|
|
import os |
|
|
import asyncio |
|
|
import time |
|
|
from typing import Optional |
|
|
from datetime import datetime, timezone |
|
|
import httpx |
|
|
import trafilatura |
|
|
import gradio as gr |
|
|
from dateutil import parser as dateparser |
|
|
from fastapi import FastAPI |
|
|
from pydantic import BaseModel |
|
|
import google.generativeai as genai |
|
|
|
|
|
from analytics import record_request, last_n_days_df, last_n_days_avg_time_df |
|
|
|
|
|
|
|
|
PROMPT_NORMAL = """ |
|
|
Based on the user's original query, provide a concise summary in shot form of the following text. Focus only on query releted information mention source url and Answer should be in correct order in timeline. |
|
|
USER'S QUERY: "{query}" |
|
|
TEXT TO SUMMARIZE: |
|
|
--- |
|
|
{context_text} |
|
|
--- |
|
|
""" |
|
|
PROMPT_DEEP = """ |
|
|
As a meticulous research analyst, your task is to synthesize the information from the provided web search results into a maximum detailed and comprehensive report. |
|
|
**Current Date:** {current_date}. |
|
|
**VERY IMPORTANT:** Your top priority is to provide information relevant to this current date and the future. If the user's query is about a recurring event (like an exam), you MUST focus on the upcoming or current event. |
|
|
**User's Original Query:** "{query}" |
|
|
**Instructions:** |
|
|
1.You are a researcher who does deep research on the query and explains in detail without leaving any topic and adds as much detail in the explanation as possible which is given in the web page. |
|
|
2.You do not have to give your opinion, you only have to speak according to the source. You also have to tell in your answers from which source you got the information and you have to give that too. |
|
|
3. In the result, give only query related details which are completely different from the topic of the query, ignore them and make a summary in the detailed summary in the order of the timeline. . |
|
|
**Provided Search Results:** |
|
|
--- |
|
|
{context_text} |
|
|
--- |
|
|
""" |
|
|
|
|
|
|
|
|
async def search_web_logic(query: str, serper_api_key: str, search_type: str, num_results: int) -> str: |
|
|
start_time = time.time() |
|
|
if not serper_api_key: return "Error: Serper API Key is required." |
|
|
num_results = max(1, min(20, num_results)) |
|
|
search_type = "search" if search_type not in ["search", "news"] else search_type |
|
|
try: |
|
|
endpoint = "https://google.serper.dev/news" if search_type == "news" else "https://google.serper.dev/search" |
|
|
payload = {"q": query, "num": num_results}; headers = {"X-API-KEY": serper_api_key, "Content-Type": "application/json"} |
|
|
async with httpx.AsyncClient(timeout=15) as client: |
|
|
resp = await client.post(endpoint, headers=headers, json=payload) |
|
|
if resp.status_code != 200: return f"Error: Search API returned status {resp.status_code}." |
|
|
results = resp.json().get("news" if search_type == "news" else "organic", []) |
|
|
if not results: return f"No {search_type} results found for '{query}'." |
|
|
urls = [r["link"] for r in results] |
|
|
async with httpx.AsyncClient(timeout=20, follow_redirects=True) as client: |
|
|
tasks = [client.get(u) for u in urls]; responses = await asyncio.gather(*tasks, return_exceptions=True) |
|
|
chunks, successful_extractions = [], 0 |
|
|
for meta, response in zip(results, responses): |
|
|
if isinstance(response, Exception) or not (body := trafilatura.extract(response.text)): continue |
|
|
successful_extractions += 1 |
|
|
if search_type == "news": |
|
|
try: date_iso = dateparser.parse(meta.get("date", ""), fuzzy=True).strftime("%Y-%m-%d") |
|
|
except Exception: date_iso = "Unknown" |
|
|
chunk = f"## {meta['title']}\n**Source:** {meta.get('source', 'Unknown')} | **Date:** {date_iso}\n**URL:** {meta['link']}\n\n{body.strip()}\n" |
|
|
else: |
|
|
domain = meta["link"].split("/")[2].replace("www.", ""); chunk = f"## {meta['title']}\n**Domain:** {domain}\n**URL:** {meta['link']}\n\n{body.strip()}\n" |
|
|
chunks.append(chunk) |
|
|
if not chunks: return "Found results for '{query}', but couldn't extract content." |
|
|
summary = f"Successfully extracted content from {successful_extractions}/{len(results)} results.\n\n---\n\n" |
|
|
await record_request(time.time() - start_time, num_results) |
|
|
return summary + "\n---\n".join(chunks) |
|
|
except Exception as e: |
|
|
return f"An error occurred during web search: {str(e)}" |
|
|
|
|
|
|
|
|
async def summarize_with_gemini(text_to_summarize: str, query: str, gemini_key: str, model_name: str, research_mode: str) -> str: |
|
|
try: |
|
|
genai.configure(api_key=gemini_key) |
|
|
model = genai.GenerativeModel(model_name) |
|
|
current_date = datetime.now(timezone.utc).strftime("%Y-%m-%d") |
|
|
|
|
|
if research_mode == 'deep': |
|
|
prompt_template = PROMPT_DEEP |
|
|
else: |
|
|
prompt_template = PROMPT_NORMAL |
|
|
|
|
|
prompt = prompt_template.format(query=query, context_text=text_to_summarize, current_date=current_date) |
|
|
response = await model.generate_content_async(prompt) |
|
|
return response.text |
|
|
except Exception as e: |
|
|
return f"\n\n--- ⚠️ Gemini Summarization Failed ---\nError: {str(e)}\nReturning raw text instead." |
|
|
|
|
|
|
|
|
async def search_and_summarize(query, serper_api_key, search_type, num_results, gemini_api_key, gemini_model, research_mode): |
|
|
scraped_text = await search_web_logic(query, serper_api_key, search_type, num_results) |
|
|
|
|
|
if gemini_api_key and "Error:" not in scraped_text: |
|
|
summarized_text = await summarize_with_gemini(scraped_text, query, gemini_api_key, gemini_model, research_mode) |
|
|
if "⚠️ Gemini Summarization Failed" in summarized_text: |
|
|
return scraped_text + summarized_text |
|
|
else: |
|
|
return summarized_text |
|
|
return scraped_text |
|
|
|
|
|
|
|
|
app = FastAPI() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class SearchRequest(BaseModel): |
|
|
query: str |
|
|
serper_api_key: str |
|
|
search_type: str = "search" |
|
|
num_results: int = 4 |
|
|
gemini_api_key: Optional[str] = None |
|
|
gemini_model: Optional[str] = "gemini-2.5-flash-lite" |
|
|
research_mode: str = "normal" |
|
|
|
|
|
@app.post("/api/search") |
|
|
async def api_search(request: SearchRequest): |
|
|
result = await search_and_summarize( |
|
|
request.query, request.serper_api_key, request.search_type, request.num_results, |
|
|
request.gemini_api_key, request.gemini_model, request.research_mode |
|
|
) |
|
|
return {"result": result} |
|
|
|
|
|
|
|
|
def create_gradio_app(): |
|
|
with gr.Blocks(title="Web Search & Summarize UI") as demo: |
|
|
gr.Markdown("# 🔍 AI Search & Summarize") |
|
|
with gr.Tabs(): |
|
|
with gr.Tab("App"): |
|
|
gr.Markdown("### Step 1: Web Search") |
|
|
query_input = gr.Textbox(label="Search Query") |
|
|
serper_api_key_input = gr.Textbox(label="Your Serper API Key", type="password") |
|
|
with gr.Row(): |
|
|
search_type_input = gr.Radio(["search", "news"], value="search", label="Search Type") |
|
|
num_results_input = gr.Slider(1, 20, value=4, step=1, label="Number of Results") |
|
|
|
|
|
gr.Markdown("### Step 2: AI Summarization") |
|
|
research_mode_input = gr.Radio(["normal", "deep"], value="normal", label="Research Mode", info="Normal for fast summary, Deep for detailed report.") |
|
|
gemini_api_key_input = gr.Textbox(label="Your Gemini API Key", type="password", placeholder="Leave empty to skip summarization") |
|
|
gemini_model_input = gr.Textbox(label="Gemini Model", value="gemini-1.5-flash-latest") |
|
|
search_button = gr.Button("Search & Summarize", variant="primary") |
|
|
output = gr.Textbox(label="Result", lines=25, max_lines=40) |
|
|
|
|
|
search_button.click( |
|
|
fn=search_and_summarize, |
|
|
inputs=[query_input, serper_api_key_input, search_type_input, num_results_input, gemini_api_key_input, gemini_model_input, research_mode_input], |
|
|
outputs=output |
|
|
) |
|
|
with gr.Tab("Analytics"): |
|
|
requests_plot = gr.BarPlot(x="date", y="count", title="Daily Requests") |
|
|
avg_time_plot = gr.BarPlot(x="date", y="avg_time", title="Avg. Response Time (s)") |
|
|
def update_analytics(): return last_n_days_df(14), last_n_days_avg_time_df(14) |
|
|
demo.load(update_analytics, [], [requests_plot, avg_time_plot]) |
|
|
return demo |
|
|
|
|
|
|
|
|
gradio_ui = create_gradio_app() |
|
|
app = gr.mount_gradio_app(app, gradio_ui, path="/") |
|
|
|
|
|
if __name__ == "__main__": |
|
|
import uvicorn |
|
|
uvicorn.run(app, host="0.0.0.0", port=7860) |