| """ |
| Aria β Hybrid RAG + Web Search Engine |
| |
| Architecture: |
| User Query |
| β |
| βΌ |
| Classifier β Portfolio query? β FAISS RAG β Groq LLM β Answer |
| General query? β DuckDuckGo β Groq LLM β Answer |
| """ |
| import os |
| import re |
| import json |
| import logging |
| import asyncio |
| from typing import List, Dict, Optional |
| from dotenv import load_dotenv |
|
|
| load_dotenv() |
| logger = logging.getLogger(__name__) |
|
|
| |
| |
| |
|
|
| PORTFOLIO_PROMPT = """You are Aria, a personal AI assistant built by Abhishek Kumar (AI/ML Engineer from India). |
| |
| When answering questions about Abhishek: |
| - Use ONLY the provided context β never make up facts |
| - Be friendly, warm, and concise |
| - Refer to him as "Abhishek" (third person) |
| - If info is not in context: "I don't have that specific detail about Abhishek. You can contact him at abhishek.pathak01111@gmail.com" |
| |
| If asked who you are: "I'm Aria, Abhishek Kumar's personal AI assistant. He built me to help you!".""" |
|
|
| GENERAL_PROMPT = """You are Aria, a smart personal AI assistant built by Abhishek Kumar (AI/ML Engineer from India). |
| |
| When answering general questions: |
| - Use the provided web search results as your primary source |
| - Be accurate, clear, and helpful |
| - Cite sources when useful |
| - If search results are insufficient: "I couldn't find reliable information for that question." |
| - Never hallucinate or invent facts |
| - Match response length to question complexity |
| |
| If asked who you are: "I'm Aria, Abhishek Kumar's personal AI assistant. He built me to help with anything β about him or any topic!".""" |
|
|
| |
| |
| |
|
|
| PORTFOLIO_KEYWORDS = [ |
| "abhishek", "abhishek kumar", "abhishek bhardwaj", |
| "aria", "who are you", "who built you", "who made you", |
| "your creator", "your owner", "your developer", |
| "skills", "projects", "experience", "education", "background", |
| "resume", "cv", "portfolio", "work", "internship", |
| "certification", "achievement", "hackathon", "cgc", "landran", |
| "contact", "email", "phone", "linkedin", "github", |
| "hire", "available", "freelance", "reach", "connect", |
| "neurachat", "mrabhi", "langchain", "langgraph", |
| "his skills", "his projects", "his experience", "his work", |
| "tell me about him", "about him", |
| ] |
|
|
| GENERAL_KEYWORDS = [ |
| "weather", "news", "today", "current", "latest", "recent", |
| "history", "geography", "science", "math", "physics", "chemistry", |
| "biology", "politics", "economy", "war", "conflict", |
| "country", "city", "capital", "population", "culture", |
| "cricket", "football", "movie", "actor", "singer", |
| "recipe", "food", "health", "medicine", |
| "stock", "crypto", "bitcoin", "market", "finance", |
| "travel", "tourism", "hotel", "flight", |
| "mahabharata", "ramayana", "mythology", "temple", "festival", |
| "iran", "israel", "russia", "ukraine", "china", "usa", |
| "delhi", "mumbai", "bangalore", "chandigarh", |
| "what is", "how does", "how to", "explain", "define", |
| "difference between", "compare", "versus", "calculate", |
| "poem", "story", "joke", "planet", "space", "galaxy", |
| ] |
|
|
|
|
| def classify_query(query: str) -> str: |
| """Returns 'portfolio' or 'general'.""" |
| q = query.lower().strip() |
|
|
| portfolio_hits = [kw for kw in PORTFOLIO_KEYWORDS if kw in q] |
| general_hits = [kw for kw in GENERAL_KEYWORDS if kw in q] |
|
|
| |
| if portfolio_hits: |
| strong = [k for k in portfolio_hits if k in ["abhishek", "aria", "you", "your", "hire"]] |
| if strong or not general_hits: |
| logger.info(f"[Classifier] PORTFOLIO β hits: {portfolio_hits[:3]}") |
| return "portfolio" |
|
|
| |
| if general_hits and not portfolio_hits: |
| logger.info(f"[Classifier] GENERAL β hits: {general_hits[:3]}") |
| return "general" |
|
|
| |
| personal = ["your skills", "your projects", "your experience", |
| "what do you know", "your work", "your education"] |
| if any(p in q for p in personal): |
| return "portfolio" |
|
|
| |
| logger.info(f"[Classifier] GENERAL β default") |
| return "general" |
|
|
|
|
| |
| |
| |
|
|
| class WebSearch: |
| """DuckDuckGo-based web search. Free, no API key needed.""" |
|
|
| def __init__(self, max_results: int = 5): |
| self.max_results = max_results |
|
|
| def search(self, query: str) -> str: |
| """Search and return formatted context string.""" |
| try: |
| from duckduckgo_search import DDGS |
| results = [] |
| with DDGS() as ddgs: |
| raw = list(ddgs.text(query, max_results=self.max_results)) |
| for i, r in enumerate(raw, 1): |
| results.append( |
| f"[Result {i}] {r.get('title', '')}\n" |
| f"URL: {r.get('href', '')}\n" |
| f"{r.get('body', '')}" |
| ) |
| context = "\n\n---\n\n".join(results) |
| logger.info(f"[WebSearch] '{query[:50]}' β {len(raw)} results") |
| return context |
| except Exception as e: |
| logger.error(f"[WebSearch] Failed: {e}") |
| return "" |
|
|
|
|
| |
| |
| |
|
|
| class GroqLLM: |
| def __init__(self, api_key: str, model: str, max_tokens: int = 1024, temperature: float = 0.7): |
| self.api_key = api_key |
| self.model = model |
| self.max_tokens = max_tokens |
| self.temperature = temperature |
| self._client = None |
|
|
| @property |
| def client(self): |
| if self._client is None: |
| from groq import Groq |
| self._client = Groq(api_key=self.api_key) |
| return self._client |
|
|
| def generate(self, system: str, messages: List[Dict], user_msg: str) -> str: |
| full = [{"role": "system", "content": system}] |
| full.extend(messages[-6:]) |
| full.append({"role": "user", "content": user_msg}) |
| resp = self.client.chat.completions.create( |
| model=self.model, messages=full, |
| max_tokens=self.max_tokens, temperature=self.temperature, stream=False |
| ) |
| return resp.choices[0].message.content |
|
|
| def stream(self, system: str, messages: List[Dict], user_msg: str): |
| full = [{"role": "system", "content": system}] |
| full.extend(messages[-6:]) |
| full.append({"role": "user", "content": user_msg}) |
| resp = self.client.chat.completions.create( |
| model=self.model, messages=full, |
| max_tokens=self.max_tokens, temperature=self.temperature, stream=True |
| ) |
| for chunk in resp: |
| delta = chunk.choices[0].delta |
| if delta and delta.content: |
| yield delta.content |
|
|
|
|
| |
| |
| |
|
|
| class RAGEngine: |
| """ |
| Hybrid engine: |
| - Portfolio queries β FAISS RAG β Groq |
| - General queries β DuckDuckGo β Groq |
| """ |
|
|
| def __init__(self, pipeline, llm_model: str, max_tokens: int = 1024, temperature: float = 0.7): |
| self.pipeline = pipeline |
| self.max_tokens = max_tokens |
| self.temperature = temperature |
|
|
| api_key = self._get_key() |
| self.llm = GroqLLM(api_key, llm_model, max_tokens, temperature) |
| self.web = WebSearch(max_results=5) |
| logger.info(f"Aria RAG Engine initialized β model: {llm_model}") |
|
|
| def _get_key(self) -> str: |
| load_dotenv() |
| return os.environ.get("GROQ_API_KEY", "").strip() or \ |
| os.environ.get("ANTHROPIC_API_KEY", "").strip() |
|
|
| def _rag_context(self, query: str, top_k: int = 5) -> tuple: |
| """Retrieve from FAISS and format context.""" |
| chunks = self.pipeline.search(query, k=top_k) |
| if not chunks: |
| return [], "" |
| good = [c for c in chunks if c.get("score", 0) > 0.25] |
| parts = [f"[{c['metadata'].get('source','')}]\n{c['text']}" for c in good] |
| return good, "\n\n---\n\n".join(parts) |
|
|
| |
|
|
| def generate(self, query: str, conversation_history=None, top_k: int = 5) -> Dict: |
| history = conversation_history or [] |
| q_type = classify_query(query) |
|
|
| if q_type == "portfolio": |
| chunks, context = self._rag_context(query, top_k) |
| if not context: |
| answer = ("I don't have specific details about that in Abhishek's portfolio. " |
| "Contact him at abhishek.pathak01111@gmail.com or visit mrabhi-7208.netlify.app") |
| else: |
| user_msg = f"Context about Abhishek:\n{context}\n\nQuestion: {query}\n\nAnswer from context only." |
| answer = self.llm.generate(PORTFOLIO_PROMPT, history, user_msg) |
| sources = [{"source": c["metadata"].get("source",""), "score": round(c.get("score",0),3)} |
| for c in (chunks or [])[:3]] |
| else: |
| context = self.web.search(query) |
| if context: |
| user_msg = f"Web search results:\n{context}\n\nQuestion: {query}\n\nAnswer from search results." |
| else: |
| user_msg = f"Question: {query}\n\nAnswer from your knowledge, or say you couldn't find reliable info." |
| answer = self.llm.generate(GENERAL_PROMPT, history, user_msg) |
| sources = [{"source": "web search", "query": query}] |
|
|
| return { |
| "answer": answer, |
| "query_type": q_type, |
| "sources": sources, |
| "model": self.llm.model, |
| "tokens_used": {"input": 0, "output": 0} |
| } |
|
|
| |
|
|
| async def generate_stream(self, query: str, conversation_history=None, top_k: int = 5): |
| history = conversation_history or [] |
| q_type = classify_query(query) |
|
|
| |
| yield f"__QTYPE__{q_type}" |
|
|
| if q_type == "portfolio": |
| chunks, context = self._rag_context(query, top_k) |
| if not context: |
| yield ("I don't have specific details about that in Abhishek's portfolio. " |
| "You can contact him at abhishek.pathak01111@gmail.com") |
| return |
| user_msg = f"Context about Abhishek:\n{context}\n\nQuestion: {query}\n\nAnswer from context only." |
| system = PORTFOLIO_PROMPT |
| else: |
| |
| loop = asyncio.get_event_loop() |
| context = await loop.run_in_executor(None, self.web.search, query) |
| if context: |
| user_msg = f"Web search results:\n{context}\n\nQuestion: {query}\n\nAnswer from search results." |
| else: |
| user_msg = f"Question: {query}\n\nAnswer from knowledge, or say couldn't find reliable info." |
| system = GENERAL_PROMPT |
|
|
| for chunk in self.llm.stream(system, history, user_msg): |
| yield chunk |