thivy's picture
feat: :sparkles: create search supervisor
f6ff4be
from langchain_core.messages import HumanMessage
from langchain_core.tools import tool
from langchain_community.tools import (
DuckDuckGoSearchRun,
WikipediaQueryRun,
ArxivQueryRun
)
from langchain_google_community.search import (
GoogleSearchAPIWrapper,
GoogleSearchRun
)
from langchain_community.utilities import WikipediaAPIWrapper, ArxivAPIWrapper
from langchain_openai import ChatOpenAI
import base64
import pandas as pd
import os
import os
from huggingface_hub import InferenceClient
import json
import requests
from youtube_transcript_api import YouTubeTranscriptApi
from ultralytics import YOLO
import cv2
import re
from dotenv import load_dotenv
load_dotenv()
HF_TOKEN = os.getenv("HF_TOKEN")
GOOGLE_API_KEY = os.getenv("GOOGLE_API_KEY")
GOOGLE_CSE_ID = os.getenv("GOOGLE_CSE_ID")
client = InferenceClient(
provider="hf-inference",
api_key=HF_TOKEN,
)
llm = ChatOpenAI(model="o4-mini")
vision_llm = ChatOpenAI(model="gpt-4o")
@tool
def analyze_image(img_path: str, question: str) -> str:
"""Analyze an image and answer a question about it."""
try:
with open(img_path, "rb") as image_file:
image_bytes = image_file.read()
image_base64 = base64.b64encode(image_bytes).decode("utf-8")
message = [
HumanMessage(
content=[
{"type": "text", "text": question},
{
"type": "image_url",
"image_url": {"url": f"data:image/jpeg;base64,{image_base64}"}
}
]
)
]
response = vision_llm.invoke(message)
return response.content
except Exception as e:
return f"Error analyzing image: {str(e)}"
@tool
def read_excel_file(file_path: str, question: str) -> str:
"""Read and analyze an Excel file to answer a question."""
try:
# Read Excel file
df = pd.read_excel(file_path)
df_dict = df.to_dict(orient='records')
info = json.dumps(df_dict)
return info
except Exception as e:
return f"Error reading Excel file: {str(e)}"
@tool
def read_python_file(file_path: str, question: str) -> str:
"""Read and analyze a Python file to answer a question."""
try:
with open(file_path, 'r', encoding='utf-8') as f:
code_content = f.read()
prompt = f"""Here is Python code from a file:
```python
{code_content}
```
Question: {question}
Please analyze the code and answer the question."""
response = llm.invoke([HumanMessage(content=prompt)])
return response.content
except Exception as e:
return f"Error reading Python file: {str(e)}"
@tool
def transcribe_audio(file_path: str, question: str) -> str:
"""Transcribe audio file."""
try:
headers = {
"Authorization": f"Bearer {HF_TOKEN}",
"Content-Type": "audio/mpeg" # Add this line for MP3 files
}
API_URL = "https://router.huggingface.co/hf-inference/models/openai/whisper-large-v3"
def query(filename):
with open(filename, "rb") as f:
data = f.read()
response = requests.request("POST", API_URL, headers=headers, data=data)
return json.loads(response.content.decode("utf-8"))
data = query(file_path)
return data
except Exception as e:
return f"Error transcribing audio: {str(e)}"
# Simple math tools
@tool
def add(a: float, b: float) -> float:
"""Add two numbers."""
return a + b
@tool
def sum_list(numbers: list) -> float:
"""Sum a list of numbers."""
return sum(numbers)
# Simple data tools
@tool
def extract_values(data: str, column: str) -> list:
"""Extract all values from a column in JSON data."""
parsed = json.loads(data)
values = []
for row in parsed:
for key, value in row.items():
if column.lower() in key.lower():
try:
values.append(float(value))
except:
pass
return values
@tool
def filter_rows(data: str, exclude_words: list) -> str:
"""Remove rows containing any of the exclude words."""
parsed = json.loads(data)
filtered = []
for row in parsed:
row_text = " ".join(str(v).lower() for v in row.values())
if not any(word.lower() in row_text for word in exclude_words):
filtered.append(row)
return json.dumps(filtered)
@tool
def read_excel(file_path: str) -> str:
"""Read any Excel file and return as JSON."""
df = pd.read_excel(file_path)
return json.dumps(df.to_dict(orient='records'))
@tool
def object_detection(video_url: str) -> str:
"""Analyze objects and visual content in a YouTube video."""
try:
model = YOLO("yolo11n.pt") # Load an official Detect model
results = model.track(video_url)
# Track objects across frames
frame_objects = []
for i, result in enumerate(results):
if result.boxes is not None:
objects_in_frame = []
for j in range(len(result.boxes)):
class_name = result.names[int(result.boxes.cls[j].item())]
confidence = float(result.boxes.conf[j].item())
if confidence > 0.5: # Only high confidence detections
objects_in_frame.append(class_name)
frame_objects.append({
"frame": i,
"objects": objects_in_frame,
"unique_objects": list(set(objects_in_frame))
})
return json.dumps(frame_objects, indent=2)
except Exception as e:
return f"Error analyzing video: {str(e)}"
@tool
def get_youtube_transcript(video_url: str) -> str:
"""Get transcript from a YouTube video."""
try:
# Extract video ID
video_id_match = re.search(r'(?:v=|\/)([0-9A-Za-z_-]{11}).*', video_url)
if not video_id_match:
return "Error: Could not extract video ID"
video_id = video_id_match.group(1)
transcript = YouTubeTranscriptApi.get_transcript(video_id)
# Format with timestamps
formatted_transcript = []
for entry in transcript:
formatted_transcript.append({
"start": entry['start'],
"duration": entry['duration'],
"text": entry['text']
})
return json.dumps(formatted_transcript, indent=2)
except Exception as e:
return f"Error getting transcript: {str(e)}"
# @tool
def analyze_video_content(video_url: str, question: str = "", max_vision_frames: int = 1) -> str:
"""Analyze video content using YOLO for object detection and vision LLM for detailed analysis."""
try:
model = YOLO("yolo11n.pt")
results = model.track(video_url)
# Step 1: YOLO analysis for all frames
frame_objects = []
frames_with_content = []
for i, result in enumerate(results):
frame_data = {
"frame": i,
"objects": [],
"unique_objects": [],
"object_counts": {}
}
if result.boxes is not None:
objects_in_frame = []
for j in range(len(result.boxes)):
class_name = result.names[int(result.boxes.cls[j].item())]
confidence = float(result.boxes.conf[j].item())
if confidence > 0.5:
objects_in_frame.append(class_name)
# Count objects
for obj in objects_in_frame:
frame_data["object_counts"][obj] = frame_data["object_counts"].get(obj, 0) + 1
frame_data["objects"] = objects_in_frame
frame_data["unique_objects"] = list(set(objects_in_frame))
# Store frame for potential vision analysis
if objects_in_frame: # Only store frames with detected objects
frames_with_content.append({
"frame_index": i,
"objects": objects_in_frame,
"object_counts": frame_data["object_counts"],
"total_objects": len(objects_in_frame),
"image": result.orig_img
})
frame_objects.append(frame_data)
# Step 2: If there's a specific question, use vision LLM on selected frames
detailed_analyses = []
if question.strip():
# Sort frames by total objects and select top frames
frames_with_content.sort(key=lambda x: x["total_objects"], reverse=True)
selected_frames = frames_with_content[:max_vision_frames]
for frame_data in selected_frames:
try:
# Encode frame directly to base64
_, buffer = cv2.imencode('.jpg', frame_data["image"])
image_bytes = buffer.tobytes()
image_base64 = base64.b64encode(image_bytes).decode("utf-8")
message = [
HumanMessage(
content=[
{"type": "text", "text": question},
{
"type": "image_url",
"image_url": {"url": f"data:image/jpeg;base64,{image_base64}"}
}
]
)
]
vision_response = vision_llm.invoke(message)
detailed_analyses.append({
"frame_index": frame_data["frame_index"],
"yolo_objects": frame_data["objects"],
"yolo_counts": frame_data["object_counts"],
"vision_analysis": vision_response.content
})
except Exception as vision_error:
detailed_analyses.append({
"frame_index": frame_data["frame_index"],
"yolo_objects": frame_data["objects"],
"yolo_counts": frame_data["object_counts"],
"vision_analysis": f"Vision analysis failed: {str(vision_error)}"
})
# Combine results
result_data = {
"video_url": video_url,
"question": question,
"total_frames": len(frame_objects),
"yolo_analysis": frame_objects,
"frames_with_objects": len(frames_with_content)
}
if detailed_analyses:
result_data["detailed_vision_analysis"] = detailed_analyses
result_data["vision_frames_analyzed"] = len(detailed_analyses)
return json.dumps(result_data, indent=2)
except Exception as e:
return f"Error analyzing video content: {str(e)}"
@tool
def google_search():
"""Google search tool"""
api_wrapper = GoogleSearchAPIWrapper(
google_api_key=GOOGLE_API_KEY,
google_cse_id=GOOGLE_CSE_ID,
k=10, # Number of results
siterestrict=False # Site restrictions
)
google_search = GoogleSearchRun(api_wrapper=api_wrapper)
return google_search
@tool
def wiki_search():
"""Google search tool"""
api_wrapper = WikipediaAPIWrapper()
search = WikipediaQueryRun(api_wrapper=api_wrapper)
return search
@tool
def arxiv_search():
"""Google search tool"""
api_wrapper = ArxivAPIWrapper()
search = ArxivQueryRun(api_wrapper=api_wrapper)
return search
def general_tools():
tools = [
analyze_image,
read_python_file,
transcribe_audio,
]
return tools
def analyze_video_tools():
tools = [object_detection, analyze_video_content]
return tools
def youtube_transcript_tools():
tools = [get_youtube_transcript]
return tools
def file_agent_tools():
tools = [read_excel]
return tools
def math_agent_tools():
tools = [add, sum_list]
return tools
def data_agent_tools():
tools = [extract_values, filter_rows]
return tools
def search_agen_tools():
tools = [
google_search,
ArxivQueryRun(api_wrapper=ArxivAPIWrapper()),
WikipediaQueryRun(api_wrapper=WikipediaAPIWrapper())
]
return tools