Spaces:
Runtime error
Runtime error
| import os | |
| import json | |
| import time | |
| import logging | |
| from typing import List, Dict, Any, Optional | |
| from groq import Groq | |
| logger = logging.getLogger(__name__) | |
| # Override via OPS_BRAIN_MODEL env var for fast A/B testing. | |
| DEFAULT_MODEL = os.getenv("OPS_BRAIN_MODEL", "llama-3.3-70b-versatile") | |
| GROQ_TIMEOUT_S = 15.0 | |
| GROQ_MAX_RETRIES = 2 | |
| GROQ_RETRY_BACKOFF_S = 1.5 | |
| def _safe_json_loads(raw: str) -> Optional[Dict[str, Any]]: | |
| """ | |
| Parse JSON from a model response, tolerating stray prose / markdown fences. | |
| Returns None if unparseable. | |
| """ | |
| if not raw: | |
| return None | |
| text = raw.strip() | |
| # Strip ```json ... ``` fences if present. | |
| if text.startswith("```"): | |
| first_nl = text.find("\n") | |
| if first_nl != -1: | |
| text = text[first_nl + 1:] | |
| if text.endswith("```"): | |
| text = text[:-3] | |
| # First attempt: strict. | |
| try: | |
| return json.loads(text) | |
| except json.JSONDecodeError: | |
| pass | |
| # Second attempt: find the first '{' and last '}' and try that slice. | |
| start = text.find("{") | |
| end = text.rfind("}") | |
| if start != -1 and end != -1 and end > start: | |
| try: | |
| return json.loads(text[start : end + 1]) | |
| except json.JSONDecodeError: | |
| pass | |
| logger.error("ops_brain: could not parse JSON from model output: %r", raw[:300]) | |
| return None | |
| def _groq_chat(client: Groq, prompt: str, model: str) -> Optional[Dict[str, Any]]: | |
| """Call Groq with timeout, retry, and JSON repair.""" | |
| last_err: Optional[Exception] = None | |
| for attempt in range(1, GROQ_MAX_RETRIES + 1): | |
| try: | |
| completion = client.chat.completions.create( | |
| model=model, | |
| messages=[{"role": "user", "content": prompt}], | |
| response_format={"type": "json_object"}, | |
| timeout=GROQ_TIMEOUT_S, | |
| ) | |
| raw = completion.choices[0].message.content | |
| parsed = _safe_json_loads(raw) | |
| if parsed is not None: | |
| return parsed | |
| last_err = ValueError("json parse failed") | |
| except Exception as e: # groq.GroqError, httpx.TimeoutException, etc. | |
| last_err = e | |
| logger.warning("ops_brain: groq attempt %d/%d failed: %s", attempt, GROQ_MAX_RETRIES, e) | |
| if attempt < GROQ_MAX_RETRIES: | |
| time.sleep(GROQ_RETRY_BACKOFF_S * attempt) | |
| logger.error("ops_brain: giving up after %d attempts. last_err=%s", GROQ_MAX_RETRIES, last_err) | |
| return None | |
| class OpsManagerAI: | |
| def __init__(self, api_key: str, model: str = DEFAULT_MODEL): | |
| if not api_key or not api_key.strip(): | |
| raise ValueError("OpsManagerAI: api_key is empty") | |
| self.client = Groq(api_key=api_key) | |
| self.model = model | |
| def process_telegram_message(self, text: str) -> Dict[str, Any]: | |
| """ | |
| Parses store reports into structured JSON using Groq. | |
| Returns a safe-default dict (with store_id=None) if the model fails, | |
| so the bot never crashes the handler thread. | |
| """ | |
| prompt = f""" | |
| You are a Professional AI Operations Manager. Your task is to parse store reports into a strict JSON format. | |
| The user provides reports in a specific template like: | |
| 'Daily Update [ Store Name ] Date: [Date] 💵 Sales:; (Value) 🛍️Transactions: [Value] 📊Average Transaction (AT): [Value] ⬆️⬇️AT Yesterday[ Value]' | |
| Input Text: {text} | |
| Required JSON Output: | |
| {{ | |
| "store_id": "Extract the name inside the brackets [ ]", | |
| "metrics": {{ | |
| "sales": float or null, | |
| "inventory_status": "Good|Warning|Critical", | |
| "staffing": "OK|Understaffed|Overstaffed" | |
| }}, | |
| "issues": ["List any mentioned anomalies or low stock, otherwise empty"], | |
| "analysis": "A brief analytical summary of the store's health today", | |
| "actions_needed": ["Concrete, actionable steps based on the sales/AT trends"] | |
| }} | |
| Return ONLY the JSON object. No preamble. | |
| """ | |
| parsed = _groq_chat(self.client, prompt, self.model) | |
| if parsed is not None and parsed.get("store_id"): | |
| return parsed | |
| # Safe fallback so caller can decide to reply with an error instead of crashing. | |
| return { | |
| "store_id": None, | |
| "metrics": {"sales": None, "inventory_status": None, "staffing": None}, | |
| "issues": [], | |
| "analysis": "AI could not extract a valid store report from the message.", | |
| "actions_needed": [], | |
| } | |
| def generate_hot_list_analysis(self, all_stores_data: List[Dict]) -> Dict[str, Any]: | |
| """ | |
| Analyze the full fleet to find critical areas. | |
| """ | |
| prompt = f""" | |
| Analyze the following store data and provide a strategic operation summary. | |
| Data: {json.dumps(all_stores_data)} | |
| Return JSON: | |
| {{ | |
| "top_performers": ["Store name"], | |
| "critical_stores": ["Store name"], | |
| "global_dev_area": "General area needing improvement", | |
| "strategic_priority": "Top immediate action for the owner" | |
| }} | |
| Return ONLY JSON. | |
| """ | |
| parsed = _groq_chat(self.client, prompt, self.model) | |
| if parsed is not None: | |
| return parsed | |
| return { | |
| "top_performers": [], | |
| "critical_stores": [], | |
| "global_dev_area": "AI analysis unavailable.", | |
| "strategic_priority": "Manual review required.", | |
| } |