import os import faiss import numpy as np import json from fastapi import FastAPI, HTTPException from fastapi.responses import HTMLResponse, RedirectResponse from pydantic import BaseModel from sentence_transformers import SentenceTransformer from openai import OpenAI from typing import Optional import uvicorn from rank_bm25 import BM25Okapi # ===================== # CONFIG # ===================== HF_TOKEN = os.environ.get("HF_TOKEN", "") LLM_MODEL = "deepseek-ai/DeepSeek-V4-Flash:novita" EMBED_MODEL = "intfloat/multilingual-e5-large" DATA_FILE = "plant_diseases_guide.json" CHUNK_SIZE = 500 CHUNK_OVERLAP = 50 TOP_K = 6 # ===================== # LOAD & CHUNK JSON # ===================== def load_and_chunk(filepath: str, chunk_size: int = CHUNK_SIZE, overlap: int = CHUNK_OVERLAP): with open(filepath, "r", encoding="utf-8") as f: data = json.load(f) chunks = [] if isinstance(data, dict) and "diseases" in data: for disease in data["diseases"]: chunk_text = f""" المرض: {disease.get('arabic_name', '')} النبات: {disease.get('plant', '')} النوع: {disease.get('type', '')} مسبب المرض: {disease.get('pathogen', '')} مستوى الخطورة: {disease.get('danger_level', '')} الانتشار في مصر: {disease.get('egypt_prevalence', '')} الأعراض: {chr(10).join(disease.get('symptoms', []))} طرق الوقاية: {chr(10).join(disease.get('prevention', []))} العلاج: {chr(10).join(disease.get('treatment', []))} معلومات إضافية: {disease.get('egypt_notes', '')} {disease.get('danger_details', '')} {disease.get('contagion_method', '')} الظروف المساعدة: {' | '.join(disease.get('favorable_conditions', []))} """.strip() chunks.append(chunk_text) if "general_agriculture_info" in data: general = data["general_agriculture_info"] for section_key, section in general.get("sections", {}).items(): section_text = f""" موضوع: {section.get('title', '')} {chr(10).join(section.get('content', []))} """.strip() chunks.append(section_text) else: if isinstance(data, list): text = "\n".join([json.dumps(item, ensure_ascii=False) for item in data]) else: text = json.dumps(data, ensure_ascii=False, indent=2) start = 0 while start < len(text): end = start + chunk_size chunk = text[start:end].strip() if chunk: chunks.append(chunk) start += chunk_size - overlap return chunks # ===================== # BUILD FAISS INDEX — Cosine Similarity # ===================== def build_index(chunks, embed_model): print("جاري بناء الـ index...") embeddings = embed_model.encode(chunks, show_progress_bar=True) embeddings = np.array(embeddings).astype("float32") faiss.normalize_L2(embeddings) dim = embeddings.shape[1] index = faiss.IndexFlatIP(dim) index.add(embeddings) print(f"تم بناء الـ index بنجاح - {len(chunks)} chunk") return index, embeddings # ===================== # QUERY EXPANSION # ===================== def expand_query(query: str) -> list: variations = [query] replacements = { "بيتلون": "تلون تغير لون", "بيعفن": "عفن تعفن", "بيموت": "يذبل موت ذبول", "عليه بقع": "بقع تبقع", "اصفرار": "اصفرار أوراق صفراء", "بتاع": "خاص بـ", "ليه": "لماذا سبب", "ازاي": "كيف طريقة", "امتا": "متى وقت", "علاج": "علاج معالجة مكافحة", "وقاية": "وقاية وقاء حماية منع", "اعراض": "أعراض علامات", "مرض": "مرض إصابة", "نبات": "نبات محصول زراعة", "فطر": "فطر فطريات مرض فطري", "حشرة": "حشرة آفة", } expanded = query for colloquial, formal in replacements.items(): if colloquial in query: expanded = expanded.replace(colloquial, formal) if expanded != query: variations.append(expanded) return variations # ===================== # QUERY REWRITING — يحل مشكلة الضمائر مع history طويل # ===================== def rewrite_query(query: str, history: list, llm_client: OpenAI) -> str: if not history: return query reference_words = ["ده", "هو", "هي", "دي", "نفسه", "نفسها", "علاجه", "علاجها", "أعراضه", "أعراضها", "خطير", "منتشر", "سببه", "وقايته", "بتاعه", "بتاعها", "فيه", "عنه", "عنها"] if not any(word in query for word in reference_words): return query # بنبعت كل الـ history عشان يلاقي الموضوع حتى لو بعيد history_text = "\n".join([ f"سؤال: {turn['question']}\nجواب: {turn['answer'][:150]}..." for turn in history ]) rewrite_prompt = f"""المحادثة كلها: {history_text} السؤال الجديد: "{query}" بناءً على المحادثة كلها، حدد الموضوع الرئيسي وأعد صياغة السؤال الجديد كسؤال مستقل وكامل بدون ضمائر. أرجع السؤال المعاد صياغته فقط بدون أي كلام تاني.""" try: response = llm_client.chat.completions.create( model=LLM_MODEL, messages=[{"role": "user", "content": rewrite_prompt}], max_tokens=100, temperature=0, ) rewritten = response.choices[0].message.content.strip() print(f"Query rewritten: '{query}' → '{rewritten}'") return rewritten except Exception as e: print(f"Rewrite failed, using original: {e}") return query # ===================== # HYBRID RETRIEVE — FAISS (Cosine) + BM25 + RRF # ===================== def retrieve(query: str, index, chunks, embed_model, bm25: BM25Okapi, top_k: int = TOP_K): queries = expand_query(query) rrf_scores = {} # Dense retrieval (FAISS Cosine) for q in queries: query_vec = embed_model.encode([q]).astype("float32") faiss.normalize_L2(query_vec) distances, indices = index.search(query_vec, top_k * 2) for rank, idx in enumerate(indices[0]): if idx < len(chunks): rrf_scores[idx] = rrf_scores.get(idx, 0) + 1 / (rank + 60) # Sparse retrieval (BM25) for q in queries: bm25_scores = bm25.get_scores(q.split()) top_bm25 = np.argsort(bm25_scores)[::-1][: top_k * 2] for rank, idx in enumerate(top_bm25): rrf_scores[idx] = rrf_scores.get(idx, 0) + 1 / (rank + 60) top_indices = sorted(rrf_scores.keys(), key=lambda x: rrf_scores[x], reverse=True)[:top_k] return [{"chunk": chunks[i], "score": rrf_scores[i]} for i in top_indices if i < len(chunks)] # ===================== # GENERATE # ===================== def generate_answer(query: str, context_chunks: list, llm_client: OpenAI, history: list = None): context = "\n---\n".join([c["chunk"] for c in context_chunks]) system_prompt = """أنت مساعد ذكي متخصص في أمراض النباتات. استخدم المعلومات المقدمة في كل رسالة للإجابة على السؤال مع مراعاة سياق المحادثة السابقة كاملاً. إذا كان السؤال مرتبطاً بموضوع سبق ذكره في المحادثة، استخدم ذلك السياق مباشرةً دون طلب توضيح. إذا لم تجد الإجابة في المعلومات المتاحة، أجب من معلوماتك العامة بس لازم يبقي سؤال لي علاقه بالزراعه او النباتات، لو بره الحاجات دي قوله مش تخصصي. للتحيات والأسئلة الاعتيادية أجب بشكل طبيعي. لا تستخدم أي تنسيق markdown مثل ** أو ## أو * في إجاباتك، اكتب نص عادي فقط.""" messages = [{"role": "system", "content": system_prompt}] if history: for turn in history: messages.append({"role": "user", "content": turn["question"]}) messages.append({"role": "assistant", "content": turn["answer"]}) user_prompt = f"""المعلومات من قاعدة البيانات: {context} السؤال الحالي: {query} الإجابة:""" messages.append({"role": "user", "content": user_prompt}) response = llm_client.chat.completions.create( model=LLM_MODEL, messages=messages, max_tokens=512, temperature=0.3, ) return response.choices[0].message.content.strip() # ===================== # INIT # ===================== print("جاري تحميل الـ embedding model...") embed_model = SentenceTransformer(EMBED_MODEL) print("جاري تحميل الداتا...") chunks = load_and_chunk(DATA_FILE) index, _ = build_index(chunks, embed_model) print("جاري بناء الـ BM25 index...") tokenized_chunks = [c.split() for c in chunks] bm25 = BM25Okapi(tokenized_chunks) print(f"BM25 جاهز - {len(tokenized_chunks)} chunk") print("جاري الاتصال بـ DeepSeek...") llm_client = OpenAI( base_url="https://router.huggingface.co/v1", api_key=HF_TOKEN, ) app = FastAPI(title="Arabic RAG Chat API") # ===================== # SCHEMAS # ===================== class HistoryTurn(BaseModel): question: str answer: str class QueryRequest(BaseModel): question: str top_k: Optional[int] = TOP_K history: Optional[list[HistoryTurn]] = [] class QueryResponse(BaseModel): answer: str sources: list[str] # ===================== # ENDPOINTS # ===================== @app.get("/") def root(): return RedirectResponse(url="/ui") @app.get("/ui", response_class=HTMLResponse) def ui(): return r"""
اسألني عن أي مرض نباتي — أتذكر المحادثة كلها
اسألني عن أي مرض وهجاوبك من قاعدة البيانات.
ومش محتاج تكرر اسم المرض في كل سؤال — أنا بتذكر!
Enter للإرسال — Shift+Enter لسطر جديد