vinay-pepakayala's picture
Upload folder using huggingface_hub
f80c901 verified
from __future__ import annotations
import json
import os
import re
from pathlib import Path
from typing import Any, Dict, List, Optional, Sequence, Union
from urllib.request import Request, urlopen
try:
from GitHubIssueTriage.models import (
ActionType,
Difficulty,
DuplicateCandidate,
GoalType,
HiddenGradingTarget,
IssueComment,
IssueSnapshot,
IssueStatus,
IssueTriageState,
Priority,
RepoRules,
Severity,
TaskSpec,
TimelineEvent,
build_initial_state,
)
except ImportError: # pragma: no cover
from models import (
ActionType,
Difficulty,
DuplicateCandidate,
GoalType,
HiddenGradingTarget,
IssueComment,
IssueSnapshot,
IssueStatus,
IssueTriageState,
Priority,
RepoRules,
Severity,
TaskSpec,
TimelineEvent,
build_initial_state,
)
JsonLike = Dict[str, Any]
def _validate_model(model_cls, data: Any):
validator = getattr(model_cls, "model_validate", None)
if callable(validator):
return validator(data)
parser = getattr(model_cls, "parse_obj", None)
if callable(parser):
return parser(data)
raise AttributeError(f"{model_cls.__name__} does not support model validation.")
def _default_allowed_actions() -> List[ActionType]:
return [
ActionType.READ_ISSUE,
ActionType.READ_REPO_RULES,
ActionType.READ_LABEL_DEFINITIONS,
ActionType.READ_TEAM_ROUTING,
ActionType.READ_ASSIGNEE_POOL,
ActionType.READ_MILESTONES,
ActionType.SEARCH_SIMILAR_ISSUES,
ActionType.ADD_LABEL,
ActionType.REMOVE_LABEL,
ActionType.ASSIGN_USER,
ActionType.SET_PRIORITY,
ActionType.SET_MILESTONE,
ActionType.COMMENT,
ActionType.REQUEST_INFO,
ActionType.PROVIDE_INFO,
ActionType.MARK_DUPLICATE,
ActionType.CLOSE_ISSUE,
ActionType.REOPEN_ISSUE,
ActionType.NOOP,
]
def _slugify(text: str) -> str:
slug = re.sub(r"[^a-zA-Z0-9_]+", "_", str(text)).strip("_").lower()
return slug or "item"
_GITHUB_ISSUE_WEB_RE = re.compile(
r"^https?://github\.com/(?P<owner>[^/]+)/(?P<repo>[^/]+)/issues/(?P<number>\d+)(?:/.*)?$"
)
_GITHUB_BLOB_RE = re.compile(
r"^https?://github\.com/(?P<owner>[^/]+)/(?P<repo>[^/]+)/blob/(?P<branch>[^/]+)/(?P<path>.+)$"
)
def _is_url(value: Union[str, Path]) -> bool:
return isinstance(value, str) and value.startswith(("http://", "https://"))
def _headers() -> Dict[str, str]:
headers = {
"User-Agent": "openenv-github-issue-triage-loader/1.0",
"Accept": "application/vnd.github+json, application/json",
}
token = os.getenv("GITHUB_TOKEN") or os.getenv("GH_TOKEN") or os.getenv("HF_TOKEN")
if token:
headers["Authorization"] = f"Bearer {token}"
return headers
def _load_text_source(source: Union[str, Path]) -> str:
if _is_url(source):
req = Request(str(source), headers=_headers())
with urlopen(req, timeout=30) as resp:
return resp.read().decode("utf-8")
with Path(source).open("r", encoding="utf-8") as f:
return f.read()
def _load_json_source(source: Union[str, Path]) -> Any:
return json.loads(_load_text_source(source))
def _unwrap_payload(data: Any, key: str) -> List[Any]:
if isinstance(data, list):
return data
if isinstance(data, dict):
if key in data and isinstance(data[key], list):
return data[key]
if key in data and isinstance(data[key], dict):
return [data[key]]
raise ValueError(f"Unsupported JSON shape. Expected a list or a wrapper with key '{key}'.")
def _normalize_repo_rules_payload(data: Any) -> JsonLike:
if isinstance(data, dict) and "repo_rules" in data and isinstance(data["repo_rules"], dict):
return data["repo_rules"]
if isinstance(data, dict):
return data
raise ValueError("repo_rules source must be a JSON object.")
def _convert_blob_url_to_raw(url: str) -> Optional[str]:
m = _GITHUB_BLOB_RE.match(url)
if not m:
return None
owner = m.group("owner")
repo = m.group("repo")
branch = m.group("branch")
path = m.group("path")
return f"https://raw.githubusercontent.com/{owner}/{repo}/{branch}/{path}"
def _github_issue_api_url_from_web_url(url: str) -> Optional[str]:
m = _GITHUB_ISSUE_WEB_RE.match(url)
if not m:
return None
owner = m.group("owner")
repo = m.group("repo")
number = m.group("number")
return f"https://api.github.com/repos/{owner}/{repo}/issues/{number}"
def _fetch_json(url: str) -> Any:
req = Request(url, headers=_headers())
with urlopen(req, timeout=30) as resp:
return json.loads(resp.read().decode("utf-8"))
def _load_json_maybe_github(source: Union[str, Path]) -> Any:
"""
Accepts:
- local JSON file path
- raw GitHub JSON URL
- github.com blob URL
- any direct JSON URL
"""
if not _is_url(source):
return _load_json_source(source)
url = str(source)
raw_blob = _convert_blob_url_to_raw(url)
if raw_blob is not None:
url = raw_blob
return _fetch_json(url)
def _parse_issue_comments(raw_comments: Any) -> List[IssueComment]:
comments: List[IssueComment] = []
if not isinstance(raw_comments, list):
return comments
for item in raw_comments:
if not isinstance(item, dict):
continue
comments.append(
IssueComment(
comment_id=str(item.get("comment_id") or item.get("id") or f"c_{len(comments)}"),
author=str(
item.get("author")
or (item.get("user") or {}).get("login")
or item.get("user_login")
or "unknown"
),
body=str(item.get("body") or ""),
created_at=str(item.get("created_at") or item.get("createdAt") or ""),
edited_at=item.get("edited_at") or item.get("updated_at"),
internal=bool(item.get("internal", False)),
)
)
return comments
def _parse_timeline_events(raw_events: Any) -> List[TimelineEvent]:
events: List[TimelineEvent] = []
if not isinstance(raw_events, list):
return events
for item in raw_events:
if not isinstance(item, dict):
continue
payload = item.get("payload")
if not isinstance(payload, dict):
payload = {}
events.append(
TimelineEvent(
event_id=str(item.get("event_id") or item.get("id") or f"t_{len(events)}"),
event_type=str(item.get("event_type") or item.get("type") or "event"),
actor=str(
item.get("actor")
or (item.get("user") or {}).get("login")
or item.get("user_login")
or "unknown"
),
created_at=str(item.get("created_at") or item.get("createdAt") or ""),
payload={str(k): str(v) for k, v in payload.items()},
)
)
return events
def _issue_status(value: Any) -> IssueStatus:
raw = str(value or "open").lower()
if raw == "closed":
return IssueStatus.CLOSED
return IssueStatus.OPEN
def _priority(value: Any) -> Optional[Priority]:
if value is None:
return None
try:
return Priority(str(value).lower())
except Exception:
return None
def _severity(value: Any) -> Optional[Severity]:
if value is None:
return None
try:
return Severity(str(value).lower())
except Exception:
return None
def _normalize_issue_snapshot(data: JsonLike) -> IssueSnapshot:
"""
Accepts either:
- your internal IssueSnapshot shape
- GitHub REST issue payload
- a small custom JSON issue object
"""
issue_url = data.get("issue_url") or data.get("html_url") or data.get("url")
labels_raw = data.get("labels", [])
labels: List[str] = []
if isinstance(labels_raw, list):
for item in labels_raw:
if isinstance(item, str):
labels.append(item)
elif isinstance(item, dict):
labels.append(str(item.get("name") or item.get("label") or ""))
labels = [x for x in labels if x]
assignees_raw = data.get("assignees", [])
assignees: List[str] = []
if isinstance(assignees_raw, list):
for item in assignees_raw:
if isinstance(item, str):
assignees.append(item)
elif isinstance(item, dict):
assignees.append(str(item.get("login") or item.get("username") or ""))
assignees = [x for x in assignees if x]
comments = _parse_issue_comments(data.get("comments", []))
timeline = _parse_timeline_events(data.get("timeline", []))
linked_duplicates_raw = data.get("linked_duplicates", [])
linked_duplicates = [str(x) for x in linked_duplicates_raw] if isinstance(linked_duplicates_raw, list) else []
milestone_value = data.get("milestone")
if isinstance(milestone_value, dict):
milestone_value = milestone_value.get("title") or milestone_value.get("name")
repo_value = data.get("repo_id") or data.get("repository_id") or ""
if not repo_value:
repository = data.get("repository")
if isinstance(repository, dict):
repo_value = repository.get("full_name") or repository.get("name") or ""
metadata = data.get("metadata")
if not isinstance(metadata, dict):
metadata = {}
return IssueSnapshot(
issue_id=str(data.get("issue_id") or data.get("number") or data.get("id")),
repo_id=str(repo_value),
issue_url=str(issue_url) if issue_url else None,
title=str(data.get("title") or ""),
body=str(data.get("body") or ""),
author=str(
data.get("author")
or (data.get("user") or {}).get("login")
or data.get("user_login")
or "unknown"
),
created_at=str(data.get("created_at") or data.get("createdAt") or ""),
updated_at=data.get("updated_at") or data.get("updatedAt"),
status=_issue_status(data.get("status") or data.get("state")),
labels=labels,
assignees=assignees,
milestone=str(milestone_value) if milestone_value else None,
priority=_priority(data.get("priority")),
severity=_severity(data.get("severity")),
component=(str(data.get("component")) if data.get("component") is not None else None),
comments=comments,
timeline=timeline,
linked_duplicates=linked_duplicates,
is_locked=bool(data.get("is_locked", False)),
metadata={str(k): str(v) for k, v in metadata.items()},
)
def _fetch_github_issue(issue_url: str) -> JsonLike:
api_url = _github_issue_api_url_from_web_url(issue_url)
if api_url is None:
raise ValueError(f"Not a supported GitHub issue URL: {issue_url}")
issue_payload = _fetch_json(api_url)
comments_url = issue_payload.get("comments_url")
comments: List[Any] = []
if comments_url:
try:
comments = _fetch_json(comments_url)
except Exception:
comments = []
normalized: JsonLike = dict(issue_payload)
normalized["issue_url"] = issue_url
normalized["comments"] = comments if isinstance(comments, list) else []
normalized.setdefault(
"repo_id",
issue_payload.get("repository_url")
or (issue_payload.get("repository") or {}).get("full_name")
or "",
)
normalized.setdefault("issue_id", issue_payload.get("number") or issue_payload.get("id"))
normalized.setdefault("author", (issue_payload.get("user") or {}).get("login", "unknown"))
normalized.setdefault("status", issue_payload.get("state", "open"))
normalized.setdefault("labels", issue_payload.get("labels", []))
normalized.setdefault("assignees", issue_payload.get("assignees", []))
normalized.setdefault("milestone", issue_payload.get("milestone"))
normalized.setdefault("body", issue_payload.get("body", ""))
normalized.setdefault("title", issue_payload.get("title", ""))
normalized.setdefault("created_at", issue_payload.get("created_at", ""))
normalized.setdefault("updated_at", issue_payload.get("updated_at"))
return normalized
def _load_issue_item(item: Any, *, live_github: bool = False) -> IssueSnapshot:
if isinstance(item, IssueSnapshot):
return item.model_copy(deep=True)
if isinstance(item, str):
if _is_url(item):
if live_github and _GITHUB_ISSUE_WEB_RE.match(item):
return _normalize_issue_snapshot(_fetch_github_issue(item))
data = _load_json_maybe_github(item)
if isinstance(data, dict):
return _normalize_issue_snapshot(data)
raise ValueError(f"Issue URL did not resolve to a JSON object: {item}")
# Not a URL, treat as file path
data = _load_json_maybe_github(item)
if isinstance(data, dict):
if "issues" in data and isinstance(data["issues"], list) and data["issues"]:
# Assume issues.json format, pick the first issue
issue_data = data["issues"][0]
else:
# Assume single issue dict
issue_data = data
return _normalize_issue_snapshot(issue_data)
raise ValueError(f"Issue file did not contain a JSON object: {item}")
if isinstance(item, dict):
issue_url = item.get("issue_url") or item.get("url")
if live_github and isinstance(issue_url, str) and _GITHUB_ISSUE_WEB_RE.match(issue_url):
return _normalize_issue_snapshot(_fetch_github_issue(issue_url))
return _normalize_issue_snapshot(item)
raise ValueError(f"Unsupported issue source item: {type(item).__name__}")
def load_repo_rules(repo_rules_path: Union[str, Path]) -> RepoRules:
raw = _load_json_maybe_github(repo_rules_path)
payload = _normalize_repo_rules_payload(raw)
if not isinstance(payload, dict):
raise ValueError("repo_rules must be a JSON object.")
return _validate_model(RepoRules, payload)
def load_tasks(tasks_path: Union[str, Path]) -> List[TaskSpec]:
raw = _load_json_maybe_github(tasks_path)
task_items = _unwrap_payload(raw, "tasks")
task_field_names = set(TaskSpec.model_fields.keys())
tasks: List[TaskSpec] = []
for item in task_items:
if not isinstance(item, dict):
continue
task_data = {k: v for k, v in item.items() if k in task_field_names}
tasks.append(_validate_model(TaskSpec, task_data))
return tasks
def load_issues(issues_path: Union[str, Path], *, live_github: bool = False) -> List[IssueSnapshot]:
raw = _load_json_maybe_github(issues_path)
if isinstance(raw, list):
return [_load_issue_item(item, live_github=live_github) for item in raw]
if isinstance(raw, dict) and "issues" in raw:
issues_raw = raw["issues"]
if isinstance(issues_raw, list):
return [_load_issue_item(item, live_github=live_github) for item in issues_raw]
if isinstance(issues_raw, dict):
return [_load_issue_item(issues_raw, live_github=live_github)]
if isinstance(raw, dict):
return [_load_issue_item(raw, live_github=live_github)]
raise ValueError("issues source must be a list, an object with key 'issues', or a single issue object.")
def _build_issue_index(issues: Sequence[IssueSnapshot]) -> Dict[str, IssueSnapshot]:
index: Dict[str, IssueSnapshot] = {}
for issue in issues:
index[issue.issue_id] = issue
return index
def _parse_hidden_target(raw_task: dict) -> Optional[HiddenGradingTarget]:
hidden = raw_task.get("hidden_target")
if not hidden:
return None
if isinstance(hidden, HiddenGradingTarget):
return hidden.model_copy(deep=True)
if isinstance(hidden, dict):
return _validate_model(HiddenGradingTarget, hidden)
raise ValueError("hidden_target must be a dict or HiddenGradingTarget.")
def _parse_candidate_duplicates(raw_task: dict) -> List[DuplicateCandidate]:
raw_candidates = raw_task.get("candidate_duplicates") or []
if not isinstance(raw_candidates, list):
return []
candidates: List[DuplicateCandidate] = []
for item in raw_candidates:
if isinstance(item, DuplicateCandidate):
candidates.append(item.model_copy(deep=True))
elif isinstance(item, dict):
candidates.append(_validate_model(DuplicateCandidate, item))
return candidates
def _infer_goal_type(issue: IssueSnapshot) -> GoalType:
if issue.linked_duplicates:
return GoalType.DUPLICATE_RESOLUTION
body = (issue.body or "").strip().lower()
uncertain_markers = ("not sure", "don't know", "unknown", "intermittent", "cannot reproduce")
if any(marker in body for marker in uncertain_markers):
return GoalType.NEEDS_INFO
return GoalType.TRIAGE_ONLY
def _infer_difficulty(issue: IssueSnapshot, goal_type: GoalType) -> Difficulty:
if goal_type == GoalType.DUPLICATE_RESOLUTION:
return Difficulty.HARD
if goal_type == GoalType.NEEDS_INFO:
return Difficulty.MEDIUM
if issue.severity == Severity.CRITICAL or issue.priority == Priority.P0:
return Difficulty.MEDIUM
return Difficulty.EASY
def _success_criteria_for_goal(goal_type: GoalType) -> List[str]:
if goal_type == GoalType.DUPLICATE_RESOLUTION:
return ["duplicate", "close", "labels"]
if goal_type == GoalType.NEEDS_INFO:
return ["request_info", "labels", "status"]
return ["labels", "assignee", "priority", "milestone"]
def _auto_task_from_issue(issue: IssueSnapshot, existing_ids: set[str]) -> Dict[str, Any]:
goal_type = _infer_goal_type(issue)
difficulty = _infer_difficulty(issue, goal_type)
repo_slug = _slugify(issue.repo_id or "repo")
issue_slug = _slugify(issue.issue_id or "issue")
base_task_id = f"auto_{repo_slug}_{issue_slug}"
task_id = base_task_id
suffix = 2
while task_id in existing_ids:
task_id = f"{base_task_id}_{suffix}"
suffix += 1
existing_ids.add(task_id)
return {
"episode_id": f"ep_{task_id}",
"task_id": task_id,
"difficulty": difficulty.value,
"goal_type": goal_type.value,
"repo_id": issue.repo_id,
"issue_id": issue.issue_id,
"max_steps": 10,
"success_criteria": _success_criteria_for_goal(goal_type),
"allowed_actions": [action.value for action in _default_allowed_actions()],
"hidden_grading_flags": {},
}
def _generate_tasks_from_issues(issues: Sequence[IssueSnapshot]) -> List[Dict[str, Any]]:
generated: List[Dict[str, Any]] = []
seen_ids: set[str] = set()
for issue in issues:
generated.append(_auto_task_from_issue(issue, seen_ids))
return generated
def _generate_hidden_target_from_issue(issue: IssueSnapshot) -> HiddenGradingTarget:
"""
Auto-generate a HiddenGradingTarget from issue metadata and comments.
This extracts:
- gold_labels: from issue.labels and inferred from priority/severity/component
- gold_priority: from issue.priority or extracted from comments
- gold_severity: from issue.severity
- gold_component: from issue.component
- gold_assignee: from first assignee if available
"""
gold_labels: List[str] = []
# Extract explicit labels from the issue
if issue.labels:
gold_labels.extend(issue.labels)
# Infer labels from scalar fields
if issue.priority:
gold_labels.append(f"priority:{issue.priority.value}")
if issue.severity:
gold_labels.append(f"severity:{issue.severity.value}")
if issue.component:
gold_labels.append(f"component:{issue.component}")
# Extract priority (can be overridden by comments)
gold_priority = issue.priority
# Try to extract priority from comments if not already set
if not gold_priority and issue.comments:
for comment in issue.comments:
# Look for priority mentions in comment body
comment_lower = comment.body.lower()
for priority in Priority:
if priority.value in comment_lower:
gold_priority = priority
break
if gold_priority:
break
# Extract first assignee if available
gold_assignee = issue.assignees[0] if issue.assignees else None
return HiddenGradingTarget(
gold_labels=gold_labels,
gold_assignee=gold_assignee,
gold_priority=gold_priority,
gold_milestone=issue.milestone,
gold_severity=issue.severity,
gold_component=issue.component,
gold_duplicate_issue_id=issue.linked_duplicates[0] if issue.linked_duplicates else None,
gold_close_reason=None,
required_missing_fields=[],
expected_requests=[],
expected_comment_keywords=[],
expected_response_style=None,
)
def load_episode_bundle(
*,
repo_rules_path: Union[str, Path],
tasks_path: Optional[Union[str, Path]] = None,
issues_path: Union[str, Path],
live_github: bool = False,
) -> List[IssueTriageState]:
"""
Main loader used by the environment.
Supports:
- local JSON files
- GitHub raw URLs
- github.com blob URLs
- single GitHub issue URLs inside issues.json or issue entries
"""
repo_rules = load_repo_rules(repo_rules_path)
issues = load_issues(issues_path, live_github=live_github)
issue_index = _build_issue_index(issues)
task_items: List[Any]
if tasks_path is None:
task_items = _generate_tasks_from_issues(issues)
else:
try:
tasks_raw = _load_json_maybe_github(tasks_path)
task_items = _unwrap_payload(tasks_raw, "tasks")
except FileNotFoundError:
task_items = _generate_tasks_from_issues(issues)
if not task_items:
task_items = _generate_tasks_from_issues(issues)
episodes: List[IssueTriageState] = []
task_field_names = set(TaskSpec.model_fields.keys())
for raw_task in task_items:
if not isinstance(raw_task, dict):
continue
task_data = {k: v for k, v in raw_task.items() if k in task_field_names}
task = _validate_model(TaskSpec, task_data)
if task.issue_id not in issue_index:
raise ValueError(
f"Issue {task.issue_id!r} referenced by task {task.task_id!r} was not found in issues source."
)
issue = issue_index[task.issue_id].model_copy(deep=True)
episode_id = str(raw_task.get("episode_id") or f"ep_{task.task_id}")
hidden_target = _parse_hidden_target(raw_task)
if hidden_target is None:
hidden_target = _generate_hidden_target_from_issue(issue)
candidate_duplicates = _parse_candidate_duplicates(raw_task)
state = build_initial_state(
episode_id=episode_id,
task=task,
repo_rules=repo_rules,
issue=issue,
candidate_duplicates=candidate_duplicates,
hidden_target=hidden_target,
)
episodes.append(state)
return episodes
def load_episode_bundle_from_paths(
data_dir: Union[str, Path],
*,
live_github: bool = False,
) -> List[IssueTriageState]:
"""
Convenience helper when your data is stored in a folder like:
data/
repo_rules.json
tasks.json
issues.json
"""
base = Path(data_dir)
repo_rules_path = base / "repo_rules.json"
tasks_path = base / "tasks.json"
issues_path = base / "issues.json"
missing = [str(p) for p in [repo_rules_path, issues_path] if not p.exists()]
if missing:
raise FileNotFoundError(f"Missing required files: {', '.join(missing)}")
return load_episode_bundle(
repo_rules_path=repo_rules_path,
tasks_path=tasks_path if tasks_path.exists() else None,
issues_path=issues_path,
live_github=live_github,
)
def load_single_episode(
*,
repo_rules_path: Union[str, Path],
task: dict,
issue: Union[dict, str],
candidate_duplicates: Optional[List[dict]] = None,
live_github: bool = False,
) -> IssueTriageState:
"""
Helper for tests, ad-hoc episodes, or GitHub-URL-backed issue data.
"""
repo_rules = load_repo_rules(repo_rules_path)
task_field_names = set(TaskSpec.model_fields.keys())
task_data = {k: v for k, v in task.items() if k in task_field_names}
task_obj = _validate_model(TaskSpec, task_data)
issue_obj = _load_issue_item(issue, live_github=live_github)
dup_objs = [_validate_model(DuplicateCandidate, x) for x in (candidate_duplicates or [])]
hidden_target = _parse_hidden_target(task)
if hidden_target is None:
hidden_target = _generate_hidden_target_from_issue(issue_obj)
return build_initial_state(
episode_id=str(task.get("episode_id") or f"ep_{task_obj.task_id}"),
task=task_obj,
repo_rules=repo_rules,
issue=issue_obj,
candidate_duplicates=dup_objs,
hidden_target=hidden_target,
)
def load_episode_from_source(
*,
repo_rules_path: Union[str, Path],
issue_source: Union[str, Path, Dict[str, Any]],
live_github: bool = False,
task_id: Optional[str] = None,
max_steps: int = 10,
) -> IssueTriageState:
"""
Build a single episode directly from repo rules + one issue source.
This is the no-tasks.json path.
"""
repo_rules = load_repo_rules(repo_rules_path)
if isinstance(issue_source, dict):
issue = _normalize_issue_snapshot(issue_source)
else:
issue = _load_issue_item(issue_source, live_github=live_github)
generated_task_id = task_id or f"triage_{issue.repo_id.replace('/', '_')}_{issue.issue_id}"
task = TaskSpec(
task_id=generated_task_id,
difficulty=Difficulty.EASY,
goal_type=GoalType.TRIAGE_ONLY,
repo_id=issue.repo_id,
issue_id=issue.issue_id,
max_steps=max_steps,
success_criteria=[],
allowed_actions=_default_allowed_actions(),
hidden_grading_flags={},
repo_rules_url=None,
)
# Auto-generate hidden target from issue if not explicitly provided
hidden_target = _generate_hidden_target_from_issue(issue)
return build_initial_state(
episode_id=f"ep_{generated_task_id}",
task=task,
repo_rules=repo_rules,
issue=issue,
candidate_duplicates=[],
hidden_target=hidden_target,
)