basic_app / app.py
ArchCoder's picture
Update app.py
c51f8c4 verified
raw
history blame
16.7 kB
import gradio as gr
from faster_whisper import WhisperModel
from transformers import AutoTokenizer, AutoModelForCausalLM
import torch
import requests
import base64
import tempfile
import os
import logging
import asyncio
import aiohttp
from datetime import datetime
from concurrent.futures import ThreadPoolExecutor
from functools import partial
# Setup logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# Initialize models
logger.info("Loading Whisper model...")
whisper_model = WhisperModel("tiny", device="cpu", compute_type="int8")
logger.info("Loading Qwen 2.5 1.5B-Instruct (fastest quality model)...")
model_name = "Qwen/Qwen2.5-1.5B-Instruct"
tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModelForCausalLM.from_pretrained(
model_name,
torch_dtype=torch.float32,
device_map="cpu",
low_cpu_mem_usage=True
)
logger.info("All models loaded!")
# Search APIs configuration (priority order)
TAVILY_API_KEY = os.getenv('TAVILY_API_KEY', '') # Get from environment
BRAVE_API_KEY = os.getenv('BRAVE_API_KEY', '')
def search_tavily(query):
"""Priority 1: Tavily AI search (best for AI agents)"""
logger.info("[TAVILY] Starting search...")
if not TAVILY_API_KEY:
logger.warning("[TAVILY] No API key, skipping")
return None
try:
response = requests.post(
'https://api.tavily.com/search',
json={
'api_key': TAVILY_API_KEY,
'query': query,
'max_results': 3,
'include_answer': True
},
timeout=3
)
if response.status_code == 200:
data = response.json()
results = data.get('results', [])
context = ""
for i, result in enumerate(results[:3], 1):
context += f"\n[Tavily {i}] {result.get('title', '')}\n{result.get('content', '')}\n"
logger.info(f"[TAVILY] Success - {len(results)} results")
return context
except Exception as e:
logger.error(f"[TAVILY] Error: {str(e)}")
return None
def search_brave(query):
"""Priority 2: Brave Search API"""
logger.info("[BRAVE] Starting search...")
if not BRAVE_API_KEY:
logger.warning("[BRAVE] No API key, skipping")
return None
try:
response = requests.get(
'https://api.search.brave.com/res/v1/web/search',
params={'q': query, 'count': 3},
headers={'X-Subscription-Token': BRAVE_API_KEY},
timeout=3
)
if response.status_code == 200:
data = response.json()
results = data.get('web', {}).get('results', [])
context = ""
for i, result in enumerate(results[:3], 1):
context += f"\n[Brave {i}] {result.get('title', '')}\n{result.get('description', '')}\n"
logger.info(f"[BRAVE] Success - {len(results)} results")
return context
except Exception as e:
logger.error(f"[BRAVE] Error: {str(e)}")
return None
def search_searx(query):
"""Priority 3: Searx (free, unlimited)"""
logger.info("[SEARX] Starting search...")
# Try multiple public Searx instances
searx_instances = [
'https://searx.be/search',
'https://searx.work/search',
'https://search.sapti.me/search'
]
for instance in searx_instances:
try:
response = requests.get(
instance,
params={'q': query, 'format': 'json', 'categories': 'general', 'language': 'en'},
timeout=3
)
if response.status_code == 200:
data = response.json()
results = data.get('results', [])
context = ""
for i, result in enumerate(results[:3], 1):
context += f"\n[Searx {i}] {result.get('title', '')}\n{result.get('content', '')}\n"
logger.info(f"[SEARX] Success - {len(results)} results from {instance}")
return context
except Exception as e:
logger.warning(f"[SEARX] Failed {instance}: {str(e)}")
continue
logger.error("[SEARX] All instances failed")
return None
def search_duckduckgo_html(query):
"""Priority 4: DuckDuckGo HTML scraping (fallback)"""
logger.info("[DDG] Starting search...")
try:
response = requests.get(
'https://html.duckduckgo.com/html/',
params={'q': query},
headers={'User-Agent': 'Mozilla/5.0'},
timeout=3
)
if response.status_code == 200:
# Simple HTML parsing (basic extraction)
from html.parser import HTMLParser
class DDGParser(HTMLParser):
def __init__(self):
super().__init__()
self.results = []
self.in_result = False
self.current_text = ""
def handle_starttag(self, tag, attrs):
if tag == 'a' and any(k == 'class' and 'result__a' in v for k, v in attrs):
self.in_result = True
def handle_data(self, data):
if self.in_result:
self.current_text += data.strip()
def handle_endtag(self, tag):
if tag == 'a' and self.in_result:
self.results.append(self.current_text)
self.current_text = ""
self.in_result = False
parser = DDGParser()
parser.feed(response.text)
context = ""
for i, result in enumerate(parser.results[:3], 1):
context += f"\n[DDG {i}] {result}\n"
if context:
logger.info(f"[DDG] Success - {len(parser.results)} results")
return context
except Exception as e:
logger.error(f"[DDG] Error: {str(e)}")
return None
def search_parallel(query):
"""Execute all searches in parallel, return first successful result"""
logger.info("[PARALLEL SEARCH] Starting all search engines...")
with ThreadPoolExecutor(max_workers=4) as executor:
# Submit all searches simultaneously
futures = {
executor.submit(search_tavily, query): "Tavily",
executor.submit(search_brave, query): "Brave",
executor.submit(search_searx, query): "Searx",
executor.submit(search_duckduckgo_html, query): "DuckDuckGo"
}
# Priority order: Tavily > Brave > Searx > DDG
priority_order = ["Tavily", "Brave", "Searx", "DuckDuckGo"]
results = {}
# Collect all results
for future in futures:
engine = futures[future]
try:
result = future.result(timeout=4)
if result:
results[engine] = result
logger.info(f"[PARALLEL SEARCH] {engine} completed successfully")
except Exception as e:
logger.error(f"[PARALLEL SEARCH] {engine} failed: {str(e)}")
# Return results by priority
for engine in priority_order:
if engine in results and results[engine]:
logger.info(f"[PARALLEL SEARCH] Using {engine} results (highest priority available)")
return results[engine], engine
logger.error("[PARALLEL SEARCH] All search engines failed")
return "Unable to fetch search results. All search engines are unavailable.", "None"
def transcribe_audio_base64(audio_base64):
"""Transcribe audio from base64"""
logger.info("[PLUELY STT] Request received")
try:
audio_bytes = base64.b64decode(audio_base64)
logger.info(f"[PLUELY STT] Audio size: {len(audio_bytes)} bytes")
with tempfile.NamedTemporaryFile(delete=False, suffix=".wav") as temp_audio:
temp_audio.write(audio_bytes)
temp_path = temp_audio.name
segments, _ = whisper_model.transcribe(temp_path, language="en", beam_size=1)
transcription = " ".join([seg.text for seg in segments])
os.unlink(temp_path)
logger.info(f"[PLUELY STT] Success: {transcription[:50]}...")
return {"text": transcription.strip()}
except Exception as e:
logger.error(f"[PLUELY STT] Error: {str(e)}")
return {"error": str(e)}
def generate_answer(text_input):
"""Generate answer using Qwen 2.5 1.5B"""
logger.info(f"[PLUELY AI] Question: {text_input}")
try:
if not text_input or not text_input.strip():
return "No input provided"
current_date = datetime.now().strftime("%B %d, %Y")
# Parallel search
logger.info("[PLUELY AI] Starting parallel search...")
search_results, search_engine = search_parallel(text_input)
logger.info(f"[PLUELY AI] Using {search_engine} results ({len(search_results)} chars)")
# Enhanced prompt for Qwen 2.5
messages = [
{
"role": "system",
"content": f"You are a factual assistant. Today is {current_date}. Answer questions using ONLY the provided search results. Be concise (100-120 words)."
},
{
"role": "user",
"content": f"""Search Results:
{search_results}
Question: {text_input}
Instructions:
1. Answer based STRICTLY on the search results above
2. Include relevant dates and facts from search results
3. If search results are insufficient, say so
4. Keep answer to 100-120 words
Answer:"""
}
]
# Apply chat template
text = tokenizer.apply_chat_template(
messages,
tokenize=False,
add_generation_prompt=True
)
logger.info("[PLUELY AI] Generating answer...")
inputs = tokenizer(text, return_tensors="pt", truncation=True, max_length=1500)
with torch.no_grad():
outputs = model.generate(
**inputs,
max_new_tokens=150,
temperature=0.4,
do_sample=True,
top_p=0.9,
repetition_penalty=1.1,
pad_token_id=tokenizer.eos_token_id
)
answer = tokenizer.decode(outputs[0][inputs['input_ids'].shape[1]:], skip_special_tokens=True).strip()
# Add source attribution
answer_with_source = f"{answer}\n\n**Source:** {search_engine}"
logger.info(f"[PLUELY AI] Answer generated ({len(answer)} chars)")
return answer_with_source
except Exception as e:
logger.error(f"[PLUELY AI] Error: {str(e)}")
return f"Error: {str(e)}"
def process_audio(audio_path, question_text):
"""Main pipeline"""
start_time = time.time()
logger.info("="*50)
logger.info("[MAIN] New request")
if audio_path:
logger.info(f"[MAIN] Audio: {audio_path}")
try:
segments, _ = whisper_model.transcribe(audio_path, language="en", beam_size=1)
question = " ".join([seg.text for seg in segments])
logger.info(f"[MAIN] Transcribed: {question}")
except Exception as e:
logger.error(f"[MAIN] Error: {str(e)}")
return f"❌ Error: {str(e)}", 0.0
else:
question = question_text
logger.info(f"[MAIN] Text: {question}")
if not question or not question.strip():
return "❌ No input", 0.0
transcription_time = time.time() - start_time
# Generate (includes parallel search)
gen_start = time.time()
answer = generate_answer(question)
gen_time = time.time() - gen_start
total_time = time.time() - start_time
time_emoji = "🟢" if total_time < 4.0 else "🟡" if total_time < 6.0 else "🔴"
logger.info(f"[MAIN] Total: {total_time:.2f}s")
logger.info("="*50)
timing = f"\n\n{time_emoji} **Performance:** Trans={transcription_time:.2f}s | Search+Gen={gen_time:.2f}s | **Total={total_time:.2f}s**"
return answer + timing, total_time
def audio_handler(audio_path):
return process_audio(audio_path, None)
def text_handler(text_input):
return process_audio(None, text_input)
# Gradio UI
with gr.Blocks(title="Fast Q&A - Qwen 1.5B + Multi-Search", theme=gr.themes.Soft()) as demo:
gr.Markdown("""
# ⚡ Ultra-Fast Political Q&A System
**Parallel multi-search** (Tavily → Brave → Searx → DDG) + **Qwen 2.5 1.5B**
**Features:**
- Whisper-tiny transcription
- 4 search engines running in parallel (uses fastest available)
- Qwen 2.5 1.5B-Instruct (2-3s CPU inference)
- Search-grounded answers only
""")
with gr.Tab("🎙️ Audio"):
with gr.Row():
with gr.Column():
audio_input = gr.Audio(sources=["microphone", "upload"], type="filepath", label="Record/Upload Audio")
audio_submit = gr.Button("🚀 Submit Audio", variant="primary", size="lg")
with gr.Column():
audio_output = gr.Textbox(label="Answer", lines=10, show_copy_button=True)
audio_time = gr.Number(label="Time (seconds)", precision=2)
audio_submit.click(fn=audio_handler, inputs=[audio_input], outputs=[audio_output, audio_time], api_name="audio_query")
with gr.Tab("✍️ Text"):
with gr.Row():
with gr.Column():
text_input = gr.Textbox(label="Ask anything...", placeholder="Is internet shut down in Bareilly today?", lines=3)
text_submit = gr.Button("🚀 Submit Question", variant="primary", size="lg")
with gr.Column():
text_output = gr.Textbox(label="Answer", lines=10, show_copy_button=True)
text_time = gr.Number(label="Time (seconds)", precision=2)
text_submit.click(fn=text_handler, inputs=[text_input], outputs=[text_output, text_time], api_name="text_query")
gr.Examples(
examples=[
["Is internet shut down in Bareilly today?"],
["Who won the 2024 US presidential election?"],
["What is current India inflation rate?"],
["Latest Israel Palestine conflict news?"]
],
inputs=text_input
)
with gr.Tab("🔌 Pluely API"):
gr.Markdown("""
### API Endpoints
**STT (Audio → Text):**
```
curl -X POST https://archcoder-basic-app.hf.space/call/transcribe_stt \\
-H "Content-Type: application/json" \\
-d '{"data": ["BASE64_AUDIO"]}'
```
**Response Path:** `data[0].text`
**AI (Text → Answer):**
```
curl -X POST https://archcoder-basic-app.hf.space/call/answer_ai \\
-H "Content-Type: application/json" \\
-d '{"data": ["Your question"]}'
```
**Response Path:** `data[0]`
---
### Pluely Configuration
**Custom STT Provider:**
```
curl https://archcoder-basic-app.hf.space/call/transcribe_stt -H "Content-Type: application/json" -d '{"data": ["{{AUDIO_BASE64}}"]}'
```
**Custom AI Provider:**
```
curl https://archcoder-basic-app.hf.space/call/answer_ai -H "Content-Type: application/json" -d '{"data": ["{{TEXT}}"]}'
```
""")
with gr.Row(visible=False):
stt_in = gr.Textbox()
stt_out = gr.JSON()
ai_in = gr.Textbox()
ai_out = gr.Textbox()
gr.Button("STT", visible=False).click(fn=transcribe_audio_base64, inputs=[stt_in], outputs=[stt_out], api_name="transcribe_stt")
gr.Button("AI", visible=False).click(fn=generate_answer, inputs=[ai_in], outputs=[ai_out], api_name="answer_ai")
gr.Markdown("""
---
**Model:** Qwen 2.5 1.5B-Instruct (fastest quality model for CPU)
**Search Strategy:** Parallel execution (Tavily → Brave → Searx → DDG by priority)
**All requests logged** - Check Logs tab
🟢 < 4s | 🟡 4-6s | 🔴 > 6s
""")
if __name__ == "__main__":
demo.queue(max_size=5)
demo.launch()