Spaces:
Sleeping
Sleeping
| from langchain_core.messages import HumanMessage | |
| from langchain_core.tools import tool | |
| from langchain_community.tools import ( | |
| DuckDuckGoSearchRun, | |
| WikipediaQueryRun, | |
| ArxivQueryRun | |
| ) | |
| from langchain_google_community.search import ( | |
| GoogleSearchAPIWrapper, | |
| GoogleSearchRun | |
| ) | |
| from langchain_community.utilities import WikipediaAPIWrapper, ArxivAPIWrapper | |
| from langchain_openai import ChatOpenAI | |
| import base64 | |
| import pandas as pd | |
| import os | |
| import os | |
| from huggingface_hub import InferenceClient | |
| import json | |
| import requests | |
| from youtube_transcript_api import YouTubeTranscriptApi | |
| from ultralytics import YOLO | |
| import cv2 | |
| import re | |
| from dotenv import load_dotenv | |
| load_dotenv() | |
| HF_TOKEN = os.getenv("HF_TOKEN") | |
| GOOGLE_API_KEY = os.getenv("GOOGLE_API_KEY") | |
| GOOGLE_CSE_ID = os.getenv("GOOGLE_CSE_ID") | |
| client = InferenceClient( | |
| provider="hf-inference", | |
| api_key=HF_TOKEN, | |
| ) | |
| llm = ChatOpenAI(model="o4-mini") | |
| vision_llm = ChatOpenAI(model="gpt-4o") | |
| def analyze_image(img_path: str, question: str) -> str: | |
| """Analyze an image and answer a question about it.""" | |
| try: | |
| with open(img_path, "rb") as image_file: | |
| image_bytes = image_file.read() | |
| image_base64 = base64.b64encode(image_bytes).decode("utf-8") | |
| message = [ | |
| HumanMessage( | |
| content=[ | |
| {"type": "text", "text": question}, | |
| { | |
| "type": "image_url", | |
| "image_url": {"url": f"data:image/jpeg;base64,{image_base64}"} | |
| } | |
| ] | |
| ) | |
| ] | |
| response = vision_llm.invoke(message) | |
| return response.content | |
| except Exception as e: | |
| return f"Error analyzing image: {str(e)}" | |
| def read_excel_file(file_path: str, question: str) -> str: | |
| """Read and analyze an Excel file to answer a question.""" | |
| try: | |
| # Read Excel file | |
| df = pd.read_excel(file_path) | |
| df_dict = df.to_dict(orient='records') | |
| info = json.dumps(df_dict) | |
| return info | |
| except Exception as e: | |
| return f"Error reading Excel file: {str(e)}" | |
| def read_python_file(file_path: str, question: str) -> str: | |
| """Read and analyze a Python file to answer a question.""" | |
| try: | |
| with open(file_path, 'r', encoding='utf-8') as f: | |
| code_content = f.read() | |
| prompt = f"""Here is Python code from a file: | |
| ```python | |
| {code_content} | |
| ``` | |
| Question: {question} | |
| Please analyze the code and answer the question.""" | |
| response = llm.invoke([HumanMessage(content=prompt)]) | |
| return response.content | |
| except Exception as e: | |
| return f"Error reading Python file: {str(e)}" | |
| def transcribe_audio(file_path: str, question: str) -> str: | |
| """Transcribe audio file.""" | |
| try: | |
| headers = { | |
| "Authorization": f"Bearer {HF_TOKEN}", | |
| "Content-Type": "audio/mpeg" # Add this line for MP3 files | |
| } | |
| API_URL = "https://router.huggingface.co/hf-inference/models/openai/whisper-large-v3" | |
| def query(filename): | |
| with open(filename, "rb") as f: | |
| data = f.read() | |
| response = requests.request("POST", API_URL, headers=headers, data=data) | |
| return json.loads(response.content.decode("utf-8")) | |
| data = query(file_path) | |
| return data | |
| except Exception as e: | |
| return f"Error transcribing audio: {str(e)}" | |
| # Simple math tools | |
| def add(a: float, b: float) -> float: | |
| """Add two numbers.""" | |
| return a + b | |
| def sum_list(numbers: list) -> float: | |
| """Sum a list of numbers.""" | |
| return sum(numbers) | |
| # Simple data tools | |
| def extract_values(data: str, column: str) -> list: | |
| """Extract all values from a column in JSON data.""" | |
| parsed = json.loads(data) | |
| values = [] | |
| for row in parsed: | |
| for key, value in row.items(): | |
| if column.lower() in key.lower(): | |
| try: | |
| values.append(float(value)) | |
| except: | |
| pass | |
| return values | |
| def filter_rows(data: str, exclude_words: list) -> str: | |
| """Remove rows containing any of the exclude words.""" | |
| parsed = json.loads(data) | |
| filtered = [] | |
| for row in parsed: | |
| row_text = " ".join(str(v).lower() for v in row.values()) | |
| if not any(word.lower() in row_text for word in exclude_words): | |
| filtered.append(row) | |
| return json.dumps(filtered) | |
| def read_excel(file_path: str) -> str: | |
| """Read any Excel file and return as JSON.""" | |
| df = pd.read_excel(file_path) | |
| return json.dumps(df.to_dict(orient='records')) | |
| def object_detection(video_url: str) -> str: | |
| """Analyze objects and visual content in a YouTube video.""" | |
| try: | |
| model = YOLO("yolo11n.pt") # Load an official Detect model | |
| results = model.track(video_url) | |
| # Track objects across frames | |
| frame_objects = [] | |
| for i, result in enumerate(results): | |
| if result.boxes is not None: | |
| objects_in_frame = [] | |
| for j in range(len(result.boxes)): | |
| class_name = result.names[int(result.boxes.cls[j].item())] | |
| confidence = float(result.boxes.conf[j].item()) | |
| if confidence > 0.5: # Only high confidence detections | |
| objects_in_frame.append(class_name) | |
| frame_objects.append({ | |
| "frame": i, | |
| "objects": objects_in_frame, | |
| "unique_objects": list(set(objects_in_frame)) | |
| }) | |
| return json.dumps(frame_objects, indent=2) | |
| except Exception as e: | |
| return f"Error analyzing video: {str(e)}" | |
| def get_youtube_transcript(video_url: str) -> str: | |
| """Get transcript from a YouTube video.""" | |
| try: | |
| # Extract video ID | |
| video_id_match = re.search(r'(?:v=|\/)([0-9A-Za-z_-]{11}).*', video_url) | |
| if not video_id_match: | |
| return "Error: Could not extract video ID" | |
| video_id = video_id_match.group(1) | |
| transcript = YouTubeTranscriptApi.get_transcript(video_id) | |
| # Format with timestamps | |
| formatted_transcript = [] | |
| for entry in transcript: | |
| formatted_transcript.append({ | |
| "start": entry['start'], | |
| "duration": entry['duration'], | |
| "text": entry['text'] | |
| }) | |
| return json.dumps(formatted_transcript, indent=2) | |
| except Exception as e: | |
| return f"Error getting transcript: {str(e)}" | |
| # @tool | |
| def analyze_video_content(video_url: str, question: str = "", max_vision_frames: int = 1) -> str: | |
| """Analyze video content using YOLO for object detection and vision LLM for detailed analysis.""" | |
| try: | |
| model = YOLO("yolo11n.pt") | |
| results = model.track(video_url) | |
| # Step 1: YOLO analysis for all frames | |
| frame_objects = [] | |
| frames_with_content = [] | |
| for i, result in enumerate(results): | |
| frame_data = { | |
| "frame": i, | |
| "objects": [], | |
| "unique_objects": [], | |
| "object_counts": {} | |
| } | |
| if result.boxes is not None: | |
| objects_in_frame = [] | |
| for j in range(len(result.boxes)): | |
| class_name = result.names[int(result.boxes.cls[j].item())] | |
| confidence = float(result.boxes.conf[j].item()) | |
| if confidence > 0.5: | |
| objects_in_frame.append(class_name) | |
| # Count objects | |
| for obj in objects_in_frame: | |
| frame_data["object_counts"][obj] = frame_data["object_counts"].get(obj, 0) + 1 | |
| frame_data["objects"] = objects_in_frame | |
| frame_data["unique_objects"] = list(set(objects_in_frame)) | |
| # Store frame for potential vision analysis | |
| if objects_in_frame: # Only store frames with detected objects | |
| frames_with_content.append({ | |
| "frame_index": i, | |
| "objects": objects_in_frame, | |
| "object_counts": frame_data["object_counts"], | |
| "total_objects": len(objects_in_frame), | |
| "image": result.orig_img | |
| }) | |
| frame_objects.append(frame_data) | |
| # Step 2: If there's a specific question, use vision LLM on selected frames | |
| detailed_analyses = [] | |
| if question.strip(): | |
| # Sort frames by total objects and select top frames | |
| frames_with_content.sort(key=lambda x: x["total_objects"], reverse=True) | |
| selected_frames = frames_with_content[:max_vision_frames] | |
| for frame_data in selected_frames: | |
| try: | |
| # Encode frame directly to base64 | |
| _, buffer = cv2.imencode('.jpg', frame_data["image"]) | |
| image_bytes = buffer.tobytes() | |
| image_base64 = base64.b64encode(image_bytes).decode("utf-8") | |
| message = [ | |
| HumanMessage( | |
| content=[ | |
| {"type": "text", "text": question}, | |
| { | |
| "type": "image_url", | |
| "image_url": {"url": f"data:image/jpeg;base64,{image_base64}"} | |
| } | |
| ] | |
| ) | |
| ] | |
| vision_response = vision_llm.invoke(message) | |
| detailed_analyses.append({ | |
| "frame_index": frame_data["frame_index"], | |
| "yolo_objects": frame_data["objects"], | |
| "yolo_counts": frame_data["object_counts"], | |
| "vision_analysis": vision_response.content | |
| }) | |
| except Exception as vision_error: | |
| detailed_analyses.append({ | |
| "frame_index": frame_data["frame_index"], | |
| "yolo_objects": frame_data["objects"], | |
| "yolo_counts": frame_data["object_counts"], | |
| "vision_analysis": f"Vision analysis failed: {str(vision_error)}" | |
| }) | |
| # Combine results | |
| result_data = { | |
| "video_url": video_url, | |
| "question": question, | |
| "total_frames": len(frame_objects), | |
| "yolo_analysis": frame_objects, | |
| "frames_with_objects": len(frames_with_content) | |
| } | |
| if detailed_analyses: | |
| result_data["detailed_vision_analysis"] = detailed_analyses | |
| result_data["vision_frames_analyzed"] = len(detailed_analyses) | |
| return json.dumps(result_data, indent=2) | |
| except Exception as e: | |
| return f"Error analyzing video content: {str(e)}" | |
| def google_search(): | |
| """Google search tool""" | |
| api_wrapper = GoogleSearchAPIWrapper( | |
| google_api_key=GOOGLE_API_KEY, | |
| google_cse_id=GOOGLE_CSE_ID, | |
| k=10, # Number of results | |
| siterestrict=False # Site restrictions | |
| ) | |
| google_search = GoogleSearchRun(api_wrapper=api_wrapper) | |
| return google_search | |
| def wiki_search(): | |
| """Google search tool""" | |
| api_wrapper = WikipediaAPIWrapper() | |
| search = WikipediaQueryRun(api_wrapper=api_wrapper) | |
| return search | |
| def arxiv_search(): | |
| """Google search tool""" | |
| api_wrapper = ArxivAPIWrapper() | |
| search = ArxivQueryRun(api_wrapper=api_wrapper) | |
| return search | |
| def general_tools(): | |
| tools = [ | |
| analyze_image, | |
| read_python_file, | |
| transcribe_audio, | |
| ] | |
| return tools | |
| def analyze_video_tools(): | |
| tools = [object_detection, analyze_video_content] | |
| return tools | |
| def youtube_transcript_tools(): | |
| tools = [get_youtube_transcript] | |
| return tools | |
| def file_agent_tools(): | |
| tools = [read_excel] | |
| return tools | |
| def math_agent_tools(): | |
| tools = [add, sum_list] | |
| return tools | |
| def data_agent_tools(): | |
| tools = [extract_values, filter_rows] | |
| return tools | |
| def search_agen_tools(): | |
| tools = [ | |
| google_search, | |
| ArxivQueryRun(api_wrapper=ArxivAPIWrapper()), | |
| WikipediaQueryRun(api_wrapper=WikipediaAPIWrapper()) | |
| ] | |
| return tools |