"""Low-level analysis workflow helpers for pipeline orchestration."""
import json
import shutil
from pathlib import Path
from tempfile import NamedTemporaryFile
from . import history, incremental, pitfalls
from .check_parsing import extract_check_ids
from .config_utils import detect_platform, normalize_repo_url, sanitize_repo_name
from .reporting import build_counters, build_run_metadata, write_report_file
from .reporting import build_record_entry as build_shared_record_entry
[docs]
def resolve_per_repo_paths(analysis_root: Path, repo_url: str) -> dict[str, Path]:
"""Compute per-repository output paths within the analysis root."""
sanitized_name = sanitize_repo_name(repo_url)
repo_folder = analysis_root / sanitized_name
return {
"repo_folder": repo_folder,
"somef_output": repo_folder / "somef_output.json",
"pitfall_output": repo_folder / "pitfall.jsonld",
"issue_report": repo_folder / "issue_report.md",
"report": repo_folder / "report.json",
}
[docs]
def copy_previous_repo_artifacts(
previous_repo_folder: Path, current_repo_folder: Path
) -> None:
"""Copy previous snapshot repository artifacts into current snapshot folder."""
current_repo_folder.mkdir(parents=True, exist_ok=True)
for name in (
"somef_output.json",
"pitfall.jsonld",
"issue_report.md",
"report.json",
):
src = previous_repo_folder / name
if src.exists():
shutil.copy2(src, current_repo_folder / name)
[docs]
def load_previous_repo_record(
previous_snapshot_root: Path | None, repo_url: str
) -> dict | None:
"""Load previous per-repo record from previous snapshot if available."""
if previous_snapshot_root is None:
return None
repo_folder = previous_snapshot_root / sanitize_repo_name(repo_url)
report_path = repo_folder / "report.json"
if report_path.exists():
with open(report_path, encoding="utf-8") as f:
data = json.load(f)
records = data.get("records") if isinstance(data, dict) else None
if isinstance(records, list) and records:
record = records[0]
if isinstance(record, dict):
return record
run_report = previous_snapshot_root / "run_report.json"
if run_report.exists():
with open(run_report, encoding="utf-8") as f:
data = json.load(f)
records = data.get("records") if isinstance(data, dict) else None
if isinstance(records, list):
normalized = normalize_repo_url(repo_url)
for record in records:
if not isinstance(record, dict):
continue
value = record.get("repo_url")
if isinstance(value, str) and normalize_repo_url(value) == normalized:
return record
return None
[docs]
def build_analysis_counters(records: list[dict[str, object]]) -> dict[str, int]:
"""Build analysis counters using the unified report schema."""
return build_counters(records)
[docs]
def build_analysis_run_report(
records: list[dict[str, object]],
*,
dry_run: bool,
run_root: Path,
analysis_summary_file: Path,
previous_report: Path | None,
) -> dict[str, object]:
"""Build run-level report payload from analysis decision records."""
return {
"run_metadata": build_run_metadata(
dry_run=dry_run,
run_root=run_root,
analysis_summary_file=analysis_summary_file,
previous_report=previous_report,
),
"counters": build_analysis_counters(records),
"records": records,
}
[docs]
def is_previous_issue_open(previous_record: dict[str, object]) -> bool:
"""Infer whether previous issue was open from stored metadata only."""
state_value = previous_record.get("previous_issue_state")
state = str(state_value).lower() if isinstance(state_value, str) else ""
if state in {"open", "opened"}:
return True
if state in {"closed", "close"}:
return False
# If the previous analysis already closed the issue, treat it as closed
# regardless of whether previous_issue_state was persisted.
if previous_record.get("action") == "closed":
return False
issue_url = previous_record.get("issue_url") or previous_record.get(
"previous_issue_url"
)
if not isinstance(issue_url, str) or not issue_url:
return False
issue_persistence = previous_record.get("issue_persistence")
if issue_persistence == "simulated":
return False
return True
[docs]
def build_record_entry(
*,
run_root: Path,
repo_url: str,
platform: str | None,
pitfalls_count: int,
warnings_count: int,
analysis_date: str,
metacheck_version: str,
pitfalls_ids: list[str],
warnings_ids: list[str],
action: str,
reason_code: str,
findings_signature: str,
current_commit_id: str | None,
previous_commit_id: str | None,
previous_issue_url: str | None,
previous_issue_state: str | None,
dry_run: bool,
issue_persistence: str,
issue_url: str | None,
file_path: Path,
error: str | None = None,
) -> dict[str, object]:
"""Build a per-repository analysis record payload."""
return build_shared_record_entry(
run_root=run_root,
repo_url=repo_url,
platform=platform,
pitfalls_count=pitfalls_count,
warnings_count=warnings_count,
issue_url=issue_url,
analysis_date=analysis_date,
bot_version=pitfalls.__version__,
metacheck_version=metacheck_version,
pitfalls_ids=pitfalls_ids,
warnings_ids=warnings_ids,
action=action,
reason_code=reason_code,
findings_signature=findings_signature,
current_commit_id=current_commit_id,
previous_commit_id=previous_commit_id,
previous_issue_url=previous_issue_url,
previous_issue_state=previous_issue_state,
dry_run=dry_run,
issue_persistence=issue_persistence,
file_path=file_path,
error=error,
)
[docs]
def write_analysis_repo_report(
repo_folder: Path,
record: dict[str, object],
*,
dry_run: bool,
run_root: Path,
analysis_summary_file: Path,
previous_report: Path | None,
) -> None:
"""Write per-repository analysis report using analysis-stage counters."""
write_report_file(
report_file=repo_folder / "report.json",
records=[record],
dry_run=dry_run,
run_root=run_root,
analysis_summary_file=analysis_summary_file,
previous_report=previous_report,
)
[docs]
def create_analysis_record(
*,
run_root: Path,
repo_url: str,
repo_folder: Path,
previous_record: dict[str, object] | None,
current_commit_id: str | None,
dry_run: bool,
custom_message: str | None,
) -> dict[str, object]:
"""Create a decision record for a repository without platform API calls."""
pitfall_file = repo_folder / "pitfall.jsonld"
if not pitfall_file.exists():
return build_record_entry(
run_root=run_root,
repo_url=repo_url,
platform=detect_platform_from_repo_url(repo_url),
pitfalls_count=0,
warnings_count=0,
analysis_date="unknown",
metacheck_version="unknown",
pitfalls_ids=[],
warnings_ids=[],
action="failed",
reason_code="missing_pitfall_file",
findings_signature="",
current_commit_id=current_commit_id,
previous_commit_id=None,
previous_issue_url=None,
previous_issue_state=None,
dry_run=dry_run,
issue_persistence="none",
issue_url=None,
file_path=pitfall_file,
error=f"Missing pitfall file: {pitfall_file}",
)
try:
data = pitfalls.load_pitfalls(pitfall_file)
detected_repo_url = pitfalls.get_repository_url(data)
if detected_repo_url:
repo_url = detected_repo_url
pitfalls_list = pitfalls.get_pitfalls_list(data)
warnings_list = pitfalls.get_warnings_list(data)
pitfalls_count = len(pitfalls_list)
warnings_count = len(warnings_list)
checks = data.get("checks", [])
check_ids = extract_check_ids(checks if isinstance(checks, list) else [])
pitfalls_ids, warnings_ids = check_ids
analysis_date = str(data.get("dateCreated", "unknown"))
metacheck_version = pitfalls.get_metacheck_version(data)
current_signature = history.findings_signature(pitfalls_ids, warnings_ids)
has_findings = (pitfalls_count + warnings_count) > 0
if has_findings:
formatted = pitfalls.format_report(repo_url, data)
issue_body = pitfalls.create_issue_body(formatted, custom_message)
(repo_folder / "issue_report.md").write_text(issue_body, encoding="utf-8")
platform = detect_platform_from_repo_url(repo_url)
previous_issue_url: str | None = None
previous_issue_state: str | None = None
previous_commit_id: str | None = None
previous_signature = ""
previous_exists = previous_record is not None
previous_issue_open = False
repo_updated = True
if previous_record is not None:
issue_url_value = previous_record.get("issue_url")
if not isinstance(issue_url_value, str) or not issue_url_value:
issue_url_value = previous_record.get("previous_issue_url")
previous_issue_url = (
str(issue_url_value) if isinstance(issue_url_value, str) else None
)
previous_state_value = previous_record.get("previous_issue_state")
if isinstance(previous_state_value, str) and previous_state_value:
previous_issue_state = previous_state_value
previous_commit_id = extract_previous_commit(previous_record)
previous_pitfalls_ids = previous_record.get("pitfalls_ids")
previous_warnings_ids = previous_record.get("warnings_ids")
previous_signature = history.findings_signature(
(
[value for value in previous_pitfalls_ids if isinstance(value, str)]
if isinstance(previous_pitfalls_ids, list)
else None
),
(
[value for value in previous_warnings_ids if isinstance(value, str)]
if isinstance(previous_warnings_ids, list)
else None
),
)
previous_issue_open = is_previous_issue_open(previous_record)
if (
previous_commit_id
and current_commit_id
and previous_commit_id != "Unknown"
and current_commit_id != "Unknown"
):
repo_updated = previous_commit_id != current_commit_id
decision = incremental.evaluate(
previous_exists=previous_exists,
unsubscribed=False,
repo_updated=repo_updated,
has_findings=has_findings,
identical_findings=current_signature == previous_signature,
previous_issue_open=previous_issue_open,
)
if decision.action == "create":
return build_record_entry(
run_root=run_root,
repo_url=repo_url,
platform=platform,
pitfalls_count=pitfalls_count,
warnings_count=warnings_count,
analysis_date=analysis_date,
metacheck_version=metacheck_version,
pitfalls_ids=pitfalls_ids,
warnings_ids=warnings_ids,
action="simulated_created",
reason_code=decision.reason,
findings_signature=current_signature,
current_commit_id=current_commit_id,
previous_commit_id=previous_commit_id,
previous_issue_url=previous_issue_url,
previous_issue_state=previous_issue_state,
dry_run=dry_run,
issue_persistence="simulated",
issue_url=None,
file_path=pitfall_file,
)
if decision.action == "comment":
return build_record_entry(
run_root=run_root,
repo_url=repo_url,
platform=platform,
pitfalls_count=pitfalls_count,
warnings_count=warnings_count,
analysis_date=analysis_date,
metacheck_version=metacheck_version,
pitfalls_ids=pitfalls_ids,
warnings_ids=warnings_ids,
action="updated_by_comment",
reason_code=decision.reason,
findings_signature=current_signature,
current_commit_id=current_commit_id,
previous_commit_id=previous_commit_id,
previous_issue_url=previous_issue_url,
previous_issue_state=previous_issue_state,
dry_run=dry_run,
issue_persistence="simulated",
issue_url=previous_issue_url,
file_path=pitfall_file,
)
if decision.action == "close":
return build_record_entry(
run_root=run_root,
repo_url=repo_url,
platform=platform,
pitfalls_count=pitfalls_count,
warnings_count=warnings_count,
analysis_date=analysis_date,
metacheck_version=metacheck_version,
pitfalls_ids=pitfalls_ids,
warnings_ids=warnings_ids,
action="closed",
reason_code=decision.reason,
findings_signature=current_signature,
current_commit_id=current_commit_id,
previous_commit_id=previous_commit_id,
previous_issue_url=previous_issue_url,
previous_issue_state=previous_issue_state,
dry_run=dry_run,
issue_persistence="simulated",
issue_url=previous_issue_url,
file_path=pitfall_file,
)
return build_record_entry(
run_root=run_root,
repo_url=repo_url,
platform=platform,
pitfalls_count=pitfalls_count,
warnings_count=warnings_count,
analysis_date=analysis_date,
metacheck_version=metacheck_version,
pitfalls_ids=pitfalls_ids,
warnings_ids=warnings_ids,
action="skipped",
reason_code=decision.reason,
findings_signature=current_signature,
current_commit_id=current_commit_id,
previous_commit_id=previous_commit_id,
previous_issue_url=previous_issue_url,
previous_issue_state=previous_issue_state,
dry_run=dry_run,
issue_persistence="none",
issue_url=None,
file_path=pitfall_file,
)
except Exception as exc:
return build_record_entry(
run_root=run_root,
repo_url=repo_url,
platform=detect_platform_from_repo_url(repo_url),
pitfalls_count=0,
warnings_count=0,
analysis_date="unknown",
metacheck_version="unknown",
pitfalls_ids=[],
warnings_ids=[],
action="failed",
reason_code="exception",
findings_signature="",
current_commit_id=current_commit_id,
previous_commit_id=(
extract_previous_commit(previous_record)
if previous_record is not None
else None
),
previous_issue_url=None,
previous_issue_state=None,
dry_run=dry_run,
issue_persistence="none",
issue_url=None,
file_path=pitfall_file,
error=str(exc),
)