| import os | |
| import sqlite3 | |
| import requests | |
| import openai | |
| import gradio as gr | |
| import asyncio | |
| from gtts import gTTS | |
| from typing_extensions import TypedDict | |
| from langgraph.graph import StateGraph, START, END | |
| import csv | |
| openai.api_key = os.getenv("OPENAI_API_KEY") | |
| def init_db_from_csv(csv_path: str = "transactions.csv") -> None: | |
| conn = sqlite3.connect("shop.db") | |
| cur = conn.cursor() | |
| cur.execute( | |
| "CREATE TABLE IF NOT EXISTS transactions (date TEXT, product TEXT, amount REAL)" | |
| ) | |
| with open(csv_path, newline='') as f: | |
| reader = csv.DictReader(f) | |
| rows = [(row["date"], row["product"], float(row["amount"])) for row in reader] | |
| cur.execute("DELETE FROM transactions") | |
| cur.executemany( | |
| "INSERT INTO transactions (date, product, amount) VALUES (?, ?, ?)", rows | |
| ) | |
| conn.commit() | |
| conn.close() | |
| init_db_from_csv() | |
| def db_agent(query: str) -> str: | |
| try: | |
| conn = sqlite3.connect("shop.db") | |
| cur = conn.cursor() | |
| cur.execute( | |
| """ | |
| SELECT product, SUM(amount) AS revenue | |
| FROM transactions | |
| WHERE date = date('now') | |
| GROUP BY product | |
| ORDER BY revenue DESC | |
| LIMIT 1 | |
| """ | |
| ) | |
| row = cur.fetchone() | |
| if row: | |
| return f"Top product today: {row[0]} with ₹{row[1]:,.2f}" | |
| return "No transactions found for today." | |
| except sqlite3.OperationalError as e: | |
| return f"Database error: {e}. Please check 'transactions' table in shop.db." | |
| def web_search_agent(query: str) -> str: | |
| try: | |
| resp = requests.get( | |
| "https://serpapi.com/search", | |
| params={"q": query, "api_key": os.getenv("SERPAPI_KEY")} | |
| ) | |
| snippet = resp.json().get("organic_results", [{}])[0].get("snippet", "").strip() | |
| if snippet: | |
| return llm_agent(f"Summarize: {snippet}") | |
| except Exception: | |
| pass | |
| return llm_agent(query) | |
| def llm_agent(query: str) -> str: | |
| response = openai.chat.completions.create( | |
| model="gpt-4o-mini", | |
| messages=[ | |
| {"role": "system", "content": "You are a helpful assistant."}, | |
| {"role": "user", "content": query}, | |
| ], | |
| temperature=0.2, | |
| ) | |
| return response.choices[0].message.content.strip() | |
| def stt_agent(audio_path: str) -> str: | |
| with open(audio_path, "rb") as afile: | |
| transcript = openai.audio.transcriptions.create( | |
| model="whisper-1", | |
| file=afile | |
| ) | |
| return transcript.text.strip() | |
| def tts_agent(text: str, lang: str = 'en') -> str: | |
| tts = gTTS(text=text, lang=lang) | |
| out_path = "response_audio.mp3" | |
| tts.save(out_path) | |
| return out_path | |
| class State(TypedDict): | |
| query: str | |
| result: str | |
| def route_fn(state: State) -> str: | |
| q = state["query"].lower() | |
| if any(k in q for k in ["max revenue", "revenue"]): | |
| return "db" | |
| if any(k in q for k in ["who", "what", "when", "where"]): | |
| return "web" | |
| return "llm" | |
| def router_node(state: State) -> dict: | |
| return {"query": state["query"]} | |
| def db_node(state: State) -> dict: | |
| return {"result": db_agent(state["query"]) } | |
| def web_node(state: State) -> dict: | |
| return {"result": web_search_agent(state["query"]) } | |
| def llm_node(state: State) -> dict: | |
| return {"result": llm_agent(state["query"]) } | |
| builder = StateGraph(State) | |
| builder.add_node("router", router_node) | |
| builder.set_entry_point("router") | |
| builder.set_conditional_entry_point( | |
| route_fn, | |
| path_map={"db": "db", "web": "web", "llm": "llm"} | |
| ) | |
| builder.add_node("db", db_node) | |
| builder.add_node("web", web_node) | |
| builder.add_node("llm", llm_node) | |
| builder.add_edge(START, "router") | |
| builder.add_edge("db", END) | |
| builder.add_edge("web", END) | |
| builder.add_edge("llm", END) | |
| graph = builder.compile() | |
| def handle_query(audio_or_text: str): | |
| is_audio = audio_or_text.endswith('.wav') or audio_or_text.endswith('.mp3') | |
| if is_audio: | |
| query = stt_agent(audio_or_text) | |
| else: | |
| query = audio_or_text | |
| state = graph.invoke({"query": query}) | |
| response = state["result"] | |
| if is_audio: | |
| audio_path = tts_agent(response) | |
| return response, audio_path | |
| return response | |
| with gr.Blocks() as demo: | |
| gr.Markdown("## Shop Voice-Box Assistant (Speech In/Out)") | |
| inp = gr.Audio(sources=["microphone"], type="filepath", label="Speak or type your question or upload transactions.csv separately in root") | |
| out_text = gr.Textbox(label="Answer (text)") | |
| out_audio = gr.Audio(label="Answer (speech)") | |
| submit = gr.Button("Submit") | |
| gr.Examples( | |
| examples=[ | |
| ["What is the max revenue product today?"], | |
| ["Who invented the light bulb?"], | |
| ["Tell me a joke about cats."], | |
| ], | |
| inputs=inp, | |
| outputs=[out_text, out_audio], | |
| ) | |
| submit.click(fn=handle_query, inputs=inp, outputs=[out_text, out_audio]) | |
| if __name__ == "__main__": | |
| demo.launch(share=False, server_name="0.0.0.0", server_port=7860) | |