|
|
import gradio as gr |
|
|
from faster_whisper import WhisperModel |
|
|
from transformers import AutoTokenizer, AutoModelForCausalLM |
|
|
from duckduckgo_search import DDGS |
|
|
import time |
|
|
import torch |
|
|
import base64 |
|
|
import tempfile |
|
|
import os |
|
|
import logging |
|
|
from datetime import datetime |
|
|
|
|
|
|
|
|
logging.basicConfig( |
|
|
level=logging.INFO, |
|
|
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' |
|
|
) |
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
|
|
|
logger.info("Loading Whisper model...") |
|
|
whisper_model = WhisperModel("tiny", device="cpu", compute_type="int8") |
|
|
|
|
|
logger.info("Loading Phi-2 model (faster inference)...") |
|
|
model_name = "microsoft/phi-2" |
|
|
tokenizer = AutoTokenizer.from_pretrained(model_name, trust_remote_code=True) |
|
|
model = AutoModelForCausalLM.from_pretrained( |
|
|
model_name, |
|
|
torch_dtype=torch.float32, |
|
|
device_map="cpu", |
|
|
low_cpu_mem_usage=True, |
|
|
trust_remote_code=True |
|
|
) |
|
|
tokenizer.pad_token = tokenizer.eos_token |
|
|
|
|
|
|
|
|
ddgs = DDGS(timeout=3) |
|
|
logger.info("All models loaded successfully!") |
|
|
|
|
|
def search_web(query, max_results=3): |
|
|
"""Perform web search using DuckDuckGo""" |
|
|
logger.info(f"[SEARCH] Query: {query}") |
|
|
try: |
|
|
results = ddgs.text( |
|
|
keywords=query, |
|
|
region='wt-wt', |
|
|
safesearch='moderate', |
|
|
timelimit='m', |
|
|
max_results=max_results |
|
|
) |
|
|
|
|
|
context = "" |
|
|
for i, result in enumerate(results[:max_results], 1): |
|
|
title = result.get('title', '') |
|
|
body = result.get('body', '') |
|
|
context += f"\n[Source {i}] {title}\n{body}\n" |
|
|
logger.info(f"[SEARCH] Result {i}: {title[:50]}...") |
|
|
|
|
|
if not context: |
|
|
logger.warning("[SEARCH] No results found!") |
|
|
return "No search results found." |
|
|
|
|
|
logger.info(f"[SEARCH] Successfully retrieved {max_results} results") |
|
|
return context.strip() |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"[SEARCH] Error: {str(e)}") |
|
|
return f"Search failed: {str(e)}" |
|
|
|
|
|
def transcribe_audio_base64(audio_base64): |
|
|
"""Transcribe audio from base64 string (for Pluely STT endpoint)""" |
|
|
logger.info("[PLUELY STT] Received audio transcription request") |
|
|
try: |
|
|
audio_bytes = base64.b64decode(audio_base64) |
|
|
logger.info(f"[PLUELY STT] Decoded audio size: {len(audio_bytes)} bytes") |
|
|
|
|
|
with tempfile.NamedTemporaryFile(delete=False, suffix=".wav") as temp_audio: |
|
|
temp_audio.write(audio_bytes) |
|
|
temp_path = temp_audio.name |
|
|
|
|
|
logger.info(f"[PLUELY STT] Transcribing audio...") |
|
|
segments, _ = whisper_model.transcribe(temp_path, language="en", beam_size=1) |
|
|
transcription = " ".join([seg.text for seg in segments]) |
|
|
|
|
|
os.unlink(temp_path) |
|
|
|
|
|
logger.info(f"[PLUELY STT] Transcription successful: {transcription[:50]}...") |
|
|
return {"text": transcription.strip()} |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"[PLUELY STT] Error: {str(e)}") |
|
|
return {"error": f"Transcription failed: {str(e)}"} |
|
|
|
|
|
def generate_answer(text_input): |
|
|
"""Generate answer using ONLY search results""" |
|
|
logger.info(f"[PLUELY AI] Received question: {text_input}") |
|
|
try: |
|
|
if not text_input or text_input.strip() == "": |
|
|
return "No input provided" |
|
|
|
|
|
current_date = datetime.now().strftime("%B %d, %Y") |
|
|
|
|
|
|
|
|
logger.info("[PLUELY AI] Starting web search...") |
|
|
search_results = search_web(text_input, max_results=3) |
|
|
logger.info(f"[PLUELY AI] Search results length: {len(search_results)} chars") |
|
|
|
|
|
|
|
|
prompt = f"""You are a fact-checker assistant. Today is {current_date}. |
|
|
|
|
|
CRITICAL INSTRUCTION: You MUST ONLY use information from the search results below. DO NOT use your training knowledge. |
|
|
|
|
|
Web Search Results: |
|
|
{search_results} |
|
|
|
|
|
Question: {text_input} |
|
|
|
|
|
Instructions: |
|
|
1. Read the search results carefully |
|
|
2. Answer ONLY based on what's in the search results |
|
|
3. If search results don't contain the answer, say "The search results don't provide enough information" |
|
|
4. Include relevant dates and facts from the search results |
|
|
5. Keep answer to 100-150 words |
|
|
|
|
|
Answer based STRICTLY on search results:""" |
|
|
|
|
|
logger.info("[PLUELY AI] Generating answer...") |
|
|
inputs = tokenizer(prompt, return_tensors="pt", truncation=True, max_length=1500).to("cpu") |
|
|
|
|
|
with torch.no_grad(): |
|
|
outputs = model.generate( |
|
|
**inputs, |
|
|
max_new_tokens=200, |
|
|
temperature=0.4, |
|
|
do_sample=True, |
|
|
top_p=0.9, |
|
|
repetition_penalty=1.2, |
|
|
pad_token_id=tokenizer.eos_token_id |
|
|
) |
|
|
|
|
|
response = tokenizer.decode(outputs[0][inputs['input_ids'].shape[1]:], skip_special_tokens=True) |
|
|
answer = response.strip() |
|
|
|
|
|
logger.info(f"[PLUELY AI] Answer generated ({len(answer)} chars): {answer[:100]}...") |
|
|
return answer |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"[PLUELY AI] Error: {str(e)}") |
|
|
return f"Error: {str(e)}" |
|
|
|
|
|
def process_audio(audio_path, question_text): |
|
|
"""Main pipeline - returns tuple (answer, time)""" |
|
|
start_time = time.time() |
|
|
logger.info("="*50) |
|
|
logger.info("[MAIN] New request received") |
|
|
|
|
|
|
|
|
if audio_path: |
|
|
logger.info(f"[MAIN] Audio file provided: {audio_path}") |
|
|
try: |
|
|
segments, _ = whisper_model.transcribe(audio_path, language="en", beam_size=1) |
|
|
question = " ".join([seg.text for seg in segments]) |
|
|
logger.info(f"[MAIN] Transcription: {question}") |
|
|
except Exception as e: |
|
|
logger.error(f"[MAIN] Transcription error: {str(e)}") |
|
|
return f"❌ Transcription error: {str(e)}", 0.0 |
|
|
else: |
|
|
question = question_text |
|
|
logger.info(f"[MAIN] Text input: {question}") |
|
|
|
|
|
if not question or question.strip() == "": |
|
|
logger.warning("[MAIN] No input provided") |
|
|
return "❌ No input provided", 0.0 |
|
|
|
|
|
transcription_time = time.time() - start_time |
|
|
|
|
|
|
|
|
search_start = time.time() |
|
|
search_results = search_web(question, max_results=3) |
|
|
search_time = time.time() - search_start |
|
|
|
|
|
|
|
|
llm_start = time.time() |
|
|
answer = generate_answer(question) |
|
|
llm_time = time.time() - llm_start |
|
|
|
|
|
total_time = time.time() - start_time |
|
|
time_emoji = "🟢" if total_time < 4.0 else "🟡" if total_time < 6.0 else "🔴" |
|
|
|
|
|
logger.info(f"[MAIN] Total time: {total_time:.2f}s (Trans={transcription_time:.2f}s, Search={search_time:.2f}s, LLM={llm_time:.2f}s)") |
|
|
logger.info("="*50) |
|
|
|
|
|
timing_info = f"\n\n{time_emoji} **Performance:** Trans={transcription_time:.2f}s | Search={search_time:.2f}s | LLM={llm_time:.2f}s | **Total={total_time:.2f}s**" |
|
|
|
|
|
return answer + timing_info, total_time |
|
|
|
|
|
|
|
|
def audio_handler(audio_path): |
|
|
"""Wrapper for audio input""" |
|
|
return process_audio(audio_path, None) |
|
|
|
|
|
def text_handler(text_input): |
|
|
"""Wrapper for text input""" |
|
|
return process_audio(None, text_input) |
|
|
|
|
|
|
|
|
with gr.Blocks(title="Fast Political Q&A - Phi-2", theme=gr.themes.Soft()) as demo: |
|
|
gr.Markdown(""" |
|
|
# ⚡ Fast Political Q&A System |
|
|
**Search-grounded answers** - Powered by Phi-2 (2.7B) |
|
|
|
|
|
**Features:** Whisper-tiny + Phi-2 (fast CPU inference) + DuckDuckGo + Search-only responses |
|
|
""") |
|
|
|
|
|
with gr.Tab("🎙️ Audio Input"): |
|
|
with gr.Row(): |
|
|
with gr.Column(): |
|
|
audio_input = gr.Audio( |
|
|
sources=["microphone", "upload"], |
|
|
type="filepath", |
|
|
label="Record or upload audio" |
|
|
) |
|
|
audio_submit = gr.Button("🚀 Submit Audio", variant="primary", size="lg") |
|
|
|
|
|
with gr.Column(): |
|
|
audio_output = gr.Textbox(label="Search-Grounded Answer", lines=10, show_copy_button=True) |
|
|
audio_time = gr.Number(label="Response Time (seconds)", precision=2) |
|
|
|
|
|
audio_submit.click( |
|
|
fn=audio_handler, |
|
|
inputs=[audio_input], |
|
|
outputs=[audio_output, audio_time], |
|
|
api_name="audio_query" |
|
|
) |
|
|
|
|
|
with gr.Tab("✍️ Text Input"): |
|
|
with gr.Row(): |
|
|
with gr.Column(): |
|
|
text_input = gr.Textbox( |
|
|
label="Type your question", |
|
|
placeholder="Is internet shut down in Bareilly today?", |
|
|
lines=3 |
|
|
) |
|
|
text_submit = gr.Button("🚀 Submit Text", variant="primary", size="lg") |
|
|
|
|
|
with gr.Column(): |
|
|
text_output = gr.Textbox(label="Search-Grounded Answer", lines=10, show_copy_button=True) |
|
|
text_time = gr.Number(label="Response Time (seconds)", precision=2) |
|
|
|
|
|
text_submit.click( |
|
|
fn=text_handler, |
|
|
inputs=[text_input], |
|
|
outputs=[text_output, text_time], |
|
|
api_name="text_query" |
|
|
) |
|
|
|
|
|
gr.Examples( |
|
|
examples=[ |
|
|
["Is internet shut down in Bareilly today?"], |
|
|
["Who won the 2024 US presidential election?"], |
|
|
["What is the current inflation rate in India?"], |
|
|
["What happened in Israel Palestine conflict today?"] |
|
|
], |
|
|
inputs=text_input |
|
|
) |
|
|
|
|
|
|
|
|
with gr.Tab("🔌 Pluely Integration"): |
|
|
gr.Markdown(""" |
|
|
## API Endpoints (All requests logged in console) |
|
|
|
|
|
### STT Endpoint |
|
|
``` |
|
|
curl -X POST https://archcoder-basic-app.hf.space/call/transcribe_stt \\ |
|
|
-H "Content-Type: application/json" \\ |
|
|
-d '{"data": ["BASE64_AUDIO_DATA"]}' |
|
|
``` |
|
|
|
|
|
### AI Endpoint |
|
|
``` |
|
|
curl -X POST https://archcoder-basic-app.hf.space/call/answer_ai \\ |
|
|
-H "Content-Type: application/json" \\ |
|
|
-d '{"data": ["Your question here"]}' |
|
|
``` |
|
|
|
|
|
## Pluely Configuration |
|
|
|
|
|
**STT Provider:** |
|
|
``` |
|
|
curl https://archcoder-basic-app.hf.space/call/transcribe_stt -H "Content-Type: application/json" -d '{"data": ["{{AUDIO_BASE64}}"]}' |
|
|
``` |
|
|
**Response Path:** `data[0].text` |
|
|
|
|
|
**AI Provider:** |
|
|
``` |
|
|
curl https://archcoder-basic-app.hf.space/call/answer_ai -H "Content-Type: application/json" -d '{"data": ["{{TEXT}}"]}' |
|
|
``` |
|
|
**Response Path:** `data[0]` |
|
|
""") |
|
|
|
|
|
|
|
|
with gr.Row(visible=False): |
|
|
stt_input = gr.Textbox() |
|
|
stt_output = gr.JSON() |
|
|
ai_input = gr.Textbox() |
|
|
ai_output = gr.Textbox() |
|
|
|
|
|
stt_btn = gr.Button("STT", visible=False) |
|
|
stt_btn.click( |
|
|
fn=transcribe_audio_base64, |
|
|
inputs=[stt_input], |
|
|
outputs=[stt_output], |
|
|
api_name="transcribe_stt" |
|
|
) |
|
|
|
|
|
ai_btn = gr.Button("AI", visible=False) |
|
|
ai_btn.click( |
|
|
fn=generate_answer, |
|
|
inputs=[ai_input], |
|
|
outputs=[ai_output], |
|
|
api_name="answer_ai" |
|
|
) |
|
|
|
|
|
gr.Markdown(""" |
|
|
--- |
|
|
**Model:** Phi-2 (2.7B) - Fast CPU inference, excellent reasoning |
|
|
**Output:** 100-150 words based STRICTLY on web search results |
|
|
**Logging:** All Pluely requests logged in console (check Logs tab) |
|
|
|
|
|
🟢 = Under 4s | 🟡 = 4-6s | 🔴 = Over 6s |
|
|
""") |
|
|
|
|
|
if __name__ == "__main__": |
|
|
demo.queue(max_size=5) |
|
|
demo.launch() |
|
|
|