Source code for sw_metadata_bot.publish

"""Publish issues from an existing analysis snapshot."""

import json
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import cast

import click

from . import github_api, gitlab_api, pitfalls
from .config_utils import (
    detect_platform,
    get_custom_message,
    load_config,
    sanitize_repo_name,
)
from .reporting import build_counters, write_report_file

MAX_PUBLISH_RETRY_ATTEMPTS = 3


def _is_unsubscribe_comment(comment: str) -> bool:
    """Return True when a comment is exactly the unsubscribe keyword."""
    return comment.strip().lower() == "unsubscribe"


def _now_utc_iso() -> str:
    """Return a UTC timestamp suitable for report persistence."""
    return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")


def _parse_utc_datetime(value: object) -> datetime | None:
    """Parse an ISO UTC timestamp persisted in publish records."""
    if not isinstance(value, str) or not value:
        return None
    try:
        return datetime.fromisoformat(value.replace("Z", "+00:00"))
    except ValueError:
        return None


def _retry_after_seconds_from_error(error_text: str) -> int:
    """Infer a retry delay from a publish error string."""
    lowered = error_text.lower()
    if "429" in lowered or "rate limit" in lowered or "too many requests" in lowered:
        return 300
    if (
        "timeout" in lowered
        or "temporarily unavailable" in lowered
        or "connection" in lowered
    ):
        return 60
    if any(code in lowered for code in ["500", "502", "503", "504"]):
        return 120
    return 30


def _is_transient_publish_error(error_text: str) -> bool:
    """Return True when the error likely represents a transient API failure."""
    lowered = error_text.lower()
    if any(code in lowered for code in ["401", "403", "404"]):
        return False
    if "unauthorized" in lowered or "forbidden" in lowered or "not found" in lowered:
        return False
    if "invalid token" in lowered or "insufficient" in lowered:
        return False
    return True


def _clear_failure_metadata(record: dict[str, object]) -> None:
    """Remove retry/failure bookkeeping after a successful publish action."""
    record.pop("error", None)
    record.pop("retry_attempt", None)
    record.pop("is_transient_error", None)
    record.pop("retry_after_seconds", None)
    record.pop("failed_at", None)


def _resolve_retry_action(record: dict[str, object]) -> str | None:
    """Resolve the original action to re-attempt for a failed publish record."""
    last_publish_action = record.get("last_publish_action")
    if isinstance(last_publish_action, str) and last_publish_action:
        return last_publish_action

    # Backward-compatible fallback for failed records created before retry metadata.
    simulated_issue_url = record.get("simulated_issue_url")
    if isinstance(simulated_issue_url, str) and simulated_issue_url:
        return "simulated_created"
    return None


def _can_retry_failed_record(record: dict[str, object]) -> bool:
    """Return True when a failed record is eligible for a new publish attempt."""
    if record.get("is_transient_error") is False:
        return False

    retry_attempt = record.get("retry_attempt")
    attempt_count = retry_attempt if isinstance(retry_attempt, int) else 0
    if attempt_count >= MAX_PUBLISH_RETRY_ATTEMPTS:
        return False

    retry_after_value = record.get("retry_after_seconds")
    retry_after_seconds = retry_after_value if isinstance(retry_after_value, int) else 0
    failed_at = _parse_utc_datetime(record.get("failed_at"))
    if failed_at is None or retry_after_seconds <= 0:
        return True

    return datetime.now(timezone.utc) >= failed_at + timedelta(
        seconds=retry_after_seconds
    )


def _build_counters(records: list[dict[str, object]]) -> dict[str, int]:
    """Build publish outcome counters from report records."""
    return build_counters(records)


def _detect_platform_for_publish(repo_url: str, record: dict[str, object]) -> str:
    """Resolve platform for publish from record metadata and repository URL."""
    value = record.get("platform")
    if isinstance(value, str) and value:
        if value in {"github", "gitlab", "gitlab.com"}:
            return value

    platform = detect_platform(repo_url)
    if platform is None:
        raise click.ClickException(f"Unsupported platform for repository: {repo_url}")
    return platform


def _load_publish_body(analysis_root: Path, repo_url: str) -> str:
    """Load issue body from report file, with pitfall-based fallback if needed."""
    repo_folder = analysis_root / sanitize_repo_name(repo_url)
    issue_report_file = repo_folder / "issue_report.md"
    if issue_report_file.exists():
        return issue_report_file.read_text(encoding="utf-8")

    pitfall_file = repo_folder / "pitfall.jsonld"
    if not pitfall_file.exists():
        raise click.ClickException(
            f"Missing issue body and pitfall file for repository: {repo_url}"
        )

    data = pitfalls.load_pitfalls(pitfall_file)
    config_file = analysis_root / "config.json"
    custom_message = None
    if config_file.exists():
        custom_message = get_custom_message(load_config(config_file))
    report = pitfalls.format_report(repo_url, data)
    return pitfalls.create_issue_body(report, custom_message)


def _issue_url_for_publish(record: dict[str, object]) -> str | None:
    """Return best available issue URL from record lineage fields."""
    current = record.get("issue_url")
    if isinstance(current, str) and current:
        return current
    previous = record.get("previous_issue_url")
    if isinstance(previous, str) and previous:
        return previous
    simulated = record.get("simulated_issue_url")
    if isinstance(simulated, str) and simulated:
        return simulated
    return None


def _write_per_repo_report(
    analysis_root: Path,
    record: dict[str, object],
    analysis_summary_file: Path | None,
    previous_report: Path | None,
) -> None:
    """Persist a single-record per-repo report alongside repository artifacts."""
    repo_url = record.get("repo_url")
    if not isinstance(repo_url, str) or not repo_url:
        return

    write_report_file(
        report_file=analysis_root / sanitize_repo_name(repo_url) / "report.json",
        records=[record],
        dry_run=False,
        run_root=analysis_root.parent,
        analysis_summary_file=analysis_summary_file,
        previous_report=previous_report,
    )


[docs] def publish_analysis(analysis_root: Path, retry_failed: bool = False) -> None: """Publish issues from an existing analysis snapshot without re-running analysis.""" run_report_file = analysis_root / "run_report.json" if not run_report_file.exists(): raise click.ClickException(f"Missing run_report.json in {analysis_root}") with open(run_report_file, encoding="utf-8") as f: run_report = json.load(f) run_metadata = ( run_report.get("run_metadata") if isinstance(run_report, dict) else None ) if not isinstance(run_metadata, dict): run_metadata = {} analysis_summary_value = run_metadata.get("analysis_summary_file") previous_report_value = run_metadata.get("previous_report_source") analysis_summary_file = ( Path(analysis_summary_value) if isinstance(analysis_summary_value, str) else None ) previous_report = ( Path(previous_report_value) if isinstance(previous_report_value, str) else None ) records = run_report.get("records") if isinstance(run_report, dict) else None if not isinstance(records, list): raise click.ClickException( f"Invalid run_report.json format in {run_report_file}: records must be a list" ) github_client: github_api.GitHubAPI | None = None gitlab_client: gitlab_api.GitLabAPI | None = None def issue_client_for_platform(platform: str): """Return lazily initialized issue client for the requested platform.""" nonlocal github_client, gitlab_client if platform == "github": if github_client is None: github_client = github_api.GitHubAPI(dry_run=False) return github_client if platform in {"gitlab", "gitlab.com"}: if gitlab_client is None: gitlab_client = gitlab_api.GitLabAPI(dry_run=False) return gitlab_client raise click.ClickException(f"Unsupported platform for publish: {platform}") updated_records: list[dict[str, object]] = [] skipped_published = 0 skipped_failed_retry = 0 for raw_record in records: if not isinstance(raw_record, dict): continue record = dict(raw_record) repo_url = record.get("repo_url") if not isinstance(repo_url, str) or not repo_url: updated_records.append(record) continue if record.get("dry_run") is False and record.get("action") != "failed": skipped_published += 1 updated_records.append(record) continue action = str(record.get("action", "")) if action == "failed": if not retry_failed: skipped_failed_retry += 1 updated_records.append(record) continue if not _can_retry_failed_record(record): skipped_failed_retry += 1 updated_records.append(record) continue retry_action = _resolve_retry_action(record) if retry_action is None: skipped_failed_retry += 1 record["reason_code"] = "missing_retry_action" updated_records.append(record) continue action = retry_action record["action"] = retry_action platform = _detect_platform_for_publish(repo_url, record) issue_url = _issue_url_for_publish(record) attempted_action = action try: if action in {"updated_by_comment", "closed"}: if not issue_url: raise click.ClickException( f"Missing issue URL for publish action {action}: {repo_url}" ) issue_client = issue_client_for_platform(platform) comments = issue_client.get_issue_comments(issue_url) unsubscribe_detected = any( _is_unsubscribe_comment(comment) for comment in comments ) if unsubscribe_detected: record["action"] = "skipped" record["reason_code"] = "unsubscribe" record["unsubscribe_detected"] = True record["dry_run"] = False record["issue_persistence"] = "none" record.pop("simulated_issue_url", None) updated_records.append(record) analysis_summary_value = run_report.get("run_metadata", {}).get( "analysis_summary_file" ) _write_per_repo_report( analysis_root, record, analysis_summary_file, previous_report, ) continue if action == "simulated_created": body = _load_publish_body(analysis_root, repo_url) title = "Automated Metadata Quality Report from CodeMetaSoft" issue_client = issue_client_for_platform(platform) created_url = issue_client.create_issue(repo_url, title, body) record["action"] = "created" record["issue_url"] = created_url record["dry_run"] = False record["issue_persistence"] = "posted" record.pop("simulated_issue_url", None) _clear_failure_metadata(record) elif action == "updated_by_comment": if not issue_url: raise click.ClickException( f"Missing previous issue URL for repo: {repo_url}" ) body = _load_publish_body(analysis_root, repo_url) issue_client = issue_client_for_platform(platform) issue_client.add_issue_comment( issue_url, f"New analysis detected updated findings.\n\n{body}", ) record["issue_url"] = issue_url record["dry_run"] = False record["issue_persistence"] = "posted" record.pop("simulated_issue_url", None) _clear_failure_metadata(record) elif action == "closed": if not issue_url: raise click.ClickException( f"Missing previous issue URL for repo: {repo_url}" ) issue_client = issue_client_for_platform(platform) issue_client.add_issue_comment( issue_url, "The latest analysis no longer reports metadata pitfalls/warnings. " "Closing this issue.", ) issue_client.close_issue(issue_url) record["issue_url"] = issue_url record["previous_issue_state"] = "closed" record["dry_run"] = False record["issue_persistence"] = "posted" record.pop("simulated_issue_url", None) _clear_failure_metadata(record) elif action == "skipped": record["dry_run"] = False record["issue_persistence"] = "none" record.pop("simulated_issue_url", None) _clear_failure_metadata(record) else: if attempted_action == "failed": skipped_failed_retry += 1 else: record["dry_run"] = False record.pop("simulated_issue_url", None) _clear_failure_metadata(record) except Exception as exc: record["action"] = "failed" record["reason_code"] = "publish_exception" error_text = str(exc) record["error"] = error_text record["dry_run"] = True record["is_transient_error"] = _is_transient_publish_error(error_text) record["retry_after_seconds"] = _retry_after_seconds_from_error(error_text) previous_retry_attempt = record.get("retry_attempt") retry_attempt = ( previous_retry_attempt + 1 if isinstance(previous_retry_attempt, int) else 1 ) record["retry_attempt"] = retry_attempt record["failed_at"] = _now_utc_iso() if attempted_action and attempted_action != "failed": record["last_publish_action"] = attempted_action updated_records.append(record) _write_per_repo_report( analysis_root, record, analysis_summary_file, previous_report, ) run_report = write_report_file( report_file=run_report_file, records=updated_records, dry_run=False, run_root=analysis_root.parent, analysis_summary_file=analysis_summary_file, previous_report=previous_report, ) run_metadata_candidate = run_report.get("run_metadata") if isinstance(run_metadata_candidate, dict): run_metadata_written = cast(dict[str, object], run_metadata_candidate) else: run_metadata_written = {} run_report["run_metadata"] = run_metadata_written run_metadata_written["published_at"] = datetime.now(timezone.utc).strftime( "%Y-%m-%dT%H:%M:%SZ" ) run_metadata_written["idempotency_skipped_records"] = skipped_published run_metadata_written["failed_retry_skipped_records"] = skipped_failed_retry with open(run_report_file, "w", encoding="utf-8") as f: json.dump(run_report, f, indent=2)
@click.command() @click.option( "--analysis-root", type=click.Path(exists=True, file_okay=False, dir_okay=True, path_type=Path), required=True, help="Existing analysis snapshot folder containing run_report.json.", ) @click.option( "--retry-failed", is_flag=True, default=False, help="Retry records previously marked as failed when they are eligible for retry.", ) def publish_command(analysis_root: Path, retry_failed: bool) -> None: """Publish issues using precomputed decisions from an analysis snapshot.""" publish_analysis(analysis_root, retry_failed=retry_failed)