Zahid0123's picture
Update app.py
08c651b verified
import os
import re
import logging
import tempfile
from pathlib import Path
from typing import List,Tuple,Any
import numpy as np
import PyPDF2
from sentence_transformers import SentenceTransformer
import faiss
import gradio as gr
from gtts import gTTS
import requests
import math
import ast
import json
try:
import sympy as sp
SYMPY_OK = True
except Exception:
SYMPY_OK = False
try:
from groq import Groq
GROQ_OK = True
except ImportError:
GROQ_OK = False
print("Groq library not installed!")
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
GROQ_API_KEY = os.getenv("GROQ_API_KEY","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")
groq_client = None
if GROQ_OK:
try:
groq_client = Groq(api_key=GROQ_API_KEY)
print("Groq client initialized successfully!")
except Exception as e:
groq_client = None
print(f"Groq initialization error: {e}")
class SafeEval(ast.NodeVisitor):
ALLOWED_NAMES = {n: getattr(math,n) for n in dir(math) if not n.startswith("__")}
ALLOWED_NAMES.update({"abs": abs,"round": round,"pi": math.pi,"e": math.e})
def visit(self,node):
if isinstance(node,ast.Expression):
return self.visit(node.body)
if isinstance(node,ast.BinOp):
left = self.visit(node.left)
right = self.visit(node.right)
return self._binop(node.op,left,right)
if isinstance(node,ast.UnaryOp):
operand = self.visit(node.operand)
return self._unaryop(node.op,operand)
if isinstance(node,ast.Num):
return node.n
if isinstance(node,ast.Constant) and isinstance(node.value,(int,float)):
return node.value
if isinstance(node,ast.Call):
func = node.func
if isinstance(func,ast.Name) and func.id in self.ALLOWED_NAMES:
args = [self.visit(a) for a in node.args]
return self.ALLOWED_NAMES[func.id](*args)
if isinstance(node,ast.Name):
if node.id in self.ALLOWED_NAMES:
return self.ALLOWED_NAMES[node.id]
raise ValueError(f"Use of name '{node.id}' is not allowed")
raise ValueError(f"Unsupported expression: {ast.dump(node)}")
def _binop(self,op,a,b):
if isinstance(op,ast.Add): return a + b
if isinstance(op,ast.Sub): return a - b
if isinstance(op,ast.Mult): return a * b
if isinstance(op,ast.Div): return a / b
if isinstance(op,ast.Mod): return a % b
if isinstance(op,ast.Pow): return a ** b
if isinstance(op,ast.FloorDiv): return a // b
raise ValueError("Unsupported binary operator")
def _unaryop(self,op,a):
if isinstance(op,ast.UAdd): return +a
if isinstance(op,ast.USub): return -a
raise ValueError("Unsupported unary operator")
def safe_calc_eval(expr: str):
expr = expr.strip()
expr = expr.replace('^','**')
expr = expr.replace('x','*').replace('X','*')
expr = expr.replace('×','*').replace('÷','/')
if SYMPY_OK:
try:
result = sp.sympify(expr)
numeric = float(result.evalf())
return True,str(numeric)
except:
pass
try:
node = ast.parse(expr,mode='eval')
se = SafeEval()
val = se.visit(node)
return True,str(val)
except Exception as e:
return False,f"Calc error: {e}"
def get_stock_price(symbol: str) -> dict:
symbol = symbol.upper().strip()
try:
url = f"https://query1.finance.yahoo.com/v8/finance/chart/{symbol}"
headers = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"}
resp = requests.get(url,headers=headers,timeout=10)
resp.raise_for_status()
data = resp.json()
if "chart" in data and "result" in data["chart"] and data["chart"]["result"]:
result = data["chart"]["result"][0]
meta = result.get("meta",{})
current_price = meta.get("regularMarketPrice",0)
previous_close = meta.get("previousClose",0)
currency = meta.get("currency","USD")
exchange = meta.get("exchangeName","Unknown")
name = meta.get("shortName",symbol)
change = current_price - previous_close if previous_close else 0
change_percent = (change / previous_close * 100) if previous_close else 0
return {
"success": True,
"symbol": symbol,
"name": name,
"price": round(current_price,2),
"change": round(change,2),
"change_percent": round(change_percent,2),
"previous_close": round(previous_close,2),
"currency": currency,
"exchange": exchange
}
return {"success": False,"error": f"No data for {symbol}"}
except Exception as e:
logger.error(f"Stock API error: {e}")
return {"success": False,"error": str(e)}
def extract_stock_symbol(question: str) -> str:
question_upper = question.upper()
known_stocks = {
"CARECLOUD": "MTBC","CARE CLOUD": "MTBC","MTBC": "MTBC",
"APPLE": "AAPL","GOOGLE": "GOOGL","ALPHABET": "GOOGL",
"MICROSOFT": "MSFT","AMAZON": "AMZN","TESLA": "TSLA",
"META": "META","FACEBOOK": "META","NVIDIA": "NVDA",
"NETFLIX": "NFLX","INTEL": "INTC","AMD": "AMD",
"PAYPAL": "PYPL","DISNEY": "DIS","WALMART": "WMT",
"NIKE": "NKE","BOEING": "BA","UBER": "UBER",
"ZOOM": "ZM","SPOTIFY": "SPOT"
}
for name,symbol in known_stocks.items():
if name in question_upper:
logger.info(f"Found stock: {name} -> {symbol}")
return symbol
common_words = {'THE','AND','FOR','ARE','BUT','NOT','YOU','ALL',
'STOCK','PRICE','CURRENT','TELL','ABOUT','WHAT','HOW'}
words = re.findall(r'\b[A-Z]{2,5}\b',question_upper)
for word in words:
if word not in common_words:
return word
return ""
def web_search(query: str,max_results: int = 5) -> List[dict]:
try:
resp = requests.get(
"https://html.duckduckgo.com/html/",
params={"q": query},
timeout=10,
headers={"User-Agent": "Mozilla/5.0"}
)
resp.raise_for_status()
text = resp.text
results = []
parts = text.split('result__a')
for part in parts[1:max_results+1]:
title = ""
snippet = ""
try:
title_match = re.search(r'>([^<]+)<',part)
title = title_match.group(1) if title_match else ""
except:
pass
try:
if 'result__snippet' in part:
snippet_part = part.split('result__snippet')[1]
snippet_match = re.search(r'>([^<]+)<',snippet_part)
snippet = snippet_match.group(1) if snippet_match else ""
except:
pass
if title or snippet:
results.append({"title": title.strip(),"snippet": snippet.strip()})
return results
except Exception as e:
logger.error(f"Web search error: {e}")
return []
class AgenticRAGAgent:
def __init__(self):
self.chunks = []
self.index = None
self.embedder = SentenceTransformer('all-MiniLM-L6-v2')
self.temperature = 0.3
self.max_tokens = 1000
self.chunk_size = 512
self.chunk_overlap = 50
self.retrieval_k = 10
self.enable_web_search = True
self.enable_calculations = True
self.enable_fact_checking = True
self.enable_analysis = True
self.enable_stock_lookup = True
self.relevance_threshold = 0.35
self.pdf_loaded = False
print("AgenticRAGAgent initialized")
def remove_emojis(self,text: str) -> str:
emoji_pattern = re.compile("["
u"\U0001F600-\U0001F64F"
u"\U0001F300-\U0001F5FF"
u"\U0001F680-\U0001F6FF"
u"\U0001F1E0-\U0001F1FF"
u"\U00002702-\U000027B0"
u"\U000024C2-\U0001F251"
"]+",flags=re.UNICODE)
return emoji_pattern.sub(r'',text)
def clean_for_voice(self,text: str) -> str:
text = self.remove_emojis(text)
text = re.sub(r'[\*_`#\[\]\|]','',text)
text = re.sub(r'\s+',' ',text).strip()
return text
def generate_voice(self,text: str):
if not text or not text.strip():
return None
clean = self.clean_for_voice(text)
if len(clean) < 5:
return None
try:
tts = gTTS(text=clean[:500],lang='en',slow=False)
tmp = tempfile.NamedTemporaryFile(delete=False,suffix=".mp3")
tts.save(tmp.name)
return tmp.name
except Exception as e:
logger.error(f"Voice error: {e}")
return None
def upload_pdfs(self,files):
if not files:
return "No files selected."
folder = Path("sample_data")
folder.mkdir(exist_ok=True)
all_chunks = []
count = 0
for file in files:
filename = str(file.name) if hasattr(file,'name') else str(file)
if not filename.lower().endswith('.pdf'):
continue
dest = folder / Path(filename).name
try:
content = file.read() if hasattr(file,'read') else open(filename,'rb').read()
with open(dest,"wb") as f:
f.write(content)
except Exception as e:
continue
text = ""
try:
with open(dest,'rb') as f:
reader = PyPDF2.PdfReader(f)
for page in reader.pages:
t = page.extract_text()
if t:
text += t + " "
except Exception as e:
continue
if text.strip():
chunks = [text[i:i+self.chunk_size] for i in range(0,len(text),self.chunk_size - self.chunk_overlap)]
all_chunks.extend([{"content": str(c.strip())} for c in chunks if c.strip()])
count += 1
if not all_chunks:
return "No readable text in PDFs."
vecs = self.embedder.encode([c["content"] for c in all_chunks],show_progress_bar=True)
vecs = vecs / np.linalg.norm(vecs,axis=1,keepdims=True)
dim = vecs.shape[1]
self.index = faiss.IndexFlatIP(dim)
self.index.add(vecs.astype('float32'))
self.chunks = all_chunks
self.pdf_loaded = True
return f"Loaded {count} PDF(s) with {len(all_chunks)} chunks!"
def is_stock_question(self,question: str) -> Tuple[bool,str]:
question_lower = question.lower()
stock_keywords = ['stock','share','price','trading','ticker','nasdaq','nyse','market']
known_companies = ['carecloud','mtbc','apple','google','microsoft','amazon',
'tesla','meta','nvidia','netflix','intel','amd']
has_keyword = any(kw in question_lower for kw in stock_keywords)
has_company = any(co in question_lower for co in known_companies)
if has_keyword or has_company:
symbol = extract_stock_symbol(question)
if symbol:
logger.info(f"Stock question detected: {symbol}")
return True,symbol
return False,""
def is_calculation_question(self,question: str) -> Tuple[bool,str]:
question_lower = question.lower()
calc_keywords = ['calculate','compute','solve','calcuate','calc']
has_calc_word = any(kw in question_lower for kw in calc_keywords)
math_match = re.search(r'(\d+)\s*[\*xX×\+\-\/÷\^]\s*(\d+)',question)
if math_match:
expr = math_match.group(0)
expr = expr.replace('x','*').replace('X','*').replace('×','*').replace('÷','/')
logger.info(f"Math expression found: {expr}")
return True,expr
pure_math = re.match(r'^[\d\s\+\-\*\/\^\(\)\.xX×÷]+$',question.strip())
if pure_math:
expr = question.strip()
expr = expr.replace('x','*').replace('X','*').replace('×','*').replace('÷','/')
return True,expr
if has_calc_word:
nums = re.findall(r'\d+',question)
if len(nums) >= 2:
expr = f"{nums[0]}*{nums[1]}"
return True,expr
return False,""
def is_pdf_related_question(self,question: str) -> bool:
pdf_keywords = ['pdf','document','file','attached','uploaded','summarize',
'summary','in the document','from the document','the paper']
question_lower = question.lower()
return any(kw in question_lower for kw in pdf_keywords)
def is_general_knowledge_question(self,question: str) -> bool:
question_lower = question.lower()
if 'stock' in question_lower or 'price' in question_lower:
return False
if re.search(r'\d+\s*[\*\+\-\/]\s*\d+',question):
return False
general_triggers = ['what is ai','how does','explain','tell me about',
'history of','future of','definition']
return any(t in question_lower for t in general_triggers)
def check_context_relevance(self,question: str,context: str,scores: np.ndarray) -> Tuple[bool,float]:
if not context:
return False,0.0
max_score = float(np.max(scores)) if len(scores) > 0 else 0.0
stop_words = {'what','is','the','a','how','tell','me','about','stock','price'}
q_terms = [w.lower() for w in re.findall(r'\b\w+\b',question) if w.lower() not in stop_words and len(w) > 2]
matches = sum(1 for t in q_terms if t in context.lower())
coverage = matches / len(q_terms) if q_terms else 0
is_relevant = max_score >= self.relevance_threshold and coverage >= 0.3
return is_relevant,max_score
def determine_tool(self,question: str) -> Tuple[str,str]:
logger.info(f"Determining tool for: {question}")
is_stock,symbol = self.is_stock_question(question)
if is_stock and symbol:
logger.info(f"Tool: STOCK,Symbol: {symbol}")
return 'stock',symbol
is_calc,expr = self.is_calculation_question(question)
if is_calc and expr:
logger.info(f"Tool: CALCULATOR,Expression: {expr}")
return 'calculator',expr
if self.is_pdf_related_question(question):
if self.pdf_loaded:
logger.info("Tool: PDF")
return 'pdf',''
if self.is_general_knowledge_question(question):
logger.info("Tool: WEB")
return 'web',''
if self.pdf_loaded:
return 'check_pdf',''
logger.info("Tool: WEB (default)")
return 'web',''
def perform_analysis(self,answer: str,tools_used: List[str]) -> str:
if not self.enable_analysis or not answer:
return ""
analysis = []
for tool in tools_used:
if tool == "PDF":
analysis.append("📄 Source: PDF Documents")
elif tool == "Web":
analysis.append("🌐 Source: Web Search")
elif tool == "Calculator":
analysis.append("🧮 Source: Calculator")
elif tool == "Stock":
analysis.append("📈 Source: Yahoo Finance (Real-time)")
word_count = len(answer.split())
analysis.append(f"📊 Response: {word_count} words")
if analysis:
return "\n\n[📊 Analysis]\n• " + "\n• ".join(analysis)
return ""
def ask(self,question: str,history: List) -> Tuple[List,Any]:
global groq_client
if not isinstance(question,str):
question = str(question) if question else ""
if not isinstance(history,list):
history = []
question = question.strip()
if not question:
return history,None
if question.lower() in ["hi","hello","hey"]:
reply = "👋 Hi! I can help with:\n• 📈 Stock prices (try: 'stock price of MTBC')\n• 🧮 Calculations (try: '2*4')\n• 📄 PDF questions\n• 🌐 Web search"
history.append([question,reply])
return history,self.generate_voice(reply)
tools_used = []
reply = ""
tool,extra = self.determine_tool(question)
logger.info(f"Selected tool: {tool},extra: {extra}")
# STOCK TOOL
if tool == 'stock' and extra:
stock_data = get_stock_price(extra)
if stock_data.get("success"):
change_emoji = "📈" if stock_data["change"] >= 0 else "📉"
sign = "+" if stock_data["change"] >= 0 else ""
reply = f"""## 📈 {stock_data['name']} ({stock_data['symbol']})
**Current Price:** ${stock_data['price']} {stock_data['currency']}
**Change:** {change_emoji} {sign}${stock_data['change']} ({sign}{stock_data['change_percent']}%)
**Previous Close:** ${stock_data['previous_close']}
**Exchange:** {stock_data['exchange']}
*Real-time data from Yahoo Finance*"""
tools_used.append("Stock")
else:
tool = 'web'
# CALCULATOR TOOL
if tool == 'calculator' and extra:
ok,result = safe_calc_eval(extra)
if ok:
reply = f"""## 🧮 Calculator
**Expression:** `{extra}`
**Result:** **{result}**"""
tools_used.append("Calculator")
else:
reply = f"Calculation error: {result}"
tools_used.append("Calculator")
# PDF TOOL
if tool in ['pdf','check_pdf'] and self.index:
try:
q_vec = self.embedder.encode([question])
q_vec = q_vec / np.linalg.norm(q_vec)
scores,indices = self.index.search(q_vec.astype('float32'),k=self.retrieval_k)
context_list = [self.chunks[i]["content"] for i in indices[0] if i < len(self.chunks)]
context = "\n\n".join(context_list)
if tool == 'pdf' or self.check_context_relevance(question,context,scores[0])[0]:
tools_used.append("PDF")
prompt = f"Document:\n{context}\n\nQuestion: {question}\n\nAnswer based on the document:"
if groq_client:
resp = groq_client.chat.completions.create(
model="llama-3.3-70b-versatile",
messages=[{"role": "user","content": prompt}],
temperature=self.temperature,
max_tokens=self.max_tokens
)
reply = resp.choices[0].message.content.strip()
else:
tool = 'web'
except Exception as e:
logger.error(f"PDF error: {e}")
tool = 'web'
# WEB SEARCH TOOL
if tool == 'web' and not reply:
results = web_search(question)
if results:
tools_used.append("Web")
web_text = "\n".join([f"- {r['title']}: {r['snippet']}" for r in results[:3]])
prompt = f"Web results:\n{web_text}\n\nQuestion: {question}\n\nProvide a helpful answer:"
if groq_client:
try:
resp = groq_client.chat.completions.create(
model="llama-3.3-70b-versatile",
messages=[{"role": "user","content": prompt}],
temperature=self.temperature,
max_tokens=self.max_tokens
)
reply = resp.choices[0].message.content.strip()
reply += "\n\n🌐 **Web Sources:**\n" + "\n".join([f"• {r['title']}" for r in results[:3]])
except Exception as e:
reply = f"Error: {e}"
else:
reply = "Web results:\n" + web_text
# FALLBACK
if not reply:
if groq_client:
try:
resp = groq_client.chat.completions.create(
model="llama-3.3-70b-versatile",
messages=[{"role": "user","content": question}],
temperature=self.temperature,
max_tokens=self.max_tokens
)
reply = resp.choices[0].message.content.strip()
tools_used.append("LLM")
except Exception as e:
reply = f"Error: {e}"
else:
reply = "Unable to process request."
# Add analysis
analysis = self.perform_analysis(reply,tools_used)
if analysis:
reply += analysis
logger.info(f"Tools used: {tools_used}")
history.append([question,reply])
return history,self.generate_voice(reply)
def update_settings(self,temp,tokens,chunk_size,overlap,k,web,calc,fact,analysis):
self.temperature = float(temp)
self.max_tokens = int(tokens)
self.chunk_size = int(chunk_size)
self.chunk_overlap = int(overlap)
self.retrieval_k = int(k)
self.enable_web_search = bool(web)
self.enable_calculations = bool(calc)
self.enable_fact_checking = bool(fact)
self.enable_analysis = bool(analysis)
return f"Settings updated! Temp={temp},Tokens={tokens}"
def create_interface():
agent = AgenticRAGAgent()
with gr.Blocks(title="AI Research Agent") as interface:
chat_memory = gr.State([])
gr.HTML("""
<div style="text-align:center;padding:20px;background:linear-gradient(135deg,#667eea 0%,#764ba2 100%);border-radius:15px;">
<h1 style="color:white;">🤖 AI Research Agent</h1>
<p style="color:white;">📈 Stocks | 🧮 Calculator | 📄 PDF | 🌐 Web Search</p>
</div>
""")
with gr.Row():
with gr.Column(scale=2):
chatbot = gr.Chatbot(label="Chat",height=500)
with gr.Row():
msg = gr.Textbox(placeholder="Try: 'stock price of MTBC' or '2*4' or 'summarize the PDF'",scale=4)
submit_btn = gr.Button("Send",variant="primary")
clear_btn = gr.Button("Clear")
audio_output = gr.Audio(label="Voice",autoplay=True)
with gr.Column(scale=1):
pdf_upload = gr.Files(file_types=[".pdf"],label="Upload PDFs")
upload_status = gr.Textbox(label="Status",interactive=False)
with gr.Accordion("Settings",open=False):
temp = gr.Slider(0,1,value=0.3,label="Temperature")
tokens = gr.Slider(100,2000,value=1000,label="Max Tokens")
chunk = gr.Slider(256,1024,value=512,label="Chunk Size")
overlap = gr.Slider(0,200,value=50,label="Overlap")
k = gr.Slider(3,15,value=10,label="Retrieval K")
web = gr.Checkbox(value=True,label="Web Search")
calc = gr.Checkbox(value=True,label="Calculator")
fact = gr.Checkbox(value=True,label="Fact Check")
analysis = gr.Checkbox(value=True,label="Analysis")
apply_btn = gr.Button("Apply")
status = gr.Textbox(label="Settings Status")
def respond(message,history):
new_history,audio = agent.ask(message,history)
display = []
for item in new_history:
if isinstance(item,list) and len(item) == 2:
display.append({"role": "user","content": str(item[0])})
display.append({"role": "assistant","content": str(item[1])})
return "",new_history,display,audio
submit_btn.click(respond,[msg,chat_memory],[msg,chat_memory,chatbot,audio_output])
msg.submit(respond,[msg,chat_memory],[msg,chat_memory,chatbot,audio_output])
clear_btn.click(lambda: ([],[]),outputs=[chat_memory,chatbot])
pdf_upload.change(agent.upload_pdfs,[pdf_upload],[upload_status])
apply_btn.click(agent.update_settings,[temp,tokens,chunk,overlap,k,web,calc,fact,analysis],[status])
return interface
if __name__ == "__main__":
print("Starting AI Research Agent...")
app = create_interface()
app.launch(server_name="0.0.0.0",server_port=7860,show_error=True)