|
|
import gradio as gr |
|
|
from faster_whisper import WhisperModel |
|
|
from transformers import AutoTokenizer, AutoModelForCausalLM |
|
|
import torch |
|
|
import requests |
|
|
import base64 |
|
|
import tempfile |
|
|
import os |
|
|
import logging |
|
|
import time |
|
|
import json |
|
|
from datetime import datetime |
|
|
from html.parser import HTMLParser |
|
|
from fastapi import FastAPI, Request, Query |
|
|
from fastapi.responses import JSONResponse |
|
|
from fastapi.middleware.cors import CORSMiddleware |
|
|
import uvicorn |
|
|
|
|
|
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(message)s') |
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
|
|
|
logger.info("Loading models...") |
|
|
whisper_model = WhisperModel("tiny", device="cpu", compute_type="int8") |
|
|
model_name = "HuggingFaceTB/SmolLM2-360M-Instruct" |
|
|
tokenizer = AutoTokenizer.from_pretrained(model_name) |
|
|
model = AutoModelForCausalLM.from_pretrained( |
|
|
model_name, |
|
|
torch_dtype=torch.float32, |
|
|
device_map="cpu", |
|
|
low_cpu_mem_usage=True |
|
|
) |
|
|
logger.info("Models loaded!") |
|
|
|
|
|
def search_parallel(query): |
|
|
"""DuckDuckGo search""" |
|
|
logger.info(f"[SEARCH] Query: {query}") |
|
|
try: |
|
|
response = requests.get( |
|
|
'https://html.duckduckgo.com/html/', |
|
|
params={'q': query}, |
|
|
headers={'User-Agent': 'Mozilla/5.0'}, |
|
|
timeout=1.5 |
|
|
) |
|
|
if response.status_code == 200: |
|
|
class DDGParser(HTMLParser): |
|
|
def __init__(self): |
|
|
super().__init__() |
|
|
self.results = [] |
|
|
self.in_result = False |
|
|
self.current_text = "" |
|
|
|
|
|
def handle_starttag(self, tag, attrs): |
|
|
if tag == 'a' and any(k == 'class' and 'result__a' in v for k, v in attrs): |
|
|
self.in_result = True |
|
|
|
|
|
def handle_data(self, data): |
|
|
if self.in_result and data.strip(): |
|
|
self.current_text += data.strip() + " " |
|
|
|
|
|
def handle_endtag(self, tag): |
|
|
if tag == 'a' and self.in_result: |
|
|
if self.current_text: |
|
|
self.results.append(self.current_text.strip()[:120]) |
|
|
self.current_text = "" |
|
|
self.in_result = False |
|
|
|
|
|
parser = DDGParser() |
|
|
parser.feed(response.text) |
|
|
result = "\n".join([f"• {r}" for r in parser.results[:2]]) if parser.results else "No results" |
|
|
logger.info(f"[SEARCH] ✓ Found {len(parser.results)} results") |
|
|
return result, "DuckDuckGo" |
|
|
except Exception as e: |
|
|
logger.error(f"[SEARCH] Error: {str(e)}") |
|
|
return "No search results", "None" |
|
|
|
|
|
def generate_answer(text_input): |
|
|
"""Generate answer""" |
|
|
logger.info(f"[AI] Question: {text_input}") |
|
|
|
|
|
try: |
|
|
if not text_input or not text_input.strip(): |
|
|
return "No input provided" |
|
|
|
|
|
current_date = datetime.now().strftime("%B %d, %Y") |
|
|
|
|
|
search_start = time.time() |
|
|
search_results, search_engine = search_parallel(text_input) |
|
|
search_time = time.time() - search_start |
|
|
logger.info(f"[AI] Search: {search_time:.2f}s") |
|
|
|
|
|
messages = [ |
|
|
{"role": "system", "content": f"Today is {current_date}. Answer briefly (60-80 words)."}, |
|
|
{"role": "user", "content": f"Search:\n{search_results}\n\nQ: {text_input}\nA:"} |
|
|
] |
|
|
|
|
|
prompt = f"<|im_start|>system\n{messages[0]['content']}<|im_end|>\n<|im_start|>user\n{messages[1]['content']}<|im_end|>\n<|im_start|>assistant\n" |
|
|
|
|
|
gen_start = time.time() |
|
|
inputs = tokenizer(prompt, return_tensors="pt", truncation=True, max_length=800) |
|
|
|
|
|
with torch.no_grad(): |
|
|
outputs = model.generate( |
|
|
**inputs, |
|
|
max_new_tokens=80, |
|
|
temperature=0.7, |
|
|
do_sample=True, |
|
|
top_p=0.9, |
|
|
top_k=40, |
|
|
repetition_penalty=1.15, |
|
|
pad_token_id=tokenizer.eos_token_id |
|
|
) |
|
|
|
|
|
answer = tokenizer.decode(outputs[0][inputs['input_ids'].shape[1]:], skip_special_tokens=True).strip() |
|
|
gen_time = time.time() - gen_start |
|
|
logger.info(f"[AI] Gen: {gen_time:.2f}s") |
|
|
logger.info(f"[AI] Answer: {answer[:100]}...") |
|
|
|
|
|
return f"{answer}\n\n**Source:** {search_engine}" |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"[AI] Error: {str(e)}") |
|
|
return f"Error: {str(e)}" |
|
|
|
|
|
|
|
|
app = FastAPI() |
|
|
|
|
|
|
|
|
app.add_middleware( |
|
|
CORSMiddleware, |
|
|
allow_origins=["*"], |
|
|
allow_credentials=True, |
|
|
allow_methods=["*"], |
|
|
allow_headers=["*"], |
|
|
) |
|
|
|
|
|
@app.middleware("http") |
|
|
async def log_requests(request: Request, call_next): |
|
|
"""Log all requests""" |
|
|
logger.info("="*80) |
|
|
logger.info(f"[REQUEST] Method: {request.method}") |
|
|
logger.info(f"[REQUEST] URL: {request.url}") |
|
|
logger.info(f"[REQUEST] Headers: {dict(request.headers)}") |
|
|
logger.info(f"[REQUEST] Query params: {dict(request.query_params)}") |
|
|
|
|
|
|
|
|
if request.method == "POST": |
|
|
body = await request.body() |
|
|
logger.info(f"[REQUEST] Raw body ({len(body)} bytes): {body}") |
|
|
try: |
|
|
body_str = body.decode('utf-8') |
|
|
logger.info(f"[REQUEST] Body as string: {body_str}") |
|
|
body_json = json.loads(body_str) |
|
|
logger.info(f"[REQUEST] Body as JSON: {body_json}") |
|
|
except Exception as e: |
|
|
logger.error(f"[REQUEST] Body parse error: {str(e)}") |
|
|
|
|
|
response = await call_next(request) |
|
|
logger.info(f"[RESPONSE] Status: {response.status_code}") |
|
|
logger.info("="*80) |
|
|
return response |
|
|
|
|
|
@app.post("/api/ai") |
|
|
async def api_ai_post(request: Request): |
|
|
"""AI endpoint - POST""" |
|
|
try: |
|
|
body = await request.body() |
|
|
|
|
|
if not body: |
|
|
return JSONResponse({"error": "Empty body"}, status_code=400) |
|
|
|
|
|
data = json.loads(body.decode('utf-8')) |
|
|
logger.info(f"[API POST] Parsed: {data}") |
|
|
|
|
|
question = data.get("text", "") |
|
|
if not question: |
|
|
return JSONResponse({"error": "No 'text' field"}, status_code=400) |
|
|
|
|
|
answer = generate_answer(question) |
|
|
return JSONResponse({"answer": answer}) |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"[API POST] Error: {str(e)}") |
|
|
return JSONResponse({"error": str(e)}, status_code=500) |
|
|
|
|
|
@app.get("/api/ai") |
|
|
async def api_ai_get(text: str = Query(default="", description="Question")): |
|
|
"""AI endpoint - GET""" |
|
|
try: |
|
|
logger.info(f"[API GET] text param: '{text}'") |
|
|
|
|
|
if not text: |
|
|
return JSONResponse({"error": "No text parameter"}, status_code=400) |
|
|
|
|
|
answer = generate_answer(text) |
|
|
return JSONResponse({"answer": answer}) |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"[API GET] Error: {str(e)}") |
|
|
return JSONResponse({"error": str(e)}, status_code=500) |
|
|
|
|
|
@app.get("/health") |
|
|
async def health(): |
|
|
return {"status": "ok", "model": "SmolLM2-360M", "endpoints": ["/api/ai (GET/POST)"]} |
|
|
|
|
|
|
|
|
with gr.Blocks(title="Fast Q&A") as demo: |
|
|
gr.Markdown(""" |
|
|
# ⚡ Fast Q&A - SmolLM2-360M |
|
|
|
|
|
## 🎯 Pluely Configuration |
|
|
|
|
|
### Method 1: GET Request (RECOMMENDED - Works with Pluely) |
|
|
|
|
|
**Curl Command for Pluely:** |
|
|
``` |
|
|
curl https://archcoder-basic-app.hf.space/api/ai?text={{TEXT}} |
|
|
``` |
|
|
|
|
|
**Response Path:** `answer` |
|
|
|
|
|
**Streaming:** OFF |
|
|
|
|
|
--- |
|
|
|
|
|
### Method 2: POST Request (Alternative) |
|
|
|
|
|
**Curl Command for Pluely:** |
|
|
``` |
|
|
curl -X POST https://archcoder-basic-app.hf.space/api/ai -H "Content-Type: application/json" -d {\"text\":\"{{TEXT}}\"} |
|
|
``` |
|
|
|
|
|
**Response Path:** `answer` |
|
|
|
|
|
**Streaming:** OFF |
|
|
|
|
|
--- |
|
|
|
|
|
## 🧪 Test Manually |
|
|
|
|
|
**Windows CMD:** |
|
|
``` |
|
|
curl "https://archcoder-basic-app.hf.space/api/ai?text=Who+is+the+president" |
|
|
``` |
|
|
|
|
|
**PowerShell:** |
|
|
``` |
|
|
Invoke-RestMethod -Uri "https://archcoder-basic-app.hf.space/api/ai?text=Who is the president" |
|
|
``` |
|
|
|
|
|
**Browser:** |
|
|
``` |
|
|
https://archcoder-basic-app.hf.space/api/ai?text=Who is the president |
|
|
``` |
|
|
""") |
|
|
|
|
|
with gr.Tab("Test"): |
|
|
test_input = gr.Textbox(label="Question", placeholder="Ask anything...") |
|
|
test_btn = gr.Button("🚀 Test") |
|
|
test_output = gr.Textbox(label="Answer", lines=8) |
|
|
test_btn.click(fn=generate_answer, inputs=[test_input], outputs=[test_output]) |
|
|
|
|
|
with gr.Tab("Logs"): |
|
|
gr.Markdown(""" |
|
|
## How to Check Logs |
|
|
|
|
|
1. Go to your Hugging Face Space |
|
|
2. Click on **"Logs"** tab at the top |
|
|
3. You'll see all requests with: |
|
|
- Request method and URL |
|
|
- Headers |
|
|
- Body content |
|
|
- Response |
|
|
|
|
|
This helps debug what Pluely is actually sending! |
|
|
""") |
|
|
|
|
|
app = gr.mount_gradio_app(app, demo, path="/") |
|
|
|
|
|
if __name__ == "__main__": |
|
|
uvicorn.run(app, host="0.0.0.0", port=7860) |
|
|
|