bronxie / main.py
teorebull's picture
Added my agent files
6d618b4
import base64
from typing import TypedDict, Annotated, Optional
from langchain_openai import ChatOpenAI
from langchain_core.messages import AnyMessage, SystemMessage, HumanMessage
from langgraph.graph.message import add_messages
from langgraph.graph import START, StateGraph
from langgraph.prebuilt import ToolNode, tools_condition
from moviepy import VideoFileClip
import speech_recognition as sr
import os, json
# --- State ---
class AgentState(TypedDict):
input_file: Optional[str]
messages: Annotated[list[AnyMessage], add_messages]
# --- Tools ---
import base64
import logging
from langchain_core.messages import HumanMessage
from langchain_openai import ChatOpenAI
# Create the vision model once
vision_llm = ChatOpenAI(model="gpt-4o")
# --- IMAGE TOOL ---
def extract_text_from_image(img_path: str) -> str:
"""
Extract text from an image using GPT-4o vision.
"""
try:
with open(img_path, "rb") as f:
image_base64 = base64.b64encode(f.read()).decode("utf-8")
message = HumanMessage(content=[
{"type": "text", "text": "Extract all the text from this image. Return only the text, no explanations."},
{"type": "image_url", "image_url": {"url": f"data:image/png;base64,{image_base64}"}},
])
response = vision_llm.invoke([message])
return response.content.strip()
except FileNotFoundError:
return f"Error: image file not found at {img_path}"
except Exception as e:
logging.error(f"Image OCR error: {e}")
return f"Error extracting text: {e}"
# --- VIDEO TOOL ---
def video_to_text(video_path: str) -> str:
"""
Extract spoken text from a video file.
"""
try:
clip = VideoFileClip(video_path)
audio_path = "temp_audio.wav"
clip.audio.write_audiofile(audio_path, logger=None)
recognizer = sr.Recognizer()
with sr.AudioFile(audio_path) as source:
audio_data = recognizer.record(source)
text = recognizer.recognize_google(audio_data)
return text
except Exception as e:
return f"Error extracting text from video: {e}"
def divide(a: int, b: int) -> float:
"""Divides two numbers and returns the result."""
return a / b
def extract_text_from_file(file_path: str) -> str:
"""
Reads a plain text file and returns its contents.
Args:
file_path (str): Path to the .txt file.
Returns:
str: The text inside the file, or an error message if not found.
"""
try:
with open(file_path, "r", encoding="utf-8") as f:
return f.read().strip()
except FileNotFoundError:
return f"Error: text file not found at {file_path}"
except Exception as e:
return f"Error reading text file: {e}"
tools = [extract_text_from_image, divide, video_to_text, extract_text_from_file]
# --- LLM with tools ---
llm = ChatOpenAI(model="gpt-4o")
llm_with_tools = llm.bind_tools(tools, parallel_tool_calls=False)
# --- call_llm Node ---
def call_llm(state: AgentState):
sys_msg = SystemMessage(
content="""You are an assistant that must always respond with the most concise possible answer.
- Use exactly one word whenever possible.
- In case of numeric answers, don't use the word, output the number instead.
- Do not explain your reasoning.
- Do not add punctuation, commentary, symbols of any kind or extra words.
- If a one‑word answer is not possible, use the shortest phrase that conveys the answer.
"""
)
return {
"messages": [llm_with_tools.invoke([sys_msg] + state["messages"])],
"input_file": state["input_file"]
}
# --- Build Graph ---
graph = StateGraph(AgentState)
graph.add_node("call_llm", call_llm)
graph.add_node("tools", ToolNode(tools))
graph.add_edge(START, "call_llm")
graph.add_conditional_edges("call_llm", tools_condition)
graph.add_edge("tools", "call_llm")
agent = graph.compile()
# --- Run Example ---
messages = [HumanMessage(content="Divide 10 by 2")]
result = agent.invoke({"messages": messages, "input_file": None})
# Write down the answers to the json file
with open("./test_metadata.jsonl", "r", encoding="utf-8") as f:
test_data = [json.loads(line) for line in f if line.strip()]
# Define the base path where files are located
base_path = "./files"
# Loop through all questions and fill in the agent's answers
for item in test_data:
q = item["Question"]
# Wrap the question as a HumanMessage
messages = [HumanMessage(content=q)]
# Run through the agent
result = agent.invoke({"messages": messages, "input_file": None})
# Take the agent's last message as the answer
agent_answer = result["messages"][-1].content
# Replace the placeholder "Final answer" with the agent's answer
item["Final answer"] = agent_answer
# Save the updated data into a new file
with open("./test_with_agent_answers.jsonl", "w", encoding="utf-8") as f:
for item in test_data:
f.write(json.dumps(item, ensure_ascii=False) + "\n")
print("Done! Agent answers written to test_with_agent_answers.jsonl")