| import gradio as gr |
| from faster_whisper import WhisperModel |
| from transformers import AutoTokenizer, AutoModelForCausalLM |
| import torch |
| import requests |
| import base64 |
| import tempfile |
| import os |
| import logging |
| import asyncio |
| import aiohttp |
| from datetime import datetime |
| from concurrent.futures import ThreadPoolExecutor |
| from functools import partial |
|
|
| |
| logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') |
| logger = logging.getLogger(__name__) |
|
|
| |
| logger.info("Loading Whisper model...") |
| whisper_model = WhisperModel("tiny", device="cpu", compute_type="int8") |
|
|
| logger.info("Loading Qwen 2.5 1.5B-Instruct (fastest quality model)...") |
| model_name = "Qwen/Qwen2.5-1.5B-Instruct" |
| tokenizer = AutoTokenizer.from_pretrained(model_name) |
| model = AutoModelForCausalLM.from_pretrained( |
| model_name, |
| torch_dtype=torch.float32, |
| device_map="cpu", |
| low_cpu_mem_usage=True |
| ) |
|
|
| logger.info("All models loaded!") |
|
|
| |
| TAVILY_API_KEY = os.getenv('TAVILY_API_KEY', '') |
| BRAVE_API_KEY = os.getenv('BRAVE_API_KEY', '') |
|
|
| def search_tavily(query): |
| """Priority 1: Tavily AI search (best for AI agents)""" |
| logger.info("[TAVILY] Starting search...") |
| if not TAVILY_API_KEY: |
| logger.warning("[TAVILY] No API key, skipping") |
| return None |
| |
| try: |
| response = requests.post( |
| 'https://api.tavily.com/search', |
| json={ |
| 'api_key': TAVILY_API_KEY, |
| 'query': query, |
| 'max_results': 3, |
| 'include_answer': True |
| }, |
| timeout=3 |
| ) |
| |
| if response.status_code == 200: |
| data = response.json() |
| results = data.get('results', []) |
| context = "" |
| for i, result in enumerate(results[:3], 1): |
| context += f"\n[Tavily {i}] {result.get('title', '')}\n{result.get('content', '')}\n" |
| logger.info(f"[TAVILY] Success - {len(results)} results") |
| return context |
| except Exception as e: |
| logger.error(f"[TAVILY] Error: {str(e)}") |
| return None |
|
|
| def search_brave(query): |
| """Priority 2: Brave Search API""" |
| logger.info("[BRAVE] Starting search...") |
| if not BRAVE_API_KEY: |
| logger.warning("[BRAVE] No API key, skipping") |
| return None |
| |
| try: |
| response = requests.get( |
| 'https://api.search.brave.com/res/v1/web/search', |
| params={'q': query, 'count': 3}, |
| headers={'X-Subscription-Token': BRAVE_API_KEY}, |
| timeout=3 |
| ) |
| |
| if response.status_code == 200: |
| data = response.json() |
| results = data.get('web', {}).get('results', []) |
| context = "" |
| for i, result in enumerate(results[:3], 1): |
| context += f"\n[Brave {i}] {result.get('title', '')}\n{result.get('description', '')}\n" |
| logger.info(f"[BRAVE] Success - {len(results)} results") |
| return context |
| except Exception as e: |
| logger.error(f"[BRAVE] Error: {str(e)}") |
| return None |
|
|
| def search_searx(query): |
| """Priority 3: Searx (free, unlimited)""" |
| logger.info("[SEARX] Starting search...") |
| |
| |
| searx_instances = [ |
| 'https://searx.be/search', |
| 'https://searx.work/search', |
| 'https://search.sapti.me/search' |
| ] |
| |
| for instance in searx_instances: |
| try: |
| response = requests.get( |
| instance, |
| params={'q': query, 'format': 'json', 'categories': 'general', 'language': 'en'}, |
| timeout=3 |
| ) |
| |
| if response.status_code == 200: |
| data = response.json() |
| results = data.get('results', []) |
| context = "" |
| for i, result in enumerate(results[:3], 1): |
| context += f"\n[Searx {i}] {result.get('title', '')}\n{result.get('content', '')}\n" |
| logger.info(f"[SEARX] Success - {len(results)} results from {instance}") |
| return context |
| except Exception as e: |
| logger.warning(f"[SEARX] Failed {instance}: {str(e)}") |
| continue |
| |
| logger.error("[SEARX] All instances failed") |
| return None |
|
|
| def search_duckduckgo_html(query): |
| """Priority 4: DuckDuckGo HTML scraping (fallback)""" |
| logger.info("[DDG] Starting search...") |
| try: |
| response = requests.get( |
| 'https://html.duckduckgo.com/html/', |
| params={'q': query}, |
| headers={'User-Agent': 'Mozilla/5.0'}, |
| timeout=3 |
| ) |
| |
| if response.status_code == 200: |
| |
| from html.parser import HTMLParser |
| |
| class DDGParser(HTMLParser): |
| def __init__(self): |
| super().__init__() |
| self.results = [] |
| self.in_result = False |
| self.current_text = "" |
| |
| def handle_starttag(self, tag, attrs): |
| if tag == 'a' and any(k == 'class' and 'result__a' in v for k, v in attrs): |
| self.in_result = True |
| |
| def handle_data(self, data): |
| if self.in_result: |
| self.current_text += data.strip() |
| |
| def handle_endtag(self, tag): |
| if tag == 'a' and self.in_result: |
| self.results.append(self.current_text) |
| self.current_text = "" |
| self.in_result = False |
| |
| parser = DDGParser() |
| parser.feed(response.text) |
| |
| context = "" |
| for i, result in enumerate(parser.results[:3], 1): |
| context += f"\n[DDG {i}] {result}\n" |
| |
| if context: |
| logger.info(f"[DDG] Success - {len(parser.results)} results") |
| return context |
| except Exception as e: |
| logger.error(f"[DDG] Error: {str(e)}") |
| return None |
|
|
| def search_parallel(query): |
| """Execute all searches in parallel, return first successful result""" |
| logger.info("[PARALLEL SEARCH] Starting all search engines...") |
| |
| with ThreadPoolExecutor(max_workers=4) as executor: |
| |
| futures = { |
| executor.submit(search_tavily, query): "Tavily", |
| executor.submit(search_brave, query): "Brave", |
| executor.submit(search_searx, query): "Searx", |
| executor.submit(search_duckduckgo_html, query): "DuckDuckGo" |
| } |
| |
| |
| priority_order = ["Tavily", "Brave", "Searx", "DuckDuckGo"] |
| results = {} |
| |
| |
| for future in futures: |
| engine = futures[future] |
| try: |
| result = future.result(timeout=4) |
| if result: |
| results[engine] = result |
| logger.info(f"[PARALLEL SEARCH] {engine} completed successfully") |
| except Exception as e: |
| logger.error(f"[PARALLEL SEARCH] {engine} failed: {str(e)}") |
| |
| |
| for engine in priority_order: |
| if engine in results and results[engine]: |
| logger.info(f"[PARALLEL SEARCH] Using {engine} results (highest priority available)") |
| return results[engine], engine |
| |
| logger.error("[PARALLEL SEARCH] All search engines failed") |
| return "Unable to fetch search results. All search engines are unavailable.", "None" |
|
|
| def transcribe_audio_base64(audio_base64): |
| """Transcribe audio from base64""" |
| logger.info("[PLUELY STT] Request received") |
| try: |
| audio_bytes = base64.b64decode(audio_base64) |
| logger.info(f"[PLUELY STT] Audio size: {len(audio_bytes)} bytes") |
| |
| with tempfile.NamedTemporaryFile(delete=False, suffix=".wav") as temp_audio: |
| temp_audio.write(audio_bytes) |
| temp_path = temp_audio.name |
| |
| segments, _ = whisper_model.transcribe(temp_path, language="en", beam_size=1) |
| transcription = " ".join([seg.text for seg in segments]) |
| os.unlink(temp_path) |
| |
| logger.info(f"[PLUELY STT] Success: {transcription[:50]}...") |
| return {"text": transcription.strip()} |
| |
| except Exception as e: |
| logger.error(f"[PLUELY STT] Error: {str(e)}") |
| return {"error": str(e)} |
|
|
| def generate_answer(text_input): |
| """Generate answer using Qwen 2.5 1.5B""" |
| logger.info(f"[PLUELY AI] Question: {text_input}") |
| try: |
| if not text_input or not text_input.strip(): |
| return "No input provided" |
| |
| current_date = datetime.now().strftime("%B %d, %Y") |
| |
| |
| logger.info("[PLUELY AI] Starting parallel search...") |
| search_results, search_engine = search_parallel(text_input) |
| logger.info(f"[PLUELY AI] Using {search_engine} results ({len(search_results)} chars)") |
| |
| |
| messages = [ |
| { |
| "role": "system", |
| "content": f"You are a factual assistant. Today is {current_date}. Answer questions using ONLY the provided search results. Be concise (100-120 words)." |
| }, |
| { |
| "role": "user", |
| "content": f"""Search Results: |
| {search_results} |
| |
| Question: {text_input} |
| |
| Instructions: |
| 1. Answer based STRICTLY on the search results above |
| 2. Include relevant dates and facts from search results |
| 3. If search results are insufficient, say so |
| 4. Keep answer to 100-120 words |
| |
| Answer:""" |
| } |
| ] |
| |
| |
| text = tokenizer.apply_chat_template( |
| messages, |
| tokenize=False, |
| add_generation_prompt=True |
| ) |
| |
| logger.info("[PLUELY AI] Generating answer...") |
| inputs = tokenizer(text, return_tensors="pt", truncation=True, max_length=1500) |
| |
| with torch.no_grad(): |
| outputs = model.generate( |
| **inputs, |
| max_new_tokens=150, |
| temperature=0.4, |
| do_sample=True, |
| top_p=0.9, |
| repetition_penalty=1.1, |
| pad_token_id=tokenizer.eos_token_id |
| ) |
| |
| answer = tokenizer.decode(outputs[0][inputs['input_ids'].shape[1]:], skip_special_tokens=True).strip() |
| |
| |
| answer_with_source = f"{answer}\n\n**Source:** {search_engine}" |
| |
| logger.info(f"[PLUELY AI] Answer generated ({len(answer)} chars)") |
| return answer_with_source |
| |
| except Exception as e: |
| logger.error(f"[PLUELY AI] Error: {str(e)}") |
| return f"Error: {str(e)}" |
|
|
| def process_audio(audio_path, question_text): |
| """Main pipeline""" |
| start_time = time.time() |
| logger.info("="*50) |
| logger.info("[MAIN] New request") |
| |
| if audio_path: |
| logger.info(f"[MAIN] Audio: {audio_path}") |
| try: |
| segments, _ = whisper_model.transcribe(audio_path, language="en", beam_size=1) |
| question = " ".join([seg.text for seg in segments]) |
| logger.info(f"[MAIN] Transcribed: {question}") |
| except Exception as e: |
| logger.error(f"[MAIN] Error: {str(e)}") |
| return f"❌ Error: {str(e)}", 0.0 |
| else: |
| question = question_text |
| logger.info(f"[MAIN] Text: {question}") |
| |
| if not question or not question.strip(): |
| return "❌ No input", 0.0 |
| |
| transcription_time = time.time() - start_time |
| |
| |
| gen_start = time.time() |
| answer = generate_answer(question) |
| gen_time = time.time() - gen_start |
| |
| total_time = time.time() - start_time |
| time_emoji = "🟢" if total_time < 4.0 else "🟡" if total_time < 6.0 else "🔴" |
| |
| logger.info(f"[MAIN] Total: {total_time:.2f}s") |
| logger.info("="*50) |
| |
| timing = f"\n\n{time_emoji} **Performance:** Trans={transcription_time:.2f}s | Search+Gen={gen_time:.2f}s | **Total={total_time:.2f}s**" |
| |
| return answer + timing, total_time |
|
|
| def audio_handler(audio_path): |
| return process_audio(audio_path, None) |
|
|
| def text_handler(text_input): |
| return process_audio(None, text_input) |
|
|
| |
| with gr.Blocks(title="Fast Q&A - Qwen 1.5B + Multi-Search", theme=gr.themes.Soft()) as demo: |
| gr.Markdown(""" |
| # ⚡ Ultra-Fast Political Q&A System |
| **Parallel multi-search** (Tavily → Brave → Searx → DDG) + **Qwen 2.5 1.5B** |
| |
| **Features:** |
| - Whisper-tiny transcription |
| - 4 search engines running in parallel (uses fastest available) |
| - Qwen 2.5 1.5B-Instruct (2-3s CPU inference) |
| - Search-grounded answers only |
| """) |
| |
| with gr.Tab("🎙️ Audio"): |
| with gr.Row(): |
| with gr.Column(): |
| audio_input = gr.Audio(sources=["microphone", "upload"], type="filepath", label="Record/Upload Audio") |
| audio_submit = gr.Button("🚀 Submit Audio", variant="primary", size="lg") |
| with gr.Column(): |
| audio_output = gr.Textbox(label="Answer", lines=10, show_copy_button=True) |
| audio_time = gr.Number(label="Time (seconds)", precision=2) |
| |
| audio_submit.click(fn=audio_handler, inputs=[audio_input], outputs=[audio_output, audio_time], api_name="audio_query") |
| |
| with gr.Tab("✍️ Text"): |
| with gr.Row(): |
| with gr.Column(): |
| text_input = gr.Textbox(label="Ask anything...", placeholder="Is internet shut down in Bareilly today?", lines=3) |
| text_submit = gr.Button("🚀 Submit Question", variant="primary", size="lg") |
| with gr.Column(): |
| text_output = gr.Textbox(label="Answer", lines=10, show_copy_button=True) |
| text_time = gr.Number(label="Time (seconds)", precision=2) |
| |
| text_submit.click(fn=text_handler, inputs=[text_input], outputs=[text_output, text_time], api_name="text_query") |
| |
| gr.Examples( |
| examples=[ |
| ["Is internet shut down in Bareilly today?"], |
| ["Who won the 2024 US presidential election?"], |
| ["What is current India inflation rate?"], |
| ["Latest Israel Palestine conflict news?"] |
| ], |
| inputs=text_input |
| ) |
| |
| with gr.Tab("🔌 Pluely API"): |
| gr.Markdown(""" |
| ### API Endpoints |
| |
| **STT (Audio → Text):** |
| ``` |
| curl -X POST https://archcoder-basic-app.hf.space/call/transcribe_stt \\ |
| -H "Content-Type: application/json" \\ |
| -d '{"data": ["BASE64_AUDIO"]}' |
| ``` |
| **Response Path:** `data[0].text` |
| |
| **AI (Text → Answer):** |
| ``` |
| curl -X POST https://archcoder-basic-app.hf.space/call/answer_ai \\ |
| -H "Content-Type: application/json" \\ |
| -d '{"data": ["Your question"]}' |
| ``` |
| **Response Path:** `data[0]` |
| |
| --- |
| |
| ### Pluely Configuration |
| |
| **Custom STT Provider:** |
| ``` |
| curl https://archcoder-basic-app.hf.space/call/transcribe_stt -H "Content-Type: application/json" -d '{"data": ["{{AUDIO_BASE64}}"]}' |
| ``` |
| |
| **Custom AI Provider:** |
| ``` |
| curl https://archcoder-basic-app.hf.space/call/answer_ai -H "Content-Type: application/json" -d '{"data": ["{{TEXT}}"]}' |
| ``` |
| """) |
| |
| with gr.Row(visible=False): |
| stt_in = gr.Textbox() |
| stt_out = gr.JSON() |
| ai_in = gr.Textbox() |
| ai_out = gr.Textbox() |
| |
| gr.Button("STT", visible=False).click(fn=transcribe_audio_base64, inputs=[stt_in], outputs=[stt_out], api_name="transcribe_stt") |
| gr.Button("AI", visible=False).click(fn=generate_answer, inputs=[ai_in], outputs=[ai_out], api_name="answer_ai") |
| |
| gr.Markdown(""" |
| --- |
| **Model:** Qwen 2.5 1.5B-Instruct (fastest quality model for CPU) |
| **Search Strategy:** Parallel execution (Tavily → Brave → Searx → DDG by priority) |
| **All requests logged** - Check Logs tab |
| |
| 🟢 < 4s | 🟡 4-6s | 🔴 > 6s |
| """) |
|
|
| if __name__ == "__main__": |
| demo.queue(max_size=5) |
| demo.launch() |
|
|