"""Pipeline command to run analysis workflows."""
import json
import re
from datetime import datetime, timezone
from pathlib import Path
import click
from . import analysis_runtime, commit_lookup, pitfalls
from .config_utils import (
copy_config_to_analysis_root,
get_custom_message,
get_opt_out_repositories,
get_repositories,
load_config,
resolve_output_root,
resolve_run_name,
resolve_snapshot_tag,
sanitize_repo_name,
)
from .reporting import build_record_entry
from .rsmetacheck_wrapper import rsmetacheck_command
SNAPSHOT_TAG_PATTERN = re.compile(r"^(\d{8})(?:_(\d+))?$")
SNAPSHOT_INCREMENT_PATTERN = re.compile(r"^(.+?)_(\d+)$")
def _resolve_unique_snapshot_tag(
run_root: Path, snapshot_tag: str | None
) -> str | None:
"""Return a non-colliding snapshot tag by adding or incrementing numeric suffixes."""
if snapshot_tag is None:
return None
candidate_path = run_root / snapshot_tag
if not candidate_path.exists():
return snapshot_tag
match = SNAPSHOT_INCREMENT_PATTERN.fullmatch(snapshot_tag)
if match is None:
base_tag = snapshot_tag
suffix = 2
else:
base_tag = match.group(1)
suffix = int(match.group(2)) + 1
while True:
candidate = f"{base_tag}_{suffix}"
if not (run_root / candidate).exists():
return candidate
suffix += 1
def _snapshot_sort_key(snapshot_tag: str) -> tuple[str, int] | None:
"""Return sortable key for snapshot tags matching YYYYMMDD or YYYYMMDD_N."""
match = SNAPSHOT_TAG_PATTERN.fullmatch(snapshot_tag)
if match is None:
return None
date_part, suffix_part = match.group(1), match.group(2)
suffix = int(suffix_part) if suffix_part is not None else 0
return (date_part, suffix)
def _find_latest_previous_snapshot_root(
output_root: Path,
run_name: str,
current_snapshot_tag: str | None,
) -> Path | None:
"""Find latest previous snapshot root from same run folder."""
run_root = output_root / run_name
if not run_root.exists() or not run_root.is_dir():
return None
candidates: list[tuple[tuple[str, int], Path]] = []
for child in run_root.iterdir():
if not child.is_dir():
continue
key = _snapshot_sort_key(child.name)
if key is None:
continue
if current_snapshot_tag is not None and child.name == current_snapshot_tag:
continue
has_new_layout = any(
candidate.is_dir() and (candidate / "report.json").exists()
for candidate in child.iterdir()
)
has_old_layout = (child / "issues_out" / "report.json").exists()
has_run_report = (child / "run_report.json").exists()
if has_new_layout or has_old_layout or has_run_report:
candidates.append((key, child))
if not candidates:
return None
candidates.sort(reverse=True)
return candidates[0][1]
[docs]
def find_latest_previous_report(
output_root: Path,
run_name: str,
current_snapshot_tag: str | None,
) -> Path | None:
"""Find latest previous report path from same run folder."""
snapshot_root = _find_latest_previous_snapshot_root(
output_root=output_root,
run_name=run_name,
current_snapshot_tag=current_snapshot_tag,
)
if snapshot_root is None:
return None
run_report = snapshot_root / "run_report.json"
if run_report.exists():
return run_report
legacy_report = snapshot_root / "issues_out" / "report.json"
if legacy_report.exists():
return legacy_report
return None
def _snapshot_root_from_report_path(report_path: Path | None) -> Path | None:
"""Resolve snapshot root directory from a report file path."""
if report_path is None:
return None
if report_path.name == "run_report.json":
return report_path.parent
if report_path.name == "report.json" and report_path.parent.name == "issues_out":
return report_path.parent.parent
if report_path.name == "report.json":
return report_path.parent.parent
return report_path.parent
[docs]
def run_pipeline(
config_file: Path,
dry_run: bool,
snapshot_tag: str | None,
previous_report: Path | None,
) -> None:
"""Run analysis and write issue decision records without API side effects."""
config = load_config(config_file)
repositories = get_repositories(config)
custom_message = get_custom_message(config)
opt_out_repos = get_opt_out_repositories(config)
output_root = resolve_output_root(config, config_file)
run_folder_name = resolve_run_name(config, config_file)
requested_snapshot_tag = resolve_snapshot_tag(config, snapshot_tag)
run_root = output_root / run_folder_name
resolved_snapshot_tag = _resolve_unique_snapshot_tag(
run_root=run_root,
snapshot_tag=requested_snapshot_tag,
)
analysis_root = (
run_root / resolved_snapshot_tag if resolved_snapshot_tag else run_root
)
analysis_output_file = analysis_root / "analysis_results.json"
copy_config_to_analysis_root(config_file, analysis_root)
analysis_root.mkdir(parents=True, exist_ok=True)
resolved_previous_report = previous_report
if resolved_previous_report is None:
resolved_previous_report = find_latest_previous_report(
output_root=output_root,
run_name=run_folder_name,
current_snapshot_tag=resolved_snapshot_tag,
)
previous_snapshot_root = _snapshot_root_from_report_path(resolved_previous_report)
evaluated_repositories: dict[str, dict[str, str]] = {}
run_records: list[dict[str, object]] = []
for repo_url in repositories:
per_repo = analysis_runtime.resolve_per_repo_paths(analysis_root, repo_url)
repo_folder = per_repo["repo_folder"]
repo_folder.mkdir(parents=True, exist_ok=True)
previous_record = analysis_runtime.load_previous_repo_record(
previous_snapshot_root, repo_url
)
previous_commit_id = (
analysis_runtime.extract_previous_commit(previous_record)
if previous_record
else None
)
try:
current_commit_id = commit_lookup.get_repo_head_commit(repo_url)
except Exception:
current_commit_id = None
reused_previous = False
if (
previous_snapshot_root is not None
and previous_record is not None
and previous_commit_id
and current_commit_id
and previous_commit_id != "Unknown"
and current_commit_id != "Unknown"
and current_commit_id == previous_commit_id
):
previous_repo_folder = previous_snapshot_root / sanitize_repo_name(repo_url)
if previous_repo_folder.exists():
analysis_runtime.copy_previous_repo_artifacts(
previous_repo_folder, repo_folder
)
reused_previous = True
if not reused_previous:
analysis_runtime.run_metacheck_for_repo(
repo_url, repo_folder, rsmetacheck_command
)
normalized_repo = analysis_runtime.normalize_repo_url(repo_url)
if normalized_repo in opt_out_repos:
record = build_record_entry(
run_root=run_root,
repo_url=repo_url,
platform=analysis_runtime.detect_platform_from_repo_url(repo_url),
pitfalls_count=0,
warnings_count=0,
issue_url=None,
analysis_date=datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ"),
bot_version=pitfalls.__version__,
metacheck_version="unknown",
pitfalls_ids=[],
warnings_ids=[],
action="skipped",
reason_code="in_opt_out_list",
dry_run=dry_run,
issue_persistence="none",
current_commit_id=current_commit_id,
file_path=repo_folder / "pitfall.jsonld",
)
else:
record = analysis_runtime.create_analysis_record(
run_root=run_root,
repo_url=repo_url,
repo_folder=repo_folder,
previous_record=previous_record,
current_commit_id=current_commit_id,
dry_run=dry_run,
custom_message=custom_message,
)
analysis_runtime.write_analysis_repo_report(
repo_folder,
record,
dry_run=dry_run,
run_root=run_root,
analysis_summary_file=analysis_output_file,
previous_report=resolved_previous_report,
)
run_records.append(record)
evaluated_repositories[sanitize_repo_name(repo_url)] = {
"url": repo_url,
"commit_id": current_commit_id or "Unknown",
}
analysis_summary = {
"generated_at": datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ"),
"summary": {"evaluated_repositories": evaluated_repositories},
}
with open(analysis_output_file, "w", encoding="utf-8") as f:
json.dump(analysis_summary, f, indent=2)
run_report = analysis_runtime.build_analysis_run_report(
run_records,
dry_run=dry_run,
run_root=run_root,
analysis_summary_file=analysis_output_file,
previous_report=resolved_previous_report,
)
run_report_file = analysis_root / "run_report.json"
with open(run_report_file, "w", encoding="utf-8") as f:
json.dump(run_report, f, indent=2)
@click.command()
@click.option(
"--config-file",
type=click.Path(exists=True, dir_okay=False, path_type=Path),
required=True,
help="Unified JSON configuration file.",
)
@click.option(
"--snapshot-tag",
type=str,
default=None,
help="Optional snapshot suffix folder (for example 2026-03).",
)
@click.option(
"--previous-report",
type=click.Path(exists=True, dir_okay=False, path_type=Path),
default=None,
help="Previous run_report.json used for incremental issue handling.",
)
def run_analysis_command(
config_file: Path,
snapshot_tag: str | None,
previous_report: Path | None,
) -> None:
"""Run analysis and compute issue lifecycle decisions in dry-run mode."""
run_pipeline(
config_file=config_file,
dry_run=True,
snapshot_tag=snapshot_tag,
previous_report=previous_report,
)