Spaces:
Sleeping
Sleeping
| from __future__ import annotations | |
| import json | |
| import os | |
| from typing import Any, Dict, List, Optional, TYPE_CHECKING | |
| from .models import StrategicObjective | |
| if TYPE_CHECKING: | |
| from openai.types.chat import ChatCompletionUserMessageParam | |
| def _alignment_label(score: float) -> str: | |
| if score >= 0.75: | |
| return "Strong" | |
| if score >= 0.55: | |
| return "Medium" | |
| return "Weak" | |
| class RAGEngine: | |
| """Retrieval-Augmented Generation (RAG) helper for improvement suggestions. | |
| Responsibilities: | |
| - Accept a strategic objective, current alignment score, and top-K retrieved action tasks | |
| - Build a structured prompt with clear SYSTEM / CONTEXT / INSTRUCTIONS sections | |
| - Optionally call an LLM (OpenAI API via env vars) and parse structured JSON | |
| - Fallback to deterministic, rule-based suggestions if LLM is unavailable | |
| """ | |
| def __init__(self, model: Optional[str] = None) -> None: | |
| # Model name can be overridden via env var OPENAI_MODEL | |
| # Use a widely supported default; allow override via env or arg | |
| self.model = model or os.environ.get("OPENAI_MODEL") or "gpt-4o-mini" | |
| self.api_key = os.environ.get("OPENAI_API_KEY") | |
| # ------------------------- Prompt Construction ------------------------- | |
| def build_prompt( | |
| self, | |
| strategy: StrategicObjective, | |
| current_score: float, | |
| retrieved_actions: List[Dict[str, Any]], | |
| ) -> ChatCompletionUserMessageParam: | |
| system = "You are an AI business analyst." | |
| actions_lines = [] | |
| for i, a in enumerate(retrieved_actions, start=1): | |
| title = a.get("title") or a.get("metadata", {}).get("title") or "Action" | |
| sim = a.get("similarity") | |
| actions_lines.append(f"{i}. {title} (similarity: {sim:.2f})") | |
| actions_block = "\n".join(actions_lines) if actions_lines else "(none)" | |
| context = ( | |
| f"Strategic Objective:\n\n{strategy.title}\n\n" | |
| f"Description:\n\n{strategy.description}\n\n" | |
| f"Current Alignment Score:\n{current_score:.2f} ({_alignment_label(current_score)})\n\n" | |
| f"Retrieved Action Tasks:\n{actions_block}" | |
| ) | |
| instructions = ( | |
| "- Explain why alignment is at the current level\n" | |
| "- Suggest 3 new action tasks\n" | |
| "- Suggest 2 measurable KPIs\n" | |
| "- Suggest timeline and ownership\n" | |
| "- Keep suggestions realistic for AutoBridge" | |
| ) | |
| response_format = ( | |
| "Respond in strict JSON with keys: " | |
| "explanation (string), suggested_actions (string[3]), " | |
| "kpis (string[2]), timeline_and_ownership (object with keys: owner, start, end), " | |
| "risks (string[1..3])." | |
| ) | |
| content = ( | |
| "SYSTEM:\n" | |
| + system | |
| + "\n\n" | |
| + "CONTEXT:\n" | |
| + context | |
| + "\n\n" | |
| + "INSTRUCTIONS:\n" | |
| + instructions | |
| + "\n\n" | |
| + "RESPONSE_FORMAT:\n" | |
| + response_format | |
| ) | |
| return { | |
| "role": "user", | |
| "content": content, | |
| } | |
| # ------------------------- LLM Invocation ------------------------- | |
| def _call_openai(self, user_msg: ChatCompletionUserMessageParam) -> Optional[str]: | |
| if not self.api_key: | |
| print("RAGEngine: OPENAI_API_KEY not set; using fallback.") | |
| return None | |
| try: | |
| from openai import OpenAI # type: ignore | |
| client = OpenAI(api_key=self.api_key) | |
| completion = client.chat.completions.create( | |
| model=self.model, | |
| messages=[ | |
| { | |
| "role": "system", | |
| "content": "You are an AI business analyst.", | |
| }, | |
| user_msg, | |
| ], | |
| ) | |
| print(f"RAGEngine: OpenAI call succeeded (model={self.model}).") | |
| return completion.choices[0].message.content | |
| except Exception as e: | |
| print(f"RAGEngine: OpenAI call failed: {e!r}; using fallback.") | |
| return None | |
| # ------------------------- Parsing and Fallback ------------------------- | |
| def _parse_json(self, text: Optional[str]) -> Optional[Dict[str, Any]]: | |
| if not text: | |
| return None | |
| # Try to locate first JSON object in text | |
| text = text.strip() | |
| start = text.find("{") | |
| end = text.rfind("}") | |
| if start != -1 and end != -1 and end > start: | |
| snippet = text[start : end + 1] | |
| try: | |
| return json.loads(snippet) | |
| except Exception: | |
| pass | |
| # Last resort | |
| try: | |
| return json.loads(text) | |
| except Exception: | |
| return None | |
| def _fallback_rule_based( | |
| self, | |
| strategy: StrategicObjective, | |
| current_score: float, | |
| retrieved_actions: List[Dict[str, Any]], | |
| ) -> Dict[str, Any]: | |
| label = _alignment_label(current_score) | |
| # Simple templates for MSc-friendly determinism | |
| if label == "Weak": | |
| expl = ( | |
| "Current actions only partially address the strategy's outcomes; coverage is sparse " | |
| "and lacks clear ownership/timelines, resulting in low similarity scores." | |
| ) | |
| actions = [ | |
| "Publish a standardized landed-cost breakdown with supplier lane mapping", | |
| "Automate ingestion of duties/freight/insurance components with validations", | |
| "Launch monthly variance review with supplier scorecards and corrective actions", | |
| ] | |
| kpis = [ | |
| "Cost variance across lanes < 3%", | |
| "Data coverage of cost components ≥ 90% of SKUs", | |
| ] | |
| timeline = { | |
| "owner": "Finance Ops", | |
| "start": "2026-02-15", | |
| "end": "2026-05-31", | |
| } | |
| risks = [ | |
| "Supplier data quality or delays may limit transparency", | |
| "Insufficient engineering capacity for integrations", | |
| ] | |
| elif label == "Medium": | |
| expl = ( | |
| "Alignment is improving but gaps remain in data completeness and process rigor; " | |
| "consolidation and clearer deliverables would strengthen coverage." | |
| ) | |
| actions = [ | |
| "Unify overlapping cost tools into a single authoritative calculator", | |
| "Backfill historical cost data and set validation thresholds", | |
| "Define escalation playbooks for clearance delays and exceptions", | |
| ] | |
| kpis = [ | |
| "Audit pass rate ≥ 95%", | |
| "Average clearance time < 24h", | |
| ] | |
| timeline = { | |
| "owner": "Operations", | |
| "start": "2026-03-01", | |
| "end": "2026-06-30", | |
| } | |
| risks = [ | |
| "Fragmented ownership across finance and operations", | |
| ] | |
| else: # Strong | |
| expl = ( | |
| "Actions strongly map to the strategy. Focus on monitoring, risk management, and " | |
| "sustaining improvements through governance routines." | |
| ) | |
| actions = [ | |
| "Introduce quarterly retrospectives with supplier and ops stakeholders", | |
| "Automate anomaly detection for lane cost spikes", | |
| "Publish transparency dashboards for leadership review", | |
| ] | |
| kpis = [ | |
| "Early-warning alerts resolved within 48h", | |
| "Quarterly savings achieved vs. target", | |
| ] | |
| timeline = { | |
| "owner": "Data Science", | |
| "start": "2026-03-15", | |
| "end": "2026-07-31", | |
| } | |
| risks = [ | |
| "Model drift or changing tariffs impacting reliability", | |
| ] | |
| return { | |
| "explanation": expl, | |
| "suggested_actions": actions, | |
| "kpis": kpis, | |
| "timeline_and_ownership": timeline, | |
| "risks": risks, | |
| } | |
| # ------------------------- Public API ------------------------- | |
| def generate( | |
| self, | |
| strategy: StrategicObjective, | |
| current_score: float, | |
| retrieved_actions: List[Dict[str, Any]], | |
| ) -> Dict[str, Any]: | |
| user_msg = self.build_prompt(strategy, current_score, retrieved_actions) | |
| text = self._call_openai(user_msg) | |
| parsed = self._parse_json(text) | |
| if parsed: | |
| return parsed | |
| return self._fallback_rule_based(strategy, current_score, retrieved_actions) | |