Spaces:
Sleeping
Sleeping
| import os | |
| import logging | |
| from typing import List, Optional | |
| from fastapi import FastAPI, HTTPException | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from pydantic import BaseModel | |
| import chromadb | |
| from sentence_transformers import SentenceTransformer | |
| from huggingface_hub import snapshot_download | |
| from groq import Groq # โ ุงุณุชุจุฏุงู InferenceClient ุจู Groq | |
| # ============================================= | |
| # AgriRAG Pro V5 - Multilingual Smart Edition | |
| # (Powered by Groq - Free & Fast โก) | |
| # ============================================= | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger("AgriRAG-Pro-V5") | |
| app = FastAPI(title="AgriRAG Pro: Multilingual Smart Edition (Groq)", version="5.1") | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| HF_TOKEN = os.environ.get("HF_TOKEN") | |
| GROQ_API_KEY = os.environ.get("GROQ_API_KEY") # โ ู ูุชุงุญ Groq | |
| DATASET_REPO_ID = "ahmedsaeed2515/AgriRAG-DB" | |
| # โ ุฃูุถู ู ูุฏูู ู ุฌุงูู ุนูู Groq - ุณุฑูุน ุฌุฏุงู | |
| GROQ_MODEL = "llama-3.3-70b-versatile" | |
| class AISystem: | |
| def __init__(self): | |
| try: | |
| logger.info("โณ Initializing Knowledge Engine V5 (Groq Edition)...") | |
| snapshot_download( | |
| repo_id=DATASET_REPO_ID, | |
| repo_type="dataset", | |
| local_dir=".", | |
| token=HF_TOKEN | |
| ) | |
| self.embed_model = SentenceTransformer( | |
| "sentence-transformers/paraphrase-multilingual-mpnet-base-v2" | |
| ) | |
| self.client = chromadb.PersistentClient(path="./chroma_db") | |
| self.collection = self.client.get_collection("agricultural_knowledge") | |
| # โ ุงุณุชุฎุฏุงู Groq ุจุฏู HuggingFace InferenceClient | |
| self.llm = Groq(api_key=GROQ_API_KEY) | |
| logger.info("โ AgriRAG Pro V5 (Groq) is Online & Ready!") | |
| except Exception as e: | |
| logger.error(f"Startup Failure: {e}") | |
| raise | |
| # ------------------------------------------------------------------ | |
| # Helper: ุงุณุชุฏุนุงุก Groq ุจุฏู HuggingFace | |
| # ------------------------------------------------------------------ | |
| def _groq_chat(self, messages: list, max_tokens: int = 1024, temperature: float = 0.0) -> str: | |
| """Unified Groq chat call - ูุณุชุจุฏู self.llm.chat_completion ุงููุฏูู """ | |
| response = self.llm.chat.completions.create( | |
| model=GROQ_MODEL, | |
| messages=messages, | |
| max_tokens=max_tokens, | |
| temperature=temperature, | |
| ) | |
| return response.choices[0].message.content.strip() | |
| # ------------------------------------------------------------------ | |
| # STEP 1: Translate user question to English and Hindi using the LLM | |
| # ------------------------------------------------------------------ | |
| def translate_query(self, question: str) -> dict: | |
| """ | |
| Returns {"en": "...", "hi": "...", "detected_lang": "..."} | |
| Uses a single LLM call to detect language and translate. | |
| """ | |
| prompt = f"""You are a translation assistant. Given the user's question below: | |
| 1. Detect the language (respond with the ISO 639-1 code, e.g. "ar", "en", "hi", "fr"). | |
| 2. Translate the question to English. | |
| 3. Translate the question to Hindi (Devanagari script). | |
| Respond ONLY in this exact JSON format (no markdown, no explanation): | |
| {{"detected_lang": "<code>", "en": "<english translation>", "hi": "<hindi translation>"}} | |
| User question: {question}""" | |
| try: | |
| import json | |
| raw = self._groq_chat( | |
| messages=[{"role": "user", "content": prompt}], | |
| max_tokens=256, | |
| temperature=0.0, | |
| ) | |
| # Strip possible markdown fences | |
| raw = raw.replace("```json", "").replace("```", "").strip() | |
| translations = json.loads(raw) | |
| logger.info(f"๐ Translations: {translations}") | |
| return translations | |
| except Exception as e: | |
| logger.warning(f"Translation failed, falling back to original: {e}") | |
| return {"detected_lang": "en", "en": question, "hi": question} | |
| # ------------------------------------------------------------------ | |
| # STEP 2: Multi-lingual vector search | |
| # ------------------------------------------------------------------ | |
| def multilingual_search(self, translations: dict, top_k: int = 8) -> list: | |
| """ | |
| Searches ChromaDB with English AND Hindi queries, | |
| merges results, deduplicates, and returns the best chunks. | |
| """ | |
| sources_filter = { | |
| "source": { | |
| "$in": ["Plant_Diseases_QA", "KisanVaani_Farmers", "SARTHI_Advisory"] | |
| } | |
| } | |
| queries = [translations["en"], translations["hi"]] | |
| all_chunks = {} # doc_text -> metadata (dedup by content) | |
| for query in queries: | |
| try: | |
| vec = self.embed_model.encode([query]).tolist() | |
| results = self.collection.query( | |
| query_embeddings=vec, | |
| n_results=top_k, | |
| where=sources_filter, | |
| ) | |
| for doc, meta in zip( | |
| results["documents"][0], results["metadatas"][0] | |
| ): | |
| if doc not in all_chunks: | |
| all_chunks[doc] = meta | |
| except Exception as e: | |
| logger.warning(f"Search error for query '{query}': {e}") | |
| merged = [ | |
| {"doc": doc, "meta": meta} for doc, meta in all_chunks.items() | |
| ][: top_k * 2] | |
| logger.info(f"๐ Retrieved {len(merged)} unique knowledge chunks") | |
| return merged | |
| # ------------------------------------------------------------------ | |
| # STEP 3: Generate smart, language-matched answer | |
| # ------------------------------------------------------------------ | |
| def generate_answer( | |
| self, | |
| question: str, | |
| chunks: list, | |
| detected_lang: str, | |
| history: list, | |
| ) -> str: | |
| # Build context string | |
| context_parts = [] | |
| for i, item in enumerate(chunks): | |
| source = item["meta"].get("source", "Unknown") | |
| context_parts.append(f"[Source {i+1} | {source}]:\n{item['doc']}") | |
| context_text = "\n\n".join(context_parts) | |
| system_prompt = f"""You are 'AgriRAG Pro', an expert global agricultural scientist and advisor. | |
| STRICT RULES โ follow ALL of them: | |
| 1. LANGUAGE: The user's language code is '{detected_lang}'. You MUST reply in that EXACT language. | |
| - Arabic (ar) โ reply fully in Arabic | |
| - English (en) โ reply fully in English | |
| - Hindi (hi) โ reply fully in Hindi | |
| - Any other language โ reply in that language | |
| Never mix languages in your answer. | |
| 2. SMART SELECTION: You are given knowledge chunks from multiple sources. | |
| Read all chunks, then select ONLY the information that is accurate and directly answers the question. | |
| Ignore chunks that are about different crops, diseases, or regions. | |
| 3. NO REPETITION: Never repeat the same sentence or advice more than once. | |
| 4. STRUCTURED & INTERACTIVE: | |
| - Use clear headings and bullet points. | |
| - At the end, ask the user ONE follow-up question to help them further (in their language). | |
| 5. CITATIONS: Cite sources you used as [1], [2], etc. | |
| 6. IF NO RELEVANT INFO: If none of the chunks actually answer the question, say so honestly | |
| and give general best-practice advice IN THE USER'S LANGUAGE. Do NOT fabricate sources.""" | |
| messages = [{"role": "system", "content": system_prompt}] | |
| # Last 4 messages of history for context | |
| for msg in history[-4:]: | |
| messages.append({"role": msg["role"], "content": msg["content"]}) | |
| user_prompt = ( | |
| f"Knowledge Base Context:\n{context_text}\n\n" | |
| f"User Question: {question}" | |
| ) | |
| messages.append({"role": "user", "content": user_prompt}) | |
| return self._groq_chat( | |
| messages=messages, | |
| max_tokens=1024, | |
| temperature=0.2, | |
| ) | |
| ai = AISystem() | |
| # ------------------------------------------------------------------ | |
| # Models | |
| # ------------------------------------------------------------------ | |
| class Message(BaseModel): | |
| role: str | |
| content: str | |
| class ChatRequest(BaseModel): | |
| question: str | |
| history: Optional[List[Message]] = [] | |
| top_k: int = 8 | |
| # ------------------------------------------------------------------ | |
| # Endpoint | |
| # ------------------------------------------------------------------ | |
| async def chat_endpoint(request: ChatRequest): | |
| try: | |
| logger.info(f"๐ฅ Question: {request.question}") | |
| # Step 1: Detect language & translate | |
| translations = ai.translate_query(request.question) | |
| detected_lang = translations.get("detected_lang", "en") | |
| # Step 2: Multi-lingual search | |
| chunks = ai.multilingual_search(translations, top_k=request.top_k) | |
| # Step 3: Generate smart answer | |
| history_dicts = [m.dict() for m in request.history] | |
| answer = ai.generate_answer( | |
| question=request.question, | |
| chunks=chunks, | |
| detected_lang=detected_lang, | |
| history=history_dicts, | |
| ) | |
| return { | |
| "answer": answer, | |
| "detected_language": detected_lang, | |
| "sources_used": [c["meta"] for c in chunks], | |
| } | |
| except Exception as e: | |
| logger.error(f"Execution Error: {str(e)}") | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def health(): | |
| return {"status": "AgriRAG Pro V5 (Groq โก) is running โ "} | |
| # # ============================================================================= | |
| # # AgriRAG Pro โ FINAL PRODUCTION EDITION | |
| # # Version: 6.0 | BrandCode | 2025 | |
| # # Author notes: Zero-tolerance for failures. Full multilingual. HF-Spaces ready. | |
| # # ============================================================================= | |
| # from __future__ import annotations | |
| # import json | |
| # import logging | |
| # import os | |
| # import re | |
| # import time | |
| # import unicodedata | |
| # import uuid | |
| # from collections import deque | |
| # from contextlib import asynccontextmanager | |
| # from functools import wraps | |
| # from threading import Lock | |
| # from typing import AsyncGenerator, Dict, List, Optional, Tuple | |
| # import numpy as np | |
| # from fastapi import FastAPI, HTTPException, Request, status | |
| # from fastapi.middleware.cors import CORSMiddleware | |
| # from fastapi.responses import StreamingResponse | |
| # from huggingface_hub import InferenceClient, snapshot_download | |
| # from pydantic import BaseModel, Field, field_validator | |
| # from sentence_transformers import SentenceTransformer | |
| # try: | |
| # import chromadb | |
| # except ImportError as exc: | |
| # raise RuntimeError("chromadb is required โ pip install chromadb") from exc | |
| # # ============================================================================= | |
| # # SECTION 1 โ CONFIGURATION (single place for every tunable value) | |
| # # ============================================================================= | |
| # class Config: | |
| # # โโ Hugging Face โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ | |
| # HF_TOKEN: str = os.environ.get("HF_TOKEN", "") | |
| # LLM_MODEL: str = os.environ.get("LLM_MODEL", "Qwen/Qwen2.5-7B-Instruct") | |
| # EMBED_MODEL: str = os.environ.get("EMBED_MODEL", "BAAI/bge-m3") # BGE-M3 โ best multilingual | |
| # DATASET_REPO_ID: str = os.environ.get("DATASET_REPO_ID", "ahmedsaeed2515/AgriRAG-DB") | |
| # # โโ ChromaDB โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ | |
| # CHROMA_PATH: str = os.environ.get("CHROMA_PATH", "./chroma_db") | |
| # COLLECTION_NAME: str = os.environ.get("COLLECTION_NAME", "agricultural_knowledge") | |
| # ALLOWED_SOURCES: list = ["Plant_Diseases_QA", "KisanVaani_Farmers", "SARTHI_Advisory"] | |
| # # โโ Search / RAG โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ | |
| # DEFAULT_TOP_K: int = 8 # chunks fetched per query | |
| # MAX_TOP_K: int = 20 # upper bound allowed from caller | |
| # RERANK_TOP_N: int = 6 # final chunks sent to LLM after re-ranking | |
| # # โโ LLM generation โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ | |
| # LLM_MAX_TOKENS: int = 800 | |
| # LLM_TEMPERATURE: float = 0.30 | |
| # LLM_TOP_P: float = 0.90 | |
| # LLM_REP_PENALTY: float = 1.15 # KEY FIX for repetition loop | |
| # LLM_RETRY_COUNT: int = 3 | |
| # LLM_RETRY_DELAY: float = 2.0 # seconds between retries (doubles each time) | |
| # # โโ Translation โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ | |
| # TRANSLATE_MAX_TOKENS: int = 256 | |
| # # โโ Input validation โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ | |
| # MAX_QUESTION_LEN: int = 1000 | |
| # MAX_HISTORY_MSGS: int = 10 # stored; only last 4 sent to LLM | |
| # MAX_CONTEXT_CHARS: int = 12_000 # truncate knowledge context if too long | |
| # # โโ Semantic cache โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ | |
| # CACHE_MAX_SIZE: int = 256 | |
| # CACHE_SIM_THRESH: float = 0.92 # cosine similarity above this = cache hit | |
| # # โโ Loop detector โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ | |
| # LOOP_MIN_WORDS: int = 20 # don't check short replies | |
| # LOOP_WINDOW: int = 30 # look at last N tokens | |
| # LOOP_UNIQUE_MIN: int = 5 # if unique < this โ loop detected | |
| # # โโ Misc โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ | |
| # APP_VERSION: str = "6.0" | |
| # LOG_LEVEL: str = os.environ.get("LOG_LEVEL", "INFO") | |
| # # ============================================================================= | |
| # # SECTION 2 โ LOGGING | |
| # # ============================================================================= | |
| # logging.basicConfig( | |
| # level=getattr(logging, Config.LOG_LEVEL, logging.INFO), | |
| # format="%(asctime)s | %(levelname)-8s | %(name)s | %(message)s", | |
| # datefmt="%Y-%m-%d %H:%M:%S", | |
| # ) | |
| # logger = logging.getLogger("AgriRAG-Pro") | |
| # # ============================================================================= | |
| # # SECTION 3 โ UTILITY HELPERS | |
| # # ============================================================================= | |
| # # โโ 3.1 Retry decorator with exponential back-off โโโโโโโโโโโโโโโโโโโโโโโโโโโ | |
| # def retry(max_attempts: int = 3, base_delay: float = 2.0, exceptions=(Exception,)): | |
| # """Decorator: retry on exception with exponential back-off.""" | |
| # def decorator(func): | |
| # @wraps(func) | |
| # def wrapper(*args, **kwargs): | |
| # last_exc: Exception = RuntimeError("unknown") | |
| # for attempt in range(1, max_attempts + 1): | |
| # try: | |
| # return func(*args, **kwargs) | |
| # except exceptions as exc: | |
| # last_exc = exc | |
| # if attempt < max_attempts: | |
| # delay = base_delay * (2 ** (attempt - 1)) | |
| # logger.warning( | |
| # f"[retry] {func.__name__} attempt {attempt}/{max_attempts} " | |
| # f"failed: {exc}. Retrying in {delay:.1f}s โฆ" | |
| # ) | |
| # time.sleep(delay) | |
| # else: | |
| # logger.error( | |
| # f"[retry] {func.__name__} exhausted {max_attempts} attempts." | |
| # ) | |
| # raise last_exc | |
| # return wrapper | |
| # return decorator | |
| # # โโ 3.2 Safe JSON parser with regex fall-back โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ | |
| # _JSON_FENCE_RE = re.compile(r"```[\w]*", re.MULTILINE) | |
| # def safe_parse_json(raw: str) -> Optional[dict]: | |
| # """ | |
| # Parse JSON from an LLM response that may include markdown fences or extra text. | |
| # Returns None on total failure (caller must handle). | |
| # """ | |
| # if not raw: | |
| # return None | |
| # # 1. Strip markdown fences | |
| # cleaned = _JSON_FENCE_RE.sub("", raw).replace("```", "").strip() | |
| # # 2. Try direct parse | |
| # try: | |
| # return json.loads(cleaned) | |
| # except json.JSONDecodeError: | |
| # pass | |
| # # 3. Extract first JSON object with regex | |
| # match = re.search(r"\{[^{}]*\}", cleaned, re.DOTALL) | |
| # if match: | |
| # try: | |
| # return json.loads(match.group()) | |
| # except json.JSONDecodeError: | |
| # pass | |
| # # 4. Extract individual fields with regex (last resort) | |
| # result: Dict[str, str] = {} | |
| # for key in ("detected_lang", "en", "hi"): | |
| # m = re.search(rf'"{key}"\s*:\s*"([^"]+)"', cleaned) | |
| # if m: | |
| # result[key] = m.group(1) | |
| # return result if result else None | |
| # # โโ 3.3 Script-based language heuristics (zero-dependency fallback) โโโโโโโโโโ | |
| # _ARABIC_RE = re.compile(r"[\u0600-\u06FF]") | |
| # _DEVANAGARI_RE = re.compile(r"[\u0900-\u097F]") # Hindi | |
| # _CHINESE_RE = re.compile(r"[\u4E00-\u9FFF]") | |
| # _CYRILLIC_RE = re.compile(r"[\u0400-\u04FF]") # Russian | |
| # _LATIN_RE = re.compile(r"[a-zA-Z]") | |
| # def heuristic_lang(text: str) -> str: | |
| # """Detect language from Unicode script. Returns ISO 639-1 code.""" | |
| # if not text: | |
| # return "en" | |
| # counts = { | |
| # "ar": len(_ARABIC_RE.findall(text)), | |
| # "hi": len(_DEVANAGARI_RE.findall(text)), | |
| # "zh": len(_CHINESE_RE.findall(text)), | |
| # "ru": len(_CYRILLIC_RE.findall(text)), | |
| # "en": len(_LATIN_RE.findall(text)), | |
| # } | |
| # dominant = max(counts, key=counts.get) | |
| # # If the dominant count is tiny, default to English | |
| # return dominant if counts[dominant] > 2 else "en" | |
| # # โโ 3.4 Repetition / loop detector โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ | |
| # def detect_and_fix_loop(text: str) -> Tuple[bool, str]: | |
| # """ | |
| # Detect if the LLM response contains a repetition loop. | |
| # Returns (was_looping: bool, cleaned_text: str). | |
| # """ | |
| # words = text.split() | |
| # if len(words) < Config.LOOP_MIN_WORDS: | |
| # return False, text | |
| # window = words[-Config.LOOP_WINDOW:] | |
| # unique = set(window) | |
| # if len(unique) < Config.LOOP_UNIQUE_MIN: | |
| # # Find the last 'clean' sentence before the loop starts | |
| # sentences = re.split(r"(?<=[.!?ุเฅค\n])\s+", text) | |
| # clean_sentences = [] | |
| # running_words: deque = deque(maxlen=Config.LOOP_WINDOW) | |
| # for sent in sentences: | |
| # sent_words = sent.split() | |
| # running_words.extend(sent_words) | |
| # if len(set(list(running_words)[-Config.LOOP_WINDOW:])) >= Config.LOOP_UNIQUE_MIN: | |
| # clean_sentences.append(sent) | |
| # else: | |
| # break # loop starts here, stop | |
| # cleaned = " ".join(clean_sentences).strip() | |
| # if not cleaned: | |
| # cleaned = text[:500] + "\n\n[Response truncated due to generation error]" | |
| # return True, cleaned | |
| # return False, text | |
| # # โโ 3.5 Context window guard: truncate knowledge context if too long โโโโโโโโโ | |
| # def truncate_context(context: str, max_chars: int = Config.MAX_CONTEXT_CHARS) -> str: | |
| # if len(context) <= max_chars: | |
| # return context | |
| # truncated = context[:max_chars] | |
| # last_newline = truncated.rfind("\n") | |
| # return (truncated[:last_newline] if last_newline > 0 else truncated) + \ | |
| # "\n\n[... context truncated for length ...]" | |
| # # ============================================================================= | |
| # # SECTION 4 โ SEMANTIC CACHE (thread-safe in-memory) | |
| # # ============================================================================= | |
| # class SemanticCache: | |
| # """ | |
| # Thread-safe LRU-like semantic cache. | |
| # Stores (embedding, answer, detected_lang) tuples. | |
| # Cache hit when cosine_similarity >= threshold. | |
| # """ | |
| # def __init__(self, max_size: int = Config.CACHE_MAX_SIZE, | |
| # threshold: float = Config.CACHE_SIM_THRESH): | |
| # self._cache: List[Tuple[np.ndarray, str, str]] = [] # (emb, answer, lang) | |
| # self._lock = Lock() | |
| # self.max_size = max_size | |
| # self.threshold = threshold | |
| # self.hits = 0 | |
| # self.misses = 0 | |
| # def _cosine_sim(self, a: np.ndarray, b: np.ndarray) -> float: | |
| # denom = (np.linalg.norm(a) * np.linalg.norm(b)) | |
| # return float(np.dot(a, b) / denom) if denom > 1e-9 else 0.0 | |
| # def lookup(self, emb: np.ndarray) -> Optional[Tuple[str, str]]: | |
| # """Return (answer, detected_lang) if cache hit, else None.""" | |
| # with self._lock: | |
| # for stored_emb, answer, lang in self._cache: | |
| # if self._cosine_sim(emb, stored_emb) >= self.threshold: | |
| # self.hits += 1 | |
| # return answer, lang | |
| # self.misses += 1 | |
| # return None | |
| # def store(self, emb: np.ndarray, answer: str, lang: str) -> None: | |
| # with self._lock: | |
| # if len(self._cache) >= self.max_size: | |
| # self._cache.pop(0) # evict oldest | |
| # self._cache.append((emb, answer, lang)) | |
| # @property | |
| # def stats(self) -> dict: | |
| # return { | |
| # "size": len(self._cache), | |
| # "hits": self.hits, | |
| # "misses": self.misses, | |
| # "hit_rate": round(self.hits / max(1, self.hits + self.misses), 3), | |
| # } | |
| # # ============================================================================= | |
| # # SECTION 5 โ CORE AI SYSTEM | |
| # # ============================================================================= | |
| # class AISystem: | |
| # """ | |
| # Encapsulates all AI logic: embedding, retrieval, translation, generation. | |
| # Designed for zero unhandled exceptions after initialization. | |
| # """ | |
| # def __init__(self): | |
| # self._validate_config() | |
| # self.cache = SemanticCache() | |
| # self._request_count = 0 | |
| # self._error_count = 0 | |
| # self._ready = False | |
| # logger.info("=" * 60) | |
| # logger.info(f" AgriRAG Pro v{Config.APP_VERSION} โ Starting Up") | |
| # logger.info("=" * 60) | |
| # self._load_dataset() | |
| # self._load_embed_model() | |
| # self._load_chromadb() | |
| # self._load_llm() | |
| # self._ready = True | |
| # logger.info("โ AgriRAG Pro is ONLINE and ready for production traffic.") | |
| # # โโ Startup helpers โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ | |
| # @staticmethod | |
| # def _validate_config(): | |
| # if not Config.HF_TOKEN: | |
| # raise EnvironmentError( | |
| # "HF_TOKEN environment variable is not set. " | |
| # "Set it in your Hugging Face Space secrets." | |
| # ) | |
| # @retry(max_attempts=3, base_delay=5.0) | |
| # def _load_dataset(self): | |
| # logger.info(f"โฌ๏ธ Downloading dataset: {Config.DATASET_REPO_ID}") | |
| # snapshot_download( | |
| # repo_id=Config.DATASET_REPO_ID, | |
| # repo_type="dataset", | |
| # local_dir=".", | |
| # token=Config.HF_TOKEN, | |
| # ignore_patterns=["*.md", "*.txt"], # skip docs, only get DB files | |
| # ) | |
| # logger.info("โ Dataset downloaded.") | |
| # @retry(max_attempts=2, base_delay=3.0) | |
| # def _load_embed_model(self): | |
| # logger.info(f"๐ง Loading embedding model: {Config.EMBED_MODEL}") | |
| # self.embed_model = SentenceTransformer(Config.EMBED_MODEL) | |
| # logger.info("โ Embedding model loaded.") | |
| # def _load_chromadb(self): | |
| # logger.info(f"๐๏ธ Opening ChromaDB at: {Config.CHROMA_PATH}") | |
| # self.chroma_client = chromadb.PersistentClient(path=Config.CHROMA_PATH) | |
| # try: | |
| # self.collection = self.chroma_client.get_collection(Config.COLLECTION_NAME) | |
| # count = self.collection.count() | |
| # logger.info(f"โ ChromaDB collection '{Config.COLLECTION_NAME}' opened โ {count} documents.") | |
| # except Exception as exc: | |
| # raise RuntimeError( | |
| # f"ChromaDB collection '{Config.COLLECTION_NAME}' not found. " | |
| # f"Make sure the dataset download succeeded and the DB is built. " | |
| # f"Original error: {exc}" | |
| # ) from exc | |
| # def _load_llm(self): | |
| # logger.info(f"๐ค Connecting to LLM: {Config.LLM_MODEL}") | |
| # self.llm = InferenceClient(model=Config.LLM_MODEL, token=Config.HF_TOKEN) | |
| # logger.info("โ LLM client ready.") | |
| # # ========================================================================= | |
| # # STEP 1 โ Encode query for cache lookup + retrieval | |
| # # ========================================================================= | |
| # def encode_query(self, text: str) -> np.ndarray: | |
| # """Embed a query string. Returns normalized numpy array.""" | |
| # vec = self.embed_model.encode( | |
| # [text], | |
| # normalize_embeddings=True, # BGE-M3 expects normalized for cosine | |
| # show_progress_bar=False, | |
| # ) | |
| # return vec[0] | |
| # # ========================================================================= | |
| # # STEP 2 โ Language detection + translation | |
| # # ========================================================================= | |
| # @retry(max_attempts=2, base_delay=1.0) | |
| # def _call_translate_llm(self, question: str) -> Optional[dict]: | |
| # """Single LLM call for language detection + translation.""" | |
| # prompt = ( | |
| # "TASK: Detect language and translate the text below.\n" | |
| # "OUTPUT FORMAT: Return ONLY a valid JSON object with exactly these 3 keys.\n" | |
| # "No preamble, no explanation, no markdown fences.\n\n" | |
| # '{"detected_lang": "<ISO-639-1 code>", "en": "<English translation>", "hi": "<Hindi translation in Devanagari>"}\n\n' | |
| # f"TEXT: {question}" | |
| # ) | |
| # response = self.llm.chat_completion( | |
| # messages=[{"role": "user", "content": prompt}], | |
| # max_tokens=Config.TRANSLATE_MAX_TOKENS, | |
| # temperature=0.0, | |
| # ) | |
| # raw = response.choices[0].message.content.strip() | |
| # return safe_parse_json(raw) | |
| # def translate_query(self, question: str) -> dict: | |
| # """ | |
| # Returns: {"detected_lang": str, "en": str, "hi": str} | |
| # Never raises โ falls back gracefully to heuristics. | |
| # """ | |
| # # Try LLM-based detection first | |
| # try: | |
| # result = self._call_translate_llm(question) | |
| # if result and all(k in result for k in ("detected_lang", "en", "hi")): | |
| # # Validate language code is sane (2-3 lowercase chars) | |
| # lang = str(result["detected_lang"]).strip().lower()[:5] | |
| # if not re.match(r"^[a-z]{2,3}$", lang): | |
| # lang = heuristic_lang(question) | |
| # logger.info(f"๐ Lang: {lang} | EN: {result['en'][:60]}โฆ") | |
| # return { | |
| # "detected_lang": lang, | |
| # "en": str(result.get("en", question)), | |
| # "hi": str(result.get("hi", question)), | |
| # } | |
| # except Exception as exc: | |
| # logger.warning(f"Translation LLM failed: {exc}. Using heuristic fallback.") | |
| # # Heuristic fallback โ we can't translate, but at least detect language | |
| # lang = heuristic_lang(question) | |
| # logger.info(f"๐ Heuristic lang: {lang}") | |
| # return {"detected_lang": lang, "en": question, "hi": question} | |
| # # ========================================================================= | |
| # # STEP 3 โ Multilingual vector search + cosine re-ranking | |
| # # ========================================================================= | |
| # def _search_single(self, query: str, top_k: int) -> List[Tuple[str, dict, float]]: | |
| # """ | |
| # Search ChromaDB for a single query string. | |
| # Returns list of (doc_text, metadata, distance). | |
| # """ | |
| # vec = self.embed_model.encode( | |
| # [query], normalize_embeddings=True, show_progress_bar=False | |
| # ).tolist() | |
| # filter_clause = {"source": {"$in": Config.ALLOWED_SOURCES}} | |
| # results = self.collection.query( | |
| # query_embeddings=vec, | |
| # n_results=min(top_k, self.collection.count()), | |
| # where=filter_clause, | |
| # include=["documents", "metadatas", "distances"], | |
| # ) | |
| # items = [] | |
| # docs = results.get("documents", [[]])[0] | |
| # metas = results.get("metadatas", [[]])[0] | |
| # distances = results.get("distances", [[]])[0] | |
| # for doc, meta, dist in zip(docs, metas, distances): | |
| # if doc and doc.strip(): | |
| # items.append((doc, meta, dist)) | |
| # return items | |
| # def _cosine_rerank( | |
| # self, | |
| # query_emb: np.ndarray, | |
| # candidates: List[Tuple[str, dict]], | |
| # top_n: int, | |
| # ) -> List[dict]: | |
| # """ | |
| # Re-rank candidates by cosine similarity with the query embedding. | |
| # Returns top_n as list of {"doc": ..., "meta": ..., "score": ...}. | |
| # """ | |
| # if not candidates: | |
| # return [] | |
| # texts = [c[0] for c in candidates] | |
| # chunk_embs = self.embed_model.encode( | |
| # texts, normalize_embeddings=True, show_progress_bar=False | |
| # ) | |
| # scores = np.dot(chunk_embs, query_emb) # dot product of normalized = cosine | |
| # ranked = sorted( | |
| # zip(scores, candidates), key=lambda x: x[0], reverse=True | |
| # ) | |
| # return [ | |
| # {"doc": doc, "meta": meta, "score": float(score)} | |
| # for score, (doc, meta) in ranked[:top_n] | |
| # ] | |
| # def multilingual_search( | |
| # self, | |
| # translations: dict, | |
| # query_emb: np.ndarray, | |
| # top_k: int = Config.DEFAULT_TOP_K, | |
| # ) -> List[dict]: | |
| # """ | |
| # Search ChromaDB with EN + HI queries, deduplicate, re-rank, return top chunks. | |
| # Never raises โ returns empty list on complete failure. | |
| # """ | |
| # queries = [translations["en"], translations["hi"]] | |
| # # If original query is a 3rd language and is different, add it too | |
| # original = translations.get("original", "") | |
| # if original and original not in queries: | |
| # queries.append(original) | |
| # seen_docs: Dict[str, Tuple[str, dict]] = {} # doc_text โ (doc, meta) | |
| # for q in queries: | |
| # if not q or not q.strip(): | |
| # continue | |
| # try: | |
| # items = self._search_single(q, top_k) | |
| # for doc, meta, _dist in items: | |
| # if doc not in seen_docs: | |
| # seen_docs[doc] = (doc, meta) | |
| # except Exception as exc: | |
| # logger.warning(f"Search failed for query '{q[:40]}โฆ': {exc}") | |
| # if not seen_docs: | |
| # logger.warning("โ ๏ธ No chunks retrieved from ChromaDB.") | |
| # return [] | |
| # candidates = list(seen_docs.values()) | |
| # ranked = self._cosine_rerank(query_emb, candidates, top_n=Config.RERANK_TOP_N) | |
| # logger.info( | |
| # f"๐ Retrieved {len(seen_docs)} unique chunks โ " | |
| # f"re-ranked to top {len(ranked)}." | |
| # ) | |
| # return ranked | |
| # # ========================================================================= | |
| # # STEP 4 โ Answer generation | |
| # # ========================================================================= | |
| # def _build_system_prompt(self, detected_lang: str) -> str: | |
| # lang_map = { | |
| # "ar": "Arabic (ุงูุนุฑุจูุฉ)", | |
| # "en": "English", | |
| # "hi": "Hindi (เคนเคฟเคจเฅเคฆเฅ)", | |
| # "es": "Spanish (Espaรฑol)", | |
| # "fr": "French (Franรงais)", | |
| # "de": "German (Deutsch)", | |
| # "zh": "Chinese (ไธญๆ)", | |
| # "ru": "Russian (ะ ัััะบะธะน)", | |
| # "sw": "Swahili", | |
| # "pt": "Portuguese", | |
| # "ur": "Urdu (ุงุฑุฏู)", | |
| # } | |
| # lang_name = lang_map.get(detected_lang, f"the language with ISO code '{detected_lang}'") | |
| # return f"""โ ๏ธ ABSOLUTE PRIORITY DIRECTIVE: | |
| # You MUST respond entirely in {lang_name} (ISO code: {detected_lang}). | |
| # If the knowledge context below is in a different language, TRANSLATE the relevant parts to {lang_name} before answering. | |
| # If you respond in ANY other language, you have critically failed your task. | |
| # DO NOT repeat any sentence, phrase, or word cluster more than once. Stop writing the moment you are done. | |
| # โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ | |
| # You are AgriRAG Pro โ an expert multilingual agricultural scientist and advisor. | |
| # โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ | |
| # STRICT RULES: | |
| # 1. LANGUAGE: Respond 100% in {lang_name}. No mixing. No switching. | |
| # 2. SMART SELECTION: You have knowledge chunks below. Read ALL of them. | |
| # Use ONLY chunks that are directly relevant to the user's question. | |
| # Silently ignore chunks about unrelated crops, diseases, or regions. | |
| # 3. STRUCTURED ANSWER: | |
| # โข Use clear headings (##) and bullet points where helpful. | |
| # โข Keep the answer practical and actionable for a farmer. | |
| # 4. CITATIONS: Cite every source you use inline as [1], [2], etc. | |
| # Match citation numbers to the Source numbers in the context below. | |
| # 5. NO HALLUCINATION: Never invent facts or cite sources you did not use. | |
| # If no relevant chunk exists, clearly say so and provide evidence-based | |
| # general best-practice advice IN {lang_name}. | |
| # 6. NO REPETITION: Never write the same idea, phrase, or sentence twice. | |
| # If you catch yourself repeating, stop and move to the next point. | |
| # 7. FOLLOW-UP: End with exactly ONE helpful follow-up question to the user, | |
| # written in {lang_name}, to help them further.""" | |
| # @retry(max_attempts=Config.LLM_RETRY_COUNT, base_delay=Config.LLM_RETRY_DELAY) | |
| # def _call_generation_llm(self, messages: list) -> str: | |
| # """Single LLM generation call with all production parameters.""" | |
| # response = self.llm.chat_completion( | |
| # messages=messages, | |
| # max_tokens=Config.LLM_MAX_TOKENS, | |
| # temperature=Config.LLM_TEMPERATURE, | |
| # top_p=Config.LLM_TOP_P, | |
| # repetition_penalty=Config.LLM_REP_PENALTY, # โ KEY FIX | |
| # ) | |
| # return response.choices[0].message.content.strip() | |
| # def generate_answer( | |
| # self, | |
| # question: str, | |
| # chunks: List[dict], | |
| # detected_lang: str, | |
| # history: List[dict], | |
| # ) -> Tuple[str, bool]: | |
| # """ | |
| # Generate answer. Returns (answer_text, was_loop_detected). | |
| # Never raises โ returns a safe error message on complete failure. | |
| # """ | |
| # # Build context | |
| # if chunks: | |
| # context_parts = [ | |
| # f"[Source {i + 1} | {item['meta'].get('source', 'Unknown')} " | |
| # f"| relevance: {item.get('score', 0):.2f}]:\n{item['doc']}" | |
| # for i, item in enumerate(chunks) | |
| # ] | |
| # context_text = truncate_context("\n\n".join(context_parts)) | |
| # else: | |
| # context_text = "[No relevant knowledge found in the database for this query.]" | |
| # system_prompt = self._build_system_prompt(detected_lang) | |
| # messages = [{"role": "system", "content": system_prompt}] | |
| # # Include last 4 history messages (avoid context overflow) | |
| # for msg in history[-4:]: | |
| # role = msg.get("role", "user") | |
| # content = msg.get("content", "") | |
| # if role in ("user", "assistant") and content: | |
| # messages.append({"role": role, "content": content[:2000]}) # cap each msg | |
| # user_prompt = ( | |
| # f"โโโ KNOWLEDGE BASE CONTEXT โโโ\n{context_text}\n\n" | |
| # f"โโโ USER QUESTION โโโ\n{question}" | |
| # ) | |
| # messages.append({"role": "user", "content": user_prompt}) | |
| # try: | |
| # raw_answer = self._call_generation_llm(messages) | |
| # except Exception as exc: | |
| # logger.error(f"LLM generation failed after all retries: {exc}") | |
| # # Return a safe, language-aware sorry message | |
| # sorry_msgs = { | |
| # "ar": "ุนุฐุฑุงูุ ุญุฏุซ ุฎุทุฃ ูู ู ุนุงูุฌุฉ ุทูุจู. ูุฑุฌู ุงูู ุญุงููุฉ ู ุฑุฉ ุฃุฎุฑู.", | |
| # "hi": "เคเฅเคทเคฎเคพ เคเคฐเฅเค, เคเคชเคเฅ เค เคจเฅเคฐเฅเคง เคเฅ เคธเคเคธเคพเคงเคฟเคค เคเคฐเคจเฅ เคฎเฅเค เคคเฅเคฐเฅเคเคฟ เคนเฅเคเฅค เคเฅเคชเคฏเคพ เคชเฅเคจเค เคชเฅเคฐเคฏเคพเคธ เคเคฐเฅเคเฅค", | |
| # "es": "Lo sentimos, ocurriรณ un error al procesar su solicitud. Por favor, intรฉntelo de nuevo.", | |
| # "fr": "Dรฉsolรฉ, une erreur s'est produite. Veuillez rรฉessayer.", | |
| # } | |
| # return sorry_msgs.get(detected_lang, | |
| # "Sorry, an error occurred while processing your request. Please try again."), False | |
| # # Check and fix loops before returning | |
| # was_loop, cleaned = detect_and_fix_loop(raw_answer) | |
| # if was_loop: | |
| # logger.warning("๐ Repetition loop detected and cleaned in LLM output.") | |
| # return cleaned, was_loop | |
| # # ========================================================================= | |
| # # Full pipeline (sync) | |
| # # ========================================================================= | |
| # def pipeline( | |
| # self, | |
| # question: str, | |
| # history: List[dict], | |
| # top_k: int = Config.DEFAULT_TOP_K, | |
| # request_id: str = "", | |
| # ) -> dict: | |
| # """ | |
| # End-to-end pipeline: translate โ embed โ cache check โ search โ generate. | |
| # Returns standardised response dict. Never raises. | |
| # """ | |
| # self._request_count += 1 | |
| # t_start = time.time() | |
| # log_prefix = f"[{request_id}]" if request_id else "" | |
| # try: | |
| # # โโ Translate + detect lang โโ | |
| # translations = self.translate_query(question) | |
| # # Store original for possible 3rd-language search | |
| # translations["original"] = question | |
| # detected_lang = translations["detected_lang"] | |
| # # โโ Embed query (shared for cache + rerank) โโ | |
| # query_emb = self.encode_query(translations["en"]) | |
| # # โโ Cache lookup โโ | |
| # cached = self.cache.lookup(query_emb) | |
| # if cached: | |
| # cached_answer, cached_lang = cached | |
| # logger.info(f"{log_prefix} ๐ฏ Cache HIT โ returning cached answer.") | |
| # return { | |
| # "answer": cached_answer, | |
| # "detected_language": cached_lang, | |
| # "sources_used": [], | |
| # "cache_hit": True, | |
| # "loop_detected": False, | |
| # "latency_ms": int((time.time() - t_start) * 1000), | |
| # "request_id": request_id, | |
| # } | |
| # # โโ Vector search + re-rank โโ | |
| # effective_top_k = min(top_k, Config.MAX_TOP_K) | |
| # chunks = self.multilingual_search(translations, query_emb, effective_top_k) | |
| # # โโ Generate โโ | |
| # answer, was_loop = self.generate_answer( | |
| # question=question, | |
| # chunks=chunks, | |
| # detected_lang=detected_lang, | |
| # history=history, | |
| # ) | |
| # # โโ Store in cache โโ | |
| # self.cache.store(query_emb, answer, detected_lang) | |
| # latency = int((time.time() - t_start) * 1000) | |
| # logger.info(f"{log_prefix} โ Pipeline done in {latency}ms | lang={detected_lang} | chunks={len(chunks)}") | |
| # return { | |
| # "answer": answer, | |
| # "detected_language": detected_lang, | |
| # "sources_used": [c["meta"] for c in chunks], | |
| # "cache_hit": False, | |
| # "loop_detected": was_loop, | |
| # "latency_ms": latency, | |
| # "request_id": request_id, | |
| # } | |
| # except Exception as exc: | |
| # self._error_count += 1 | |
| # logger.exception(f"{log_prefix} ๐ฅ Unhandled exception in pipeline: {exc}") | |
| # return { | |
| # "answer": "An unexpected error occurred. Please try again.", | |
| # "detected_language": "en", | |
| # "sources_used": [], | |
| # "cache_hit": False, | |
| # "loop_detected": False, | |
| # "latency_ms": int((time.time() - t_start) * 1000), | |
| # "request_id": request_id, | |
| # "error": str(exc), | |
| # } | |
| # # โโ Streaming pipeline โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ | |
| # def stream_answer( | |
| # self, | |
| # question: str, | |
| # chunks: List[dict], | |
| # detected_lang: str, | |
| # history: List[dict], | |
| # ) -> AsyncGenerator[str, None]: | |
| # """ | |
| # Generator that yields answer tokens one by one for streaming. | |
| # Handles loop detection post-stream. | |
| # """ | |
| # context_parts = [ | |
| # f"[Source {i+1} | {item['meta'].get('source','?')}]:\n{item['doc']}" | |
| # for i, item in enumerate(chunks) | |
| # ] if chunks else ["[No relevant knowledge found.]"] | |
| # context_text = truncate_context("\n\n".join(context_parts)) | |
| # system_prompt = self._build_system_prompt(detected_lang) | |
| # messages = [{"role": "system", "content": system_prompt}] | |
| # for msg in history[-4:]: | |
| # if msg.get("role") in ("user", "assistant") and msg.get("content"): | |
| # messages.append({"role": msg["role"], "content": msg["content"][:2000]}) | |
| # messages.append({"role": "user", "content": ( | |
| # f"โโโ KNOWLEDGE BASE CONTEXT โโโ\n{context_text}\n\n" | |
| # f"โโโ USER QUESTION โโโ\n{question}" | |
| # )}) | |
| # stream = self.llm.chat_completion( | |
| # messages=messages, | |
| # max_tokens=Config.LLM_MAX_TOKENS, | |
| # temperature=Config.LLM_TEMPERATURE, | |
| # top_p=Config.LLM_TOP_P, | |
| # repetition_penalty=Config.LLM_REP_PENALTY, | |
| # stream=True, | |
| # ) | |
| # buffer = [] | |
| # for chunk in stream: | |
| # delta = chunk.choices[0].delta.content or "" | |
| # if delta: | |
| # buffer.append(delta) | |
| # yield delta | |
| # # Post-stream loop check (log only, user already received text) | |
| # full = "".join(buffer) | |
| # was_loop, _ = detect_and_fix_loop(full) | |
| # if was_loop: | |
| # logger.warning("๐ Streaming: loop detected in completed output.") | |
| # @property | |
| # def system_stats(self) -> dict: | |
| # return { | |
| # "version": Config.APP_VERSION, | |
| # "ready": self._ready, | |
| # "total_requests": self._request_count, | |
| # "total_errors": self._error_count, | |
| # "error_rate": round( | |
| # self._error_count / max(1, self._request_count), 3 | |
| # ), | |
| # "cache": self.cache.stats, | |
| # "chroma_doc_count": self.collection.count() if self._ready else -1, | |
| # "llm_model": Config.LLM_MODEL, | |
| # "embed_model": Config.EMBED_MODEL, | |
| # } | |
| # # ============================================================================= | |
| # # SECTION 6 โ APP STARTUP / SHUTDOWN (lifespan) | |
| # # ============================================================================= | |
| # ai: Optional[AISystem] = None | |
| # @asynccontextmanager | |
| # async def lifespan(app: FastAPI): | |
| # """Manage startup and shutdown of the AI system.""" | |
| # global ai | |
| # logger.info("๐ FastAPI lifespan: starting AISystem โฆ") | |
| # try: | |
| # ai = AISystem() | |
| # except Exception as exc: | |
| # logger.critical(f"๐ฅ AISystem failed to initialize: {exc}") | |
| # # Do NOT re-raise โ allow FastAPI to start so /health can explain the error | |
| # ai = None | |
| # yield | |
| # logger.info("๐ FastAPI lifespan: shutting down.") | |
| # # ============================================================================= | |
| # # SECTION 7 โ FASTAPI APP | |
| # # ============================================================================= | |
| # app = FastAPI( | |
| # title="AgriRAG Pro", | |
| # description="Production-grade multilingual agricultural AI assistant", | |
| # version=Config.APP_VERSION, | |
| # lifespan=lifespan, | |
| # docs_url="/docs", | |
| # redoc_url="/redoc", | |
| # ) | |
| # app.add_middleware( | |
| # CORSMiddleware, | |
| # allow_origins=["*"], | |
| # allow_credentials=True, | |
| # allow_methods=["*"], | |
| # allow_headers=["*"], | |
| # ) | |
| # # ============================================================================= | |
| # # SECTION 8 โ REQUEST / RESPONSE MODELS | |
| # # ============================================================================= | |
| # class Message(BaseModel): | |
| # role: str = Field(..., pattern="^(user|assistant)$") | |
| # content: str = Field(..., min_length=1, max_length=4000) | |
| # class ChatRequest(BaseModel): | |
| # question: str = Field(..., min_length=1, max_length=Config.MAX_QUESTION_LEN) | |
| # history: List[Message] = Field(default_factory=list, max_length=Config.MAX_HISTORY_MSGS) | |
| # top_k: int = Field(default=Config.DEFAULT_TOP_K, ge=1, le=Config.MAX_TOP_K) | |
| # @field_validator("question") | |
| # @classmethod | |
| # def clean_question(cls, v: str) -> str: | |
| # # Normalize Unicode, strip control characters | |
| # v = unicodedata.normalize("NFC", v) | |
| # v = re.sub(r"[\x00-\x08\x0b\x0c\x0e-\x1f\x7f]", "", v) | |
| # return v.strip() | |
| # class ChatResponse(BaseModel): | |
| # answer: str | |
| # detected_language: str | |
| # sources_used: List[dict] | |
| # cache_hit: bool | |
| # loop_detected: bool | |
| # latency_ms: int | |
| # request_id: str | |
| # # ============================================================================= | |
| # # SECTION 9 โ ENDPOINTS | |
| # # ============================================================================= | |
| # def _require_ai() -> AISystem: | |
| # """Guard: raise 503 if AI system failed to initialize.""" | |
| # if ai is None or not ai._ready: | |
| # raise HTTPException( | |
| # status_code=status.HTTP_503_SERVICE_UNAVAILABLE, | |
| # detail="AI system is not ready. Check startup logs for initialization errors.", | |
| # ) | |
| # return ai | |
| # # โโ /ask (standard JSON response) โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ | |
| # @app.post("/ask", response_model=ChatResponse, summary="Ask an agricultural question") | |
| # async def ask_endpoint(request: ChatRequest, http_request: Request): | |
| # system = _require_ai() | |
| # request_id = str(uuid.uuid4())[:8] | |
| # logger.info(f"[{request_id}] ๐ฅ Question: {request.question[:80]}โฆ") | |
| # history = [m.model_dump() for m in request.history] | |
| # result = system.pipeline( | |
| # question=request.question, | |
| # history=history, | |
| # top_k=request.top_k, | |
| # request_id=request_id, | |
| # ) | |
| # if "error" in result and result.get("answer", "").startswith("An unexpected"): | |
| # raise HTTPException( | |
| # status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
| # detail=result.get("error", "Unknown error"), | |
| # ) | |
| # return ChatResponse(**{k: result[k] for k in ChatResponse.model_fields}) | |
| # # โโ /ask/stream (SSE streaming) โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ | |
| # @app.post("/ask/stream", summary="Ask with streaming response (SSE)") | |
| # async def ask_stream_endpoint(request: ChatRequest): | |
| # system = _require_ai() | |
| # request_id = str(uuid.uuid4())[:8] | |
| # logger.info(f"[{request_id}] ๐ Stream request: {request.question[:80]}โฆ") | |
| # translations = system.translate_query(request.question) | |
| # translations["original"] = request.question | |
| # detected_lang = translations["detected_lang"] | |
| # query_emb = system.encode_query(translations["en"]) | |
| # chunks = system.multilingual_search(translations, query_emb, request.top_k) | |
| # history = [m.model_dump() for m in request.history] | |
| # async def token_generator() -> AsyncGenerator[str, None]: | |
| # try: | |
| # for token in system.stream_answer(request.question, chunks, detected_lang, history): | |
| # yield f"data: {json.dumps({'token': token})}\n\n" | |
| # yield f"data: {json.dumps({'done': True, 'lang': detected_lang})}\n\n" | |
| # except Exception as exc: | |
| # logger.error(f"[{request_id}] Streaming error: {exc}") | |
| # yield f"data: {json.dumps({'error': str(exc)})}\n\n" | |
| # return StreamingResponse( | |
| # token_generator(), | |
| # media_type="text/event-stream", | |
| # headers={ | |
| # "Cache-Control": "no-cache", | |
| # "X-Request-ID": request_id, | |
| # }, | |
| # ) | |
| # # โโ /health (fast liveness probe) โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ | |
| # @app.get("/health", summary="Liveness probe") | |
| # async def health(): | |
| # if ai is None or not ai._ready: | |
| # raise HTTPException( | |
| # status_code=status.HTTP_503_SERVICE_UNAVAILABLE, | |
| # detail="System not ready", | |
| # ) | |
| # return {"status": "ok", "version": Config.APP_VERSION} | |
| # # โโ /health/deep (full readiness probe) โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ | |
| # @app.get("/health/deep", summary="Full readiness probe โ checks all components") | |
| # async def health_deep(): | |
| # checks = {} | |
| # # HF Token | |
| # checks["hf_token"] = "ok" if Config.HF_TOKEN else "MISSING" | |
| # # ChromaDB | |
| # try: | |
| # if ai and ai._ready: | |
| # count = ai.collection.count() | |
| # checks["chromadb"] = f"ok ({count} documents)" | |
| # else: | |
| # checks["chromadb"] = "not_ready" | |
| # except Exception as exc: | |
| # checks["chromadb"] = f"error: {exc}" | |
| # # Embedding model | |
| # try: | |
| # if ai and ai._ready: | |
| # _ = ai.encode_query("test") | |
| # checks["embed_model"] = "ok" | |
| # else: | |
| # checks["embed_model"] = "not_ready" | |
| # except Exception as exc: | |
| # checks["embed_model"] = f"error: {exc}" | |
| # # LLM (lightweight check โ just verify client exists) | |
| # checks["llm"] = "ok" if (ai and ai._ready and ai.llm) else "not_ready" | |
| # all_ok = all(v == "ok" or v.startswith("ok") for v in checks.values()) | |
| # return { | |
| # "status": "ok" if all_ok else "degraded", | |
| # "components": checks, | |
| # "version": Config.APP_VERSION, | |
| # } | |
| # # โโ /stats โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ | |
| # @app.get("/stats", summary="Runtime statistics") | |
| # async def stats(): | |
| # system = _require_ai() | |
| # return system.system_stats | |
| # # โโ /info โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ | |
| # @app.get("/", summary="API info") | |
| # async def root(): | |
| # return { | |
| # "name": "AgriRAG Pro", | |
| # "version": Config.APP_VERSION, | |
| # "description": "Multilingual agricultural AI assistant", | |
| # "endpoints": { | |
| # "/ask": "POST โ Standard JSON question/answer", | |
| # "/ask/stream": "POST โ Streaming SSE question/answer", | |
| # "/health": "GET โ Liveness probe", | |
| # "/health/deep": "GET โ Full readiness probe", | |
| # "/stats": "GET โ Runtime statistics", | |
| # "/docs": "GET โ Interactive API documentation", | |
| # }, | |
| # } | |