| import base64 |
| from typing import TypedDict, Annotated, Optional |
| from langchain_openai import ChatOpenAI |
| from langchain_core.messages import AnyMessage, SystemMessage, HumanMessage |
| from langgraph.graph.message import add_messages |
| from langgraph.graph import START, StateGraph |
| from langgraph.prebuilt import ToolNode, tools_condition |
| from moviepy import VideoFileClip |
| import speech_recognition as sr |
| import os, json |
|
|
| |
| class AgentState(TypedDict): |
| input_file: Optional[str] |
| messages: Annotated[list[AnyMessage], add_messages] |
|
|
| |
| import base64 |
| import logging |
| from langchain_core.messages import HumanMessage |
| from langchain_openai import ChatOpenAI |
|
|
| |
| vision_llm = ChatOpenAI(model="gpt-4o") |
|
|
| |
| def extract_text_from_image(img_path: str) -> str: |
| """ |
| Extract text from an image using GPT-4o vision. |
| """ |
| try: |
| with open(img_path, "rb") as f: |
| image_base64 = base64.b64encode(f.read()).decode("utf-8") |
|
|
| message = HumanMessage(content=[ |
| {"type": "text", "text": "Extract all the text from this image. Return only the text, no explanations."}, |
| {"type": "image_url", "image_url": {"url": f"data:image/png;base64,{image_base64}"}}, |
| ]) |
|
|
| response = vision_llm.invoke([message]) |
| return response.content.strip() |
|
|
| except FileNotFoundError: |
| return f"Error: image file not found at {img_path}" |
| except Exception as e: |
| logging.error(f"Image OCR error: {e}") |
| return f"Error extracting text: {e}" |
|
|
| |
| def video_to_text(video_path: str) -> str: |
| """ |
| Extract spoken text from a video file. |
| """ |
| try: |
| clip = VideoFileClip(video_path) |
| audio_path = "temp_audio.wav" |
| clip.audio.write_audiofile(audio_path, logger=None) |
|
|
| recognizer = sr.Recognizer() |
| with sr.AudioFile(audio_path) as source: |
| audio_data = recognizer.record(source) |
| text = recognizer.recognize_google(audio_data) |
|
|
| return text |
| except Exception as e: |
| return f"Error extracting text from video: {e}" |
| def divide(a: int, b: int) -> float: |
| """Divides two numbers and returns the result.""" |
| return a / b |
| def extract_text_from_file(file_path: str) -> str: |
| """ |
| Reads a plain text file and returns its contents. |
| |
| Args: |
| file_path (str): Path to the .txt file. |
| |
| Returns: |
| str: The text inside the file, or an error message if not found. |
| """ |
| try: |
| with open(file_path, "r", encoding="utf-8") as f: |
| return f.read().strip() |
| except FileNotFoundError: |
| return f"Error: text file not found at {file_path}" |
| except Exception as e: |
| return f"Error reading text file: {e}" |
|
|
|
|
| tools = [extract_text_from_image, divide, video_to_text, extract_text_from_file] |
|
|
| |
| llm = ChatOpenAI(model="gpt-4o") |
| llm_with_tools = llm.bind_tools(tools, parallel_tool_calls=False) |
|
|
| |
| def call_llm(state: AgentState): |
| sys_msg = SystemMessage( |
| content="""You are an assistant that must always respond with the most concise possible answer. |
| - Use exactly one word whenever possible. |
| - In case of numeric answers, don't use the word, output the number instead. |
| - Do not explain your reasoning. |
| - Do not add punctuation, commentary, symbols of any kind or extra words. |
| - If a one‑word answer is not possible, use the shortest phrase that conveys the answer. |
| """ |
| ) |
| return { |
| "messages": [llm_with_tools.invoke([sys_msg] + state["messages"])], |
| "input_file": state["input_file"] |
| } |
|
|
| |
| graph = StateGraph(AgentState) |
| graph.add_node("call_llm", call_llm) |
| graph.add_node("tools", ToolNode(tools)) |
|
|
| graph.add_edge(START, "call_llm") |
| graph.add_conditional_edges("call_llm", tools_condition) |
| graph.add_edge("tools", "call_llm") |
|
|
| agent = graph.compile() |
|
|
| |
| messages = [HumanMessage(content="Divide 10 by 2")] |
| result = agent.invoke({"messages": messages, "input_file": None}) |
|
|
|
|
| |
| with open("./test_metadata.jsonl", "r", encoding="utf-8") as f: |
| test_data = [json.loads(line) for line in f if line.strip()] |
|
|
| |
| base_path = "./files" |
| |
| for item in test_data: |
| q = item["Question"] |
|
|
| |
| messages = [HumanMessage(content=q)] |
|
|
| |
| result = agent.invoke({"messages": messages, "input_file": None}) |
|
|
| |
| agent_answer = result["messages"][-1].content |
|
|
| |
| item["Final answer"] = agent_answer |
|
|
| |
| with open("./test_with_agent_answers.jsonl", "w", encoding="utf-8") as f: |
| for item in test_data: |
| f.write(json.dumps(item, ensure_ascii=False) + "\n") |
|
|
| print("Done! Agent answers written to test_with_agent_answers.jsonl") |