Spaces:
Runtime error
Runtime error
File size: 6,303 Bytes
099df87 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 |
import os
import uuid
import asyncio
import json
import requests
from typing import AsyncGenerator
from dotenv import load_dotenv
# Load .env so env vars are available when starting Uvicorn directly
load_dotenv()
# Groq configuration
GROQ_API_KEY = os.getenv("GROQ_API_KEY", "").strip()
GROQ_MODEL = os.getenv("GROQ_MODEL", "llama-3.1-8b-instant").strip()
GROQ_URL = "https://api.groq.com/openai/v1/chat/completions"
# Optional switch to simulate local behavior (no external calls)
GROQ_DISABLED = os.getenv("GROQ_DISABLED", "").lower() in {"1", "true", "yes"}
# Reasonable connect/read timeouts for generation/streaming
DEFAULT_TIMEOUT = (10, 120)
# Base headers for Groq API
HEADERS = {
"Authorization": f"Bearer {GROQ_API_KEY}" if GROQ_API_KEY else "",
"Content-Type": "application/json",
}
def generate_report_id() -> str:
"""Create a unique ID for each report."""
return str(uuid.uuid4())
def stream_event(kind: str, data):
"""
Serialize events as proper JSON for SSE.
The FastAPI route will send lines like: `data: <json>\n\n`
Frontend can safely parse with json.loads(payload).
"""
return json.dumps({"kind": kind, "data": data}, ensure_ascii=False)
def _chunk(text: str, n: int):
"""Split text into small pieces to render a smoother streaming experience."""
for i in range(0, len(text), n):
yield text[i : i + n]
async def run_researcher_async(topic: str) -> str:
"""
Researcher step: produce compact factual bullets.
Fallback text is returned if GROQ is disabled or unavailable.
"""
if GROQ_DISABLED or not GROQ_API_KEY:
return (
f"- What is '{topic}'?\n"
f"- 3–5 key facts\n"
f"- Common use cases\n"
f"- Simple examples\n"
)
payload = {
"model": GROQ_MODEL,
"messages": [
{"role": "system", "content": "You are a concise researcher."},
{
"role": "user",
"content": f"Provide compact, factual bullet points about '{topic}'. "
f"Max 8 bullets. Avoid filler text.",
},
],
"temperature": 0.5,
}
try:
r = requests.post(GROQ_URL, headers=HEADERS, json=payload, timeout=DEFAULT_TIMEOUT)
r.raise_for_status()
return r.json()["choices"][0]["message"]["content"]
except Exception as e:
# Fallback on any network/API error
return f"[fallback researcher due to error: {e}]\n- Background\n- Key points\n- Examples"
async def run_analyst_async(researcher_notes: str) -> str:
"""
Analyst step: extract key insights and implications from researcher notes.
Fallback text is returned if GROQ is disabled or unavailable.
"""
if GROQ_DISABLED or not GROQ_API_KEY:
return "- 3 key insights\n- 2 implications\n- 1 trade-off\n"
payload = {
"model": GROQ_MODEL,
"messages": [
{"role": "system", "content": "You extract insights cleanly."},
{
"role": "user",
"content": f"From these notes, produce exactly 3 insights and 2 implications:\n{researcher_notes}",
},
],
"temperature": 0.5,
}
try:
r = requests.post(GROQ_URL, headers=HEADERS, json=payload, timeout=DEFAULT_TIMEOUT)
r.raise_for_status()
return r.json()["choices"][0]["message"]["content"]
except Exception as e:
return f"[fallback analyst due to error: {e}]\n- Insight 1\n- Insight 2\n- Insight 3\n- Implication A\n- Implication B"
async def run_writer_token_stream(
topic: str,
researcher_notes: str,
analyst_notes: str,
) -> AsyncGenerator[str, None]:
"""
Writer step: stream the final report as small token-like chunks for smooth UI updates.
Yields strings (small chunks). Caller accumulates or forwards as SSE tokens.
"""
writer_prompt = (
"Write a clear, beginner-friendly report with markdown headings:\n"
"Sections: Introduction, Key Concepts, Insights, Practical Tips, Conclusion.\n"
"Use concise language and bullets where helpful.\n\n"
f"Topic: {topic}\n\n"
f"Researcher Notes:\n{researcher_notes}\n\n"
f"Analyst Notes:\n{analyst_notes}\n"
)
# Local simulated streaming if GROQ is disabled or key missing
if GROQ_DISABLED or not GROQ_API_KEY:
simulated = [
f"## {topic}\n\n",
"### Introduction\n",
"This response is streaming locally to simulate real-time typing.\n\n",
"### Key Concepts\n",
"- Concept A\n- Concept B\n\n",
"### Insights\n",
"- Insight 1\n- Insight 2\n\n",
"### Practical Tips\n",
"- Tip 1\n- Tip 2\n\n",
"### Conclusion\n",
"Short summary.\n",
]
for piece in simulated:
for small in _chunk(piece, 20):
yield small
await asyncio.sleep(0.015)
return
# Real streaming via Groq's OpenAI-compatible API
payload = {
"model": GROQ_MODEL,
"messages": [
{"role": "system", "content": "You are a clear, helpful technical writer."},
{"role": "user", "content": writer_prompt},
],
"temperature": 0.6,
"stream": True,
}
# Using requests stream; iterate server-sent "data: ..." lines
with requests.post(
GROQ_URL, headers=HEADERS, json=payload, stream=True, timeout=DEFAULT_TIMEOUT
) as resp:
resp.raise_for_status()
for line in resp.iter_lines(decode_unicode=True):
if not line:
continue
if not line.startswith("data: "):
continue
data = line[6:].strip()
if data == "[DONE]":
break
try:
obj = json.loads(data)
delta = obj["choices"][0]["delta"].get("content", "")
if not delta:
continue
# Yield tiny chunks to update UI frequently
for small in _chunk(delta, 20):
yield small
except Exception:
# Skip malformed lines gracefully
continue
|