Source code for sw_metadata_bot.publish

"""Publish issues from an existing analysis snapshot."""

import json
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import cast

import click

from . import constants, github_api, gitlab_api, pitfalls, utils
from .config.config_utils import (
    append_opt_out_to_config,
    detect_platform,
    sanitize_repo_name,
)
from .config.schemas import BotConfig
from .reporting import build_counters, write_report_file

MAX_PUBLISH_RETRY_ATTEMPTS = 3


[docs] class FakeIssueClient: """Issue client used only for local publish simulation."""
[docs] def __init__(self, comments_for=None): """ Initialize the fake issue client. """ self._comments_for = comments_for or (lambda url: []) self.created: list[tuple[str, str, str]] = [] self.commented: list[str] = [] self.closed: list[str] = []
[docs] def create_issue(self, repo_url: str, title: str, body: str) -> str: """Create an issue and return a simulated issue URL.""" self.created.append((repo_url, title, body)) return f"{repo_url}/issues/99"
[docs] def get_issue(self, issue_url: str) -> dict[str, object]: """return simulated issue data, with state 'open' by default (can be overridden by test setup)""" return {"state": "open"}
[docs] def get_issue_comments(self, issue_url: str) -> list[str]: """get simulated comments for the issue URL, as provided by the comments_for function""" return self._comments_for(issue_url)
[docs] def add_issue_comment(self, issue_url: str, body: str) -> None: """add a comment to the issue URL (recording the action for test verification)""" self.commented.append(issue_url)
[docs] def close_issue(self, issue_url: str) -> None: """simulate closing the issue at the given URL (recording the action for test verification)""" self.closed.append(issue_url)
def _is_unsubscribe_comment(comment: str) -> bool: """Return True when a comment is exactly the unsubscribe keyword.""" return comment.strip().lower() == "unsubscribe" def _now_utc_iso() -> str: """Return a UTC timestamp suitable for report persistence.""" return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") def _parse_utc_datetime(value: object) -> datetime | None: """Parse an ISO UTC timestamp persisted in publish records.""" if not isinstance(value, str) or not value: return None try: return datetime.fromisoformat(value.replace("Z", "+00:00")) except ValueError: return None def _retry_after_seconds_from_error(error_text: str) -> int: """Infer a retry delay from a publish error string.""" lowered = error_text.lower() if "429" in lowered or "rate limit" in lowered or "too many requests" in lowered: return 300 if ( "timeout" in lowered or "temporarily unavailable" in lowered or "connection" in lowered ): return 60 if any(code in lowered for code in ["500", "502", "503", "504"]): return 120 return 30 def _is_transient_publish_error(error_text: str) -> bool: """Return True when the error likely represents a transient API failure.""" lowered = error_text.lower() if any(code in lowered for code in ["401", "403", "404"]): return False if "unauthorized" in lowered or "forbidden" in lowered or "not found" in lowered: return False if "invalid token" in lowered or "insufficient" in lowered: return False return True def _clear_failure_metadata(record: dict[str, object]) -> None: """Remove retry/failure bookkeeping after a successful publish action.""" record.pop("error", None) record.pop("retry_attempt", None) record.pop("is_transient_error", None) record.pop("retry_after_seconds", None) record.pop("failed_at", None) def _resolve_retry_action(record: dict[str, object]) -> str | None: """Resolve the original action to re-attempt for a failed publish record.""" last_publish_action = record.get("last_publish_action") if isinstance(last_publish_action, str) and last_publish_action: return last_publish_action # Backward-compatible fallback for failed records created before retry metadata. simulated_issue_url = record.get("simulated_issue_url") if isinstance(simulated_issue_url, str) and simulated_issue_url: return "simulated_created" return None def _can_retry_failed_record(record: dict[str, object]) -> bool: """Return True when a failed record is eligible for a new publish attempt.""" if record.get("is_transient_error") is False: return False retry_attempt = record.get("retry_attempt") attempt_count = retry_attempt if isinstance(retry_attempt, int) else 0 if attempt_count >= MAX_PUBLISH_RETRY_ATTEMPTS: return False retry_after_value = record.get("retry_after_seconds") retry_after_seconds = retry_after_value if isinstance(retry_after_value, int) else 0 failed_at = _parse_utc_datetime(record.get("failed_at")) if failed_at is None or retry_after_seconds <= 0: return True return datetime.now(timezone.utc) >= failed_at + timedelta( seconds=retry_after_seconds ) def _build_counters(records: list[dict[str, object]]) -> dict[str, int]: """Build publish outcome counters from report records.""" return build_counters(records) def _detect_platform_for_publish(repo_url: str, record: dict[str, object]) -> str: """Resolve platform for publish from record metadata and repository URL.""" value = record.get("platform") if isinstance(value, str) and value: if value in {"github", "gitlab", "gitlab.com"}: return value platform = detect_platform(repo_url) if platform is None: raise click.ClickException(f"Unsupported platform for repository: {repo_url}") return platform def _load_publish_body(analysis_root: Path, repo_url: str) -> str: """Load issue body from report file, with pitfall-based fallback if needed.""" repo_folder = analysis_root / sanitize_repo_name(repo_url) issue_report_file = repo_folder / "issue_report.md" if issue_report_file.exists(): return issue_report_file.read_text(encoding="utf-8") pitfall_file = repo_folder / "pitfall.jsonld" if not pitfall_file.exists(): raise click.ClickException( f"Missing issue body and pitfall file for repository: {repo_url}" ) data = pitfalls.load_pitfalls(pitfall_file) config_file = analysis_root / constants.FILENAME_CONFIG_SNAPSHOT custom_message = None if config_file.exists(): config = BotConfig.from_json(config_file) custom_message = config.get_custom_issue_message() report = pitfalls.format_report(repo_url, data) return pitfalls.create_issue_body(report, custom_message) def _issue_url_for_publish(record: dict[str, object]) -> str | None: """Return best available issue URL from record lineage fields.""" current = record.get("issue_url") if isinstance(current, str) and current: return current previous = record.get("previous_issue_url") if isinstance(previous, str) and previous: return previous simulated = record.get("simulated_issue_url") if isinstance(simulated, str) and simulated: return simulated return None def _issue_is_closed(issue_data: dict[str, object] | None) -> bool: """Return True when issue data indicates the issue is already closed.""" if not isinstance(issue_data, dict): return False state_value = issue_data.get("state") return isinstance(state_value, str) and state_value.strip().lower() in { "closed", "close", } def _write_per_repo_report( analysis_root: Path, record: dict[str, object], analysis_summary_file: Path | None, previous_report: Path | None, ) -> None: """Persist a single-record per-repo report alongside repository artifacts.""" repo_url = record.get("repo_url") if not isinstance(repo_url, str) or not repo_url: return write_report_file( report_file=analysis_root / sanitize_repo_name(repo_url) / constants.FILENAME_REPORT, records=[record], dry_run=False, run_root=analysis_root.parent, analysis_summary_file=analysis_summary_file, previous_report=previous_report, )
[docs] def publish_analysis( analysis_root: Path, retry_failed: bool = False, github_client: github_api.GitHubAPI | None = None, gitlab_client: gitlab_api.GitLabAPI | None = None, ) -> None: """Publish issues from an existing analysis snapshot without re-running analysis.""" run_report_file = analysis_root / constants.FILENAME_RUN_REPORT try: run_report = utils.load_json_file( run_report_file, required=True, description="run report" ) except FileNotFoundError: raise click.ClickException(f"Missing run_report.json in {analysis_root}") except (ValueError, json.JSONDecodeError) as exc: raise click.ClickException( f"Invalid run_report.json format in {run_report_file}: {exc}" ) run_metadata = ( run_report.get("run_metadata") if isinstance(run_report, dict) else None ) if not isinstance(run_metadata, dict): run_metadata = {} analysis_summary_value = run_metadata.get("analysis_summary_file") previous_report_value = run_metadata.get("previous_report_source") input_config_value = run_metadata.get("input_config_file") analysis_summary_file = ( Path(analysis_summary_value) if isinstance(analysis_summary_value, str) else None ) previous_report = ( Path(previous_report_value) if isinstance(previous_report_value, str) else None ) input_config_file = ( Path(input_config_value) if isinstance(input_config_value, str) else None ) records = run_report.get("records") if isinstance(run_report, dict) else None if not isinstance(records, list): raise click.ClickException( f"Invalid run_report.json format in {run_report_file}: records must be a list" ) github_client_instance = github_client gitlab_client_instance = gitlab_client def issue_client_for_platform(platform: str): """Return lazily initialized issue client for the requested platform.""" nonlocal github_client_instance, gitlab_client_instance if platform == "github": if github_client_instance is None: github_client_instance = github_api.GitHubAPI(dry_run=False) return github_client_instance if platform in {"gitlab", "gitlab.com"}: if gitlab_client_instance is None: gitlab_client_instance = gitlab_api.GitLabAPI(dry_run=False) return gitlab_client_instance raise click.ClickException(f"Unsupported platform for publish: {platform}") updated_records: list[dict[str, object]] = [] skipped_published = 0 skipped_failed_retry = 0 for raw_record in records: if not isinstance(raw_record, dict): continue record = dict(raw_record) repo_url = record.get("repo_url") if not isinstance(repo_url, str) or not repo_url: updated_records.append(record) continue try: action = str(record.get("action", "")) platform = _detect_platform_for_publish(repo_url, record) issue_url = _issue_url_for_publish(record) if action == constants.ACTION_SKIPPED and issue_url: issue_client = issue_client_for_platform(platform) comments = issue_client.get_issue_comments(issue_url) unsubscribe_detected = any( _is_unsubscribe_comment(comment) for comment in comments ) record["unsubscribe_detected"] = unsubscribe_detected if unsubscribe_detected: config_file = analysis_root / constants.FILENAME_CONFIG_SNAPSHOT if config_file.exists(): append_opt_out_to_config(config_file, repo_url, explicit=False) if input_config_file is not None: original_input_path = input_config_file if not original_input_path.is_absolute(): original_input_path = ( analysis_root.parent / original_input_path ) if original_input_path.exists(): append_opt_out_to_config( original_input_path, repo_url, explicit=False ) record["action"] = constants.ACTION_SKIPPED record["reason_code"] = constants.REASON_CODE_UNSUBSCRIBE record["dry_run"] = False record["issue_persistence"] = "none" record.pop("simulated_issue_url", None) updated_records.append(record) _write_per_repo_report( analysis_root, record, analysis_summary_file, previous_report, ) continue if ( record.get("dry_run") is False and record.get("action") != constants.ACTION_FAILED ): skipped_published += 1 updated_records.append(record) continue if action == constants.ACTION_FAILED: if not retry_failed: skipped_failed_retry += 1 updated_records.append(record) continue if not _can_retry_failed_record(record): skipped_failed_retry += 1 updated_records.append(record) continue retry_action = _resolve_retry_action(record) if retry_action is None: skipped_failed_retry += 1 record["reason_code"] = "missing_retry_action" updated_records.append(record) continue action = retry_action record["action"] = retry_action platform = _detect_platform_for_publish(repo_url, record) issue_url = _issue_url_for_publish(record) attempted_action = action try: if action in { constants.ACTION_UPDATED_BY_COMMENT, constants.ACTION_CLOSED, }: if not issue_url: raise click.ClickException( f"Missing issue URL for publish action {action}: {repo_url}" ) issue_client = issue_client_for_platform(platform) comments = issue_client.get_issue_comments(issue_url) unsubscribe_detected = any( _is_unsubscribe_comment(comment) for comment in comments ) if unsubscribe_detected: # update config of analysis snapshot when present config_file = analysis_root / constants.FILENAME_CONFIG_SNAPSHOT if config_file.exists(): append_opt_out_to_config( config_file, repo_url, explicit=False ) # also update the original input config file when available input_config_value = run_metadata.get("input_config_file") if isinstance(input_config_value, str): input_config_path = Path(input_config_value) if not input_config_path.is_absolute(): input_config_path = ( analysis_root.parent / input_config_path ) if input_config_path.exists(): append_opt_out_to_config( input_config_path, repo_url, explicit=False ) # skip publish record["action"] = constants.ACTION_SKIPPED record["reason_code"] = constants.REASON_CODE_UNSUBSCRIBE record["unsubscribe_detected"] = True record["dry_run"] = False record["issue_persistence"] = "none" record.pop("simulated_issue_url", None) updated_records.append(record) analysis_summary_value = run_report.get("run_metadata", {}).get( "analysis_summary_file" ) _write_per_repo_report( analysis_root, record, analysis_summary_file, previous_report, ) continue issue_data = issue_client.get_issue(issue_url) if ( action == constants.ACTION_UPDATED_BY_COMMENT and _issue_is_closed(issue_data) ): record["action"] = constants.ACTION_SIMULATED_CREATED record["reason_code"] = "changed_and_issue_closed" record["previous_issue_url"] = issue_url record.pop("issue_url", None) action = constants.ACTION_SIMULATED_CREATED if action == constants.ACTION_SIMULATED_CREATED: body = _load_publish_body(analysis_root, repo_url) title = "Automated Metadata Quality Report from CodeMetaSoft" issue_client = issue_client_for_platform(platform) created_url = issue_client.create_issue(repo_url, title, body) record["action"] = constants.ACTION_CREATED record["issue_url"] = created_url record["dry_run"] = False record["issue_persistence"] = "posted" record.pop("simulated_issue_url", None) _clear_failure_metadata(record) elif action == constants.ACTION_UPDATED_BY_COMMENT: if not issue_url: raise click.ClickException( f"Missing previous issue URL for repo: {repo_url}" ) body = _load_publish_body(analysis_root, repo_url) issue_client = issue_client_for_platform(platform) issue_client.add_issue_comment( issue_url, f"New analysis detected updated findings.\n\n{body}", ) record["issue_url"] = issue_url record["dry_run"] = False record["issue_persistence"] = "posted" record.pop("simulated_issue_url", None) _clear_failure_metadata(record) elif action == constants.ACTION_CLOSED: if not issue_url: raise click.ClickException( f"Missing previous issue URL for repo: {repo_url}" ) issue_client = issue_client_for_platform(platform) issue_client.add_issue_comment( issue_url, "The latest analysis no longer reports metadata pitfalls/warnings. " "Closing this issue.", ) issue_client.close_issue(issue_url) record["issue_url"] = issue_url record["previous_issue_state"] = "closed" record["dry_run"] = False record["issue_persistence"] = "posted" record.pop("simulated_issue_url", None) _clear_failure_metadata(record) elif action == constants.ACTION_SKIPPED: record["dry_run"] = False record["issue_persistence"] = "none" record.pop("simulated_issue_url", None) _clear_failure_metadata(record) else: if attempted_action == constants.ACTION_FAILED: skipped_failed_retry += 1 else: record["dry_run"] = False record.pop("simulated_issue_url", None) _clear_failure_metadata(record) except Exception as exc: record["action"] = constants.ACTION_FAILED record["reason_code"] = constants.REASON_CODE_PUBLISH_EXCEPTION error_text = str(exc) record["error"] = error_text record["dry_run"] = True record["is_transient_error"] = _is_transient_publish_error(error_text) record["retry_after_seconds"] = _retry_after_seconds_from_error( error_text ) previous_retry_attempt = record.get("retry_attempt") retry_attempt = ( previous_retry_attempt + 1 if isinstance(previous_retry_attempt, int) else 1 ) record["retry_attempt"] = retry_attempt record["failed_at"] = _now_utc_iso() if attempted_action and attempted_action != constants.ACTION_FAILED: record["last_publish_action"] = attempted_action updated_records.append(record) except Exception as exc: record["action"] = constants.ACTION_FAILED record["reason_code"] = constants.REASON_CODE_PUBLISH_EXCEPTION error_text = str(exc) record["error"] = error_text record["dry_run"] = True record["is_transient_error"] = _is_transient_publish_error(error_text) record["retry_after_seconds"] = _retry_after_seconds_from_error(error_text) previous_retry_attempt = record.get("retry_attempt") retry_attempt = ( previous_retry_attempt + 1 if isinstance(previous_retry_attempt, int) else 1 ) record["retry_attempt"] = retry_attempt record["failed_at"] = _now_utc_iso() updated_records.append(record) _write_per_repo_report( analysis_root, record, analysis_summary_file, previous_report, ) run_report = write_report_file( report_file=run_report_file, records=updated_records, dry_run=False, run_root=analysis_root.parent, analysis_summary_file=analysis_summary_file, previous_report=previous_report, input_config_file=input_config_file, ) run_metadata_candidate = run_report.get("run_metadata") if isinstance(run_metadata_candidate, dict): run_metadata_written = cast(dict[str, object], run_metadata_candidate) else: run_metadata_written = {} run_report["run_metadata"] = run_metadata_written run_metadata_written["published_at"] = datetime.now(timezone.utc).strftime( "%Y-%m-%dT%H:%M:%SZ" ) run_metadata_written["idempotency_skipped_records"] = skipped_published run_metadata_written["failed_retry_skipped_records"] = skipped_failed_retry with open(run_report_file, "w", encoding="utf-8") as f: json.dump(run_report, f, indent=2)
@click.command() @click.option( "--analysis-root", type=click.Path(exists=True, file_okay=False, dir_okay=True, path_type=Path), required=True, help="Existing analysis snapshot folder containing run_report.json.", ) @click.option( "--retry-failed", is_flag=True, default=False, help="Retry records previously marked as failed when they are eligible for retry.", ) def publish_command(analysis_root: Path, retry_failed: bool) -> None: """Publish issues using precomputed decisions from an analysis snapshot.""" publish_analysis(analysis_root, retry_failed=retry_failed) @click.command() @click.option( "--analysis-root", type=click.Path(exists=True, file_okay=False, dir_okay=True, path_type=Path), required=True, help="Existing analysis snapshot folder containing run_report.json.", ) @click.option( "--retry-failed", is_flag=True, default=False, help="Retry records previously marked as failed when they are eligible for retry.", ) @click.option( "--unsubscribe", is_flag=True, default=False, help="Simulate an unsubscribe comment on all issue comment checks.", ) @click.option( "--fake-comment", multiple=True, help="Fake issue comment text returned for all issue URLs. Can be repeated.", ) def simulate_publish_command( analysis_root: Path, retry_failed: bool, unsubscribe: bool, fake_comment: tuple[str, ...], ) -> None: """Simulate publish using a local fake issue client without external API access.""" fake_comments = [] if unsubscribe: fake_comments.append("unsubscribe") fake_comments.extend(fake_comment) fake_client = FakeIssueClient(comments_for=lambda url: list(fake_comments)) publish_analysis( analysis_root, retry_failed=retry_failed, github_client=fake_client, gitlab_client=fake_client, )