Spaces:
Sleeping
Sleeping
| import os | |
| import re | |
| import logging | |
| import tempfile | |
| from pathlib import Path | |
| from typing import List,Tuple,Any | |
| import numpy as np | |
| import PyPDF2 | |
| from sentence_transformers import SentenceTransformer | |
| import faiss | |
| import gradio as gr | |
| from gtts import gTTS | |
| import requests | |
| import math | |
| import ast | |
| import json | |
| try: | |
| import sympy as sp | |
| SYMPY_OK = True | |
| except Exception: | |
| SYMPY_OK = False | |
| try: | |
| from groq import Groq | |
| GROQ_OK = True | |
| except ImportError: | |
| GROQ_OK = False | |
| print("Groq library not installed!") | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| GROQ_API_KEY = os.getenv("GROQ_API_KEY","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") | |
| groq_client = None | |
| if GROQ_OK: | |
| try: | |
| groq_client = Groq(api_key=GROQ_API_KEY) | |
| print("Groq client initialized successfully!") | |
| except Exception as e: | |
| groq_client = None | |
| print(f"Groq initialization error: {e}") | |
| class SafeEval(ast.NodeVisitor): | |
| ALLOWED_NAMES = {n: getattr(math,n) for n in dir(math) if not n.startswith("__")} | |
| ALLOWED_NAMES.update({"abs": abs,"round": round,"pi": math.pi,"e": math.e}) | |
| def visit(self,node): | |
| if isinstance(node,ast.Expression): | |
| return self.visit(node.body) | |
| if isinstance(node,ast.BinOp): | |
| left = self.visit(node.left) | |
| right = self.visit(node.right) | |
| return self._binop(node.op,left,right) | |
| if isinstance(node,ast.UnaryOp): | |
| operand = self.visit(node.operand) | |
| return self._unaryop(node.op,operand) | |
| if isinstance(node,ast.Num): | |
| return node.n | |
| if isinstance(node,ast.Constant) and isinstance(node.value,(int,float)): | |
| return node.value | |
| if isinstance(node,ast.Call): | |
| func = node.func | |
| if isinstance(func,ast.Name) and func.id in self.ALLOWED_NAMES: | |
| args = [self.visit(a) for a in node.args] | |
| return self.ALLOWED_NAMES[func.id](*args) | |
| if isinstance(node,ast.Name): | |
| if node.id in self.ALLOWED_NAMES: | |
| return self.ALLOWED_NAMES[node.id] | |
| raise ValueError(f"Use of name '{node.id}' is not allowed") | |
| raise ValueError(f"Unsupported expression: {ast.dump(node)}") | |
| def _binop(self,op,a,b): | |
| if isinstance(op,ast.Add): return a + b | |
| if isinstance(op,ast.Sub): return a - b | |
| if isinstance(op,ast.Mult): return a * b | |
| if isinstance(op,ast.Div): return a / b | |
| if isinstance(op,ast.Mod): return a % b | |
| if isinstance(op,ast.Pow): return a ** b | |
| if isinstance(op,ast.FloorDiv): return a // b | |
| raise ValueError("Unsupported binary operator") | |
| def _unaryop(self,op,a): | |
| if isinstance(op,ast.UAdd): return +a | |
| if isinstance(op,ast.USub): return -a | |
| raise ValueError("Unsupported unary operator") | |
| def safe_calc_eval(expr: str): | |
| expr = expr.strip() | |
| expr = expr.replace('^','**') | |
| expr = expr.replace('x','*').replace('X','*') | |
| expr = expr.replace('×','*').replace('÷','/') | |
| if SYMPY_OK: | |
| try: | |
| result = sp.sympify(expr) | |
| numeric = float(result.evalf()) | |
| return True,str(numeric) | |
| except: | |
| pass | |
| try: | |
| node = ast.parse(expr,mode='eval') | |
| se = SafeEval() | |
| val = se.visit(node) | |
| return True,str(val) | |
| except Exception as e: | |
| return False,f"Calc error: {e}" | |
| def get_stock_price(symbol: str) -> dict: | |
| symbol = symbol.upper().strip() | |
| try: | |
| url = f"https://query1.finance.yahoo.com/v8/finance/chart/{symbol}" | |
| headers = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"} | |
| resp = requests.get(url,headers=headers,timeout=10) | |
| resp.raise_for_status() | |
| data = resp.json() | |
| if "chart" in data and "result" in data["chart"] and data["chart"]["result"]: | |
| result = data["chart"]["result"][0] | |
| meta = result.get("meta",{}) | |
| current_price = meta.get("regularMarketPrice",0) | |
| previous_close = meta.get("previousClose",0) | |
| currency = meta.get("currency","USD") | |
| exchange = meta.get("exchangeName","Unknown") | |
| name = meta.get("shortName",symbol) | |
| change = current_price - previous_close if previous_close else 0 | |
| change_percent = (change / previous_close * 100) if previous_close else 0 | |
| return { | |
| "success": True, | |
| "symbol": symbol, | |
| "name": name, | |
| "price": round(current_price,2), | |
| "change": round(change,2), | |
| "change_percent": round(change_percent,2), | |
| "previous_close": round(previous_close,2), | |
| "currency": currency, | |
| "exchange": exchange | |
| } | |
| return {"success": False,"error": f"No data for {symbol}"} | |
| except Exception as e: | |
| logger.error(f"Stock API error: {e}") | |
| return {"success": False,"error": str(e)} | |
| def extract_stock_symbol(question: str) -> str: | |
| question_upper = question.upper() | |
| known_stocks = { | |
| "CARECLOUD": "MTBC","CARE CLOUD": "MTBC","MTBC": "MTBC", | |
| "APPLE": "AAPL","GOOGLE": "GOOGL","ALPHABET": "GOOGL", | |
| "MICROSOFT": "MSFT","AMAZON": "AMZN","TESLA": "TSLA", | |
| "META": "META","FACEBOOK": "META","NVIDIA": "NVDA", | |
| "NETFLIX": "NFLX","INTEL": "INTC","AMD": "AMD", | |
| "PAYPAL": "PYPL","DISNEY": "DIS","WALMART": "WMT", | |
| "NIKE": "NKE","BOEING": "BA","UBER": "UBER", | |
| "ZOOM": "ZM","SPOTIFY": "SPOT" | |
| } | |
| for name,symbol in known_stocks.items(): | |
| if name in question_upper: | |
| logger.info(f"Found stock: {name} -> {symbol}") | |
| return symbol | |
| common_words = {'THE','AND','FOR','ARE','BUT','NOT','YOU','ALL', | |
| 'STOCK','PRICE','CURRENT','TELL','ABOUT','WHAT','HOW'} | |
| words = re.findall(r'\b[A-Z]{2,5}\b',question_upper) | |
| for word in words: | |
| if word not in common_words: | |
| return word | |
| return "" | |
| def web_search(query: str,max_results: int = 5) -> List[dict]: | |
| try: | |
| resp = requests.get( | |
| "https://html.duckduckgo.com/html/", | |
| params={"q": query}, | |
| timeout=10, | |
| headers={"User-Agent": "Mozilla/5.0"} | |
| ) | |
| resp.raise_for_status() | |
| text = resp.text | |
| results = [] | |
| parts = text.split('result__a') | |
| for part in parts[1:max_results+1]: | |
| title = "" | |
| snippet = "" | |
| try: | |
| title_match = re.search(r'>([^<]+)<',part) | |
| title = title_match.group(1) if title_match else "" | |
| except: | |
| pass | |
| try: | |
| if 'result__snippet' in part: | |
| snippet_part = part.split('result__snippet')[1] | |
| snippet_match = re.search(r'>([^<]+)<',snippet_part) | |
| snippet = snippet_match.group(1) if snippet_match else "" | |
| except: | |
| pass | |
| if title or snippet: | |
| results.append({"title": title.strip(),"snippet": snippet.strip()}) | |
| return results | |
| except Exception as e: | |
| logger.error(f"Web search error: {e}") | |
| return [] | |
| class AgenticRAGAgent: | |
| def __init__(self): | |
| self.chunks = [] | |
| self.index = None | |
| self.embedder = SentenceTransformer('all-MiniLM-L6-v2') | |
| self.temperature = 0.3 | |
| self.max_tokens = 1000 | |
| self.chunk_size = 512 | |
| self.chunk_overlap = 50 | |
| self.retrieval_k = 10 | |
| self.enable_web_search = True | |
| self.enable_calculations = True | |
| self.enable_fact_checking = True | |
| self.enable_analysis = True | |
| self.enable_stock_lookup = True | |
| self.relevance_threshold = 0.35 | |
| self.pdf_loaded = False | |
| print("AgenticRAGAgent initialized") | |
| def remove_emojis(self,text: str) -> str: | |
| emoji_pattern = re.compile("[" | |
| u"\U0001F600-\U0001F64F" | |
| u"\U0001F300-\U0001F5FF" | |
| u"\U0001F680-\U0001F6FF" | |
| u"\U0001F1E0-\U0001F1FF" | |
| u"\U00002702-\U000027B0" | |
| u"\U000024C2-\U0001F251" | |
| "]+",flags=re.UNICODE) | |
| return emoji_pattern.sub(r'',text) | |
| def clean_for_voice(self,text: str) -> str: | |
| text = self.remove_emojis(text) | |
| text = re.sub(r'[\*_`#\[\]\|]','',text) | |
| text = re.sub(r'\s+',' ',text).strip() | |
| return text | |
| def generate_voice(self,text: str): | |
| if not text or not text.strip(): | |
| return None | |
| clean = self.clean_for_voice(text) | |
| if len(clean) < 5: | |
| return None | |
| try: | |
| tts = gTTS(text=clean[:500],lang='en',slow=False) | |
| tmp = tempfile.NamedTemporaryFile(delete=False,suffix=".mp3") | |
| tts.save(tmp.name) | |
| return tmp.name | |
| except Exception as e: | |
| logger.error(f"Voice error: {e}") | |
| return None | |
| def upload_pdfs(self,files): | |
| if not files: | |
| return "No files selected." | |
| folder = Path("sample_data") | |
| folder.mkdir(exist_ok=True) | |
| all_chunks = [] | |
| count = 0 | |
| for file in files: | |
| filename = str(file.name) if hasattr(file,'name') else str(file) | |
| if not filename.lower().endswith('.pdf'): | |
| continue | |
| dest = folder / Path(filename).name | |
| try: | |
| content = file.read() if hasattr(file,'read') else open(filename,'rb').read() | |
| with open(dest,"wb") as f: | |
| f.write(content) | |
| except Exception as e: | |
| continue | |
| text = "" | |
| try: | |
| with open(dest,'rb') as f: | |
| reader = PyPDF2.PdfReader(f) | |
| for page in reader.pages: | |
| t = page.extract_text() | |
| if t: | |
| text += t + " " | |
| except Exception as e: | |
| continue | |
| if text.strip(): | |
| chunks = [text[i:i+self.chunk_size] for i in range(0,len(text),self.chunk_size - self.chunk_overlap)] | |
| all_chunks.extend([{"content": str(c.strip())} for c in chunks if c.strip()]) | |
| count += 1 | |
| if not all_chunks: | |
| return "No readable text in PDFs." | |
| vecs = self.embedder.encode([c["content"] for c in all_chunks],show_progress_bar=True) | |
| vecs = vecs / np.linalg.norm(vecs,axis=1,keepdims=True) | |
| dim = vecs.shape[1] | |
| self.index = faiss.IndexFlatIP(dim) | |
| self.index.add(vecs.astype('float32')) | |
| self.chunks = all_chunks | |
| self.pdf_loaded = True | |
| return f"Loaded {count} PDF(s) with {len(all_chunks)} chunks!" | |
| def is_stock_question(self,question: str) -> Tuple[bool,str]: | |
| question_lower = question.lower() | |
| stock_keywords = ['stock','share','price','trading','ticker','nasdaq','nyse','market'] | |
| known_companies = ['carecloud','mtbc','apple','google','microsoft','amazon', | |
| 'tesla','meta','nvidia','netflix','intel','amd'] | |
| has_keyword = any(kw in question_lower for kw in stock_keywords) | |
| has_company = any(co in question_lower for co in known_companies) | |
| if has_keyword or has_company: | |
| symbol = extract_stock_symbol(question) | |
| if symbol: | |
| logger.info(f"Stock question detected: {symbol}") | |
| return True,symbol | |
| return False,"" | |
| def is_calculation_question(self,question: str) -> Tuple[bool,str]: | |
| question_lower = question.lower() | |
| calc_keywords = ['calculate','compute','solve','calcuate','calc'] | |
| has_calc_word = any(kw in question_lower for kw in calc_keywords) | |
| math_match = re.search(r'(\d+)\s*[\*xX×\+\-\/÷\^]\s*(\d+)',question) | |
| if math_match: | |
| expr = math_match.group(0) | |
| expr = expr.replace('x','*').replace('X','*').replace('×','*').replace('÷','/') | |
| logger.info(f"Math expression found: {expr}") | |
| return True,expr | |
| pure_math = re.match(r'^[\d\s\+\-\*\/\^\(\)\.xX×÷]+$',question.strip()) | |
| if pure_math: | |
| expr = question.strip() | |
| expr = expr.replace('x','*').replace('X','*').replace('×','*').replace('÷','/') | |
| return True,expr | |
| if has_calc_word: | |
| nums = re.findall(r'\d+',question) | |
| if len(nums) >= 2: | |
| expr = f"{nums[0]}*{nums[1]}" | |
| return True,expr | |
| return False,"" | |
| def is_pdf_related_question(self,question: str) -> bool: | |
| pdf_keywords = ['pdf','document','file','attached','uploaded','summarize', | |
| 'summary','in the document','from the document','the paper'] | |
| question_lower = question.lower() | |
| return any(kw in question_lower for kw in pdf_keywords) | |
| def is_general_knowledge_question(self,question: str) -> bool: | |
| question_lower = question.lower() | |
| if 'stock' in question_lower or 'price' in question_lower: | |
| return False | |
| if re.search(r'\d+\s*[\*\+\-\/]\s*\d+',question): | |
| return False | |
| general_triggers = ['what is ai','how does','explain','tell me about', | |
| 'history of','future of','definition'] | |
| return any(t in question_lower for t in general_triggers) | |
| def check_context_relevance(self,question: str,context: str,scores: np.ndarray) -> Tuple[bool,float]: | |
| if not context: | |
| return False,0.0 | |
| max_score = float(np.max(scores)) if len(scores) > 0 else 0.0 | |
| stop_words = {'what','is','the','a','how','tell','me','about','stock','price'} | |
| q_terms = [w.lower() for w in re.findall(r'\b\w+\b',question) if w.lower() not in stop_words and len(w) > 2] | |
| matches = sum(1 for t in q_terms if t in context.lower()) | |
| coverage = matches / len(q_terms) if q_terms else 0 | |
| is_relevant = max_score >= self.relevance_threshold and coverage >= 0.3 | |
| return is_relevant,max_score | |
| def determine_tool(self,question: str) -> Tuple[str,str]: | |
| logger.info(f"Determining tool for: {question}") | |
| is_stock,symbol = self.is_stock_question(question) | |
| if is_stock and symbol: | |
| logger.info(f"Tool: STOCK,Symbol: {symbol}") | |
| return 'stock',symbol | |
| is_calc,expr = self.is_calculation_question(question) | |
| if is_calc and expr: | |
| logger.info(f"Tool: CALCULATOR,Expression: {expr}") | |
| return 'calculator',expr | |
| if self.is_pdf_related_question(question): | |
| if self.pdf_loaded: | |
| logger.info("Tool: PDF") | |
| return 'pdf','' | |
| if self.is_general_knowledge_question(question): | |
| logger.info("Tool: WEB") | |
| return 'web','' | |
| if self.pdf_loaded: | |
| return 'check_pdf','' | |
| logger.info("Tool: WEB (default)") | |
| return 'web','' | |
| def perform_analysis(self,answer: str,tools_used: List[str]) -> str: | |
| if not self.enable_analysis or not answer: | |
| return "" | |
| analysis = [] | |
| for tool in tools_used: | |
| if tool == "PDF": | |
| analysis.append("📄 Source: PDF Documents") | |
| elif tool == "Web": | |
| analysis.append("🌐 Source: Web Search") | |
| elif tool == "Calculator": | |
| analysis.append("🧮 Source: Calculator") | |
| elif tool == "Stock": | |
| analysis.append("📈 Source: Yahoo Finance (Real-time)") | |
| word_count = len(answer.split()) | |
| analysis.append(f"📊 Response: {word_count} words") | |
| if analysis: | |
| return "\n\n[📊 Analysis]\n• " + "\n• ".join(analysis) | |
| return "" | |
| def ask(self,question: str,history: List) -> Tuple[List,Any]: | |
| global groq_client | |
| if not isinstance(question,str): | |
| question = str(question) if question else "" | |
| if not isinstance(history,list): | |
| history = [] | |
| question = question.strip() | |
| if not question: | |
| return history,None | |
| if question.lower() in ["hi","hello","hey"]: | |
| reply = "👋 Hi! I can help with:\n• 📈 Stock prices (try: 'stock price of MTBC')\n• 🧮 Calculations (try: '2*4')\n• 📄 PDF questions\n• 🌐 Web search" | |
| history.append([question,reply]) | |
| return history,self.generate_voice(reply) | |
| tools_used = [] | |
| reply = "" | |
| tool,extra = self.determine_tool(question) | |
| logger.info(f"Selected tool: {tool},extra: {extra}") | |
| # STOCK TOOL | |
| if tool == 'stock' and extra: | |
| stock_data = get_stock_price(extra) | |
| if stock_data.get("success"): | |
| change_emoji = "📈" if stock_data["change"] >= 0 else "📉" | |
| sign = "+" if stock_data["change"] >= 0 else "" | |
| reply = f"""## 📈 {stock_data['name']} ({stock_data['symbol']}) | |
| **Current Price:** ${stock_data['price']} {stock_data['currency']} | |
| **Change:** {change_emoji} {sign}${stock_data['change']} ({sign}{stock_data['change_percent']}%) | |
| **Previous Close:** ${stock_data['previous_close']} | |
| **Exchange:** {stock_data['exchange']} | |
| *Real-time data from Yahoo Finance*""" | |
| tools_used.append("Stock") | |
| else: | |
| tool = 'web' | |
| # CALCULATOR TOOL | |
| if tool == 'calculator' and extra: | |
| ok,result = safe_calc_eval(extra) | |
| if ok: | |
| reply = f"""## 🧮 Calculator | |
| **Expression:** `{extra}` | |
| **Result:** **{result}**""" | |
| tools_used.append("Calculator") | |
| else: | |
| reply = f"Calculation error: {result}" | |
| tools_used.append("Calculator") | |
| # PDF TOOL | |
| if tool in ['pdf','check_pdf'] and self.index: | |
| try: | |
| q_vec = self.embedder.encode([question]) | |
| q_vec = q_vec / np.linalg.norm(q_vec) | |
| scores,indices = self.index.search(q_vec.astype('float32'),k=self.retrieval_k) | |
| context_list = [self.chunks[i]["content"] for i in indices[0] if i < len(self.chunks)] | |
| context = "\n\n".join(context_list) | |
| if tool == 'pdf' or self.check_context_relevance(question,context,scores[0])[0]: | |
| tools_used.append("PDF") | |
| prompt = f"Document:\n{context}\n\nQuestion: {question}\n\nAnswer based on the document:" | |
| if groq_client: | |
| resp = groq_client.chat.completions.create( | |
| model="llama-3.3-70b-versatile", | |
| messages=[{"role": "user","content": prompt}], | |
| temperature=self.temperature, | |
| max_tokens=self.max_tokens | |
| ) | |
| reply = resp.choices[0].message.content.strip() | |
| else: | |
| tool = 'web' | |
| except Exception as e: | |
| logger.error(f"PDF error: {e}") | |
| tool = 'web' | |
| # WEB SEARCH TOOL | |
| if tool == 'web' and not reply: | |
| results = web_search(question) | |
| if results: | |
| tools_used.append("Web") | |
| web_text = "\n".join([f"- {r['title']}: {r['snippet']}" for r in results[:3]]) | |
| prompt = f"Web results:\n{web_text}\n\nQuestion: {question}\n\nProvide a helpful answer:" | |
| if groq_client: | |
| try: | |
| resp = groq_client.chat.completions.create( | |
| model="llama-3.3-70b-versatile", | |
| messages=[{"role": "user","content": prompt}], | |
| temperature=self.temperature, | |
| max_tokens=self.max_tokens | |
| ) | |
| reply = resp.choices[0].message.content.strip() | |
| reply += "\n\n🌐 **Web Sources:**\n" + "\n".join([f"• {r['title']}" for r in results[:3]]) | |
| except Exception as e: | |
| reply = f"Error: {e}" | |
| else: | |
| reply = "Web results:\n" + web_text | |
| # FALLBACK | |
| if not reply: | |
| if groq_client: | |
| try: | |
| resp = groq_client.chat.completions.create( | |
| model="llama-3.3-70b-versatile", | |
| messages=[{"role": "user","content": question}], | |
| temperature=self.temperature, | |
| max_tokens=self.max_tokens | |
| ) | |
| reply = resp.choices[0].message.content.strip() | |
| tools_used.append("LLM") | |
| except Exception as e: | |
| reply = f"Error: {e}" | |
| else: | |
| reply = "Unable to process request." | |
| # Add analysis | |
| analysis = self.perform_analysis(reply,tools_used) | |
| if analysis: | |
| reply += analysis | |
| logger.info(f"Tools used: {tools_used}") | |
| history.append([question,reply]) | |
| return history,self.generate_voice(reply) | |
| def update_settings(self,temp,tokens,chunk_size,overlap,k,web,calc,fact,analysis): | |
| self.temperature = float(temp) | |
| self.max_tokens = int(tokens) | |
| self.chunk_size = int(chunk_size) | |
| self.chunk_overlap = int(overlap) | |
| self.retrieval_k = int(k) | |
| self.enable_web_search = bool(web) | |
| self.enable_calculations = bool(calc) | |
| self.enable_fact_checking = bool(fact) | |
| self.enable_analysis = bool(analysis) | |
| return f"Settings updated! Temp={temp},Tokens={tokens}" | |
| def create_interface(): | |
| agent = AgenticRAGAgent() | |
| with gr.Blocks(title="AI Research Agent") as interface: | |
| chat_memory = gr.State([]) | |
| gr.HTML(""" | |
| <div style="text-align:center;padding:20px;background:linear-gradient(135deg,#667eea 0%,#764ba2 100%);border-radius:15px;"> | |
| <h1 style="color:white;">🤖 AI Research Agent</h1> | |
| <p style="color:white;">📈 Stocks | 🧮 Calculator | 📄 PDF | 🌐 Web Search</p> | |
| </div> | |
| """) | |
| with gr.Row(): | |
| with gr.Column(scale=2): | |
| chatbot = gr.Chatbot(label="Chat",height=500) | |
| with gr.Row(): | |
| msg = gr.Textbox(placeholder="Try: 'stock price of MTBC' or '2*4' or 'summarize the PDF'",scale=4) | |
| submit_btn = gr.Button("Send",variant="primary") | |
| clear_btn = gr.Button("Clear") | |
| audio_output = gr.Audio(label="Voice",autoplay=True) | |
| with gr.Column(scale=1): | |
| pdf_upload = gr.Files(file_types=[".pdf"],label="Upload PDFs") | |
| upload_status = gr.Textbox(label="Status",interactive=False) | |
| with gr.Accordion("Settings",open=False): | |
| temp = gr.Slider(0,1,value=0.3,label="Temperature") | |
| tokens = gr.Slider(100,2000,value=1000,label="Max Tokens") | |
| chunk = gr.Slider(256,1024,value=512,label="Chunk Size") | |
| overlap = gr.Slider(0,200,value=50,label="Overlap") | |
| k = gr.Slider(3,15,value=10,label="Retrieval K") | |
| web = gr.Checkbox(value=True,label="Web Search") | |
| calc = gr.Checkbox(value=True,label="Calculator") | |
| fact = gr.Checkbox(value=True,label="Fact Check") | |
| analysis = gr.Checkbox(value=True,label="Analysis") | |
| apply_btn = gr.Button("Apply") | |
| status = gr.Textbox(label="Settings Status") | |
| def respond(message,history): | |
| new_history,audio = agent.ask(message,history) | |
| display = [] | |
| for item in new_history: | |
| if isinstance(item,list) and len(item) == 2: | |
| display.append({"role": "user","content": str(item[0])}) | |
| display.append({"role": "assistant","content": str(item[1])}) | |
| return "",new_history,display,audio | |
| submit_btn.click(respond,[msg,chat_memory],[msg,chat_memory,chatbot,audio_output]) | |
| msg.submit(respond,[msg,chat_memory],[msg,chat_memory,chatbot,audio_output]) | |
| clear_btn.click(lambda: ([],[]),outputs=[chat_memory,chatbot]) | |
| pdf_upload.change(agent.upload_pdfs,[pdf_upload],[upload_status]) | |
| apply_btn.click(agent.update_settings,[temp,tokens,chunk,overlap,k,web,calc,fact,analysis],[status]) | |
| return interface | |
| if __name__ == "__main__": | |
| print("Starting AI Research Agent...") | |
| app = create_interface() | |
| app.launch(server_name="0.0.0.0",server_port=7860,show_error=True) |