Spaces:
Running
Running
| import os | |
| import asyncio | |
| import io | |
| import uuid | |
| import datetime | |
| from typing import AsyncGenerator | |
| from groq import Groq | |
| from tavily import TavilyClient | |
| from pypdf import PdfReader | |
| import chromadb | |
| from chromadb.utils import embedding_functions | |
| from dotenv import load_dotenv | |
| # 🔥 Local dev ke liye .env load karega | |
| load_dotenv() | |
| # ============================================================================== | |
| # 1. ENGINE CONFIGURATION (GROQ + TAVILY) | |
| # ============================================================================== | |
| GROQ_API_KEY = os.getenv("GROQ_API_KEY") | |
| TAVILY_API_KEY = os.getenv("TAVILY_API_KEY") | |
| # Init Clients | |
| if not GROQ_API_KEY: | |
| print("❌ ERROR: GROQ_API_KEY missing! Engine won't work.") | |
| client = None | |
| else: | |
| client = Groq(api_key=GROQ_API_KEY) | |
| tavily = TavilyClient(api_key=TAVILY_API_KEY) if TAVILY_API_KEY else None | |
| # ============================================================================== | |
| # 2. DATABASE SETUP (LOCAL SENTENCE-TRANSFORMERS) | |
| # ============================================================================== | |
| db_client = None | |
| collection = None | |
| try: | |
| # Sabse Best Local Embedding: all-MiniLM-L6-v2 (Fast, Accurate & Free) | |
| local_ef = embedding_functions.SentenceTransformerEmbeddingFunction( | |
| model_name="all-MiniLM-L6-v2" | |
| ) | |
| db_client = chromadb.PersistentClient(path="./avii_vector_storage") | |
| collection = db_client.get_or_create_collection( | |
| name="avii_knowledge_base", | |
| embedding_function=local_ef | |
| ) | |
| print("✅ Engine: Local Vector DB Initialized (Sentence-Transformers)") | |
| except Exception as e: | |
| print(f"⚠️ DB Init Error: {e}") | |
| collection = None | |
| # ============================================================================== | |
| # 3. ASYNC HELPER | |
| # ============================================================================== | |
| async def run_sync_in_thread(func, *args, **kwargs): | |
| """Blocking code ko alag thread mein chalane ke liye""" | |
| return await asyncio.to_thread(func, *args, **kwargs) | |
| # ============================================================================== | |
| # 4. INTERNAL HELPER FUNCTIONS | |
| # ============================================================================== | |
| async def determine_intent(user_query: str) -> str: | |
| """Super Fast Intent Check using Groq""" | |
| if not client: return "CHAT" | |
| q_lower = user_query.lower().strip() | |
| if q_lower in ["hi", "hello", "hey", "sup", "how are you"]: return "CHAT" | |
| prompt = f""" | |
| Classify the intent of this user query: | |
| 1. WEB: Current events, news, or facts. | |
| 2. PDF: Specific questions about files/documents. | |
| 3. CHAT: General conversation, coding, or math. | |
| Query: "{user_query}" | |
| Output ONLY one word: WEB, PDF, or CHAT | |
| """ | |
| try: | |
| response = client.chat.completions.create( | |
| model="llama-3-8b-8192", | |
| messages=[{"role": "user", "content": prompt}], | |
| temperature=0, | |
| max_tokens=5 | |
| ) | |
| intent = response.choices[0].message.content.strip().upper() | |
| return intent if intent in ["WEB", "PDF"] else "CHAT" | |
| except: | |
| return "CHAT" | |
| def _search_web_sync(query: str): | |
| """Tavily Search with Strict Logging""" | |
| if not tavily: | |
| print("❌ Tavily Client is None. API Key missing?") | |
| return "" | |
| try: | |
| print(f"⚙️ [AVII PRO LOG] Searching Tavily for: {query}") # Backend Console Log | |
| results = tavily.search(query=query, max_results=3, search_depth="advanced") | |
| context = "" | |
| for r in results.get('results', []): | |
| context += f"Source: {r['title']}\nContent: {r['content'][:400]}\n---\n" | |
| return context | |
| except Exception as e: | |
| print(f"❌ Tavily Search Error: {e}") # Agar API limit hit hui toh yahan dikhega | |
| return "" | |
| def _search_pdf_sync(query: str, user_id: str): | |
| """Local ChromaDB Search""" | |
| if not collection: return None | |
| try: | |
| results = collection.query( | |
| query_texts=[query], | |
| n_results=5, | |
| where={"user_id": str(user_id)} | |
| ) | |
| if results['documents'] and len(results['documents'][0]) > 0: | |
| return "\n".join(results['documents'][0]) | |
| except: | |
| return None | |
| # ============================================================================== | |
| # 5. CORE PUBLIC FUNCTIONS (GROQ STREAMING EDITION) | |
| # ============================================================================== | |
| async def run_agent_stream( | |
| query: str, | |
| history: list, | |
| use_web: bool, | |
| user_id: int, | |
| system_instruction: str = None, | |
| temperature: float = 0.3 | |
| ) -> AsyncGenerator[str, None]: | |
| """ | |
| AVII PRO: Main Streaming Engine with Web & PDF Support. | |
| """ | |
| if not client: | |
| yield "Error: Groq API Key is missing. Check your .env file." | |
| return | |
| intent = await determine_intent(query) | |
| context_data = "" | |
| source_label = "General Knowledge" | |
| if query.lower().strip() not in ["hi", "hello", "hey"]: | |
| # 1. Search PDF Database | |
| if intent == "PDF" or (intent == "CHAT" and collection): | |
| pdf_data = await run_sync_in_thread(_search_pdf_sync, query, str(user_id)) | |
| if pdf_data: | |
| context_data = pdf_data | |
| source_label = "Uploaded Document" | |
| # 2. FORCE Search Web (Bypasses Intent if UI button is ON) | |
| if not context_data and use_web: | |
| yield "\n🌐 **[AVII PRO Live Search]** \n*Fetching real-time data from the web...*\n\n---\n\n" | |
| web_data = await run_sync_in_thread(_search_web_sync, query) | |
| if web_data: | |
| context_data = web_data | |
| source_label = "Web Search" | |
| else: | |
| yield "\n⚠️ *[Web Search returned no results or failed. Relying on internal memory]*\n\n" | |
| # Identity Override (Brainwash Mode) | |
| base_persona = system_instruction if system_instruction and len(system_instruction) > 5 else "You are AVII PRO, a high-performance AI created by Avneesh Kumar." | |
| final_system_prompt = f""" | |
| {base_persona} | |
| CONTEXTUAL INFORMATION: | |
| - Current Date: {datetime.date.today()} | |
| - Context Source: {source_label} | |
| - Retrieved Context: {context_data if context_data else "None (Use internal knowledge)"} | |
| STRICT IDENTITY: | |
| - You are AVII PRO. | |
| - Created exclusively by Avneesh Kumar. | |
| - Never identify as OpenAI, ChatGPT, Groq, or Meta. | |
| - If user asks for code, provide clean, commented Markdown blocks. | |
| - Be direct, sharp, and highly technical. | |
| """ | |
| try: | |
| msgs = [{"role": "system", "content": final_system_prompt}] | |
| for h in history[-5:]: # Context window management | |
| msgs.append({"role": h["role"], "content": h["content"]}) | |
| msgs.append({"role": "user", "content": query}) | |
| # 🔥 Streaming Call to Groq | |
| response = client.chat.completions.create( | |
| model="openai/gpt-oss-20b", | |
| messages=msgs, | |
| temperature=temperature, | |
| stream=True | |
| ) | |
| # Stream chunks back to frontend | |
| for chunk in response: | |
| if hasattr(chunk.choices[0].delta, 'content') and chunk.choices[0].delta.content is not None: | |
| yield chunk.choices[0].delta.content | |
| except Exception as e: | |
| yield f"\nAVII PRO Engine Error: {str(e)}" | |
| # ============================================================================== | |
| # 6. MEMORY MANAGEMENT (PDF & RESET) | |
| # ============================================================================== | |
| async def ingest_pdf(file_content, filename, user_id): | |
| """Local Vector DB Ingestion (ChromaDB)""" | |
| if not collection: return 0 | |
| def parse_chunk_sync(): | |
| try: | |
| pdf = PdfReader(io.BytesIO(file_content)) | |
| text = "".join([page.extract_text() + "\n" for page in pdf.pages if page.extract_text()]) | |
| if not text.strip(): return 0 | |
| # Chunking with 1000 size and 200 overlap | |
| chunks = [text[i:i+1000] for i in range(0, len(text), 800)] | |
| ids = [str(uuid.uuid4()) for _ in range(len(chunks))] | |
| metadatas = [{"user_id": str(user_id), "filename": filename} for _ in range(len(chunks))] | |
| collection.add(documents=chunks, ids=ids, metadatas=metadatas) | |
| return len(chunks) | |
| except Exception as e: | |
| print(f"Ingest Error: {e}") | |
| return 0 | |
| return await run_sync_in_thread(parse_chunk_sync) | |
| async def reset_memory(): | |
| """ChromaDB ke saare stored chunks ko delete karne ke liye""" | |
| def clear_all_sync(): | |
| try: | |
| if collection: | |
| all_data = collection.get() | |
| all_ids = all_data.get('ids', []) | |
| if all_ids: | |
| collection.delete(ids=all_ids) | |
| print(f"✅ ChromaDB Cleared: {len(all_ids)} chunks deleted.") | |
| return True | |
| return False | |
| except Exception as e: | |
| print(f"❌ Error during Chroma Reset: {e}") | |
| return False | |
| return await run_sync_in_thread(clear_all_sync) |