import os import asyncio import io import uuid import datetime from typing import AsyncGenerator from groq import Groq from tavily import TavilyClient from pypdf import PdfReader import chromadb from chromadb.utils import embedding_functions from dotenv import load_dotenv # šŸ”„ Local dev ke liye .env load karega load_dotenv() # ============================================================================== # 1. ENGINE CONFIGURATION (GROQ + TAVILY) # ============================================================================== GROQ_API_KEY = os.getenv("GROQ_API_KEY") TAVILY_API_KEY = os.getenv("TAVILY_API_KEY") # Init Clients if not GROQ_API_KEY: print("āŒ ERROR: GROQ_API_KEY missing! Engine won't work.") client = None else: client = Groq(api_key=GROQ_API_KEY) tavily = TavilyClient(api_key=TAVILY_API_KEY) if TAVILY_API_KEY else None # ============================================================================== # 2. DATABASE SETUP (LOCAL SENTENCE-TRANSFORMERS) # ============================================================================== db_client = None collection = None try: # Sabse Best Local Embedding: all-MiniLM-L6-v2 (Fast, Accurate & Free) local_ef = embedding_functions.SentenceTransformerEmbeddingFunction( model_name="all-MiniLM-L6-v2" ) db_client = chromadb.PersistentClient(path="./avii_vector_storage") collection = db_client.get_or_create_collection( name="avii_knowledge_base", embedding_function=local_ef ) print("āœ… Engine: Local Vector DB Initialized (Sentence-Transformers)") except Exception as e: print(f"āš ļø DB Init Error: {e}") collection = None # ============================================================================== # 3. ASYNC HELPER # ============================================================================== async def run_sync_in_thread(func, *args, **kwargs): """Blocking code ko alag thread mein chalane ke liye""" return await asyncio.to_thread(func, *args, **kwargs) # ============================================================================== # 4. INTERNAL HELPER FUNCTIONS # ============================================================================== async def determine_intent(user_query: str) -> str: """Super Fast Intent Check using Groq""" if not client: return "CHAT" q_lower = user_query.lower().strip() if q_lower in ["hi", "hello", "hey", "sup", "how are you"]: return "CHAT" prompt = f""" Classify the intent of this user query: 1. WEB: Current events, news, or facts. 2. PDF: Specific questions about files/documents. 3. CHAT: General conversation, coding, or math. Query: "{user_query}" Output ONLY one word: WEB, PDF, or CHAT """ try: response = client.chat.completions.create( model="llama-3-8b-8192", messages=[{"role": "user", "content": prompt}], temperature=0, max_tokens=5 ) intent = response.choices[0].message.content.strip().upper() return intent if intent in ["WEB", "PDF"] else "CHAT" except: return "CHAT" def _search_web_sync(query: str): """Tavily Search with Strict Logging""" if not tavily: print("āŒ Tavily Client is None. API Key missing?") return "" try: print(f"āš™ļø [AVII PRO LOG] Searching Tavily for: {query}") # Backend Console Log results = tavily.search(query=query, max_results=3, search_depth="advanced") context = "" for r in results.get('results', []): context += f"Source: {r['title']}\nContent: {r['content'][:400]}\n---\n" return context except Exception as e: print(f"āŒ Tavily Search Error: {e}") # Agar API limit hit hui toh yahan dikhega return "" def _search_pdf_sync(query: str, user_id: str): """Local ChromaDB Search""" if not collection: return None try: results = collection.query( query_texts=[query], n_results=5, where={"user_id": str(user_id)} ) if results['documents'] and len(results['documents'][0]) > 0: return "\n".join(results['documents'][0]) except: return None # ============================================================================== # 5. CORE PUBLIC FUNCTIONS (GROQ STREAMING EDITION) # ============================================================================== async def run_agent_stream( query: str, history: list, use_web: bool, user_id: int, system_instruction: str = None, temperature: float = 0.3 ) -> AsyncGenerator[str, None]: """ AVII PRO: Main Streaming Engine with Web & PDF Support. """ if not client: yield "Error: Groq API Key is missing. Check your .env file." return intent = await determine_intent(query) context_data = "" source_label = "General Knowledge" if query.lower().strip() not in ["hi", "hello", "hey"]: # 1. Search PDF Database if intent == "PDF" or (intent == "CHAT" and collection): pdf_data = await run_sync_in_thread(_search_pdf_sync, query, str(user_id)) if pdf_data: context_data = pdf_data source_label = "Uploaded Document" # 2. FORCE Search Web (Bypasses Intent if UI button is ON) if not context_data and use_web: yield "\n🌐 **[AVII PRO Live Search]** \n*Fetching real-time data from the web...*\n\n---\n\n" web_data = await run_sync_in_thread(_search_web_sync, query) if web_data: context_data = web_data source_label = "Web Search" else: yield "\nāš ļø *[Web Search returned no results or failed. Relying on internal memory]*\n\n" # Identity Override (Brainwash Mode) base_persona = system_instruction if system_instruction and len(system_instruction) > 5 else "You are AVII PRO, a high-performance AI created by Avneesh Kumar." final_system_prompt = f""" {base_persona} CONTEXTUAL INFORMATION: - Current Date: {datetime.date.today()} - Context Source: {source_label} - Retrieved Context: {context_data if context_data else "None (Use internal knowledge)"} STRICT IDENTITY: - You are AVII PRO. - Created exclusively by Avneesh Kumar. - Never identify as OpenAI, ChatGPT, Groq, or Meta. - If user asks for code, provide clean, commented Markdown blocks. - Be direct, sharp, and highly technical. """ try: msgs = [{"role": "system", "content": final_system_prompt}] for h in history[-5:]: # Context window management msgs.append({"role": h["role"], "content": h["content"]}) msgs.append({"role": "user", "content": query}) # šŸ”„ Streaming Call to Groq response = client.chat.completions.create( model="openai/gpt-oss-20b", messages=msgs, temperature=temperature, stream=True ) # Stream chunks back to frontend for chunk in response: if hasattr(chunk.choices[0].delta, 'content') and chunk.choices[0].delta.content is not None: yield chunk.choices[0].delta.content except Exception as e: yield f"\nAVII PRO Engine Error: {str(e)}" # ============================================================================== # 6. MEMORY MANAGEMENT (PDF & RESET) # ============================================================================== async def ingest_pdf(file_content, filename, user_id): """Local Vector DB Ingestion (ChromaDB)""" if not collection: return 0 def parse_chunk_sync(): try: pdf = PdfReader(io.BytesIO(file_content)) text = "".join([page.extract_text() + "\n" for page in pdf.pages if page.extract_text()]) if not text.strip(): return 0 # Chunking with 1000 size and 200 overlap chunks = [text[i:i+1000] for i in range(0, len(text), 800)] ids = [str(uuid.uuid4()) for _ in range(len(chunks))] metadatas = [{"user_id": str(user_id), "filename": filename} for _ in range(len(chunks))] collection.add(documents=chunks, ids=ids, metadatas=metadatas) return len(chunks) except Exception as e: print(f"Ingest Error: {e}") return 0 return await run_sync_in_thread(parse_chunk_sync) async def reset_memory(): """ChromaDB ke saare stored chunks ko delete karne ke liye""" def clear_all_sync(): try: if collection: all_data = collection.get() all_ids = all_data.get('ids', []) if all_ids: collection.delete(ids=all_ids) print(f"āœ… ChromaDB Cleared: {len(all_ids)} chunks deleted.") return True return False except Exception as e: print(f"āŒ Error during Chroma Reset: {e}") return False return await run_sync_in_thread(clear_all_sync)