Spaces:
Sleeping
Sleeping
| """ | |
| PHI Prompts Loader. | |
| Loads benchmark prompt files and applies type-specific rules: | |
| - success_story: Date moved back 180 days | |
| - email: Only run for unique email_signature (first occurrence) | |
| - transcript: No special rules | |
| """ | |
| from __future__ import annotations | |
| import json | |
| from datetime import datetime, timedelta | |
| from pathlib import Path | |
| from typing import Optional | |
| from benchmark.schemas import PromptItem | |
| # Date adjustments per prompt type | |
| DATE_ADJUSTMENTS = { | |
| "success_story": -180, | |
| "email": 0, | |
| "transcript": 0, | |
| } | |
| def _parse_date(date_str: str) -> datetime: | |
| """Try parsing a date string in various formats.""" | |
| for fmt in [ | |
| "%Y-%m-%d", | |
| "%m/%d/%Y", | |
| "%B %d, %Y", | |
| "%B, %Y", | |
| "%b %d, %Y", | |
| "%d %b %Y", | |
| ]: | |
| try: | |
| return datetime.strptime(date_str, fmt) | |
| except ValueError: | |
| continue | |
| # Try python-dateutil as fallback | |
| try: | |
| from dateutil import parser | |
| return parser.parse(date_str) | |
| except Exception: | |
| return datetime.now() | |
| def calculate_search_dates( | |
| original_date: str, | |
| prompt_type: str, | |
| years_back: int = 10, | |
| ) -> tuple[str, str]: | |
| """ | |
| Calculate search date range based on prompt type. | |
| Returns (start_date, end_date) as YYYY-MM-DD strings. | |
| """ | |
| end_date = _parse_date(original_date) | |
| adjustment = DATE_ADJUSTMENTS.get(prompt_type, 0) | |
| if adjustment: | |
| end_date = end_date + timedelta(days=adjustment) | |
| start_date = end_date - timedelta(days=years_back * 365) | |
| return start_date.strftime("%Y-%m-%d"), end_date.strftime("%Y-%m-%d") | |
| def load_prompts( | |
| input_files: list[str | Path], | |
| deduplicate_emails: bool = True, | |
| ) -> list[PromptItem]: | |
| """ | |
| Load prompts from one or more PHI-prompts JSON files. | |
| Args: | |
| input_files: Paths to JSON files. | |
| deduplicate_emails: If True, only keep first occurrence of each email_signature. | |
| Returns: | |
| List of PromptItem objects. | |
| """ | |
| items = [] | |
| seen_emails: set[str] = set() | |
| idx = 0 | |
| for input_file in input_files: | |
| path = Path(input_file) | |
| if not path.exists(): | |
| print(f"Warning: Input file not found: {input_file}") | |
| continue | |
| # Infer prompt type from filename | |
| prompt_type = "unknown" | |
| stem = path.stem.lower() | |
| if "success" in stem: | |
| prompt_type = "success_story" | |
| elif "email" in stem: | |
| prompt_type = "email" | |
| elif "transcript" in stem: | |
| prompt_type = "transcript" | |
| with open(path) as f: | |
| data = json.load(f) | |
| for item_data in data: | |
| # Email deduplication | |
| if prompt_type == "email" and deduplicate_emails: | |
| email_sig = item_data.get("email_signature") | |
| if email_sig: | |
| if email_sig in seen_emails: | |
| continue | |
| seen_emails.add(email_sig) | |
| original_date = item_data.get("date", datetime.now().strftime("%Y-%m-%d")) | |
| start_date, end_date = calculate_search_dates(original_date, prompt_type) | |
| items.append(PromptItem( | |
| id=idx, | |
| customer=item_data.get("customer", ""), | |
| dr_prompt=item_data.get("dr_prompt", ""), | |
| date=original_date, | |
| prompt_type=prompt_type, | |
| search_start_date=start_date, | |
| search_end_date=end_date, | |
| seller=item_data.get("seller", ""), | |
| products=item_data.get("products", []), | |
| cluster=item_data.get("cluster", ""), | |
| )) | |
| idx += 1 | |
| return items | |