Spaces:
Running on CPU Upgrade
Running on CPU Upgrade
| """Prompt builders backed by external template files. | |
| Prompt text lives in ``research_suite/prompt_bank`` so experiments can swap | |
| prompt files without editing Python code. | |
| """ | |
| from __future__ import annotations | |
| from functools import lru_cache | |
| from pathlib import Path | |
| from string import Template | |
| from .models import ContextPackage, ValidationResult | |
| _PROJECT_ROOT = Path(__file__).resolve().parents[1] | |
| _DEFAULT_PROMPT_FILES = { | |
| "generation": "research_suite/prompt_bank/generation/base.txt", | |
| "general_semantic_expert": "research_suite/prompt_bank/judges/general/base.txt", | |
| "grounding_expert": "research_suite/prompt_bank/judges/grounding/base.txt", | |
| "structure_expert": "research_suite/prompt_bank/judges/structure/base.txt", | |
| "constraint_expert": "research_suite/prompt_bank/judges/constraint/base.txt", | |
| "result_expert": "research_suite/prompt_bank/judges/result/base.txt", | |
| "syntax_repair": "research_suite/prompt_bank/repair/syntax_fix.txt", | |
| "repair": "research_suite/prompt_bank/repair/base.txt", | |
| } | |
| def _format_candidates_section(context: ContextPackage) -> str: | |
| """Format context candidates into a human-readable prompt section.""" | |
| parts: list[str] = [] | |
| if context.entity_candidates: | |
| parts.append("## Entity Candidates") | |
| for entity in context.entity_candidates: | |
| parts.append( | |
| f"- {entity.get('label', '?')} -> <{entity.get('uri', '?')}> " | |
| f"(score: {entity.get('score', 0):.2f})" | |
| ) | |
| if context.relation_candidates: | |
| parts.append("\n## Relation/Property Candidates") | |
| for relation in context.relation_candidates: | |
| parts.append( | |
| f"- {relation.get('label', '?')} -> <{relation.get('uri', '?')}> " | |
| f"(score: {relation.get('score', 0):.2f})" | |
| ) | |
| if context.class_candidates: | |
| parts.append("\n## Class/Type Candidates") | |
| for cls in context.class_candidates: | |
| parts.append( | |
| f"- {cls.get('label', '?')} -> <{cls.get('uri', '?')}> " | |
| f"(score: {cls.get('score', 0):.2f})" | |
| ) | |
| if context.prefix_hints: | |
| parts.append("\n## Available Prefixes") | |
| for prefix, namespace in context.prefix_hints.items(): | |
| parts.append(f"- PREFIX {prefix}: <{namespace}>") | |
| if context.answer_type_hint: | |
| parts.append(f"\n## Expected Answer Type: {context.answer_type_hint}") | |
| if context.notes: | |
| parts.append("\n## Context Notes") | |
| for note in context.notes: | |
| parts.append(f"- {note}") | |
| return "\n".join(parts).strip() | |
| def _format_validation_section(validation: ValidationResult) -> str: | |
| """Format validation results into a reusable prompt section.""" | |
| parts = [ | |
| f"- Parse OK: {validation.parse_ok}", | |
| f"- Execute OK: {validation.execute_ok}", | |
| f"- Timeout: {validation.timeout}", | |
| f"- Result count: {validation.result_count}", | |
| f"- Answer type fit: {validation.answer_type_fit:.2f}", | |
| f"- Schema fit: {validation.schema_fit:.2f}", | |
| ] | |
| if validation.suspicious_flags: | |
| parts.append(f"- Suspicious flags: {', '.join(validation.suspicious_flags)}") | |
| if validation.execution_error: | |
| parts.append(f"- Execution error: {validation.execution_error}") | |
| if validation.result_preview: | |
| parts.append(f"- Result preview (first 3): {validation.result_preview[:3]}") | |
| return "\n".join(parts) | |
| def _resolve_prompt_path(path_str: str) -> Path: | |
| path = Path(path_str) | |
| if path.is_absolute(): | |
| return path | |
| return _PROJECT_ROOT / path | |
| def _load_template(path_str: str) -> Template: | |
| path = _resolve_prompt_path(path_str) | |
| if not path.exists(): | |
| raise FileNotFoundError(f"Prompt template not found: {path}") | |
| return Template(path.read_text(encoding="utf-8")) | |
| def _render_prompt( | |
| template_key: str, | |
| variables: dict[str, object], | |
| prompt_files: dict[str, str] | None = None, | |
| ) -> str: | |
| path_str = (prompt_files or {}).get(template_key, _DEFAULT_PROMPT_FILES[template_key]) | |
| template = _load_template(path_str) | |
| normalized = {key: str(value) for key, value in variables.items()} | |
| return template.substitute(normalized) | |
| def build_generation_prompt( | |
| question: str, | |
| context: ContextPackage, | |
| k: int, | |
| prompt_files: dict[str, str] | None = None, | |
| ) -> str: | |
| return _render_prompt( | |
| "generation", | |
| { | |
| "question": question, | |
| "context_section": _format_candidates_section(context), | |
| "k": k, | |
| "answer_type_hint": context.answer_type_hint or "select", | |
| }, | |
| prompt_files, | |
| ) | |
| def build_grounding_expert_prompt( | |
| question: str, | |
| query: str, | |
| context: ContextPackage, | |
| validation: ValidationResult, | |
| prompt_files: dict[str, str] | None = None, | |
| ) -> str: | |
| return _render_prompt( | |
| "grounding_expert", | |
| { | |
| "question": question, | |
| "query": query, | |
| "context_section": _format_candidates_section(context), | |
| "validation_section": _format_validation_section(validation), | |
| "candidate_id": validation.candidate_id, | |
| }, | |
| prompt_files, | |
| ) | |
| def build_general_semantic_expert_prompt( | |
| question: str, | |
| query: str, | |
| context: ContextPackage, | |
| validation: ValidationResult, | |
| prompt_files: dict[str, str] | None = None, | |
| ) -> str: | |
| return _render_prompt( | |
| "general_semantic_expert", | |
| { | |
| "question": question, | |
| "query": query, | |
| "context_section": _format_candidates_section(context), | |
| "validation_section": _format_validation_section(validation), | |
| "candidate_id": validation.candidate_id, | |
| "answer_type_hint": context.answer_type_hint or "select", | |
| }, | |
| prompt_files, | |
| ) | |
| def build_structure_expert_prompt( | |
| question: str, | |
| query: str, | |
| context: ContextPackage, | |
| validation: ValidationResult, | |
| prompt_files: dict[str, str] | None = None, | |
| ) -> str: | |
| return _render_prompt( | |
| "structure_expert", | |
| { | |
| "question": question, | |
| "query": query, | |
| "context_section": _format_candidates_section(context), | |
| "validation_section": _format_validation_section(validation), | |
| "candidate_id": validation.candidate_id, | |
| "answer_type_hint": context.answer_type_hint or "select", | |
| }, | |
| prompt_files, | |
| ) | |
| def build_constraint_expert_prompt( | |
| question: str, | |
| query: str, | |
| context: ContextPackage, | |
| validation: ValidationResult, | |
| prompt_files: dict[str, str] | None = None, | |
| ) -> str: | |
| return _render_prompt( | |
| "constraint_expert", | |
| { | |
| "question": question, | |
| "query": query, | |
| "context_section": _format_candidates_section(context), | |
| "validation_section": _format_validation_section(validation), | |
| "candidate_id": validation.candidate_id, | |
| }, | |
| prompt_files, | |
| ) | |
| def build_result_expert_prompt( | |
| question: str, | |
| query: str, | |
| validation: ValidationResult, | |
| prompt_files: dict[str, str] | None = None, | |
| ) -> str: | |
| return _render_prompt( | |
| "result_expert", | |
| { | |
| "question": question, | |
| "query": query, | |
| "validation_section": _format_validation_section(validation), | |
| "candidate_id": validation.candidate_id, | |
| }, | |
| prompt_files, | |
| ) | |
| def build_repair_prompt( | |
| question: str, | |
| query: str, | |
| action: str, | |
| evidence: list[str], | |
| context: ContextPackage, | |
| prior_queries: list[str] | None = None, | |
| prompt_files: dict[str, str] | None = None, | |
| template_key: str = "repair", | |
| ) -> str: | |
| action_descriptions = { | |
| "syntax_fix": "Fix SPARQL syntax errors so the query parses correctly.", | |
| "entity_relink": "Replace wrong entity IRIs with correct ones from the context.", | |
| "predicate_replace": "Replace wrong predicate/property IRIs with correct ones.", | |
| "form_fix": "Fix the query form (e.g., change SELECT to ASK, or add COUNT).", | |
| "projection_fix": "Fix the SELECT projection to return the correct variables.", | |
| "direction_fix": "Fix the subject/object direction in triple patterns.", | |
| "constraint_fix": "Fix filters, aggregation, ordering, or other constraints.", | |
| } | |
| evidence_str = ( | |
| "\n".join(f"- {item}" for item in evidence) | |
| if evidence else | |
| "- No specific evidence provided." | |
| ) | |
| prior_attempts_section = "None." | |
| if prior_queries: | |
| rendered_attempts = [] | |
| for idx, old_query in enumerate(prior_queries, start=1): | |
| rendered_attempts.append( | |
| f"Attempt {idx}:\n```sparql\n{old_query}\n```" | |
| ) | |
| prior_attempts_section = "\n".join(rendered_attempts) | |
| return _render_prompt( | |
| template_key, | |
| { | |
| "question": question, | |
| "query": query, | |
| "action": action, | |
| "action_desc": action_descriptions.get(action, f"Apply repair action: {action}"), | |
| "evidence_str": evidence_str, | |
| "context_section": _format_candidates_section(context), | |
| "prior_attempts_section": prior_attempts_section, | |
| }, | |
| prompt_files, | |
| ) | |