from langchain_core.messages import HumanMessage from langchain_core.tools import tool from langchain_community.tools import ( DuckDuckGoSearchRun, WikipediaQueryRun, ArxivQueryRun ) from langchain_google_community.search import ( GoogleSearchAPIWrapper, GoogleSearchRun ) from langchain_community.utilities import WikipediaAPIWrapper, ArxivAPIWrapper from langchain_openai import ChatOpenAI import base64 import pandas as pd import os import os from huggingface_hub import InferenceClient import json import requests from youtube_transcript_api import YouTubeTranscriptApi from ultralytics import YOLO import cv2 import re from dotenv import load_dotenv load_dotenv() HF_TOKEN = os.getenv("HF_TOKEN") GOOGLE_API_KEY = os.getenv("GOOGLE_API_KEY") GOOGLE_CSE_ID = os.getenv("GOOGLE_CSE_ID") client = InferenceClient( provider="hf-inference", api_key=HF_TOKEN, ) llm = ChatOpenAI(model="o4-mini") vision_llm = ChatOpenAI(model="gpt-4o") @tool def analyze_image(img_path: str, question: str) -> str: """Analyze an image and answer a question about it.""" try: with open(img_path, "rb") as image_file: image_bytes = image_file.read() image_base64 = base64.b64encode(image_bytes).decode("utf-8") message = [ HumanMessage( content=[ {"type": "text", "text": question}, { "type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{image_base64}"} } ] ) ] response = vision_llm.invoke(message) return response.content except Exception as e: return f"Error analyzing image: {str(e)}" @tool def read_excel_file(file_path: str, question: str) -> str: """Read and analyze an Excel file to answer a question.""" try: # Read Excel file df = pd.read_excel(file_path) df_dict = df.to_dict(orient='records') info = json.dumps(df_dict) return info except Exception as e: return f"Error reading Excel file: {str(e)}" @tool def read_python_file(file_path: str, question: str) -> str: """Read and analyze a Python file to answer a question.""" try: with open(file_path, 'r', encoding='utf-8') as f: code_content = f.read() prompt = f"""Here is Python code from a file: ```python {code_content} ``` Question: {question} Please analyze the code and answer the question.""" response = llm.invoke([HumanMessage(content=prompt)]) return response.content except Exception as e: return f"Error reading Python file: {str(e)}" @tool def transcribe_audio(file_path: str, question: str) -> str: """Transcribe audio file.""" try: headers = { "Authorization": f"Bearer {HF_TOKEN}", "Content-Type": "audio/mpeg" # Add this line for MP3 files } API_URL = "https://router.huggingface.co/hf-inference/models/openai/whisper-large-v3" def query(filename): with open(filename, "rb") as f: data = f.read() response = requests.request("POST", API_URL, headers=headers, data=data) return json.loads(response.content.decode("utf-8")) data = query(file_path) return data except Exception as e: return f"Error transcribing audio: {str(e)}" # Simple math tools @tool def add(a: float, b: float) -> float: """Add two numbers.""" return a + b @tool def sum_list(numbers: list) -> float: """Sum a list of numbers.""" return sum(numbers) # Simple data tools @tool def extract_values(data: str, column: str) -> list: """Extract all values from a column in JSON data.""" parsed = json.loads(data) values = [] for row in parsed: for key, value in row.items(): if column.lower() in key.lower(): try: values.append(float(value)) except: pass return values @tool def filter_rows(data: str, exclude_words: list) -> str: """Remove rows containing any of the exclude words.""" parsed = json.loads(data) filtered = [] for row in parsed: row_text = " ".join(str(v).lower() for v in row.values()) if not any(word.lower() in row_text for word in exclude_words): filtered.append(row) return json.dumps(filtered) @tool def read_excel(file_path: str) -> str: """Read any Excel file and return as JSON.""" df = pd.read_excel(file_path) return json.dumps(df.to_dict(orient='records')) @tool def object_detection(video_url: str) -> str: """Analyze objects and visual content in a YouTube video.""" try: model = YOLO("yolo11n.pt") # Load an official Detect model results = model.track(video_url) # Track objects across frames frame_objects = [] for i, result in enumerate(results): if result.boxes is not None: objects_in_frame = [] for j in range(len(result.boxes)): class_name = result.names[int(result.boxes.cls[j].item())] confidence = float(result.boxes.conf[j].item()) if confidence > 0.5: # Only high confidence detections objects_in_frame.append(class_name) frame_objects.append({ "frame": i, "objects": objects_in_frame, "unique_objects": list(set(objects_in_frame)) }) return json.dumps(frame_objects, indent=2) except Exception as e: return f"Error analyzing video: {str(e)}" @tool def get_youtube_transcript(video_url: str) -> str: """Get transcript from a YouTube video.""" try: # Extract video ID video_id_match = re.search(r'(?:v=|\/)([0-9A-Za-z_-]{11}).*', video_url) if not video_id_match: return "Error: Could not extract video ID" video_id = video_id_match.group(1) transcript = YouTubeTranscriptApi.get_transcript(video_id) # Format with timestamps formatted_transcript = [] for entry in transcript: formatted_transcript.append({ "start": entry['start'], "duration": entry['duration'], "text": entry['text'] }) return json.dumps(formatted_transcript, indent=2) except Exception as e: return f"Error getting transcript: {str(e)}" # @tool def analyze_video_content(video_url: str, question: str = "", max_vision_frames: int = 1) -> str: """Analyze video content using YOLO for object detection and vision LLM for detailed analysis.""" try: model = YOLO("yolo11n.pt") results = model.track(video_url) # Step 1: YOLO analysis for all frames frame_objects = [] frames_with_content = [] for i, result in enumerate(results): frame_data = { "frame": i, "objects": [], "unique_objects": [], "object_counts": {} } if result.boxes is not None: objects_in_frame = [] for j in range(len(result.boxes)): class_name = result.names[int(result.boxes.cls[j].item())] confidence = float(result.boxes.conf[j].item()) if confidence > 0.5: objects_in_frame.append(class_name) # Count objects for obj in objects_in_frame: frame_data["object_counts"][obj] = frame_data["object_counts"].get(obj, 0) + 1 frame_data["objects"] = objects_in_frame frame_data["unique_objects"] = list(set(objects_in_frame)) # Store frame for potential vision analysis if objects_in_frame: # Only store frames with detected objects frames_with_content.append({ "frame_index": i, "objects": objects_in_frame, "object_counts": frame_data["object_counts"], "total_objects": len(objects_in_frame), "image": result.orig_img }) frame_objects.append(frame_data) # Step 2: If there's a specific question, use vision LLM on selected frames detailed_analyses = [] if question.strip(): # Sort frames by total objects and select top frames frames_with_content.sort(key=lambda x: x["total_objects"], reverse=True) selected_frames = frames_with_content[:max_vision_frames] for frame_data in selected_frames: try: # Encode frame directly to base64 _, buffer = cv2.imencode('.jpg', frame_data["image"]) image_bytes = buffer.tobytes() image_base64 = base64.b64encode(image_bytes).decode("utf-8") message = [ HumanMessage( content=[ {"type": "text", "text": question}, { "type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{image_base64}"} } ] ) ] vision_response = vision_llm.invoke(message) detailed_analyses.append({ "frame_index": frame_data["frame_index"], "yolo_objects": frame_data["objects"], "yolo_counts": frame_data["object_counts"], "vision_analysis": vision_response.content }) except Exception as vision_error: detailed_analyses.append({ "frame_index": frame_data["frame_index"], "yolo_objects": frame_data["objects"], "yolo_counts": frame_data["object_counts"], "vision_analysis": f"Vision analysis failed: {str(vision_error)}" }) # Combine results result_data = { "video_url": video_url, "question": question, "total_frames": len(frame_objects), "yolo_analysis": frame_objects, "frames_with_objects": len(frames_with_content) } if detailed_analyses: result_data["detailed_vision_analysis"] = detailed_analyses result_data["vision_frames_analyzed"] = len(detailed_analyses) return json.dumps(result_data, indent=2) except Exception as e: return f"Error analyzing video content: {str(e)}" @tool def google_search(): """Google search tool""" api_wrapper = GoogleSearchAPIWrapper( google_api_key=GOOGLE_API_KEY, google_cse_id=GOOGLE_CSE_ID, k=10, # Number of results siterestrict=False # Site restrictions ) google_search = GoogleSearchRun(api_wrapper=api_wrapper) return google_search @tool def wiki_search(): """Google search tool""" api_wrapper = WikipediaAPIWrapper() search = WikipediaQueryRun(api_wrapper=api_wrapper) return search @tool def arxiv_search(): """Google search tool""" api_wrapper = ArxivAPIWrapper() search = ArxivQueryRun(api_wrapper=api_wrapper) return search def general_tools(): tools = [ analyze_image, read_python_file, transcribe_audio, ] return tools def analyze_video_tools(): tools = [object_detection, analyze_video_content] return tools def youtube_transcript_tools(): tools = [get_youtube_transcript] return tools def file_agent_tools(): tools = [read_excel] return tools def math_agent_tools(): tools = [add, sum_list] return tools def data_agent_tools(): tools = [extract_values, filter_rows] return tools def search_agen_tools(): tools = [ google_search, ArxivQueryRun(api_wrapper=ArxivAPIWrapper()), WikipediaQueryRun(api_wrapper=WikipediaAPIWrapper()) ] return tools