Spaces:
Runtime error
Runtime error
| import os | |
| import uuid | |
| import asyncio | |
| import json | |
| import requests | |
| from typing import AsyncGenerator | |
| from dotenv import load_dotenv | |
| # Load .env so env vars are available when starting Uvicorn directly | |
| load_dotenv() | |
| # Groq configuration | |
| GROQ_API_KEY = os.getenv("GROQ_API_KEY", "").strip() | |
| GROQ_MODEL = os.getenv("GROQ_MODEL", "llama-3.1-8b-instant").strip() | |
| GROQ_URL = "https://api.groq.com/openai/v1/chat/completions" | |
| # Optional switch to simulate local behavior (no external calls) | |
| GROQ_DISABLED = os.getenv("GROQ_DISABLED", "").lower() in {"1", "true", "yes"} | |
| # Reasonable connect/read timeouts for generation/streaming | |
| DEFAULT_TIMEOUT = (10, 120) | |
| # Base headers for Groq API | |
| HEADERS = { | |
| "Authorization": f"Bearer {GROQ_API_KEY}" if GROQ_API_KEY else "", | |
| "Content-Type": "application/json", | |
| } | |
| def generate_report_id() -> str: | |
| """Create a unique ID for each report.""" | |
| return str(uuid.uuid4()) | |
| def stream_event(kind: str, data): | |
| """ | |
| Serialize events as proper JSON for SSE. | |
| The FastAPI route will send lines like: `data: <json>\n\n` | |
| Frontend can safely parse with json.loads(payload). | |
| """ | |
| return json.dumps({"kind": kind, "data": data}, ensure_ascii=False) | |
| def _chunk(text: str, n: int): | |
| """Split text into small pieces to render a smoother streaming experience.""" | |
| for i in range(0, len(text), n): | |
| yield text[i : i + n] | |
| async def run_researcher_async(topic: str) -> str: | |
| """ | |
| Researcher step: produce compact factual bullets. | |
| Fallback text is returned if GROQ is disabled or unavailable. | |
| """ | |
| if GROQ_DISABLED or not GROQ_API_KEY: | |
| return ( | |
| f"- What is '{topic}'?\n" | |
| f"- 3–5 key facts\n" | |
| f"- Common use cases\n" | |
| f"- Simple examples\n" | |
| ) | |
| payload = { | |
| "model": GROQ_MODEL, | |
| "messages": [ | |
| {"role": "system", "content": "You are a concise researcher."}, | |
| { | |
| "role": "user", | |
| "content": f"Provide compact, factual bullet points about '{topic}'. " | |
| f"Max 8 bullets. Avoid filler text.", | |
| }, | |
| ], | |
| "temperature": 0.5, | |
| } | |
| try: | |
| r = requests.post(GROQ_URL, headers=HEADERS, json=payload, timeout=DEFAULT_TIMEOUT) | |
| r.raise_for_status() | |
| return r.json()["choices"][0]["message"]["content"] | |
| except Exception as e: | |
| # Fallback on any network/API error | |
| return f"[fallback researcher due to error: {e}]\n- Background\n- Key points\n- Examples" | |
| async def run_analyst_async(researcher_notes: str) -> str: | |
| """ | |
| Analyst step: extract key insights and implications from researcher notes. | |
| Fallback text is returned if GROQ is disabled or unavailable. | |
| """ | |
| if GROQ_DISABLED or not GROQ_API_KEY: | |
| return "- 3 key insights\n- 2 implications\n- 1 trade-off\n" | |
| payload = { | |
| "model": GROQ_MODEL, | |
| "messages": [ | |
| {"role": "system", "content": "You extract insights cleanly."}, | |
| { | |
| "role": "user", | |
| "content": f"From these notes, produce exactly 3 insights and 2 implications:\n{researcher_notes}", | |
| }, | |
| ], | |
| "temperature": 0.5, | |
| } | |
| try: | |
| r = requests.post(GROQ_URL, headers=HEADERS, json=payload, timeout=DEFAULT_TIMEOUT) | |
| r.raise_for_status() | |
| return r.json()["choices"][0]["message"]["content"] | |
| except Exception as e: | |
| return f"[fallback analyst due to error: {e}]\n- Insight 1\n- Insight 2\n- Insight 3\n- Implication A\n- Implication B" | |
| async def run_writer_token_stream( | |
| topic: str, | |
| researcher_notes: str, | |
| analyst_notes: str, | |
| ) -> AsyncGenerator[str, None]: | |
| """ | |
| Writer step: stream the final report as small token-like chunks for smooth UI updates. | |
| Yields strings (small chunks). Caller accumulates or forwards as SSE tokens. | |
| """ | |
| writer_prompt = ( | |
| "Write a clear, beginner-friendly report with markdown headings:\n" | |
| "Sections: Introduction, Key Concepts, Insights, Practical Tips, Conclusion.\n" | |
| "Use concise language and bullets where helpful.\n\n" | |
| f"Topic: {topic}\n\n" | |
| f"Researcher Notes:\n{researcher_notes}\n\n" | |
| f"Analyst Notes:\n{analyst_notes}\n" | |
| ) | |
| # Local simulated streaming if GROQ is disabled or key missing | |
| if GROQ_DISABLED or not GROQ_API_KEY: | |
| simulated = [ | |
| f"## {topic}\n\n", | |
| "### Introduction\n", | |
| "This response is streaming locally to simulate real-time typing.\n\n", | |
| "### Key Concepts\n", | |
| "- Concept A\n- Concept B\n\n", | |
| "### Insights\n", | |
| "- Insight 1\n- Insight 2\n\n", | |
| "### Practical Tips\n", | |
| "- Tip 1\n- Tip 2\n\n", | |
| "### Conclusion\n", | |
| "Short summary.\n", | |
| ] | |
| for piece in simulated: | |
| for small in _chunk(piece, 20): | |
| yield small | |
| await asyncio.sleep(0.015) | |
| return | |
| # Real streaming via Groq's OpenAI-compatible API | |
| payload = { | |
| "model": GROQ_MODEL, | |
| "messages": [ | |
| {"role": "system", "content": "You are a clear, helpful technical writer."}, | |
| {"role": "user", "content": writer_prompt}, | |
| ], | |
| "temperature": 0.6, | |
| "stream": True, | |
| } | |
| # Using requests stream; iterate server-sent "data: ..." lines | |
| with requests.post( | |
| GROQ_URL, headers=HEADERS, json=payload, stream=True, timeout=DEFAULT_TIMEOUT | |
| ) as resp: | |
| resp.raise_for_status() | |
| for line in resp.iter_lines(decode_unicode=True): | |
| if not line: | |
| continue | |
| if not line.startswith("data: "): | |
| continue | |
| data = line[6:].strip() | |
| if data == "[DONE]": | |
| break | |
| try: | |
| obj = json.loads(data) | |
| delta = obj["choices"][0]["delta"].get("content", "") | |
| if not delta: | |
| continue | |
| # Yield tiny chunks to update UI frequently | |
| for small in _chunk(delta, 20): | |
| yield small | |
| except Exception: | |
| # Skip malformed lines gracefully | |
| continue | |