import os
import gradio as gr
import requests
import inspect
import pandas as pd
import re
import warnings
warnings.filterwarnings("ignore")
import json
import logging
from typing import TypedDict, Annotated, Dict, Any
from json_repair import repair_json
import requests
from bs4 import BeautifulSoup
from pydantic import BaseModel, Field
from typing import Dict
from transformers import AutoTokenizer, AutoModelForCausalLM
import torch
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages
from langchain_core.messages import AnyMessage, HumanMessage, AIMessage
from langchain_community.retrievers import BM25Retriever
from langchain_core.tools import Tool
from langchain_core.documents import Document
from langgraph.prebuilt import ToolNode, tools_condition
from sentence_transformers import SentenceTransformer
from sklearn.metrics.pairwise import cosine_similarity
# from langchain.agents import create_tool_calling_agent
import torch
import gradio as gr
from transformers import pipeline
from langdetect import detect
audio_model_id = "Sandiago21/whisper-large-v2-greek" # update with your model id
audio_pipe = pipeline("automatic-speech-recognition", model=audio_model_id)
# title = "Automatic Speech Recognition (ASR)"
# description = """
# Demo for automatic speech recognition in Greek. Demo uses [Sandiago21/whisper-large-v2-greek](https://huggingface.co/Sandiago21/whisper-large-v2-greek) checkpoint, which is based on OpenAI's
# [Whisper](https://huggingface.co/openai/whisper-large-v2) model and is fine-tuned in Greek Audio dataset
# ")
# """
# (Keep Constants as is)
# --- Constants ---
DEFAULT_API_URL = "https://agents-course-unit4-scoring.hf.space"
sentence_transformer_model = SentenceTransformer("all-mpnet-base-v2")
logger = logging.getLogger("agent")
logging.basicConfig(level=logging.INFO)
class Config(object):
def __init__(self):
self.random_state = 42
self.max_len = 256
self.reasoning_max_len = 256
self.temperature = 0.1
self.repetition_penalty = 1.2
self.DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
self.model_name = "Qwen/Qwen2.5-7B-Instruct"
# self.model_name = "deepseek-ai/DeepSeek-R1-Distill-Qwen-7B"
# self.reasoning_model_name = "deepseek-ai/DeepSeek-R1-Distill-Qwen-7B"
# self.reasoning_model_name = "Qwen/Qwen2.5-7B-Instruct"
# self.reasoning_model_name = "mistralai/Mistral-7B-Instruct-v0.2"
config = Config()
tokenizer = AutoTokenizer.from_pretrained(config.model_name)
model = AutoModelForCausalLM.from_pretrained(
config.model_name,
torch_dtype=torch.float16,
device_map="auto"
)
# reasoning_tokenizer = AutoTokenizer.from_pretrained(config.reasoning_model_name)
# reasoning_model = AutoModelForCausalLM.from_pretrained(
# config.reasoning_model_name,
# torch_dtype=torch.float16,
# device_map="auto"
# )
def generate(prompt):
"""
Generate a text completion from a causal language model given a prompt.
Parameters
----------
prompt : str
Input text prompt used to condition the language model.
Returns
-------
str
The generated continuation text, decoded into a string with special
tokens removed and leading/trailing whitespace stripped.
"""
inputs = tokenizer(prompt, return_tensors="pt").to(model.device)
with torch.no_grad():
outputs = model.generate(
**inputs,
max_new_tokens=config.max_len,
temperature=config.temperature,
repetition_penalty = config.repetition_penalty,
)
generated = outputs[0][inputs["input_ids"].shape[-1]:]
return tokenizer.decode(generated, skip_special_tokens=True).strip()
def reasoning_generate(prompt):
"""
Generate a text completion from a causal language model given a prompt.
Parameters
----------
prompt : str
Input text prompt used to condition the language model.
Returns
-------
str
The generated continuation text, decoded into a string with special
tokens removed and leading/trailing whitespace stripped.
"""
inputs = tokenizer(prompt, return_tensors="pt").to(model.device)
with torch.no_grad():
outputs = model.generate(
**inputs,
max_new_tokens=config.reasoning_max_len,
temperature=config.temperature,
repetition_penalty = config.repetition_penalty,
)
generated = outputs[0][inputs["input_ids"].shape[-1]:]
return tokenizer.decode(generated, skip_special_tokens=True).strip()
def reasoning_generate(prompt):
"""
Generate a text completion from a causal language model given a prompt.
Parameters
----------
prompt : str
Input text prompt used to condition the language model.
Returns
-------
str
The generated continuation text, decoded into a string with special
tokens removed and leading/trailing whitespace stripped.
"""
inputs = tokenizer(prompt, return_tensors="pt").to(model.device)
with torch.no_grad():
outputs = model.generate(
**inputs,
max_new_tokens=config.reasoning_max_len,
temperature=config.temperature,
repetition_penalty = config.repetition_penalty,
)
generated = outputs[0][inputs["input_ids"].shape[-1]:]
return tokenizer.decode(generated, skip_special_tokens=True).strip()
class Action(BaseModel):
tool: str = Field(...)
args: Dict
# Generate the AgentState and Agent graph
class AgentState(TypedDict):
messages: Annotated[list[AnyMessage], add_messages]
proposed_action: str
information: str
output: str
confidence: float
judge_explanation: str
ALL_TOOLS = {
"web_search": ["query"],
"visit_webpage": ["url"],
}
ALLOWED_TOOLS = {
"web_search": ["query"],
"visit_webpage": ["url"],
}
def visit_webpage(url: str) -> str:
"""
Fetch and read the content of a webpage.
Args:
url: URL of the webpage
Returns:
Extracted readable text (truncated)
"""
headers = {
"User-Agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120 Safari/537.36"
}
response = requests.get(url, headers=headers, timeout=10)
response.raise_for_status()
soup = BeautifulSoup(response.text, "html.parser")
paragraphs = [p.get_text() for p in soup.find_all("p")]
text = "\n".join(paragraphs)
return (text[:500], text[500:1000])
def visit_webpage(url: str) -> str:
headers = {
"User-Agent": "Mozilla/5.0"
}
response = requests.get(url, headers=headers, timeout=10)
response.raise_for_status()
soup = BeautifulSoup(response.text, "html.parser")
# Remove scripts/styles
for tag in soup(["script", "style"]):
tag.extract()
# Extract more elements (not just
)
elements = soup.find_all(["p", "dd"])
text = " \n ".join(el.get_text(strip=False) for el in elements)
return (text[:1000], )
def visit_webpage(url: str) -> str:
headers = {
"User-Agent": "Mozilla/5.0"
}
response = requests.get(url, headers=headers, timeout=10)
response.raise_for_status()
soup = BeautifulSoup(response.text, "html.parser")
# Remove scripts/styles
for tag in soup(["script", "style"]):
tag.extract()
content = soup.find("div", {"id": "mw-content-text"})
texts = []
# 1. Paragraphs
for p in content.find_all("p"):
texts.append(p.get_text(strip=False))
# 2. Definition lists
for dd in content.find_all("dd"):
texts.append(dd.get_text(strip=False))
# 3. Tables (IMPORTANT)
for table in content.find_all("table", {"class": "wikitable"}):
for row in table.find_all("tr"):
cols = [c.get_text(strip=True) for c in row.find_all(["td", "th"])]
if cols:
texts.append(" | ".join(cols))
return (" \n ".join(texts)[:1000], )
def visit_webpage(url: str) -> str:
headers = {
"User-Agent": "Mozilla/5.0"
}
response = requests.get(url, headers=headers, timeout=10)
response.raise_for_status()
soup = BeautifulSoup(response.text, "html.parser")
# Remove scripts/styles
for tag in soup(["script", "style"]):
tag.extract()
content = soup.find("div", {"id": "mw-content-text"})
# Extract more elements (not just
)
elements = soup.find_all(["p", "dd"])
main_text = " \n ".join(el.get_text(strip=False) for el in elements)
# 3. Tables (IMPORTANT)
table_texts = []
for table in content.find_all("table", {"class": "wikitable"}):
for row in table.find_all("tr"):
cols = [c.get_text(strip=True) for c in row.find_all(["td", "th"])]
if cols:
table_texts.append(" | ".join(cols))
if len(table_texts) > 0:
return [main_text[:1000], " \n ".join(table_texts),]
else:
return [main_text[:1000],]
def visit_webpage(url: str) -> str:
headers = {
"User-Agent": "Mozilla/5.0"
}
response = requests.get(url, headers=headers, timeout=10)
response.raise_for_status()
soup = BeautifulSoup(response.text, "html.parser")
# Remove scripts/styles
for tag in soup(["script", "style"]):
tag.extract()
content = soup.find("div", {"id": "mw-content-text"})
# Extract more elements (not just
)
elements = soup.find_all(["p", "dd"])
main_text = " \n ".join(el.get_text(strip=False) for el in elements)
# 3. Tables (IMPORTANT)
table_texts = []
if content is not None:
for table in content.find_all("table", {"class": "wikitable"}):
for row in table.find_all("tr"):
cols = [c.get_text(strip=True) for c in row.find_all(["td", "th"])]
if cols:
table_texts.append(" | ".join(cols))
if len(table_texts) > 0:
return [main_text[:1000], " \n ".join(table_texts),]
else:
return [main_text[:1000],]
def visit_webpage_wiki(url: str) -> str:
headers = {
"User-Agent": "Mozilla/5.0"
}
response = requests.get(url, headers=headers, timeout=10)
response.raise_for_status()
soup = BeautifulSoup(response.text, "html.parser")
# Remove scripts/styles
for tag in soup(["script", "style"]):
tag.extract()
content = soup.find("div", {"id": "mw-content-text"})
# Extract more elements (not just
)
elements = soup.find_all(["p", "dd"])
main_text = " \n ".join(el.get_text(strip=False) for el in elements)
# 3. Tables (IMPORTANT)
table_texts = []
if content is not None:
for table in content.find_all("table", {"class": "wikitable"}):
for row in table.find_all("tr"):
cols = [c.get_text(strip=False) for c in row.find_all(["td", "th"])]
if cols:
table_texts.append(" | ".join(cols))
if len(table_texts) > 0:
return [main_text[:1000], " \n ".join(table_texts)[:5000],]
else:
return [main_text[:1000],]
def visit_webpage_main(url: str):
headers = {"User-Agent": "Mozilla/5.0"}
response = requests.get(url, headers=headers, timeout=10)
response.raise_for_status()
soup = BeautifulSoup(response.text, "html.parser")
# Remove scripts/styles
for tag in soup(["script", "style"]):
tag.extract()
# 🔥 Try to focus on body (fallback if no clear container)
content = soup.find("body")
# ✅ Extract broader set of elements
elements = content.find_all(["p", "dd", "td", "div"])
texts = []
for el in elements:
text = el.get_text(strip=True)
if text and len(text) > 30: # filter noise
texts.append(text)
main_text = "\n".join(texts)
# ✅ Extract all tables (not just wikitable)
table_texts = []
for table in soup.find_all("table"):
for row in table.find_all("tr"):
cols = [c.get_text(strip=True) for c in row.find_all(["td", "th"])]
if cols:
table_texts.append(" | ".join(cols))
if table_texts:
return [main_text[:1500], "\n".join(table_texts)[:5000]]
else:
return [main_text[:1500]]
def web_search(query: str, num_results: int = 10):
"""
Search the internet for the query provided
Args:
query: Query to search in the internet
Returns:
list of urls
"""
url = "https://html.duckduckgo.com/html/"
headers = {
"User-Agent": "Mozilla/5.0"
}
response = requests.post(url, data={"q": query}, headers=headers)
soup = BeautifulSoup(response.text, "html.parser")
return [a.get("href") for a in soup.select(".result__a")[:num_results]]
def planner_node(state: AgentState):
"""
Planning node for a tool-using LLM agent.
The planner enforces:
- Strict JSON-only output
- Tool selection constrained to predefined tools
- Argument generation limited to user-provided information
Parameters
----------
state : dict
Agent state dictionary containing:
- "messages" (str): The user's natural language request.
Returns
-------
dict
Updated state dictionary with additional keys:
- "proposed_action" (dict): Parsed JSON tool call in the form:
{
"tool": "",
"args": {...}
}
- "risk_score" (float): Initialized risk score (default 0.0).
- "decision" (str): Initial decision ("allow" by default).
Behavior
--------
1. Constructs a planning prompt including:
- Available tools and allowed arguments
- Strict JSON formatting requirements
- Example of valid output
2. Calls the language model via `generate()`.
3. Attempts to extract valid JSON from the model output.
4. Repairs malformed JSON using `repair_json`.
5. Stores the parsed action into the agent state.
Security Notes
--------------
- This node does not enforce tool-level authorization.
- It does not validate hallucinated tools.
- It does not perform risk scoring beyond initializing values.
- Downstream nodes must implement:
* Tool whitelist validation
* Argument validation
* Risk scoring and mitigation
* Execution authorization
Intended Usage
--------------
Designed for multi-agent or LangGraph-style workflows where:
Planner → Risk Assessment → Tool Executor → Logger
This node represents the *planning layer* of the agent architecture.
"""
user_input = state["messages"][-1].content
prompt = f"""
You are a planning agent.
You MUST return ONLY valid JSON as per the tools specs below ONLY.
No extra text.
DO NOT invent anything additional beyond the user request provided. Keep it strict to the user request information provided. The question and the query should be fully relevant to the user request provided, no deviation and hallucination. If possible and makes sense then the query should be exactly the user request.
The available tools and their respective arguments are: {{
"web_search": ["query"],
"visit_webpage": ["url"],
}}
Return exactly the following format:
Response:
{{
"tool": "...",
"args": {{...}}
}}
User request: Who nominated the only Featured Article on English Wikipedia about a dinosaur that was promoted in November 2016?. Example of valid JSON expected:
Response:
{{"tool": "web_search",
"args": {{"query": "Who nominated the only Featured Article on English Wikipedia about a dinosaur that was promoted in November 2016?",
}}
}}
Return only one Response!
User request:
{user_input}
"""
output = generate(prompt)
state["proposed_action"] = output.split("Response:")[-1]
fixed = repair_json(state["proposed_action"])
data = json.loads(fixed)
state["proposed_action"] = data
return state
def safety_node(state: AgentState):
"""
Evaluate the information provided and output the response for the user request.
"""
user_input = state["messages"][-1].content
information = state["information"]
prompt = f"""
You are a response agent.
You must reason over the user request and the provided information and output the answer to the user's request. Reason well over the information provided, if any, and output the answer that satisfies the user's question exactly.
You MUST ONLY return EXACTLY the answer to the user's question in the following format:
Response:
DO NOT add anything additional and return ONLY what is asked and in the format asked.
ONLY return a response if you are confident about the answer, otherwise return empty string.
If you output anything else, it is incorrect.
If there is no information provided or the information is not relevant then answer as best based on your own knowledge.
Example of valid json response for user request: Who was the winner of 2025 World Snooker Championship:
Response: Zhao Xintong.
Return exactly the above requested format and nothing more!
DO NOT generate any additional text after it!
User request:
{user_input}
Information:
{information}
"""
raw_output = reasoning_generate(prompt)
# raw_output = generate(prompt)
logger.info(f"Raw Output: {raw_output}")
# output = raw_output.split("Response:")[-1].strip()
# output = output.split("\n")[0].strip()
# # match = re.search(r"Response:\s*(.*)", raw_output, re.IGNORECASE)
# # output = match.group(1).strip() if match else ""
# if len(output) > 2 and output[0] == '"' and output[-1] == '"':
# output = output[1:-1]
# if len(output) > 2 and output[-1] == '.':
# output = output[:-1]
raw = raw_output.strip()
matches = re.findall(r"Response:\s*([^\n]+)", raw)
if matches:
output = matches[-1].strip() # ✅ take LAST occurrence
else:
# Find the first valid "Response: ..." occurrence
match = re.search(r"Response:\s*([^\n\.]+)", raw)
if match:
output = match.group(1).strip()
else:
# fallback: take first line
output = raw.split("\n")[0].strip()
if "Response:" in output:
output = output.split("Response:")[-1]
elif "Response" in output:
output = output.split("Response")[-1]
# Clean quotes / trailing punctuation
output = output.strip('"').strip()
if output.endswith("."):
output = output[:-1]
# Clean
if "Response:" in output:
output = output.split("Response:")[-1]
elif "Response" in output:
output = output.split("Response")[-1]
# Clean quotes / trailing punctuation
output = output.strip('"').strip()
if output.endswith("."):
output = output[:-1]
if output == "":
# Find the first valid "Response: ..." occurrence
match = re.search(r"Response:\s*([^\n\.]+)", raw)
if match:
output = match.group(1).strip()
else:
# fallback: take first line
output = raw.split("\n")[0].strip()
if "Response:" in output:
output = output.split("Response:")[-1]
elif "Response" in output:
output = output.split("Response")[-1]
# Clean quotes / trailing punctuation
output = output.strip('"').strip()
if output.endswith("."):
output = output[:-1]
output = output.split(".")[0]
state["output"] = output
logger.info(f"State (Safety Agent): {state}")
return state
def Judge(state: AgentState):
"""
Evaluate whether the answer provided is indeed based on the information provided or not.
"""
answer = state["output"]
information = state["information"]
user_input = state["messages"][-1].content
prompt = f"""
You are a Judging agent.
You must reason over the user request and judge with a confidence score whether the answer is indeed based on the provided information or not.
Example: User request: Who was the winner of 2025 World Snooker Championship?
Information: Zhao Xintong won the 2025 World Snooker Championship with a dominant 18-12 final victory over Mark Williams in Sheffield on Monday. The 28 year-old becomes the first player from China to win snooker’s premier prize at the Crucible Theatre.
Zhao, who collects a top prize worth £500,000, additionally becomes the first player under amateur status to go all the way to victory in a World Snooker Championship.
The former UK champion entered the competition in the very first qualifying round at the English Institute of Sport last month.
He compiled a dozen century breaks as he fought his way through four preliminary rounds in fantastic fashion to qualify for the Crucible for the third time in his career.
In the final round of the qualifiers known as Judgement Day, Zhao edged Elliot Slessor 10-8 in a high-quality affair during which both players made a hat-trick of tons.
Ironically, that probably represented his sternest test throughout the entire event.
Answer: "Zhao Xintong"
Response: {{
"confidence": 1.0,
"explanation": Based on the information provided, it is indeed mentioned that Zhao Xingong, which is the answer provided, won the 2025 World Snooker Championship.
}}
Example: User request: Who was the winner of 2025 World Snooker Championship?
Information: Zhao Xintong won the 2025 World Snooker Championship with a dominant 18-12 final victory over Mark Williams in Sheffield on Monday. The 28 year-old becomes the first player from China to win snooker’s premier prize at the Crucible Theatre.
Zhao, who collects a top prize worth £500,000, additionally becomes the first player under amateur status to go all the way to victory in a World Snooker Championship.
The former UK champion entered the competition in the very first qualifying round at the English Institute of Sport last month.
He compiled a dozen century breaks as he fought his way through four preliminary rounds in fantastic fashion to qualify for the Crucible for the third time in his career.
In the final round of the qualifiers known as Judgement Day, Zhao edged Elliot Slessor 10-8 in a high-quality affair during which both players made a hat-trick of tons.
Ironically, that probably represented his sternest test throughout the entire event.
Answer: "Ronnie O'sullivan"
Response: {{
"confidence": 0.0,
"explanation": Based on the information provided, it is was Zhao Xingong and not Ronnie O'sullivan who won the 2025 World Snooker Championship.
}}
Example: User request: Who was the winner of 2025 World Snooker Championship?
Information:
Answer: "Ronnie O'sullivan"
Response: {{
"confidence": 0.0,
"explanation": There is no information provided, so cannot answer who won the 2025 World Snooker Championship.
}}
Return exactly the above requested format and nothing more!
DO NOT generate any additional text after it!
Return only what is asked and in the format asked!
User request:
{user_input}
Information:
{information}
Answer:
{answer}
"""
raw_output = generate(prompt)
print(f"Judge raw output: {raw_output}")
output = raw_output.split("Response:")[-1].strip()
fixed = repair_json(output)
data = json.loads(fixed)
state["confidence"] = data["confidence"]
state["judge_explanation"] = data["explanation"]
# logger.info(f"State (Judge Agent): {state}")
return state
def route(state: AgentState):
"""Determine the next step based on Safety Agent classification"""
if state["risk_score"] > 0.5:
return "block"
else:
return "allow"
def tool_executor(state: AgentState):
"""
Tool execution node for a risk-aware LLM agent.
This node executes the validated and approved tool call proposed by the
planner and assessed by the safety layer. It conditionally dispatches
execution based on the safety decision and updates the agent state with
the final output.
Parameters
----------
state : dict
Agent state dictionary containing:
- "decision" (str): Safety decision ("allow" or blocking variant).
- "risk_score" (float): Computed risk score.
- "proposed_action" (dict): Validated tool call in structured form.
Returns
-------
dict
Updated state dictionary including:
- "output" (str): Result of tool execution OR block message.
Execution Flow
--------------
1. If the safety decision is not "allow":
- Skip tool execution.
- Return a blocked message including the risk score.
2. If allowed:
- Validate the proposed action using the `Action` schema.
- Dispatch execution to the appropriate tool implementation:
* "google_calendar"
* "reply_email"
* "share_credentials"
- Store tool result in `state["output"]`.
3. If the tool is unrecognized:
- Return "Unknown tool" as a fallback response.
Security Considerations
-----------------------
- Execution only occurs after passing the safety node.
- No runtime sandboxing is implemented.
- No per-tool authorization layer (RBAC) is enforced.
- Sensitive tools (e.g., credential exposure) should require:
* Elevated approval thresholds
* Human-in-the-loop confirmation
* Additional auditing
Architectural Role
------------------
Planner → Safety → Tool Execution → Logger
This node represents the controlled execution layer of the agent,
responsible for translating structured LLM intent into real system actions.
"""
try:
user_input = state["messages"][-1].content
user_input_language = detect(user_input)
webpage_result = ""
action = Action.model_validate(state["proposed_action"])
best_query_webpage_information_similarity_score = -1.0
best_webpage_information = ""
webpage_information_complete = ""
if action.tool == "web_search":
logger.info(f"action.tool: {action.tool}")
query_embeddings = sentence_transformer_model.encode_query(state["messages"][-1].content).reshape(1, -1)
query_arg_embeddings = sentence_transformer_model.encode_query(state["proposed_action"]["args"]["query"]).reshape(1, -1)
score = float(cosine_similarity(query_embeddings, query_arg_embeddings)[0][0])
if score > 0.80 or user_input_language != "en":
results = web_search(**action.args)
else:
logger.info(f"Overwriting user query because the Agent suggested query had score: {state["proposed_action"]["args"]["query"]} - {score}")
results = web_search(**{"query": state["messages"][-1].content})
logger.info(f"Webpages - Results: {results}")
for result in results:
try:
webpage_results = visit_webpage_wiki(result)
webpage_result = " \n ".join(webpage_results)
# for webpage_result in webpage_results:
query_embeddings = sentence_transformer_model.encode_query(state["proposed_action"]["args"]["query"]).reshape(1, -1)
webpage_information_embeddings = sentence_transformer_model.encode_query(webpage_result).reshape(1, -1)
query_webpage_information_similarity_score = float(cosine_similarity(query_embeddings, webpage_information_embeddings)[0][0])
# logger.info(f"Webpage Information and Similarity Score: {result} - {webpage_result} - {query_webpage_information_similarity_score}")
if query_webpage_information_similarity_score > 0.65:
webpage_information_complete += webpage_result
webpage_information_complete += " \n "
webpage_information_complete += " \n "
if query_webpage_information_similarity_score > best_query_webpage_information_similarity_score:
best_query_webpage_information_similarity_score = query_webpage_information_similarity_score
best_webpage_information = webpage_result
webpage_results = visit_webpage_main(result)
webpage_result = " \n ".join(webpage_results)
# for webpage_result in webpage_results:
query_embeddings = sentence_transformer_model.encode_query(state["proposed_action"]["args"]["query"]).reshape(1, -1)
webpage_information_embeddings = sentence_transformer_model.encode_query(webpage_result).reshape(1, -1)
query_webpage_information_similarity_score = float(cosine_similarity(query_embeddings, webpage_information_embeddings)[0][0])
# logger.info(f"Webpage Information and Similarity Score: {result} - {webpage_result} - {query_webpage_information_similarity_score}")
if query_webpage_information_similarity_score > 0.65:
webpage_information_complete += webpage_result
webpage_information_complete += " \n "
webpage_information_complete += " \n "
if query_webpage_information_similarity_score > best_query_webpage_information_similarity_score:
best_query_webpage_information_similarity_score = query_webpage_information_similarity_score
best_webpage_information = webpage_result
except Exception as e:
logger.info(f"Tool Executor - Exception: {e}")
elif action.tool == "visit_webpage":
try:
webpage_results = visit_webpage(**action.args)
webpage_result = " \n ".join(webpage_results)
# for webpage_result in webpage_results:
query_embeddings = sentence_transformer_model.encode_query(state["proposed_action"]["args"]["query"]).reshape(1, -1)
webpage_information_embeddings = sentence_transformer_model.encode_query(webpage_result).reshape(1, -1)
query_webpage_information_similarity_score = float(cosine_similarity(query_embeddings, webpage_information_embeddings)[0][0])
# logger.info(f"Webpage Information and Similarity Score: {result} - {webpage_result} - {query_webpage_information_similarity_score}")
if query_webpage_information_similarity_score > 0.65:
webpage_information_complete += webpage_result
webpage_information_complete += " \n "
webpage_information_complete += " \n "
if query_webpage_information_similarity_score > best_query_webpage_information_similarity_score:
best_query_webpage_information_similarity_score = query_webpage_information_similarity_score
best_webpage_information = webpage_result
except:
pass
else:
result = "Unknown tool"
if webpage_information_complete == "" and best_query_webpage_information_similarity_score > 0.30:
webpage_information_complete = best_webpage_information
state["information"] = webpage_information_complete[:3000]
state["best_query_webpage_information_similarity_score"] = best_query_webpage_information_similarity_score
except:
state["information"] = ""
state["best_query_webpage_information_similarity_score"] = -1.0
# logger.info(f"Information: {state['information']}")
# logger.info(f"Information: {state['best_query_webpage_information_similarity_score']}")
return state
safe_workflow = StateGraph(AgentState)
# safe_workflow = StateGraph(dict)
safe_workflow.add_node("planner", planner_node)
safe_workflow.add_node("tool_executor", tool_executor)
safe_workflow.add_node("safety", safety_node)
# safe_workflow.add_node("judge", Judge)
# safe_workflow.set_entry_point("planner")
safe_workflow.add_edge(START, "planner")
safe_workflow.add_edge("planner", "tool_executor")
safe_workflow.add_edge("tool_executor", "safety")
# safe_workflow.add_edge("safety", "judge")
# safe_workflow.add_conditional_edges(
# "safety",
# route,
# {
# "allow": "tool_executor",
# "block": END,
# },
# )
# safe_workflow.add_edge("tool_executor", END)
safe_app = safe_workflow.compile()
def transcribe_speech(filepath):
if filepath is None:
return "No audio provided"
output = audio_pipe(
filepath,
max_new_tokens=256,
generate_kwargs={
"task": "transcribe",
"language": "greek",
}, # update with the language you've fine-tuned on
chunk_length_s=30,
batch_size=8,
)
return output["text"]
title = "Automatic Speech Recognition (ASR)"
description = """
Demo for automatic speech recognition in Greek. Demo uses [Sandiago21/whisper-large-v2-greek](https://huggingface.co/Sandiago21/whisper-large-v2-greek) checkpoint, which is based on OpenAI's
[Whisper](https://huggingface.co/openai/whisper-large-v2) model and is fine-tuned in Greek Audio dataset
")
"""
mic_transcribe = gr.Interface(
fn=transcribe_speech,
inputs=gr.Audio(sources="microphone", type="filepath"),
outputs=gr.Textbox(),
title=title,
description=description,
)
# greek_translation_pipe = pipeline("translation", model="Helsinki-NLP/opus-mt-tc-big-el-en")
# def translate_from_greek_english(text):
# return greek_translation_pipe(text)[0]["translation_text"]
# --------------------------
# Define user query function
# --------------------------
def answer_question(user_question):
"""
Send the user input to your LangGraph agent and return the response.
"""
result = safe_app.invoke({"messages": user_question})
agent_answer = result["output"]
return agent_answer
# return user_question
def answer_from_audio(audio):
text = transcribe_speech(audio)
# translated_text = translate_from_greek_english(text)
return answer_question(text)
# return text
# --------------------------
# Gradio UI
# --------------------------
with gr.Blocks() as demo:
# gr.TabbedInterface(
# [mic_transcribe, file_transcribe],
# ["Transcribe Microphone", "Transcribe Audio File"],
# )
gr.Markdown("# Ask the Main Agent")
user_input = gr.Textbox(
label="Your Question",
placeholder="Type any question here...",
lines=2
)
# 🎤 AUDIO INPUT (new)
audio_input = gr.Audio(
sources=["microphone"],
type="filepath",
label="Or speak your question"
)
answer_output = gr.Textbox(
label="Agent Response"
)
submit_text_btn = gr.Button("Ask (Text)")
submit_audio_btn = gr.Button("Ask (Voice)")
submit_text_btn.click(
fn=answer_question,
inputs=user_input,
outputs=answer_output
)
submit_audio_btn.click(
fn=answer_from_audio,
inputs=audio_input,
outputs=answer_output
)
demo.launch()