diffusers-pr-api / src /slop_farmer /reports /pr_search_service.py
evalstate's picture
evalstate HF Staff
Deploy Diffusers PR API
dbf7313 verified
from __future__ import annotations
import json
from collections.abc import Iterable, Mapping, Sequence
from contextlib import suppress
from pathlib import Path
from typing import Any, Protocol
from uuid import uuid4
from slop_farmer.config import PrSearchRefreshOptions, RepoRef, resolve_github_token
from slop_farmer.data.github_api import GitHubClient
from slop_farmer.data.normalize import normalize_pr_file, normalize_pull_request
from slop_farmer.data.search_duckdb import (
connect_pr_search_db,
fetch_rows,
get_candidate_cluster_rows,
get_cluster,
get_cluster_ids_for_prs,
get_cluster_members,
get_contributor,
get_contributor_pulls,
get_document,
get_feature,
get_pair_neighbor_row,
get_run_counts,
get_scope_run_artifact,
get_shared_cluster_ids,
get_similar_pr_rows,
insert_rows,
replace_active_run,
resolve_active_run,
update_run_status,
)
from slop_farmer.reports.pr_scope import PrScopeClusterOptions
from slop_farmer.reports.pr_search_scope import (
build_pr_scope_search_artifacts,
build_scope_feature_for_pull_request,
build_scope_feature_idf_for_indexed_documents,
iso_timestamp,
load_pr_search_snapshot,
rank_scope_cluster_candidates,
rank_scope_feature_matches,
resolve_pr_search_snapshot_dir,
scope_feature_pair_explanation,
scope_options_from_settings,
)
class ProbeClientLike(Protocol):
def get_pull_request(self, owner: str, repo: str, number: int) -> dict[str, Any]: ...
def iter_pull_files(self, owner: str, repo: str, number: int) -> Iterable[dict[str, Any]]: ...
def run_pr_search_refresh(options: PrSearchRefreshOptions) -> dict[str, Any]:
snapshot_dir = resolve_pr_search_snapshot_dir(options)
snapshot = load_pr_search_snapshot(snapshot_dir)
repo = str(snapshot["repo"])
db_path = resolve_pr_search_db_path(options.db, output_dir=options.output_dir)
started_at = iso_timestamp()
scope_options = PrScopeClusterOptions(
include_closed=options.include_closed,
include_drafts=options.include_drafts,
)
artifacts = build_pr_scope_search_artifacts(
snapshot["pull_requests"],
snapshot["pr_files"],
options=scope_options,
suppression_rules=options.cluster_suppression_rules,
limit_prs=options.limit_prs,
)
run_id = uuid4().hex
source_type = "hf_dataset_repo" if options.hf_repo_id else "local_snapshot"
connection = connect_pr_search_db(db_path)
try:
insert_rows(
connection,
"pr_search_runs",
[
{
"id": run_id,
"repo": repo,
"snapshot_id": snapshot["snapshot_id"],
"snapshot_dir": str(snapshot_dir),
"source_type": source_type,
"hf_repo_id": options.hf_repo_id,
"hf_revision": options.hf_revision,
"started_at": started_at,
"finished_at": None,
"status": "running",
"settings_json": artifacts["settings_json"],
"notes": None,
}
],
)
connection.execute("BEGIN")
created_at = iso_timestamp()
insert_rows(
connection,
"pr_search_documents",
_scoped_rows(artifacts["documents"], run_id=run_id, repo=repo),
)
insert_rows(
connection,
"pr_search_contributors",
_contributor_rows(
snapshot["contributors"],
run_id=run_id,
repo=repo,
snapshot_id=str(snapshot["snapshot_id"]),
),
)
insert_rows(
connection,
"pr_scope_features",
_scoped_rows(
artifacts["features"],
run_id=run_id,
repo=repo,
computed_at=created_at,
),
)
insert_rows(
connection,
"pr_scope_run_artifacts",
_scoped_rows(
[artifacts["run_artifact"]],
run_id=run_id,
repo=repo,
computed_at=created_at,
),
)
insert_rows(
connection,
"pr_scope_neighbors",
_scoped_rows(
artifacts["neighbors"],
run_id=run_id,
repo=repo,
created_at=created_at,
),
)
insert_rows(
connection,
"pr_scope_clusters",
_scoped_rows(
artifacts["clusters"],
run_id=run_id,
repo=repo,
created_at=created_at,
),
)
insert_rows(
connection,
"pr_scope_cluster_members",
_scoped_rows(artifacts["cluster_members"], run_id=run_id, repo=repo),
)
insert_rows(
connection,
"pr_scope_cluster_candidates",
_scoped_rows(artifacts["cluster_candidates"], run_id=run_id, repo=repo),
)
finished_at = iso_timestamp()
update_run_status(
connection,
run_id=run_id,
status="succeeded",
finished_at=finished_at,
)
if options.replace_active:
replace_active_run(
connection,
repo=repo,
run_id=run_id,
activated_at=finished_at,
)
connection.execute("COMMIT")
counts = get_run_counts(connection, run_id=run_id)
return {
"db_path": str(db_path),
"run_id": run_id,
"repo": repo,
"snapshot_id": snapshot["snapshot_id"],
"snapshot_dir": str(snapshot_dir),
"source_type": source_type,
"active_updated": bool(options.replace_active),
"row_counts": counts,
}
except Exception as exc:
with suppress(Exception):
connection.execute("ROLLBACK")
update_run_status(
connection,
run_id=run_id,
status="failed",
finished_at=iso_timestamp(),
notes=str(exc),
)
raise
finally:
connection.close()
def get_pr_search_status(db_path: Path, *, repo: str | None = None) -> dict[str, Any]:
connection = connect_pr_search_db(db_path, read_only=True)
try:
active_run = resolve_active_run(connection, repo=repo)
return {
**_without_json_fields(active_run),
"settings": _json_dict(active_run.get("settings_json")),
"row_counts": get_run_counts(connection, run_id=str(active_run["id"])),
}
finally:
connection.close()
def get_pr_search_similar(
db_path: Path,
*,
pr_number: int,
repo: str | None = None,
limit: int = 10,
) -> dict[str, Any]:
connection = connect_pr_search_db(db_path, read_only=True)
try:
active_run = resolve_active_run(connection, repo=repo)
run_id = str(active_run["id"])
document = _require_document(connection, run_id=run_id, pr_number=pr_number)
similar_rows = get_similar_pr_rows(
connection, run_id=run_id, pr_number=pr_number, limit=limit
)
cluster_ids_by_pr = get_cluster_ids_for_prs(
connection,
run_id=run_id,
pr_numbers=[int(row["neighbor_pr_number"]) for row in similar_rows],
)
results = []
for row in similar_rows:
results.append(
{
**_without_json_fields(row),
"neighbor_title": _require_document(
connection,
run_id=run_id,
pr_number=int(row["neighbor_pr_number"]),
)["title"],
"cluster_ids": cluster_ids_by_pr.get(int(row["neighbor_pr_number"]), []),
"shared_filenames": _json_list(row.get("shared_filenames_json")),
"shared_directories": _json_list(row.get("shared_directories_json")),
}
)
return {
"repo": active_run["repo"],
"snapshot_id": active_run["snapshot_id"],
"run_id": run_id,
"pr": document,
"similar_prs": results,
"similar_count": len(results),
}
finally:
connection.close()
def get_pr_search_candidate_clusters(
db_path: Path,
*,
pr_number: int,
repo: str | None = None,
limit: int = 5,
) -> dict[str, Any]:
connection = connect_pr_search_db(db_path, read_only=True)
try:
active_run = resolve_active_run(connection, repo=repo)
run_id = str(active_run["id"])
document = _require_document(connection, run_id=run_id, pr_number=pr_number)
rows = get_candidate_cluster_rows(
connection, run_id=run_id, pr_number=pr_number, limit=limit
)
candidates = []
for row in rows:
evidence = _json_dict(row.get("evidence_json"))
candidates.append(
{
**_without_json_fields(row),
"shared_filenames": _json_list(row.get("shared_filenames_json")),
"shared_directories": _json_list(row.get("shared_directories_json")),
"evidence": evidence,
"matched_member_pr_numbers": evidence.get("matched_member_pr_numbers") or [],
"reason": evidence.get("reason") or "",
}
)
return {
"repo": active_run["repo"],
"snapshot_id": active_run["snapshot_id"],
"run_id": run_id,
"pr": document,
"candidate_clusters": candidates,
"candidate_cluster_count": len(candidates),
}
finally:
connection.close()
def get_pr_search_contributor(
db_path: Path,
*,
author_login: str,
repo: str | None = None,
) -> dict[str, Any]:
connection = connect_pr_search_db(db_path, read_only=True)
try:
active_run = resolve_active_run(connection, repo=repo)
run_id = str(active_run["id"])
contributor = _require_contributor(connection, run_id=run_id, author_login=author_login)
pulls = _document_rows(
get_contributor_pulls(connection, run_id=run_id, author_login=author_login, limit=20)
)
return {
"repo": active_run["repo"],
"snapshot_id": active_run["snapshot_id"],
"run_id": run_id,
"contributor": contributor,
"pulls": pulls,
"pull_count": len(pulls),
}
finally:
connection.close()
def get_pr_search_contributor_pulls(
db_path: Path,
*,
author_login: str,
repo: str | None = None,
limit: int = 20,
) -> dict[str, Any]:
connection = connect_pr_search_db(db_path, read_only=True)
try:
active_run = resolve_active_run(connection, repo=repo)
run_id = str(active_run["id"])
contributor = _require_contributor(connection, run_id=run_id, author_login=author_login)
pulls = _document_rows(
get_contributor_pulls(connection, run_id=run_id, author_login=author_login, limit=limit)
)
return {
"repo": active_run["repo"],
"snapshot_id": active_run["snapshot_id"],
"run_id": run_id,
"contributor": contributor,
"pulls": pulls,
"pull_count": len(pulls),
}
finally:
connection.close()
def get_pr_search_pull_contributor(
db_path: Path,
*,
pr_number: int,
repo: str | None = None,
) -> dict[str, Any]:
connection = connect_pr_search_db(db_path, read_only=True)
try:
active_run = resolve_active_run(connection, repo=repo)
run_id = str(active_run["id"])
document = _require_document(connection, run_id=run_id, pr_number=pr_number)
author_login = str(document.get("author_login") or "").strip()
if not author_login:
raise ValueError(f"PR #{pr_number} does not have an indexed author_login.")
contributor = _require_contributor(connection, run_id=run_id, author_login=author_login)
return {
"repo": active_run["repo"],
"snapshot_id": active_run["snapshot_id"],
"run_id": run_id,
"pr": _without_json_fields(document),
"contributor": contributor,
}
finally:
connection.close()
def get_pr_search_similar_lookup(
db_path: Path,
*,
pr_number: int,
repo: str | None = None,
limit: int = 10,
mode: str = "auto",
client: ProbeClientLike | None = None,
) -> dict[str, Any]:
resolved_mode = _normalize_lookup_mode(mode)
if resolved_mode != "live":
try:
result = get_pr_search_similar(db_path, pr_number=pr_number, repo=repo, limit=limit)
except ValueError as exc:
if resolved_mode == "indexed" or not _is_index_miss(exc):
raise
else:
result["query"] = {
"pr_number": pr_number,
"mode_requested": resolved_mode,
"mode_used": "indexed",
"source": "active_index",
}
return result
live_result = probe_pr_search_live(
db_path,
pr_number=pr_number,
repo=repo,
limit=limit,
client=client,
)
return {
"repo": live_result["repo"],
"snapshot_id": live_result["snapshot_id"],
"run_id": live_result["run_id"],
"query": {
"pr_number": pr_number,
"mode_requested": resolved_mode,
"mode_used": "live",
"source": live_result["probe_source"]["provider"],
},
"pr": live_result["probe_pr"],
"probe_source": live_result["probe_source"],
"similar_prs": live_result["similar_prs"],
"similar_count": len(live_result["similar_prs"]),
}
def get_pr_search_clusters(
db_path: Path,
*,
pr_number: int,
repo: str | None = None,
limit: int = 5,
mode: str = "auto",
client: ProbeClientLike | None = None,
) -> dict[str, Any]:
resolved_mode = _normalize_lookup_mode(mode)
if resolved_mode != "live":
try:
result = _get_pr_search_clusters_indexed(
db_path,
pr_number=pr_number,
repo=repo,
limit=limit,
)
except ValueError as exc:
if resolved_mode == "indexed" or not _is_index_miss(exc):
raise
else:
result["query"] = {
"pr_number": pr_number,
"mode_requested": resolved_mode,
"mode_used": "indexed",
"source": "active_index",
}
return result
live_result = probe_pr_search_live(
db_path,
pr_number=pr_number,
repo=repo,
limit=limit,
client=client,
)
return {
"repo": live_result["repo"],
"snapshot_id": live_result["snapshot_id"],
"run_id": live_result["run_id"],
"query": {
"pr_number": pr_number,
"mode_requested": resolved_mode,
"mode_used": "live",
"source": live_result["probe_source"]["provider"],
},
"pr": live_result["probe_pr"],
"probe_source": live_result["probe_source"],
"assigned_clusters": [],
"candidate_clusters": live_result["candidate_clusters"],
"assigned_cluster_count": 0,
"candidate_cluster_count": len(live_result["candidate_clusters"]),
}
def get_pr_search_cluster(
db_path: Path,
*,
cluster_id: str,
repo: str | None = None,
) -> dict[str, Any]:
connection = connect_pr_search_db(db_path, read_only=True)
try:
active_run = resolve_active_run(connection, repo=repo)
run_id = str(active_run["id"])
cluster = get_cluster(connection, run_id=run_id, cluster_id=cluster_id)
if cluster is None:
raise ValueError(f"Cluster {cluster_id!r} was not found in the active run.")
members = get_cluster_members(connection, run_id=run_id, cluster_id=cluster_id)
return {
"repo": active_run["repo"],
"snapshot_id": active_run["snapshot_id"],
"run_id": run_id,
"cluster": _cluster_summary(cluster),
"members": members,
"member_count": len(members),
}
finally:
connection.close()
def list_pr_search_clusters(
db_path: Path,
*,
repo: str | None = None,
limit: int = 50,
) -> dict[str, Any]:
connection = connect_pr_search_db(db_path, read_only=True)
try:
active_run = resolve_active_run(connection, repo=repo)
run_id = str(active_run["id"])
rows = fetch_rows(
connection,
"""
SELECT
cl.*,
d.title AS representative_title,
d.html_url AS representative_html_url,
d.state AS representative_state,
d.draft AS representative_draft
FROM pr_scope_clusters AS cl
LEFT JOIN pr_search_documents AS d
ON d.run_id = cl.run_id AND d.pr_number = cl.representative_pr_number
WHERE cl.run_id = ?
ORDER BY cl.cluster_size DESC, cl.average_similarity DESC, cl.cluster_id
LIMIT ?
""",
[run_id, limit],
)
clusters = []
for index, row in enumerate(rows, start=1):
clusters.append({"rank": index, **_cluster_summary(row)})
return {
"repo": active_run["repo"],
"snapshot_id": active_run["snapshot_id"],
"run_id": run_id,
"clusters": clusters,
"cluster_count": len(clusters),
}
finally:
connection.close()
def explain_pr_search_pair(
db_path: Path,
*,
left_pr_number: int,
right_pr_number: int,
repo: str | None = None,
) -> dict[str, Any]:
connection = connect_pr_search_db(db_path, read_only=True)
try:
active_run = resolve_active_run(connection, repo=repo)
run_id = str(active_run["id"])
left_document = _require_document(connection, run_id=run_id, pr_number=left_pr_number)
right_document = _require_document(connection, run_id=run_id, pr_number=right_pr_number)
neighbor_row = get_pair_neighbor_row(
connection,
run_id=run_id,
left_pr_number=left_pr_number,
right_pr_number=right_pr_number,
)
shared_cluster_ids = get_shared_cluster_ids(
connection,
run_id=run_id,
left_pr_number=left_pr_number,
right_pr_number=right_pr_number,
)
if neighbor_row is not None:
shared_filenames = _json_list(neighbor_row.get("shared_filenames_json"))
shared_directories = _json_list(neighbor_row.get("shared_directories_json"))
return {
"repo": active_run["repo"],
"snapshot_id": active_run["snapshot_id"],
"run_id": run_id,
"materialized": True,
"left_pr": left_document,
"right_pr": right_document,
"pair": {
"similarity": neighbor_row["similarity"],
"content_similarity": neighbor_row["content_similarity"],
"size_similarity": neighbor_row["size_similarity"],
"breadth_similarity": neighbor_row["breadth_similarity"],
"concentration_similarity": neighbor_row["concentration_similarity"],
"shared_filenames": shared_filenames,
"shared_directories": shared_directories,
},
"shared_cluster_ids": shared_cluster_ids,
}
left_feature = _require_feature(connection, run_id=run_id, pr_number=left_pr_number)
right_feature = _require_feature(connection, run_id=run_id, pr_number=right_pr_number)
pair = scope_feature_pair_explanation(
left_feature,
right_feature,
options=scope_options_from_settings(_json_dict(active_run.get("settings_json"))),
)
return {
"repo": active_run["repo"],
"snapshot_id": active_run["snapshot_id"],
"run_id": run_id,
"materialized": False,
"left_pr": left_document,
"right_pr": right_document,
"pair": pair,
"shared_cluster_ids": shared_cluster_ids,
}
finally:
connection.close()
def probe_pr_search_live(
db_path: Path,
*,
pr_number: int,
repo: str | None = None,
limit: int = 10,
client: ProbeClientLike | None = None,
) -> dict[str, Any]:
connection = connect_pr_search_db(db_path, read_only=True)
try:
active_run = resolve_active_run(connection, repo=repo)
run_id = str(active_run["id"])
repo_slug = repo or str(active_run["repo"])
repo_ref = RepoRef.parse(repo_slug)
settings = scope_options_from_settings(_json_dict(active_run.get("settings_json")))
indexed_documents = fetch_rows(
connection,
"""
SELECT *
FROM pr_search_documents
WHERE run_id = ?
ORDER BY pr_number
""",
[run_id],
)
indexed_features = fetch_rows(
connection,
"""
SELECT *
FROM pr_scope_features
WHERE run_id = ?
ORDER BY pr_number
""",
[run_id],
)
run_artifact = get_scope_run_artifact(connection, run_id=run_id)
cluster_rows = fetch_rows(
connection,
"""
SELECT *
FROM pr_scope_clusters
WHERE run_id = ?
ORDER BY cluster_id
""",
[run_id],
)
cluster_member_rows = fetch_rows(
connection,
"""
SELECT cluster_id, pr_number
FROM pr_scope_cluster_members
WHERE run_id = ?
ORDER BY cluster_id, pr_number
""",
[run_id],
)
cluster_members: dict[str, list[int]] = {}
for row in cluster_member_rows:
cluster_members.setdefault(str(row["cluster_id"]), []).append(int(row["pr_number"]))
probe_client = client or GitHubClient(token=resolve_github_token())
extracted_at = iso_timestamp()
pr_detail = probe_client.get_pull_request(repo_ref.owner, repo_ref.name, pr_number)
pr_row = normalize_pull_request(
repo_ref.slug,
pr_detail,
pr_detail,
str(active_run["snapshot_id"]),
extracted_at,
)
pr_files = [
normalize_pr_file(
repo_ref.slug,
pr_number,
item,
str(active_run["snapshot_id"]),
extracted_at,
)
for item in probe_client.iter_pull_files(repo_ref.owner, repo_ref.name, pr_number)
]
feature_idf = (
_json_float_dict(run_artifact.get("idf_json")) if run_artifact is not None else {}
)
if not feature_idf:
snapshot = load_pr_search_snapshot(Path(str(active_run["snapshot_dir"])))
feature_idf = build_scope_feature_idf_for_indexed_documents(
indexed_documents,
snapshot["pr_files"],
options=settings,
)
query_feature = build_scope_feature_for_pull_request(
pr_row,
pr_files,
feature_idf=feature_idf,
options=settings,
)
similarity_rows = rank_scope_feature_matches(
query_feature,
indexed_features,
options=settings,
limit=limit,
)
cluster_ids_by_pr = get_cluster_ids_for_prs(
connection,
run_id=run_id,
pr_numbers=[int(row["right_pr_number"]) for row in similarity_rows],
)
live_similar_prs = []
for row in similarity_rows:
indexed_document = _require_document(
connection,
run_id=run_id,
pr_number=int(row["right_pr_number"]),
)
live_similar_prs.append(
{
**row,
"neighbor_pr_number": int(row["right_pr_number"]),
"neighbor_title": indexed_document["title"],
"cluster_ids": cluster_ids_by_pr.get(int(row["right_pr_number"]), []),
}
)
assigned_cluster_ids = set(
get_cluster_ids_for_prs(connection, run_id=run_id, pr_numbers=[pr_number]).get(
pr_number, []
)
)
candidate_clusters = rank_scope_cluster_candidates(
similarity_rows=similarity_rows,
clusters=cluster_rows,
cluster_members=cluster_members,
assigned_cluster_ids=assigned_cluster_ids,
limit=min(5, max(limit, 1)),
)
cluster_by_id = {str(row["cluster_id"]): row for row in cluster_rows}
for row in candidate_clusters:
cluster = cluster_by_id[row["cluster_id"]]
row.update(
{
"representative_pr_number": cluster["representative_pr_number"],
"cluster_size": cluster["cluster_size"],
"average_similarity": cluster["average_similarity"],
"summary": cluster["summary"],
"shared_filenames": _json_list(cluster.get("shared_filenames_json")),
"shared_directories": _json_list(cluster.get("shared_directories_json")),
"matched_member_pr_numbers": row["evidence"].get("matched_member_pr_numbers")
or [],
"reason": row["evidence"].get("reason") or "",
}
)
return {
"repo": repo_slug,
"snapshot_id": active_run["snapshot_id"],
"run_id": run_id,
"probe_pr": {
"pr_number": pr_number,
"title": pr_row.get("title") or "",
"html_url": pr_row.get("html_url"),
"base_ref": pr_row.get("base_ref"),
"changed_files": int(pr_row.get("changed_files") or 0),
},
"probe_source": _probe_source_metadata(
probe_client,
owner=repo_ref.owner,
repo=repo_ref.name,
number=pr_number,
),
"similar_prs": live_similar_prs,
"candidate_clusters": candidate_clusters,
}
finally:
connection.close()
def probe_pr_search_github(
db_path: Path,
*,
pr_number: int,
repo: str | None = None,
limit: int = 10,
client: ProbeClientLike | None = None,
) -> dict[str, Any]:
return probe_pr_search_live(
db_path,
pr_number=pr_number,
repo=repo,
limit=limit,
client=client,
)
def resolve_pr_search_db_path(db_path: Path | None, *, output_dir: Path) -> Path:
return (db_path or output_dir / "state" / "pr-search.duckdb").resolve()
def _scoped_rows(rows: list[dict[str, Any]], **extra: Any) -> list[dict[str, Any]]:
return [{**extra, **row} for row in rows]
def _get_pr_search_clusters_indexed(
db_path: Path,
*,
pr_number: int,
repo: str | None = None,
limit: int = 5,
) -> dict[str, Any]:
connection = connect_pr_search_db(db_path, read_only=True)
try:
active_run = resolve_active_run(connection, repo=repo)
run_id = str(active_run["id"])
document = _require_document(connection, run_id=run_id, pr_number=pr_number)
candidate_rows = get_candidate_cluster_rows(
connection,
run_id=run_id,
pr_number=pr_number,
limit=limit,
)
assigned_cluster_ids = get_cluster_ids_for_prs(
connection,
run_id=run_id,
pr_numbers=[pr_number],
).get(pr_number, [])
assigned_clusters = []
for cluster_id in assigned_cluster_ids:
cluster = get_cluster(connection, run_id=run_id, cluster_id=cluster_id)
if cluster is None:
continue
assigned_clusters.append(_cluster_summary(cluster))
candidates = []
for row in candidate_rows:
evidence = _json_dict(row.get("evidence_json"))
candidates.append(
{
**_without_json_fields(row),
"shared_filenames": _json_list(row.get("shared_filenames_json")),
"shared_directories": _json_list(row.get("shared_directories_json")),
"evidence": evidence,
"matched_member_pr_numbers": evidence.get("matched_member_pr_numbers") or [],
"reason": evidence.get("reason") or "",
}
)
return {
"repo": active_run["repo"],
"snapshot_id": active_run["snapshot_id"],
"run_id": run_id,
"pr": document,
"assigned_clusters": assigned_clusters,
"candidate_clusters": candidates,
"assigned_cluster_count": len(assigned_clusters),
"candidate_cluster_count": len(candidates),
}
finally:
connection.close()
def _require_document(connection: Any, *, run_id: str, pr_number: int) -> dict[str, Any]:
document = get_document(connection, run_id=run_id, pr_number=pr_number)
if document is None:
raise ValueError(f"PR #{pr_number} was not found in the active indexed universe.")
return document
def _require_feature(connection: Any, *, run_id: str, pr_number: int) -> dict[str, Any]:
feature = get_feature(connection, run_id=run_id, pr_number=pr_number)
if feature is None:
raise ValueError(f"No scope feature row was found for PR #{pr_number}.")
return feature
def _require_contributor(connection: Any, *, run_id: str, author_login: str) -> dict[str, Any]:
contributor = get_contributor(connection, run_id=run_id, author_login=author_login)
if contributor is None:
raise ValueError(
f"Contributor {author_login!r} was not found in the active indexed universe."
)
return _contributor_row(contributor)
def _json_list(raw: Any) -> list[str]:
if isinstance(raw, list):
return [str(item) for item in raw]
if isinstance(raw, str) and raw:
payload = json.loads(raw)
if isinstance(payload, list):
return [str(item) for item in payload]
return []
def _json_dict(raw: Any) -> dict[str, Any]:
if isinstance(raw, dict):
return dict(raw)
if isinstance(raw, str) and raw:
payload = json.loads(raw)
if isinstance(payload, dict):
return payload
return {}
def _json_float_dict(raw: Any) -> dict[str, float]:
payload = _json_dict(raw)
return {str(key): float(value) for key, value in payload.items()}
def _cluster_summary(cluster: dict[str, Any]) -> dict[str, Any]:
return {
**_without_json_fields(cluster),
"shared_filenames": _json_list(cluster.get("shared_filenames_json")),
"shared_directories": _json_list(cluster.get("shared_directories_json")),
}
def _without_json_fields(row: Mapping[str, Any]) -> dict[str, Any]:
return {str(key): value for key, value in row.items() if not str(key).endswith("_json")}
def _document_rows(rows: Sequence[Mapping[str, Any]]) -> list[dict[str, Any]]:
return [_without_json_fields(row) for row in rows]
def _contributor_rows(
rows: list[Mapping[str, Any]],
*,
run_id: str,
repo: str,
snapshot_id: str,
) -> list[dict[str, Any]]:
return [
{
"run_id": run_id,
"repo": repo,
"snapshot_id": snapshot_id,
"report_generated_at": row.get("report_generated_at"),
"window_days": row.get("window_days"),
"author_login": row.get("author_login"),
"name": row.get("name"),
"profile_url": row.get("profile_url"),
"repo_pull_requests_url": row.get("repo_pull_requests_url"),
"repo_issues_url": row.get("repo_issues_url"),
"repo_first_seen_at": row.get("repo_first_seen_at"),
"repo_last_seen_at": row.get("repo_last_seen_at"),
"repo_primary_artifact_count": row.get("repo_primary_artifact_count"),
"repo_artifact_count": row.get("repo_artifact_count"),
"snapshot_issue_count": row.get("snapshot_issue_count"),
"snapshot_pr_count": row.get("snapshot_pr_count"),
"snapshot_comment_count": row.get("snapshot_comment_count"),
"snapshot_review_count": row.get("snapshot_review_count"),
"snapshot_review_comment_count": row.get("snapshot_review_comment_count"),
"repo_association": row.get("repo_association"),
"new_to_repo": row.get("new_to_repo"),
"first_seen_in_snapshot": row.get("first_seen_in_snapshot"),
"report_reason": row.get("report_reason"),
"account_age_days": row.get("account_age_days"),
"young_account": row.get("young_account"),
"follow_through_score": row.get("follow_through_score"),
"breadth_score": row.get("breadth_score"),
"automation_risk_signal": row.get("automation_risk_signal"),
"heuristic_note": row.get("heuristic_note"),
"public_orgs_json": row.get("public_orgs"),
"visible_authored_pr_count": row.get("visible_authored_pr_count"),
"merged_pr_count": row.get("merged_pr_count"),
"closed_unmerged_pr_count": row.get("closed_unmerged_pr_count"),
"open_pr_count": row.get("open_pr_count"),
"merged_pr_rate": row.get("merged_pr_rate"),
"closed_unmerged_pr_rate": row.get("closed_unmerged_pr_rate"),
"still_open_pr_rate": row.get("still_open_pr_rate"),
"distinct_repos_with_authored_prs": row.get("distinct_repos_with_authored_prs"),
"distinct_repos_with_open_prs": row.get("distinct_repos_with_open_prs"),
"fetch_error": row.get("fetch_error"),
}
for row in rows
]
def _contributor_row(row: Mapping[str, Any]) -> dict[str, Any]:
return {
**_without_json_fields(row),
"public_orgs": _json_list(row.get("public_orgs_json")),
}
def _normalize_lookup_mode(mode: str) -> str:
normalized = mode.strip().lower()
if normalized not in {"auto", "indexed", "live"}:
raise ValueError(f"Unsupported mode {mode!r}; expected auto, indexed, or live.")
return normalized
def _is_index_miss(exc: ValueError) -> bool:
return "active indexed universe" in str(exc)
def _probe_source_metadata(
client: Any,
*,
owner: str,
repo: str,
number: int,
) -> dict[str, Any]:
metadata: dict[str, Any] = {"provider": _probe_provider(client)}
base_url = getattr(client, "base_url", None)
if isinstance(base_url, str) and base_url:
metadata["base_url"] = base_url
status_method = getattr(client, "get_pull_request_status", None)
if not callable(status_method):
return metadata
try:
status = status_method(owner, repo, number)
except Exception as exc:
metadata["status_error"] = str(exc)
return metadata
metadata.update(_normalize_probe_status(status))
return metadata
def _normalize_probe_status(raw: Any) -> dict[str, Any]:
if not isinstance(raw, dict):
return {}
indexed = raw.get("indexed")
if indexed is None:
indexed = raw.get("is_indexed")
freshness = raw.get("index_freshness")
if freshness is None:
freshness = raw.get("freshness")
last_indexed_at = raw.get("last_indexed_at")
if last_indexed_at is None:
last_indexed_at = raw.get("indexed_at")
normalized: dict[str, Any] = {}
if indexed is not None:
normalized["indexed"] = bool(indexed)
if freshness is not None:
normalized["index_freshness"] = str(freshness)
if last_indexed_at is not None:
normalized["last_indexed_at"] = str(last_indexed_at)
return normalized
def _probe_provider(client: Any) -> str:
provider = getattr(client, "provider", None)
if isinstance(provider, str) and provider:
return provider
if isinstance(client, GitHubClient):
return "github"
return "live"