rag-backend / generation /groq_llm.py
imtrt004
fix: update context window and prompt
27128c4
"""Groq-powered streaming answer and quiz generation (DeepMind mode).
Automatically rotates API keys when any key hits its rate limit.
Each key carries its own model_id so different keys can use different models.
Keys are stored in the `groq_api_keys` Supabase table and managed from the
admin panel at /tus β†’ Groq Keys tab.
Reset timing uses the real `retry-after` header from Groq's 429 response so
cooldowns always match Groq's actual rate-limit windows.
"""
from __future__ import annotations
import os
import json
import re
from datetime import datetime, timedelta, UTC
from typing import Generator
from supabase import create_client
# ── Supported models & Groq Developer-plan rate limits ───────────────────────
# Source: https://console.groq.com/docs/rate-limits (Feb 2026)
# { model_id: { label, rpm, rpd, tpm, tpd } }
GROQ_MODELS: dict[str, dict] = {
# ── General-purpose chat/completion models ─────────────────────────────
"llama-3.3-70b-versatile": {
"label": "Llama 3.3 70B Versatile",
"rpm": 30, "rpd": 1_000, "tpm": 12_000, "tpd": 100_000,
},
"llama-3.1-8b-instant": {
"label": "Llama 3.1 8B Instant",
"rpm": 30, "rpd": 14_400, "tpm": 6_000, "tpd": 500_000,
},
"allam-2-7b": {
"label": "Allam 2 7B",
"rpm": 30, "rpd": 7_000, "tpm": 6_000, "tpd": 500_000,
},
"meta-llama/llama-4-maverick-17b-128e-instruct": {
"label": "Llama 4 Maverick 17B",
"rpm": 30, "rpd": 1_000, "tpm": 6_000, "tpd": 500_000,
},
"meta-llama/llama-4-scout-17b-16e-instruct": {
"label": "Llama 4 Scout 17B",
"rpm": 30, "rpd": 1_000, "tpm": 30_000, "tpd": 500_000,
},
"meta-llama/llama-guard-4-12b": {
"label": "Llama Guard 4 12B",
"rpm": 30, "rpd": 14_400, "tpm": 15_000, "tpd": 500_000,
},
"meta-llama/llama-prompt-guard-2-22m": {
"label": "Llama Prompt Guard 2 22M",
"rpm": 30, "rpd": 14_400, "tpm": 15_000, "tpd": 500_000,
},
"meta-llama/llama-prompt-guard-2-86m": {
"label": "Llama Prompt Guard 2 86M",
"rpm": 30, "rpd": 14_400, "tpm": 15_000, "tpd": 500_000,
},
"moonshotai/kimi-k2-instruct": {
"label": "Kimi K2 Instruct",
"rpm": 60, "rpd": 1_000, "tpm": 10_000, "tpd": 300_000,
},
"moonshotai/kimi-k2-instruct-0905": {
"label": "Kimi K2 Instruct 0905",
"rpm": 60, "rpd": 1_000, "tpm": 10_000, "tpd": 300_000,
},
"qwen/qwen3-32b": {
"label": "Qwen3 32B",
"rpm": 60, "rpd": 1_000, "tpm": 6_000, "tpd": 500_000,
},
"openai/gpt-oss-20b": {
"label": "GPT OSS 20B",
"rpm": 30, "rpd": 1_000, "tpm": 8_000, "tpd": 200_000,
},
"openai/gpt-oss-120b": {
"label": "GPT OSS 120B",
"rpm": 30, "rpd": 1_000, "tpm": 8_000, "tpd": 200_000,
},
"openai/gpt-oss-safeguard-20b": {
"label": "GPT OSS Safeguard 20B",
"rpm": 30, "rpd": 1_000, "tpm": 8_000, "tpd": 200_000,
},
"groq/compound": {
"label": "Groq Compound",
"rpm": 30, "rpd": 250, "tpm": 70_000, "tpd": 0,
},
"groq/compound-mini": {
"label": "Groq Compound Mini",
"rpm": 30, "rpd": 250, "tpm": 70_000, "tpd": 0,
},
# ── Audio/speech models (tpm/tpd not applicable) ───────────────────────
"canopylabs/orpheus-arabic-saudi": {
"label": "Orpheus Arabic (Saudi)",
"rpm": 10, "rpd": 100, "tpm": 1_200, "tpd": 3_600,
},
"canopylabs/orpheus-v1-english": {
"label": "Orpheus v1 English",
"rpm": 10, "rpd": 100, "tpm": 1_200, "tpd": 3_600,
},
"whisper-large-v3": {
"label": "Whisper Large v3",
"rpm": 20, "rpd": 2_000, "tpm": 0, "tpd": 0,
},
"whisper-large-v3-turbo": {
"label": "Whisper Large v3 Turbo",
"rpm": 20, "rpd": 2_000, "tpm": 0, "tpd": 0,
},
}
# Fallback model when a key has no model_id set
DEFAULT_MODEL = os.environ.get("GROQ_MODEL", "llama-3.3-70b-versatile")
SYSTEM_PROMPT = """You are DeepMind Pro β€” an expert AI study and research assistant created by Md Tusar Akon.
You are given the user's uploaded document(s) as your primary knowledge source.
CAPABILITIES:
β€’ Solve exam questions, math, and statistical problems step-by-step with full working
β€’ Summarise, explain, and analyse documents at research level
β€’ Answer general knowledge questions from your training when they go beyond the document
β€’ Suggest related concepts, interpretations, and insights based on document content
β€’ Write or explain R / Python code when relevant to the document
CITATION RULES:
When you use information directly from the document context, cite it inline as [[N]]
(e.g., [[1]], [[3]]) immediately after the relevant sentence. Each N corresponds to
[Source N] in the context. Do NOT cite general knowledge you already know from training.
BEHAVIOUR:
β€’ Document questions β†’ use context first, supplement with your knowledge if needed
β€’ General questions (theory, concepts, "what is X") β†’ answer fully from your expertise
β€’ Identity / meta questions (your name, model, training) β†’ answer honestly as DeepMind Pro by Md Tusar Akon
β€’ Exam / problem-set questions β†’ solve them completely β€” never refuse, never say the answer isn't in the doc
β€’ If context lacks detail, supplement with training knowledge and briefly note you are doing so
β€’ Be thorough, precise, and genuinely helpful β€” you operate at research level
β€’ NEVER respond with "I couldn't find that in your document" for solvable questions"""
def _build_context(chunks: list) -> str:
"""Format chunks with numbered source headers for [Source N] citation notation."""
parts = []
for i, chunk in enumerate(chunks, 1):
text = chunk.text if hasattr(chunk, "text") else str(chunk)
page_number = chunk.page_number if hasattr(chunk, "page_number") else 1
parts.append(f"[Source {i} \u2014 Page {page_number}]\n{text}")
return "\n\n---\n\n".join(parts)
QUIZ_PROMPT = """Based on the context below, generate exactly 10 multiple-choice quiz questions.
Each question must test understanding of the content, not trivia.
Context:
{context}
Respond ONLY with a JSON array, no markdown, no explanation:
[
{{
"question": "...",
"options": ["A) ...", "B) ...", "C) ...", "D) ..."],
"answer": "A",
"explanation": "Brief explanation why"
}},
...
]"""
_MAX_RETRIES = 6 # max keys to try before giving up
def _supa():
return create_client(os.environ["SUPABASE_URL"], os.environ["SUPABASE_KEY"])
def _get_available_key() -> tuple[str | None, int | None, str]:
"""Return (api_key_value, row_id, model_id) for the first available key."""
try:
now_iso = datetime.now(UTC).isoformat()
result = (
_supa()
.table("groq_api_keys")
.select("id, key_value, model_id, rate_limited_until")
.eq("is_active", True)
.order("id")
.execute()
)
for row in (result.data or []):
rl = row.get("rate_limited_until")
if rl is None or rl < now_iso:
model = row.get("model_id") or DEFAULT_MODEL
return row["key_value"], row["id"], model
except Exception as exc:
print(f"⚠️ groq_llm: failed to fetch keys from DB: {exc}")
return None, None, DEFAULT_MODEL
def _retry_after_seconds(exc: Exception, fallback: int = 65) -> int:
"""Parse the real retry-after value from Groq's 429 response headers.
Groq sends `retry-after` as a plain integer (seconds). We add a 2-second
safety buffer so we never re-try exactly at the limit boundary.
Falls back to `fallback` if the header is absent or unparseable.
"""
try:
resp = getattr(exc, "response", None)
if resp is not None:
ra = resp.headers.get("retry-after") or resp.headers.get("Retry-After")
if ra:
return max(int(float(ra)) + 2, 5)
except Exception:
pass
return fallback
def _mark_rate_limited(key_id: int, seconds: int = 65) -> None:
"""Mark a key as rate-limited until now+seconds so rotation skips it."""
try:
until = (datetime.now(UTC) + timedelta(seconds=seconds)).isoformat()
_supa().table("groq_api_keys").update({"rate_limited_until": until}) \
.eq("id", key_id).execute()
print(f"πŸ”„ groq_llm: key {key_id} rate-limited for {seconds}s (until {until[:19]}Z)")
except Exception as exc:
print(f"⚠️ groq_llm: failed to mark rate limit on key {key_id}: {exc}")
def _inc(key_id: int) -> None:
"""Atomically increment total_requests for a key (fire-and-forget)."""
try:
_supa().rpc("increment_groq_key_requests", {"key_id": key_id}).execute()
except Exception:
pass # non-critical; counters are advisory
# ── Streaming chat ─────────────────────────────────────────────────────────────
def stream_answer_groq(
query: str,
context_chunks: list,
) -> Generator[str, None, None]:
"""Stream a Groq answer, auto-rotating keys on rate-limit errors."""
try:
from groq import Groq, RateLimitError # type: ignore[import]
except ImportError:
yield "DeepMind mode requires the `groq` package. Please contact support."
return
context = _build_context(context_chunks)
messages = [
{"role": "system", "content": SYSTEM_PROMPT},
{"role": "user", "content": f"Context:\n{context}\n\nQuestion: {query}"},
]
for _attempt in range(_MAX_RETRIES):
api_key, key_id, model = _get_available_key()
if not api_key:
yield "\n\n*DeepMind unavailable β€” no API keys are configured or all are rate-limited. Please contact the admin.*"
return
try:
client = Groq(api_key=api_key)
stream = client.chat.completions.create(
model=model,
messages=messages,
max_tokens=8192,
temperature=0.0,
stream=True,
)
_inc(key_id)
for chunk in stream:
delta = chunk.choices[0].delta.content
if delta:
yield delta
return # success
except RateLimitError as exc:
wait = _retry_after_seconds(exc)
_mark_rate_limited(key_id, seconds=wait)
continue # rotate to the next key
except Exception as exc:
yield f"\n\n*DeepMind error: {exc}*"
return
yield "\n\n*All Groq API keys are currently rate-limited. Please try again in a moment.*"
# ── Quiz generation ────────────────────────────────────────────────────────────
def generate_quiz_groq(context_chunks: list) -> list[dict]:
"""Generate 10 quiz questions via Groq API with key rotation."""
try:
from groq import Groq, RateLimitError # type: ignore[import]
except ImportError:
return []
context = "\n\n".join(
(c.text if hasattr(c, "text") else str(c)) for c in context_chunks[:5]
)
messages = [{"role": "user", "content": QUIZ_PROMPT.format(context=context)}]
for _attempt in range(_MAX_RETRIES):
api_key, key_id, model = _get_available_key()
if not api_key:
return []
try:
client = Groq(api_key=api_key)
response = client.chat.completions.create(
model=model,
messages=messages,
max_tokens=4096,
temperature=0.1,
stream=False,
)
_inc(key_id)
raw = response.choices[0].message.content or ""
raw = re.sub(r"```json|```", "", raw).strip()
questions = json.loads(raw)
return questions if isinstance(questions, list) else []
except RateLimitError as exc:
wait = _retry_after_seconds(exc)
_mark_rate_limited(key_id, seconds=wait)
continue
except Exception:
return []
return []