from __future__ import annotations import json from collections.abc import Iterable, Mapping, Sequence from contextlib import suppress from pathlib import Path from typing import Any, Protocol from uuid import uuid4 from slop_farmer.config import PrSearchRefreshOptions, RepoRef, resolve_github_token from slop_farmer.data.github_api import GitHubClient from slop_farmer.data.normalize import normalize_pr_file, normalize_pull_request from slop_farmer.data.search_duckdb import ( connect_pr_search_db, fetch_rows, get_candidate_cluster_rows, get_cluster, get_cluster_ids_for_prs, get_cluster_members, get_contributor, get_contributor_pulls, get_document, get_feature, get_pair_neighbor_row, get_run_counts, get_scope_run_artifact, get_shared_cluster_ids, get_similar_pr_rows, insert_rows, replace_active_run, resolve_active_run, update_run_status, ) from slop_farmer.reports.pr_scope import PrScopeClusterOptions from slop_farmer.reports.pr_search_scope import ( build_pr_scope_search_artifacts, build_scope_feature_for_pull_request, build_scope_feature_idf_for_indexed_documents, iso_timestamp, load_pr_search_snapshot, rank_scope_cluster_candidates, rank_scope_feature_matches, resolve_pr_search_snapshot_dir, scope_feature_pair_explanation, scope_options_from_settings, ) class ProbeClientLike(Protocol): def get_pull_request(self, owner: str, repo: str, number: int) -> dict[str, Any]: ... def iter_pull_files(self, owner: str, repo: str, number: int) -> Iterable[dict[str, Any]]: ... def run_pr_search_refresh(options: PrSearchRefreshOptions) -> dict[str, Any]: snapshot_dir = resolve_pr_search_snapshot_dir(options) snapshot = load_pr_search_snapshot(snapshot_dir) repo = str(snapshot["repo"]) db_path = resolve_pr_search_db_path(options.db, output_dir=options.output_dir) started_at = iso_timestamp() scope_options = PrScopeClusterOptions( include_closed=options.include_closed, include_drafts=options.include_drafts, ) artifacts = build_pr_scope_search_artifacts( snapshot["pull_requests"], snapshot["pr_files"], options=scope_options, suppression_rules=options.cluster_suppression_rules, limit_prs=options.limit_prs, ) run_id = uuid4().hex source_type = "hf_dataset_repo" if options.hf_repo_id else "local_snapshot" connection = connect_pr_search_db(db_path) try: insert_rows( connection, "pr_search_runs", [ { "id": run_id, "repo": repo, "snapshot_id": snapshot["snapshot_id"], "snapshot_dir": str(snapshot_dir), "source_type": source_type, "hf_repo_id": options.hf_repo_id, "hf_revision": options.hf_revision, "started_at": started_at, "finished_at": None, "status": "running", "settings_json": artifacts["settings_json"], "notes": None, } ], ) connection.execute("BEGIN") created_at = iso_timestamp() insert_rows( connection, "pr_search_documents", _scoped_rows(artifacts["documents"], run_id=run_id, repo=repo), ) insert_rows( connection, "pr_search_contributors", _contributor_rows( snapshot["contributors"], run_id=run_id, repo=repo, snapshot_id=str(snapshot["snapshot_id"]), ), ) insert_rows( connection, "pr_scope_features", _scoped_rows( artifacts["features"], run_id=run_id, repo=repo, computed_at=created_at, ), ) insert_rows( connection, "pr_scope_run_artifacts", _scoped_rows( [artifacts["run_artifact"]], run_id=run_id, repo=repo, computed_at=created_at, ), ) insert_rows( connection, "pr_scope_neighbors", _scoped_rows( artifacts["neighbors"], run_id=run_id, repo=repo, created_at=created_at, ), ) insert_rows( connection, "pr_scope_clusters", _scoped_rows( artifacts["clusters"], run_id=run_id, repo=repo, created_at=created_at, ), ) insert_rows( connection, "pr_scope_cluster_members", _scoped_rows(artifacts["cluster_members"], run_id=run_id, repo=repo), ) insert_rows( connection, "pr_scope_cluster_candidates", _scoped_rows(artifacts["cluster_candidates"], run_id=run_id, repo=repo), ) finished_at = iso_timestamp() update_run_status( connection, run_id=run_id, status="succeeded", finished_at=finished_at, ) if options.replace_active: replace_active_run( connection, repo=repo, run_id=run_id, activated_at=finished_at, ) connection.execute("COMMIT") counts = get_run_counts(connection, run_id=run_id) return { "db_path": str(db_path), "run_id": run_id, "repo": repo, "snapshot_id": snapshot["snapshot_id"], "snapshot_dir": str(snapshot_dir), "source_type": source_type, "active_updated": bool(options.replace_active), "row_counts": counts, } except Exception as exc: with suppress(Exception): connection.execute("ROLLBACK") update_run_status( connection, run_id=run_id, status="failed", finished_at=iso_timestamp(), notes=str(exc), ) raise finally: connection.close() def get_pr_search_status(db_path: Path, *, repo: str | None = None) -> dict[str, Any]: connection = connect_pr_search_db(db_path, read_only=True) try: active_run = resolve_active_run(connection, repo=repo) return { **_without_json_fields(active_run), "settings": _json_dict(active_run.get("settings_json")), "row_counts": get_run_counts(connection, run_id=str(active_run["id"])), } finally: connection.close() def get_pr_search_similar( db_path: Path, *, pr_number: int, repo: str | None = None, limit: int = 10, ) -> dict[str, Any]: connection = connect_pr_search_db(db_path, read_only=True) try: active_run = resolve_active_run(connection, repo=repo) run_id = str(active_run["id"]) document = _require_document(connection, run_id=run_id, pr_number=pr_number) similar_rows = get_similar_pr_rows( connection, run_id=run_id, pr_number=pr_number, limit=limit ) cluster_ids_by_pr = get_cluster_ids_for_prs( connection, run_id=run_id, pr_numbers=[int(row["neighbor_pr_number"]) for row in similar_rows], ) results = [] for row in similar_rows: results.append( { **_without_json_fields(row), "neighbor_title": _require_document( connection, run_id=run_id, pr_number=int(row["neighbor_pr_number"]), )["title"], "cluster_ids": cluster_ids_by_pr.get(int(row["neighbor_pr_number"]), []), "shared_filenames": _json_list(row.get("shared_filenames_json")), "shared_directories": _json_list(row.get("shared_directories_json")), } ) return { "repo": active_run["repo"], "snapshot_id": active_run["snapshot_id"], "run_id": run_id, "pr": document, "similar_prs": results, "similar_count": len(results), } finally: connection.close() def get_pr_search_candidate_clusters( db_path: Path, *, pr_number: int, repo: str | None = None, limit: int = 5, ) -> dict[str, Any]: connection = connect_pr_search_db(db_path, read_only=True) try: active_run = resolve_active_run(connection, repo=repo) run_id = str(active_run["id"]) document = _require_document(connection, run_id=run_id, pr_number=pr_number) rows = get_candidate_cluster_rows( connection, run_id=run_id, pr_number=pr_number, limit=limit ) candidates = [] for row in rows: evidence = _json_dict(row.get("evidence_json")) candidates.append( { **_without_json_fields(row), "shared_filenames": _json_list(row.get("shared_filenames_json")), "shared_directories": _json_list(row.get("shared_directories_json")), "evidence": evidence, "matched_member_pr_numbers": evidence.get("matched_member_pr_numbers") or [], "reason": evidence.get("reason") or "", } ) return { "repo": active_run["repo"], "snapshot_id": active_run["snapshot_id"], "run_id": run_id, "pr": document, "candidate_clusters": candidates, "candidate_cluster_count": len(candidates), } finally: connection.close() def get_pr_search_contributor( db_path: Path, *, author_login: str, repo: str | None = None, ) -> dict[str, Any]: connection = connect_pr_search_db(db_path, read_only=True) try: active_run = resolve_active_run(connection, repo=repo) run_id = str(active_run["id"]) contributor = _require_contributor(connection, run_id=run_id, author_login=author_login) pulls = _document_rows( get_contributor_pulls(connection, run_id=run_id, author_login=author_login, limit=20) ) return { "repo": active_run["repo"], "snapshot_id": active_run["snapshot_id"], "run_id": run_id, "contributor": contributor, "pulls": pulls, "pull_count": len(pulls), } finally: connection.close() def get_pr_search_contributor_pulls( db_path: Path, *, author_login: str, repo: str | None = None, limit: int = 20, ) -> dict[str, Any]: connection = connect_pr_search_db(db_path, read_only=True) try: active_run = resolve_active_run(connection, repo=repo) run_id = str(active_run["id"]) contributor = _require_contributor(connection, run_id=run_id, author_login=author_login) pulls = _document_rows( get_contributor_pulls(connection, run_id=run_id, author_login=author_login, limit=limit) ) return { "repo": active_run["repo"], "snapshot_id": active_run["snapshot_id"], "run_id": run_id, "contributor": contributor, "pulls": pulls, "pull_count": len(pulls), } finally: connection.close() def get_pr_search_pull_contributor( db_path: Path, *, pr_number: int, repo: str | None = None, ) -> dict[str, Any]: connection = connect_pr_search_db(db_path, read_only=True) try: active_run = resolve_active_run(connection, repo=repo) run_id = str(active_run["id"]) document = _require_document(connection, run_id=run_id, pr_number=pr_number) author_login = str(document.get("author_login") or "").strip() if not author_login: raise ValueError(f"PR #{pr_number} does not have an indexed author_login.") contributor = _require_contributor(connection, run_id=run_id, author_login=author_login) return { "repo": active_run["repo"], "snapshot_id": active_run["snapshot_id"], "run_id": run_id, "pr": _without_json_fields(document), "contributor": contributor, } finally: connection.close() def get_pr_search_similar_lookup( db_path: Path, *, pr_number: int, repo: str | None = None, limit: int = 10, mode: str = "auto", client: ProbeClientLike | None = None, ) -> dict[str, Any]: resolved_mode = _normalize_lookup_mode(mode) if resolved_mode != "live": try: result = get_pr_search_similar(db_path, pr_number=pr_number, repo=repo, limit=limit) except ValueError as exc: if resolved_mode == "indexed" or not _is_index_miss(exc): raise else: result["query"] = { "pr_number": pr_number, "mode_requested": resolved_mode, "mode_used": "indexed", "source": "active_index", } return result live_result = probe_pr_search_live( db_path, pr_number=pr_number, repo=repo, limit=limit, client=client, ) return { "repo": live_result["repo"], "snapshot_id": live_result["snapshot_id"], "run_id": live_result["run_id"], "query": { "pr_number": pr_number, "mode_requested": resolved_mode, "mode_used": "live", "source": live_result["probe_source"]["provider"], }, "pr": live_result["probe_pr"], "probe_source": live_result["probe_source"], "similar_prs": live_result["similar_prs"], "similar_count": len(live_result["similar_prs"]), } def get_pr_search_clusters( db_path: Path, *, pr_number: int, repo: str | None = None, limit: int = 5, mode: str = "auto", client: ProbeClientLike | None = None, ) -> dict[str, Any]: resolved_mode = _normalize_lookup_mode(mode) if resolved_mode != "live": try: result = _get_pr_search_clusters_indexed( db_path, pr_number=pr_number, repo=repo, limit=limit, ) except ValueError as exc: if resolved_mode == "indexed" or not _is_index_miss(exc): raise else: result["query"] = { "pr_number": pr_number, "mode_requested": resolved_mode, "mode_used": "indexed", "source": "active_index", } return result live_result = probe_pr_search_live( db_path, pr_number=pr_number, repo=repo, limit=limit, client=client, ) return { "repo": live_result["repo"], "snapshot_id": live_result["snapshot_id"], "run_id": live_result["run_id"], "query": { "pr_number": pr_number, "mode_requested": resolved_mode, "mode_used": "live", "source": live_result["probe_source"]["provider"], }, "pr": live_result["probe_pr"], "probe_source": live_result["probe_source"], "assigned_clusters": [], "candidate_clusters": live_result["candidate_clusters"], "assigned_cluster_count": 0, "candidate_cluster_count": len(live_result["candidate_clusters"]), } def get_pr_search_cluster( db_path: Path, *, cluster_id: str, repo: str | None = None, ) -> dict[str, Any]: connection = connect_pr_search_db(db_path, read_only=True) try: active_run = resolve_active_run(connection, repo=repo) run_id = str(active_run["id"]) cluster = get_cluster(connection, run_id=run_id, cluster_id=cluster_id) if cluster is None: raise ValueError(f"Cluster {cluster_id!r} was not found in the active run.") members = get_cluster_members(connection, run_id=run_id, cluster_id=cluster_id) return { "repo": active_run["repo"], "snapshot_id": active_run["snapshot_id"], "run_id": run_id, "cluster": _cluster_summary(cluster), "members": members, "member_count": len(members), } finally: connection.close() def list_pr_search_clusters( db_path: Path, *, repo: str | None = None, limit: int = 50, ) -> dict[str, Any]: connection = connect_pr_search_db(db_path, read_only=True) try: active_run = resolve_active_run(connection, repo=repo) run_id = str(active_run["id"]) rows = fetch_rows( connection, """ SELECT cl.*, d.title AS representative_title, d.html_url AS representative_html_url, d.state AS representative_state, d.draft AS representative_draft FROM pr_scope_clusters AS cl LEFT JOIN pr_search_documents AS d ON d.run_id = cl.run_id AND d.pr_number = cl.representative_pr_number WHERE cl.run_id = ? ORDER BY cl.cluster_size DESC, cl.average_similarity DESC, cl.cluster_id LIMIT ? """, [run_id, limit], ) clusters = [] for index, row in enumerate(rows, start=1): clusters.append({"rank": index, **_cluster_summary(row)}) return { "repo": active_run["repo"], "snapshot_id": active_run["snapshot_id"], "run_id": run_id, "clusters": clusters, "cluster_count": len(clusters), } finally: connection.close() def explain_pr_search_pair( db_path: Path, *, left_pr_number: int, right_pr_number: int, repo: str | None = None, ) -> dict[str, Any]: connection = connect_pr_search_db(db_path, read_only=True) try: active_run = resolve_active_run(connection, repo=repo) run_id = str(active_run["id"]) left_document = _require_document(connection, run_id=run_id, pr_number=left_pr_number) right_document = _require_document(connection, run_id=run_id, pr_number=right_pr_number) neighbor_row = get_pair_neighbor_row( connection, run_id=run_id, left_pr_number=left_pr_number, right_pr_number=right_pr_number, ) shared_cluster_ids = get_shared_cluster_ids( connection, run_id=run_id, left_pr_number=left_pr_number, right_pr_number=right_pr_number, ) if neighbor_row is not None: shared_filenames = _json_list(neighbor_row.get("shared_filenames_json")) shared_directories = _json_list(neighbor_row.get("shared_directories_json")) return { "repo": active_run["repo"], "snapshot_id": active_run["snapshot_id"], "run_id": run_id, "materialized": True, "left_pr": left_document, "right_pr": right_document, "pair": { "similarity": neighbor_row["similarity"], "content_similarity": neighbor_row["content_similarity"], "size_similarity": neighbor_row["size_similarity"], "breadth_similarity": neighbor_row["breadth_similarity"], "concentration_similarity": neighbor_row["concentration_similarity"], "shared_filenames": shared_filenames, "shared_directories": shared_directories, }, "shared_cluster_ids": shared_cluster_ids, } left_feature = _require_feature(connection, run_id=run_id, pr_number=left_pr_number) right_feature = _require_feature(connection, run_id=run_id, pr_number=right_pr_number) pair = scope_feature_pair_explanation( left_feature, right_feature, options=scope_options_from_settings(_json_dict(active_run.get("settings_json"))), ) return { "repo": active_run["repo"], "snapshot_id": active_run["snapshot_id"], "run_id": run_id, "materialized": False, "left_pr": left_document, "right_pr": right_document, "pair": pair, "shared_cluster_ids": shared_cluster_ids, } finally: connection.close() def probe_pr_search_live( db_path: Path, *, pr_number: int, repo: str | None = None, limit: int = 10, client: ProbeClientLike | None = None, ) -> dict[str, Any]: connection = connect_pr_search_db(db_path, read_only=True) try: active_run = resolve_active_run(connection, repo=repo) run_id = str(active_run["id"]) repo_slug = repo or str(active_run["repo"]) repo_ref = RepoRef.parse(repo_slug) settings = scope_options_from_settings(_json_dict(active_run.get("settings_json"))) indexed_documents = fetch_rows( connection, """ SELECT * FROM pr_search_documents WHERE run_id = ? ORDER BY pr_number """, [run_id], ) indexed_features = fetch_rows( connection, """ SELECT * FROM pr_scope_features WHERE run_id = ? ORDER BY pr_number """, [run_id], ) run_artifact = get_scope_run_artifact(connection, run_id=run_id) cluster_rows = fetch_rows( connection, """ SELECT * FROM pr_scope_clusters WHERE run_id = ? ORDER BY cluster_id """, [run_id], ) cluster_member_rows = fetch_rows( connection, """ SELECT cluster_id, pr_number FROM pr_scope_cluster_members WHERE run_id = ? ORDER BY cluster_id, pr_number """, [run_id], ) cluster_members: dict[str, list[int]] = {} for row in cluster_member_rows: cluster_members.setdefault(str(row["cluster_id"]), []).append(int(row["pr_number"])) probe_client = client or GitHubClient(token=resolve_github_token()) extracted_at = iso_timestamp() pr_detail = probe_client.get_pull_request(repo_ref.owner, repo_ref.name, pr_number) pr_row = normalize_pull_request( repo_ref.slug, pr_detail, pr_detail, str(active_run["snapshot_id"]), extracted_at, ) pr_files = [ normalize_pr_file( repo_ref.slug, pr_number, item, str(active_run["snapshot_id"]), extracted_at, ) for item in probe_client.iter_pull_files(repo_ref.owner, repo_ref.name, pr_number) ] feature_idf = ( _json_float_dict(run_artifact.get("idf_json")) if run_artifact is not None else {} ) if not feature_idf: snapshot = load_pr_search_snapshot(Path(str(active_run["snapshot_dir"]))) feature_idf = build_scope_feature_idf_for_indexed_documents( indexed_documents, snapshot["pr_files"], options=settings, ) query_feature = build_scope_feature_for_pull_request( pr_row, pr_files, feature_idf=feature_idf, options=settings, ) similarity_rows = rank_scope_feature_matches( query_feature, indexed_features, options=settings, limit=limit, ) cluster_ids_by_pr = get_cluster_ids_for_prs( connection, run_id=run_id, pr_numbers=[int(row["right_pr_number"]) for row in similarity_rows], ) live_similar_prs = [] for row in similarity_rows: indexed_document = _require_document( connection, run_id=run_id, pr_number=int(row["right_pr_number"]), ) live_similar_prs.append( { **row, "neighbor_pr_number": int(row["right_pr_number"]), "neighbor_title": indexed_document["title"], "cluster_ids": cluster_ids_by_pr.get(int(row["right_pr_number"]), []), } ) assigned_cluster_ids = set( get_cluster_ids_for_prs(connection, run_id=run_id, pr_numbers=[pr_number]).get( pr_number, [] ) ) candidate_clusters = rank_scope_cluster_candidates( similarity_rows=similarity_rows, clusters=cluster_rows, cluster_members=cluster_members, assigned_cluster_ids=assigned_cluster_ids, limit=min(5, max(limit, 1)), ) cluster_by_id = {str(row["cluster_id"]): row for row in cluster_rows} for row in candidate_clusters: cluster = cluster_by_id[row["cluster_id"]] row.update( { "representative_pr_number": cluster["representative_pr_number"], "cluster_size": cluster["cluster_size"], "average_similarity": cluster["average_similarity"], "summary": cluster["summary"], "shared_filenames": _json_list(cluster.get("shared_filenames_json")), "shared_directories": _json_list(cluster.get("shared_directories_json")), "matched_member_pr_numbers": row["evidence"].get("matched_member_pr_numbers") or [], "reason": row["evidence"].get("reason") or "", } ) return { "repo": repo_slug, "snapshot_id": active_run["snapshot_id"], "run_id": run_id, "probe_pr": { "pr_number": pr_number, "title": pr_row.get("title") or "", "html_url": pr_row.get("html_url"), "base_ref": pr_row.get("base_ref"), "changed_files": int(pr_row.get("changed_files") or 0), }, "probe_source": _probe_source_metadata( probe_client, owner=repo_ref.owner, repo=repo_ref.name, number=pr_number, ), "similar_prs": live_similar_prs, "candidate_clusters": candidate_clusters, } finally: connection.close() def probe_pr_search_github( db_path: Path, *, pr_number: int, repo: str | None = None, limit: int = 10, client: ProbeClientLike | None = None, ) -> dict[str, Any]: return probe_pr_search_live( db_path, pr_number=pr_number, repo=repo, limit=limit, client=client, ) def resolve_pr_search_db_path(db_path: Path | None, *, output_dir: Path) -> Path: return (db_path or output_dir / "state" / "pr-search.duckdb").resolve() def _scoped_rows(rows: list[dict[str, Any]], **extra: Any) -> list[dict[str, Any]]: return [{**extra, **row} for row in rows] def _get_pr_search_clusters_indexed( db_path: Path, *, pr_number: int, repo: str | None = None, limit: int = 5, ) -> dict[str, Any]: connection = connect_pr_search_db(db_path, read_only=True) try: active_run = resolve_active_run(connection, repo=repo) run_id = str(active_run["id"]) document = _require_document(connection, run_id=run_id, pr_number=pr_number) candidate_rows = get_candidate_cluster_rows( connection, run_id=run_id, pr_number=pr_number, limit=limit, ) assigned_cluster_ids = get_cluster_ids_for_prs( connection, run_id=run_id, pr_numbers=[pr_number], ).get(pr_number, []) assigned_clusters = [] for cluster_id in assigned_cluster_ids: cluster = get_cluster(connection, run_id=run_id, cluster_id=cluster_id) if cluster is None: continue assigned_clusters.append(_cluster_summary(cluster)) candidates = [] for row in candidate_rows: evidence = _json_dict(row.get("evidence_json")) candidates.append( { **_without_json_fields(row), "shared_filenames": _json_list(row.get("shared_filenames_json")), "shared_directories": _json_list(row.get("shared_directories_json")), "evidence": evidence, "matched_member_pr_numbers": evidence.get("matched_member_pr_numbers") or [], "reason": evidence.get("reason") or "", } ) return { "repo": active_run["repo"], "snapshot_id": active_run["snapshot_id"], "run_id": run_id, "pr": document, "assigned_clusters": assigned_clusters, "candidate_clusters": candidates, "assigned_cluster_count": len(assigned_clusters), "candidate_cluster_count": len(candidates), } finally: connection.close() def _require_document(connection: Any, *, run_id: str, pr_number: int) -> dict[str, Any]: document = get_document(connection, run_id=run_id, pr_number=pr_number) if document is None: raise ValueError(f"PR #{pr_number} was not found in the active indexed universe.") return document def _require_feature(connection: Any, *, run_id: str, pr_number: int) -> dict[str, Any]: feature = get_feature(connection, run_id=run_id, pr_number=pr_number) if feature is None: raise ValueError(f"No scope feature row was found for PR #{pr_number}.") return feature def _require_contributor(connection: Any, *, run_id: str, author_login: str) -> dict[str, Any]: contributor = get_contributor(connection, run_id=run_id, author_login=author_login) if contributor is None: raise ValueError( f"Contributor {author_login!r} was not found in the active indexed universe." ) return _contributor_row(contributor) def _json_list(raw: Any) -> list[str]: if isinstance(raw, list): return [str(item) for item in raw] if isinstance(raw, str) and raw: payload = json.loads(raw) if isinstance(payload, list): return [str(item) for item in payload] return [] def _json_dict(raw: Any) -> dict[str, Any]: if isinstance(raw, dict): return dict(raw) if isinstance(raw, str) and raw: payload = json.loads(raw) if isinstance(payload, dict): return payload return {} def _json_float_dict(raw: Any) -> dict[str, float]: payload = _json_dict(raw) return {str(key): float(value) for key, value in payload.items()} def _cluster_summary(cluster: dict[str, Any]) -> dict[str, Any]: return { **_without_json_fields(cluster), "shared_filenames": _json_list(cluster.get("shared_filenames_json")), "shared_directories": _json_list(cluster.get("shared_directories_json")), } def _without_json_fields(row: Mapping[str, Any]) -> dict[str, Any]: return {str(key): value for key, value in row.items() if not str(key).endswith("_json")} def _document_rows(rows: Sequence[Mapping[str, Any]]) -> list[dict[str, Any]]: return [_without_json_fields(row) for row in rows] def _contributor_rows( rows: list[Mapping[str, Any]], *, run_id: str, repo: str, snapshot_id: str, ) -> list[dict[str, Any]]: return [ { "run_id": run_id, "repo": repo, "snapshot_id": snapshot_id, "report_generated_at": row.get("report_generated_at"), "window_days": row.get("window_days"), "author_login": row.get("author_login"), "name": row.get("name"), "profile_url": row.get("profile_url"), "repo_pull_requests_url": row.get("repo_pull_requests_url"), "repo_issues_url": row.get("repo_issues_url"), "repo_first_seen_at": row.get("repo_first_seen_at"), "repo_last_seen_at": row.get("repo_last_seen_at"), "repo_primary_artifact_count": row.get("repo_primary_artifact_count"), "repo_artifact_count": row.get("repo_artifact_count"), "snapshot_issue_count": row.get("snapshot_issue_count"), "snapshot_pr_count": row.get("snapshot_pr_count"), "snapshot_comment_count": row.get("snapshot_comment_count"), "snapshot_review_count": row.get("snapshot_review_count"), "snapshot_review_comment_count": row.get("snapshot_review_comment_count"), "repo_association": row.get("repo_association"), "new_to_repo": row.get("new_to_repo"), "first_seen_in_snapshot": row.get("first_seen_in_snapshot"), "report_reason": row.get("report_reason"), "account_age_days": row.get("account_age_days"), "young_account": row.get("young_account"), "follow_through_score": row.get("follow_through_score"), "breadth_score": row.get("breadth_score"), "automation_risk_signal": row.get("automation_risk_signal"), "heuristic_note": row.get("heuristic_note"), "public_orgs_json": row.get("public_orgs"), "visible_authored_pr_count": row.get("visible_authored_pr_count"), "merged_pr_count": row.get("merged_pr_count"), "closed_unmerged_pr_count": row.get("closed_unmerged_pr_count"), "open_pr_count": row.get("open_pr_count"), "merged_pr_rate": row.get("merged_pr_rate"), "closed_unmerged_pr_rate": row.get("closed_unmerged_pr_rate"), "still_open_pr_rate": row.get("still_open_pr_rate"), "distinct_repos_with_authored_prs": row.get("distinct_repos_with_authored_prs"), "distinct_repos_with_open_prs": row.get("distinct_repos_with_open_prs"), "fetch_error": row.get("fetch_error"), } for row in rows ] def _contributor_row(row: Mapping[str, Any]) -> dict[str, Any]: return { **_without_json_fields(row), "public_orgs": _json_list(row.get("public_orgs_json")), } def _normalize_lookup_mode(mode: str) -> str: normalized = mode.strip().lower() if normalized not in {"auto", "indexed", "live"}: raise ValueError(f"Unsupported mode {mode!r}; expected auto, indexed, or live.") return normalized def _is_index_miss(exc: ValueError) -> bool: return "active indexed universe" in str(exc) def _probe_source_metadata( client: Any, *, owner: str, repo: str, number: int, ) -> dict[str, Any]: metadata: dict[str, Any] = {"provider": _probe_provider(client)} base_url = getattr(client, "base_url", None) if isinstance(base_url, str) and base_url: metadata["base_url"] = base_url status_method = getattr(client, "get_pull_request_status", None) if not callable(status_method): return metadata try: status = status_method(owner, repo, number) except Exception as exc: metadata["status_error"] = str(exc) return metadata metadata.update(_normalize_probe_status(status)) return metadata def _normalize_probe_status(raw: Any) -> dict[str, Any]: if not isinstance(raw, dict): return {} indexed = raw.get("indexed") if indexed is None: indexed = raw.get("is_indexed") freshness = raw.get("index_freshness") if freshness is None: freshness = raw.get("freshness") last_indexed_at = raw.get("last_indexed_at") if last_indexed_at is None: last_indexed_at = raw.get("indexed_at") normalized: dict[str, Any] = {} if indexed is not None: normalized["indexed"] = bool(indexed) if freshness is not None: normalized["index_freshness"] = str(freshness) if last_indexed_at is not None: normalized["last_indexed_at"] = str(last_indexed_at) return normalized def _probe_provider(client: Any) -> str: provider = getattr(client, "provider", None) if isinstance(provider, str) and provider: return provider if isinstance(client, GitHubClient): return "github" return "live"