import gradio as gr from faster_whisper import WhisperModel from transformers import AutoTokenizer, AutoModelForCausalLM import torch import requests import base64 import tempfile import os import logging import asyncio import aiohttp from datetime import datetime from concurrent.futures import ThreadPoolExecutor from functools import partial # Setup logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) # Initialize models logger.info("Loading Whisper model...") whisper_model = WhisperModel("tiny", device="cpu", compute_type="int8") logger.info("Loading Qwen 2.5 1.5B-Instruct (fastest quality model)...") model_name = "Qwen/Qwen2.5-1.5B-Instruct" tokenizer = AutoTokenizer.from_pretrained(model_name) model = AutoModelForCausalLM.from_pretrained( model_name, torch_dtype=torch.float32, device_map="cpu", low_cpu_mem_usage=True ) logger.info("All models loaded!") # Search APIs configuration (priority order) TAVILY_API_KEY = os.getenv('TAVILY_API_KEY', '') # Get from environment BRAVE_API_KEY = os.getenv('BRAVE_API_KEY', '') def search_tavily(query): """Priority 1: Tavily AI search (best for AI agents)""" logger.info("[TAVILY] Starting search...") if not TAVILY_API_KEY: logger.warning("[TAVILY] No API key, skipping") return None try: response = requests.post( 'https://api.tavily.com/search', json={ 'api_key': TAVILY_API_KEY, 'query': query, 'max_results': 3, 'include_answer': True }, timeout=3 ) if response.status_code == 200: data = response.json() results = data.get('results', []) context = "" for i, result in enumerate(results[:3], 1): context += f"\n[Tavily {i}] {result.get('title', '')}\n{result.get('content', '')}\n" logger.info(f"[TAVILY] Success - {len(results)} results") return context except Exception as e: logger.error(f"[TAVILY] Error: {str(e)}") return None def search_brave(query): """Priority 2: Brave Search API""" logger.info("[BRAVE] Starting search...") if not BRAVE_API_KEY: logger.warning("[BRAVE] No API key, skipping") return None try: response = requests.get( 'https://api.search.brave.com/res/v1/web/search', params={'q': query, 'count': 3}, headers={'X-Subscription-Token': BRAVE_API_KEY}, timeout=3 ) if response.status_code == 200: data = response.json() results = data.get('web', {}).get('results', []) context = "" for i, result in enumerate(results[:3], 1): context += f"\n[Brave {i}] {result.get('title', '')}\n{result.get('description', '')}\n" logger.info(f"[BRAVE] Success - {len(results)} results") return context except Exception as e: logger.error(f"[BRAVE] Error: {str(e)}") return None def search_searx(query): """Priority 3: Searx (free, unlimited)""" logger.info("[SEARX] Starting search...") # Try multiple public Searx instances searx_instances = [ 'https://searx.be/search', 'https://searx.work/search', 'https://search.sapti.me/search' ] for instance in searx_instances: try: response = requests.get( instance, params={'q': query, 'format': 'json', 'categories': 'general', 'language': 'en'}, timeout=3 ) if response.status_code == 200: data = response.json() results = data.get('results', []) context = "" for i, result in enumerate(results[:3], 1): context += f"\n[Searx {i}] {result.get('title', '')}\n{result.get('content', '')}\n" logger.info(f"[SEARX] Success - {len(results)} results from {instance}") return context except Exception as e: logger.warning(f"[SEARX] Failed {instance}: {str(e)}") continue logger.error("[SEARX] All instances failed") return None def search_duckduckgo_html(query): """Priority 4: DuckDuckGo HTML scraping (fallback)""" logger.info("[DDG] Starting search...") try: response = requests.get( 'https://html.duckduckgo.com/html/', params={'q': query}, headers={'User-Agent': 'Mozilla/5.0'}, timeout=3 ) if response.status_code == 200: # Simple HTML parsing (basic extraction) from html.parser import HTMLParser class DDGParser(HTMLParser): def __init__(self): super().__init__() self.results = [] self.in_result = False self.current_text = "" def handle_starttag(self, tag, attrs): if tag == 'a' and any(k == 'class' and 'result__a' in v for k, v in attrs): self.in_result = True def handle_data(self, data): if self.in_result: self.current_text += data.strip() def handle_endtag(self, tag): if tag == 'a' and self.in_result: self.results.append(self.current_text) self.current_text = "" self.in_result = False parser = DDGParser() parser.feed(response.text) context = "" for i, result in enumerate(parser.results[:3], 1): context += f"\n[DDG {i}] {result}\n" if context: logger.info(f"[DDG] Success - {len(parser.results)} results") return context except Exception as e: logger.error(f"[DDG] Error: {str(e)}") return None def search_parallel(query): """Execute all searches in parallel, return first successful result""" logger.info("[PARALLEL SEARCH] Starting all search engines...") with ThreadPoolExecutor(max_workers=4) as executor: # Submit all searches simultaneously futures = { executor.submit(search_tavily, query): "Tavily", executor.submit(search_brave, query): "Brave", executor.submit(search_searx, query): "Searx", executor.submit(search_duckduckgo_html, query): "DuckDuckGo" } # Priority order: Tavily > Brave > Searx > DDG priority_order = ["Tavily", "Brave", "Searx", "DuckDuckGo"] results = {} # Collect all results for future in futures: engine = futures[future] try: result = future.result(timeout=4) if result: results[engine] = result logger.info(f"[PARALLEL SEARCH] {engine} completed successfully") except Exception as e: logger.error(f"[PARALLEL SEARCH] {engine} failed: {str(e)}") # Return results by priority for engine in priority_order: if engine in results and results[engine]: logger.info(f"[PARALLEL SEARCH] Using {engine} results (highest priority available)") return results[engine], engine logger.error("[PARALLEL SEARCH] All search engines failed") return "Unable to fetch search results. All search engines are unavailable.", "None" def transcribe_audio_base64(audio_base64): """Transcribe audio from base64""" logger.info("[PLUELY STT] Request received") try: audio_bytes = base64.b64decode(audio_base64) logger.info(f"[PLUELY STT] Audio size: {len(audio_bytes)} bytes") with tempfile.NamedTemporaryFile(delete=False, suffix=".wav") as temp_audio: temp_audio.write(audio_bytes) temp_path = temp_audio.name segments, _ = whisper_model.transcribe(temp_path, language="en", beam_size=1) transcription = " ".join([seg.text for seg in segments]) os.unlink(temp_path) logger.info(f"[PLUELY STT] Success: {transcription[:50]}...") return {"text": transcription.strip()} except Exception as e: logger.error(f"[PLUELY STT] Error: {str(e)}") return {"error": str(e)} def generate_answer(text_input): """Generate answer using Qwen 2.5 1.5B""" logger.info(f"[PLUELY AI] Question: {text_input}") try: if not text_input or not text_input.strip(): return "No input provided" current_date = datetime.now().strftime("%B %d, %Y") # Parallel search logger.info("[PLUELY AI] Starting parallel search...") search_results, search_engine = search_parallel(text_input) logger.info(f"[PLUELY AI] Using {search_engine} results ({len(search_results)} chars)") # Enhanced prompt for Qwen 2.5 messages = [ { "role": "system", "content": f"You are a factual assistant. Today is {current_date}. Answer questions using ONLY the provided search results. Be concise (100-120 words)." }, { "role": "user", "content": f"""Search Results: {search_results} Question: {text_input} Instructions: 1. Answer based STRICTLY on the search results above 2. Include relevant dates and facts from search results 3. If search results are insufficient, say so 4. Keep answer to 100-120 words Answer:""" } ] # Apply chat template text = tokenizer.apply_chat_template( messages, tokenize=False, add_generation_prompt=True ) logger.info("[PLUELY AI] Generating answer...") inputs = tokenizer(text, return_tensors="pt", truncation=True, max_length=1500) with torch.no_grad(): outputs = model.generate( **inputs, max_new_tokens=150, temperature=0.4, do_sample=True, top_p=0.9, repetition_penalty=1.1, pad_token_id=tokenizer.eos_token_id ) answer = tokenizer.decode(outputs[0][inputs['input_ids'].shape[1]:], skip_special_tokens=True).strip() # Add source attribution answer_with_source = f"{answer}\n\n**Source:** {search_engine}" logger.info(f"[PLUELY AI] Answer generated ({len(answer)} chars)") return answer_with_source except Exception as e: logger.error(f"[PLUELY AI] Error: {str(e)}") return f"Error: {str(e)}" def process_audio(audio_path, question_text): """Main pipeline""" start_time = time.time() logger.info("="*50) logger.info("[MAIN] New request") if audio_path: logger.info(f"[MAIN] Audio: {audio_path}") try: segments, _ = whisper_model.transcribe(audio_path, language="en", beam_size=1) question = " ".join([seg.text for seg in segments]) logger.info(f"[MAIN] Transcribed: {question}") except Exception as e: logger.error(f"[MAIN] Error: {str(e)}") return f"❌ Error: {str(e)}", 0.0 else: question = question_text logger.info(f"[MAIN] Text: {question}") if not question or not question.strip(): return "❌ No input", 0.0 transcription_time = time.time() - start_time # Generate (includes parallel search) gen_start = time.time() answer = generate_answer(question) gen_time = time.time() - gen_start total_time = time.time() - start_time time_emoji = "🟢" if total_time < 4.0 else "🟡" if total_time < 6.0 else "🔴" logger.info(f"[MAIN] Total: {total_time:.2f}s") logger.info("="*50) timing = f"\n\n{time_emoji} **Performance:** Trans={transcription_time:.2f}s | Search+Gen={gen_time:.2f}s | **Total={total_time:.2f}s**" return answer + timing, total_time def audio_handler(audio_path): return process_audio(audio_path, None) def text_handler(text_input): return process_audio(None, text_input) # Gradio UI with gr.Blocks(title="Fast Q&A - Qwen 1.5B + Multi-Search", theme=gr.themes.Soft()) as demo: gr.Markdown(""" # ⚡ Ultra-Fast Political Q&A System **Parallel multi-search** (Tavily → Brave → Searx → DDG) + **Qwen 2.5 1.5B** **Features:** - Whisper-tiny transcription - 4 search engines running in parallel (uses fastest available) - Qwen 2.5 1.5B-Instruct (2-3s CPU inference) - Search-grounded answers only """) with gr.Tab("🎙️ Audio"): with gr.Row(): with gr.Column(): audio_input = gr.Audio(sources=["microphone", "upload"], type="filepath", label="Record/Upload Audio") audio_submit = gr.Button("🚀 Submit Audio", variant="primary", size="lg") with gr.Column(): audio_output = gr.Textbox(label="Answer", lines=10, show_copy_button=True) audio_time = gr.Number(label="Time (seconds)", precision=2) audio_submit.click(fn=audio_handler, inputs=[audio_input], outputs=[audio_output, audio_time], api_name="audio_query") with gr.Tab("✍️ Text"): with gr.Row(): with gr.Column(): text_input = gr.Textbox(label="Ask anything...", placeholder="Is internet shut down in Bareilly today?", lines=3) text_submit = gr.Button("🚀 Submit Question", variant="primary", size="lg") with gr.Column(): text_output = gr.Textbox(label="Answer", lines=10, show_copy_button=True) text_time = gr.Number(label="Time (seconds)", precision=2) text_submit.click(fn=text_handler, inputs=[text_input], outputs=[text_output, text_time], api_name="text_query") gr.Examples( examples=[ ["Is internet shut down in Bareilly today?"], ["Who won the 2024 US presidential election?"], ["What is current India inflation rate?"], ["Latest Israel Palestine conflict news?"] ], inputs=text_input ) with gr.Tab("🔌 Pluely API"): gr.Markdown(""" ### API Endpoints **STT (Audio → Text):** ``` curl -X POST https://archcoder-basic-app.hf.space/call/transcribe_stt \\ -H "Content-Type: application/json" \\ -d '{"data": ["BASE64_AUDIO"]}' ``` **Response Path:** `data[0].text` **AI (Text → Answer):** ``` curl -X POST https://archcoder-basic-app.hf.space/call/answer_ai \\ -H "Content-Type: application/json" \\ -d '{"data": ["Your question"]}' ``` **Response Path:** `data[0]` --- ### Pluely Configuration **Custom STT Provider:** ``` curl https://archcoder-basic-app.hf.space/call/transcribe_stt -H "Content-Type: application/json" -d '{"data": ["{{AUDIO_BASE64}}"]}' ``` **Custom AI Provider:** ``` curl https://archcoder-basic-app.hf.space/call/answer_ai -H "Content-Type: application/json" -d '{"data": ["{{TEXT}}"]}' ``` """) with gr.Row(visible=False): stt_in = gr.Textbox() stt_out = gr.JSON() ai_in = gr.Textbox() ai_out = gr.Textbox() gr.Button("STT", visible=False).click(fn=transcribe_audio_base64, inputs=[stt_in], outputs=[stt_out], api_name="transcribe_stt") gr.Button("AI", visible=False).click(fn=generate_answer, inputs=[ai_in], outputs=[ai_out], api_name="answer_ai") gr.Markdown(""" --- **Model:** Qwen 2.5 1.5B-Instruct (fastest quality model for CPU) **Search Strategy:** Parallel execution (Tavily → Brave → Searx → DDG by priority) **All requests logged** - Check Logs tab 🟢 < 4s | 🟡 4-6s | 🔴 > 6s """) if __name__ == "__main__": demo.queue(max_size=5) demo.launch()