"""Low-level analysis workflow helpers for pipeline orchestration."""
import json
import shutil
from dataclasses import dataclass
from pathlib import Path
from tempfile import NamedTemporaryFile
from typing import Any
from . import __version__, constants, history, incremental, pitfalls, utils
from .check_parsing import extract_check_ids
from .codemeta_runtime import evaluate_and_persist_codemeta_status, load_codemeta_status
from .config_utils import detect_platform, normalize_repo_url, sanitize_repo_name
from .reporting import (
RecordAnalysis,
RecordLifecycle,
build_counters,
build_run_metadata,
write_report_file,
)
from .reporting import build_record_entry as build_shared_record_entry
from .rsmetacheck_wrapper import run_rsmetacheck
[docs]
@dataclass(frozen=True)
class CurrentAnalysisContext:
"""Parsed current-analysis state needed to build a decision record."""
repo_url: str
pitfall_file: Path
data: dict[str, Any]
pitfalls_count: int
warnings_count: int
pitfalls_ids: list[str]
warnings_ids: list[str]
analysis_date: str
rsmetacheck_version: str
findings_signature: str
has_findings: bool
codemeta_status: str
codemeta_missing: bool
codemeta_generated: bool
generated_codemeta: dict[str, Any] | None
[docs]
@dataclass(frozen=True)
class PreviousAnalysisContext:
"""Previous-analysis state needed for incremental decision making."""
previous_exists: bool
previous_issue_url: str | None
previous_issue_state: str | None
previous_commit_id: str | None
previous_signature: str
previous_issue_open: bool
previous_codemeta_missing: bool
repo_updated: bool
def _load_generated_codemeta(repo_folder: Path) -> dict[str, Any] | None:
"""Load generated codemeta payload when rsmetacheck created one."""
generated_codemeta_file = repo_folder / constants.FILENAME_CODEMETA_GENERATED
if not generated_codemeta_file.exists():
return None
with open(generated_codemeta_file, encoding="utf-8") as f:
loaded_generated = json.load(f)
return loaded_generated if isinstance(loaded_generated, dict) else None
def _load_current_analysis_context(
repo_url: str,
repo_folder: Path,
) -> CurrentAnalysisContext:
"""Load the current repository analysis outputs into a typed context."""
pitfall_file = repo_folder / constants.FILENAME_PITFALL
data = pitfalls.load_pitfalls(pitfall_file)
detected_repo_url = pitfalls.get_repository_url(data)
resolved_repo_url = detected_repo_url if detected_repo_url else repo_url
pitfalls_list = pitfalls.get_pitfalls_list(data)
warnings_list = pitfalls.get_warnings_list(data)
pitfalls_count = len(pitfalls_list)
warnings_count = len(warnings_list)
checks = data.get("checks", [])
pitfalls_ids, warnings_ids = extract_check_ids(
checks if isinstance(checks, list) else []
)
findings_signature = history.findings_signature(pitfalls_ids, warnings_ids)
codemeta_status_data = load_codemeta_status(repo_folder)
codemeta_status_raw = codemeta_status_data.get("status")
codemeta_status = (
codemeta_status_raw if isinstance(codemeta_status_raw, str) else "unknown"
)
return CurrentAnalysisContext(
repo_url=resolved_repo_url,
pitfall_file=pitfall_file,
data=data,
pitfalls_count=pitfalls_count,
warnings_count=warnings_count,
pitfalls_ids=pitfalls_ids,
warnings_ids=warnings_ids,
analysis_date=str(data.get("dateCreated", "unknown")),
rsmetacheck_version=pitfalls.get_rsmetacheck_version(data),
findings_signature=findings_signature,
has_findings=(pitfalls_count + warnings_count) > 0,
codemeta_status=codemeta_status,
codemeta_missing=codemeta_status == "missing",
codemeta_generated=bool(codemeta_status_data.get("generated", False)),
generated_codemeta=_load_generated_codemeta(repo_folder),
)
def _load_previous_analysis_context(
previous_record: dict[str, object] | None,
current_commit_id: str | None,
force_analysis: bool = False,
) -> PreviousAnalysisContext:
"""Load previous-analysis state used by the incremental decision tree."""
if previous_record is None:
return PreviousAnalysisContext(
previous_exists=False,
previous_issue_url=None,
previous_issue_state=None,
previous_commit_id=None,
previous_signature="",
previous_issue_open=False,
previous_codemeta_missing=False,
repo_updated=True,
)
issue_url_value = previous_record.get("issue_url")
if not isinstance(issue_url_value, str) or not issue_url_value:
issue_url_value = previous_record.get("previous_issue_url")
previous_issue_url = (
str(issue_url_value) if isinstance(issue_url_value, str) else None
)
previous_state_value = previous_record.get("previous_issue_state")
previous_issue_state = (
previous_state_value
if isinstance(previous_state_value, str) and previous_state_value
else None
)
previous_commit_id = extract_previous_commit(previous_record)
previous_pitfalls_ids = previous_record.get("pitfalls_ids")
previous_warnings_ids = previous_record.get("warnings_ids")
previous_signature = history.findings_signature(
(
[value for value in previous_pitfalls_ids if isinstance(value, str)]
if isinstance(previous_pitfalls_ids, list)
else None
),
(
[value for value in previous_warnings_ids if isinstance(value, str)]
if isinstance(previous_warnings_ids, list)
else None
),
)
previous_codemeta_status_raw = previous_record.get("codemeta_status")
previous_codemeta_missing = (
isinstance(previous_codemeta_status_raw, str)
and previous_codemeta_status_raw == "missing"
)
repo_updated = True
if not force_analysis and (
previous_commit_id
and current_commit_id
and previous_commit_id != "Unknown"
and current_commit_id != "Unknown"
):
repo_updated = previous_commit_id != current_commit_id
return PreviousAnalysisContext(
previous_exists=True,
previous_issue_url=previous_issue_url,
previous_issue_state=previous_issue_state,
previous_commit_id=previous_commit_id,
previous_signature=previous_signature,
previous_issue_open=is_previous_issue_open(previous_record),
previous_codemeta_missing=previous_codemeta_missing,
repo_updated=repo_updated,
)
def _build_decision_record(
*,
run_root: Path,
repo_url: str,
platform: str | None,
current_analysis: CurrentAnalysisContext,
previous_analysis: PreviousAnalysisContext,
current_commit_id: str | None,
dry_run: bool,
action: str,
reason_code: str,
) -> dict[str, object]:
"""Build the persisted analysis record for a resolved incremental action."""
issue_url = None
issue_persistence = "none"
if action == "simulated_created":
issue_persistence = "simulated"
elif action in {"updated_by_comment", "closed"}:
issue_persistence = "simulated"
issue_url = previous_analysis.previous_issue_url
return build_shared_record_entry(
run_root=run_root,
repo_url=repo_url,
platform=platform,
analysis=RecordAnalysis(
analysis_date=current_analysis.analysis_date,
bot_version=__version__,
rsmetacheck_version=current_analysis.rsmetacheck_version,
pitfalls_count=current_analysis.pitfalls_count,
warnings_count=current_analysis.warnings_count,
pitfalls_ids=current_analysis.pitfalls_ids,
warnings_ids=current_analysis.warnings_ids,
),
lifecycle=RecordLifecycle(
issue_url=issue_url,
action=action,
reason_code=reason_code,
previous_issue_url=previous_analysis.previous_issue_url,
previous_issue_state=previous_analysis.previous_issue_state,
findings_signature=current_analysis.findings_signature,
current_commit_id=current_commit_id,
previous_commit_id=previous_analysis.previous_commit_id,
dry_run=dry_run,
issue_persistence=issue_persistence,
codemeta_generated=current_analysis.codemeta_generated,
codemeta_status=current_analysis.codemeta_status,
file_path=current_analysis.pitfall_file,
),
)
[docs]
def resolve_per_repo_paths(analysis_root: Path, repo_url: str) -> dict[str, Path]:
"""Compute per-repository output paths within the analysis root."""
sanitized_name = sanitize_repo_name(repo_url)
repo_folder = analysis_root / sanitized_name
return {
"repo_folder": repo_folder,
"somef_output": repo_folder / constants.FILENAME_SOMEF_OUTPUT,
"pitfall_output": repo_folder / constants.FILENAME_PITFALL,
"issue_report": repo_folder / constants.FILENAME_ISSUE_REPORT,
"codemeta_status": repo_folder / constants.FILENAME_CODEMETA_STATUS,
"codemeta_generated": repo_folder / constants.FILENAME_CODEMETA_GENERATED,
"report": repo_folder / constants.FILENAME_REPORT,
}
[docs]
def copy_previous_repo_artifacts(
previous_repo_folder: Path, current_repo_folder: Path
) -> None:
"""Copy previous snapshot repository artifacts into current snapshot folder."""
current_repo_folder.mkdir(parents=True, exist_ok=True)
for name in (
constants.FILENAME_SOMEF_OUTPUT,
constants.FILENAME_PITFALL,
constants.FILENAME_ISSUE_REPORT,
constants.FILENAME_CODEMETA_STATUS,
constants.FILENAME_CODEMETA_GENERATED,
constants.FILENAME_REPORT,
):
src = previous_repo_folder / name
if src.exists():
shutil.copy2(src, current_repo_folder / name)
[docs]
def load_previous_repo_record(
previous_snapshot_root: Path | None, repo_url: str
) -> dict | None:
"""Load previous per-repo record from previous snapshot if available."""
if previous_snapshot_root is None:
return None
repo_folder = previous_snapshot_root / sanitize_repo_name(repo_url)
report_path = repo_folder / constants.FILENAME_REPORT
try:
data = utils.load_json_file(
report_path, required=False, description="previous report"
)
records = data.get("records") if isinstance(data, dict) else None
if isinstance(records, list) and records:
record = records[0]
if isinstance(record, dict):
return record
except (ValueError, json.JSONDecodeError):
pass
run_report = previous_snapshot_root / constants.FILENAME_RUN_REPORT
try:
data = utils.load_json_file(
run_report, required=False, description="previous run report"
)
records = data.get("records") if isinstance(data, dict) else None
if isinstance(records, list):
normalized = normalize_repo_url(repo_url)
for record in records:
if not isinstance(record, dict):
continue
value = record.get("repo_url")
if isinstance(value, str) and normalize_repo_url(value) == normalized:
return record
except (ValueError, json.JSONDecodeError):
pass
return None
[docs]
def build_analysis_counters(records: list[dict[str, object]]) -> dict[str, int]:
"""Build analysis counters using the unified report schema."""
return build_counters(records)
[docs]
def build_analysis_run_report(
records: list[dict[str, object]],
*,
dry_run: bool,
run_root: Path,
analysis_summary_file: Path,
previous_report: Path | None,
) -> dict[str, object]:
"""Build run-level report payload from analysis decision records."""
return {
"run_metadata": build_run_metadata(
dry_run=dry_run,
run_root=run_root,
analysis_summary_file=analysis_summary_file,
previous_report=previous_report,
),
"counters": build_analysis_counters(records),
"records": records,
}
[docs]
def is_previous_issue_open(previous_record: dict[str, object]) -> bool:
"""Infer whether previous issue was open from stored metadata only."""
state_value = previous_record.get("previous_issue_state")
state = str(state_value).lower() if isinstance(state_value, str) else ""
if state in {"open", "opened"}:
return True
if state in {"closed", "close"}:
return False
# If the previous analysis already closed the issue, treat it as closed
# regardless of whether previous_issue_state was persisted.
if previous_record.get("action") == "closed":
return False
issue_url = previous_record.get("issue_url") or previous_record.get(
"previous_issue_url"
)
if not isinstance(issue_url, str) or not issue_url:
return False
issue_persistence = previous_record.get("issue_persistence")
if issue_persistence == "simulated":
return False
return True
[docs]
def build_record_entry(
*,
run_root: Path,
repo_url: str,
platform: str | None,
pitfalls_count: int,
warnings_count: int,
analysis_date: str,
rsmetacheck_version: str,
pitfalls_ids: list[str],
warnings_ids: list[str],
action: str,
reason_code: str,
findings_signature: str,
current_commit_id: str | None,
previous_commit_id: str | None,
previous_issue_url: str | None,
previous_issue_state: str | None,
dry_run: bool,
issue_persistence: str,
issue_url: str | None,
file_path: Path,
codemeta_generated: bool | None = None,
codemeta_status: str | None = None,
error: str | None = None,
) -> dict[str, object]:
"""Build a per-repository analysis record payload."""
return build_shared_record_entry(
run_root=run_root,
repo_url=repo_url,
platform=platform,
analysis=RecordAnalysis(
analysis_date=analysis_date,
bot_version=__version__,
rsmetacheck_version=rsmetacheck_version,
pitfalls_count=pitfalls_count,
warnings_count=warnings_count,
pitfalls_ids=pitfalls_ids,
warnings_ids=warnings_ids,
),
lifecycle=RecordLifecycle(
issue_url=issue_url,
action=action,
reason_code=reason_code,
previous_issue_url=previous_issue_url,
previous_issue_state=previous_issue_state,
findings_signature=findings_signature,
current_commit_id=current_commit_id,
previous_commit_id=previous_commit_id,
dry_run=dry_run,
issue_persistence=issue_persistence,
codemeta_generated=codemeta_generated,
codemeta_status=codemeta_status,
file_path=file_path,
error=error,
),
)
[docs]
def write_analysis_repo_report(
repo_folder: Path,
record: dict[str, object],
*,
dry_run: bool,
run_root: Path,
analysis_summary_file: Path,
previous_report: Path | None,
) -> None:
"""Write per-repository analysis report using analysis-stage counters."""
write_report_file(
report_file=repo_folder / constants.FILENAME_REPORT,
records=[record],
dry_run=dry_run,
run_root=run_root,
analysis_summary_file=analysis_summary_file,
previous_report=previous_report,
)
[docs]
def create_analysis_record(
*,
run_root: Path,
repo_url: str,
repo_folder: Path,
previous_record: dict[str, object] | None,
current_commit_id: str | None,
dry_run: bool,
custom_message: str | None,
force_analysis: bool = False,
) -> dict[str, object]:
"""Create a decision record for a repository without platform API calls."""
pitfall_file = repo_folder / constants.FILENAME_PITFALL
if not pitfall_file.exists():
return build_record_entry(
run_root=run_root,
repo_url=repo_url,
platform=detect_repo_platform(repo_url),
pitfalls_count=0,
warnings_count=0,
analysis_date="unknown",
rsmetacheck_version="unknown",
pitfalls_ids=[],
warnings_ids=[],
action="failed",
reason_code="missing_pitfall_file",
findings_signature="",
current_commit_id=current_commit_id,
previous_commit_id=None,
previous_issue_url=None,
previous_issue_state=None,
dry_run=dry_run,
issue_persistence="none",
issue_url=None,
file_path=pitfall_file,
error=f"Missing pitfall file: {pitfall_file}",
)
try:
current_analysis = _load_current_analysis_context(repo_url, repo_folder)
repo_url = current_analysis.repo_url
if current_analysis.has_findings or current_analysis.codemeta_missing:
formatted = pitfalls.format_report(
repo_url,
current_analysis.data,
codemeta_missing=current_analysis.codemeta_missing,
generated_codemeta=current_analysis.generated_codemeta,
)
issue_body = pitfalls.create_issue_body(formatted, custom_message)
(repo_folder / constants.FILENAME_ISSUE_REPORT).write_text(
issue_body, encoding="utf-8"
)
platform = detect_repo_platform(repo_url)
previous_analysis = _load_previous_analysis_context(
previous_record,
current_commit_id,
force_analysis=force_analysis,
)
decision = incremental.evaluate(
previous_exists=previous_analysis.previous_exists,
unsubscribed=False,
repo_updated=previous_analysis.repo_updated,
has_findings=current_analysis.has_findings,
identical_findings=(
current_analysis.findings_signature
== previous_analysis.previous_signature
),
previous_issue_open=previous_analysis.previous_issue_open,
codemeta_missing=current_analysis.codemeta_missing,
previous_codemeta_missing=previous_analysis.previous_codemeta_missing,
)
if decision.action == "create":
return _build_decision_record(
run_root=run_root,
repo_url=repo_url,
platform=platform,
current_analysis=current_analysis,
previous_analysis=previous_analysis,
current_commit_id=current_commit_id,
dry_run=dry_run,
action="simulated_created",
reason_code=decision.reason,
)
if decision.action == "comment":
return _build_decision_record(
run_root=run_root,
repo_url=repo_url,
platform=platform,
current_analysis=current_analysis,
previous_analysis=previous_analysis,
current_commit_id=current_commit_id,
dry_run=dry_run,
action="updated_by_comment",
reason_code=decision.reason,
)
if decision.action == "close":
return _build_decision_record(
run_root=run_root,
repo_url=repo_url,
platform=platform,
current_analysis=current_analysis,
previous_analysis=previous_analysis,
current_commit_id=current_commit_id,
dry_run=dry_run,
action="closed",
reason_code=decision.reason,
)
return _build_decision_record(
run_root=run_root,
repo_url=repo_url,
platform=platform,
current_analysis=current_analysis,
previous_analysis=previous_analysis,
current_commit_id=current_commit_id,
dry_run=dry_run,
action="skipped",
reason_code=decision.reason,
)
except Exception as exc:
return build_record_entry(
run_root=run_root,
repo_url=repo_url,
platform=detect_repo_platform(repo_url),
pitfalls_count=0,
warnings_count=0,
analysis_date="unknown",
rsmetacheck_version="unknown",
pitfalls_ids=[],
warnings_ids=[],
action="failed",
reason_code="exception",
findings_signature="",
current_commit_id=current_commit_id,
previous_commit_id=(
extract_previous_commit(previous_record)
if previous_record is not None
else None
),
previous_issue_url=None,
previous_issue_state=None,
dry_run=dry_run,
issue_persistence="none",
issue_url=None,
file_path=pitfall_file,
error=str(exc),
)