Spaces:
Sleeping
Sleeping
| """Planner: turns a user request into a JSON tool-plan via Azure OpenAI *Responses*.""" | |
| from __future__ import annotations | |
| import json | |
| import logging | |
| from pathlib import Path | |
| from typing import Dict, List, Any | |
| import yaml | |
| from services.llm_client import LLMClient | |
| from config.settings import settings | |
| from services.cost_tracker import CostTracker | |
| _PROMPTS_FILE = Path(__file__).parent.parent / "config" / "prompts.yaml" | |
| # Configure logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| class Planner: | |
| """Generate a plan with the Responses API; fall back to a static template if parsing fails.""" | |
| def __init__(self, cost_tracker=None) -> None: | |
| self.prompt_template = self._load_prompt("planner") | |
| self.llm = LLMClient(settings) | |
| self.cost_tracker = cost_tracker or CostTracker() | |
| logger.info("Planner initialized with prompt template") | |
| # -------------------------------------------------- | |
| def build_plan( | |
| self, | |
| pdf_meta: Dict[str, Any], | |
| fields: List[str], | |
| doc_preview: str | None = None, | |
| field_descs: Dict | None = None, | |
| strategy: str = "Original Strategy", | |
| unique_indices: List[str] | None = None, | |
| unique_indices_descriptions: Dict[str, str] | None = None, | |
| ) -> Dict[str, Any]: | |
| """Return a JSON dict representing the execution plan.""" | |
| logger.info(f"Building plan for strategy: {strategy}") | |
| logger.info(f"Fields: {fields}") | |
| logger.info(f"Unique indices: {unique_indices}") | |
| logger.info(f"Unique indices descriptions: {unique_indices_descriptions}") | |
| logger.info(f"Field descriptions: {field_descs}") | |
| # For Unique Indices Strategy, use static plan directly | |
| if strategy == "Unique Indices Strategy": | |
| logger.info("Using static plan for Unique Indices Strategy") | |
| return self._static_plan(fields, strategy, unique_indices, unique_indices_descriptions, field_descs) | |
| # For Original Strategy, try LLM first | |
| user_context = { | |
| "pdf_meta": pdf_meta, | |
| "doc_preview": doc_preview or "", | |
| "fields": fields, | |
| "field_descriptions": field_descs or {}, | |
| "strategy": strategy, | |
| "unique_indices": unique_indices or [], | |
| "unique_indices_descriptions": unique_indices_descriptions or {}, | |
| } | |
| logger.info(f"Building plan for fields: {fields}") | |
| logger.info(f"Using strategy: {strategy}") | |
| if unique_indices: | |
| logger.info(f"Unique indices: {unique_indices}") | |
| logger.info(f"Unique indices descriptions: {unique_indices_descriptions}") | |
| logger.debug(f"User context: {user_context}") | |
| prompt = self.prompt_template.format_json(**user_context) | |
| logger.debug(f"Generated prompt: {prompt}") | |
| try: | |
| logger.info("Calling LLM to generate plan") | |
| raw = self.llm.responses( | |
| prompt, | |
| temperature=0.0, | |
| ctx={"cost_tracker": self.cost_tracker}, | |
| description="Execution Plan Generation" | |
| ) | |
| logger.debug(f"Raw LLM response: {raw}") | |
| try: | |
| logger.info("Parsing LLM response as JSON") | |
| plan = json.loads(raw) | |
| logger.debug(f"Parsed plan: {plan}") | |
| # ensure minimal structure exists | |
| if "steps" in plan and "fields" in plan: | |
| logger.info("Plan successfully generated with required structure") | |
| # Add pdf_meta and strategy info to the plan | |
| plan["pdf_meta"] = pdf_meta | |
| plan["strategy"] = strategy | |
| if unique_indices: | |
| plan["unique_indices"] = unique_indices | |
| if unique_indices_descriptions: | |
| plan["unique_indices_descriptions"] = unique_indices_descriptions | |
| if field_descs: | |
| plan["field_descriptions"] = field_descs | |
| return plan | |
| else: | |
| missing_keys = [] | |
| if "steps" not in plan: | |
| missing_keys.append("steps") | |
| if "fields" not in plan: | |
| missing_keys.append("fields") | |
| logger.error(f"Planner: LLM output missing required keys: {missing_keys}. Output: {raw}") | |
| except json.JSONDecodeError as parse_exc: | |
| logger.error(f"Planner: Failed to parse LLM output as JSON. Output: {raw}") | |
| logger.error(f"JSON parsing error: {parse_exc}") | |
| except Exception as parse_exc: | |
| logger.error(f"Planner: Unexpected error parsing LLM output: {parse_exc}") | |
| logger.error(f"LLM output: {raw}") | |
| except Exception as llm_exc: | |
| logger.error(f"Planner: LLM call failed: {llm_exc}") | |
| logger.exception("Full traceback:") | |
| # ---------- fallback static plan ---------- | |
| logger.info("Falling back to static plan") | |
| return self._static_plan(fields, strategy, unique_indices, unique_indices_descriptions, field_descs) | |
| # -------------------------------------------------- | |
| def _load_prompt(name: str): | |
| try: | |
| data = yaml.safe_load(_PROMPTS_FILE.read_text()) | |
| logger.debug(f"Loaded prompt template for '{name}'") | |
| except Exception as e: | |
| logger.error(f"Failed to load prompt template: {e}") | |
| data = {} | |
| class _Fmt: | |
| def __init__(self, s: str): | |
| self.s = s | |
| def format_json(self, **kwargs): | |
| # Format the template with the provided fields | |
| fields = kwargs.get("fields", []) | |
| field_descriptions = kwargs.get("field_descriptions", {}) | |
| doc_preview = kwargs.get("doc_preview", "") | |
| pdf_meta = kwargs.get("pdf_meta", {}) | |
| strategy = kwargs.get("strategy", "Original Strategy") | |
| unique_indices = kwargs.get("unique_indices", []) | |
| unique_indices_descriptions = kwargs.get("unique_indices_descriptions", {}) | |
| # Create a formatted string with the actual values | |
| formatted = self.s | |
| if fields: | |
| # Ensure fields is a flat list of strings | |
| fields_json = json.dumps([str(f) for f in fields]) | |
| formatted = formatted.replace("<same list you received>", fields_json) | |
| if field_descriptions: | |
| formatted = formatted.replace("field_descriptions for extra context", f"field descriptions: {json.dumps(field_descriptions)}") | |
| if doc_preview: | |
| formatted = formatted.replace("a few kB of raw text from the uploaded document", f"document preview: {doc_preview[:1000]}...") | |
| if pdf_meta: | |
| formatted = formatted.replace("pdf_meta / field_descriptions for extra context", f"document metadata: {json.dumps(pdf_meta)}") | |
| if strategy: | |
| formatted = formatted.replace("strategy for extraction", f"extraction strategy: {strategy}") | |
| if unique_indices: | |
| formatted = formatted.replace("unique indices for extraction", f"unique indices: {json.dumps(unique_indices)}") | |
| if unique_indices_descriptions: | |
| formatted = formatted.replace("unique indices descriptions for extra context", f"unique indices descriptions: {json.dumps(unique_indices_descriptions)}") | |
| return formatted | |
| return _Fmt(data.get(name, "You are a planning agent. Produce a JSON tool plan.")) | |
| # -------------------------------------------------- | |
| def _static_plan(fields: List[str], strategy: str = "Original Strategy", unique_indices: List[str] | None = None, unique_indices_descriptions: Dict[str, str] | None = None, field_descs: Dict | None = None) -> Dict[str, Any]: | |
| """Return a hard-coded plan to guarantee offline functionality.""" | |
| logger.info("Generating static fallback plan") | |
| logger.info(f"Strategy: {strategy}") | |
| logger.info(f"Fields: {fields}") | |
| logger.info(f"Unique indices: {unique_indices}") | |
| logger.info(f"Unique indices descriptions: {unique_indices_descriptions}") | |
| logger.info(f"Field descriptions: {field_descs}") | |
| if strategy == "Unique Indices Strategy": | |
| steps = [ | |
| {"tool": "PDFAgent", "args": {}}, | |
| {"tool": "TableAgent", "args": {}}, | |
| {"tool": "UniqueIndicesCombinator", "args": {}}, | |
| {"tool": "UniqueIndicesLoopAgent", "args": {}}, | |
| ] | |
| logger.info("Generated plan for Unique Indices Strategy") | |
| logger.info(f"Steps: {steps}") | |
| else: | |
| steps = [ | |
| {"tool": "PDFAgent", "args": {}}, | |
| {"tool": "TableAgent", "args": {}}, | |
| { | |
| "tool": "ForEachField", | |
| "loop": [ | |
| {"tool": "FieldMapper", "args": {"field": "$field"}}, | |
| ], | |
| }, | |
| ] | |
| logger.info("Generated plan for Original Strategy") | |
| logger.info(f"Steps: {steps}") | |
| plan = { | |
| "steps": steps, | |
| "fields": fields, | |
| "pdf_meta": {}, | |
| "strategy": strategy | |
| } | |
| if unique_indices: | |
| plan["unique_indices"] = unique_indices | |
| if unique_indices_descriptions: | |
| plan["unique_indices_descriptions"] = unique_indices_descriptions | |
| if field_descs: | |
| plan["field_descriptions"] = field_descs | |
| logger.info(f"Final plan: {json.dumps(plan, indent=2)}") | |
| return plan |