File size: 11,013 Bytes
df1a0e5
 
 
22f855f
 
df1a0e5
22f855f
df1a0e5
 
22f855f
413f406
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
df1a0e5
 
 
 
1512af4
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
df1a0e5
 
 
 
413f406
 
 
 
 
 
 
 
 
 
 
df1a0e5
22f855f
 
 
 
413f406
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
22f855f
413f406
 
 
22f855f
413f406
 
 
 
 
 
 
 
 
 
 
 
 
22f855f
413f406
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
df1a0e5
 
 
413f406
 
 
 
 
 
 
df1a0e5
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
22f855f
df1a0e5
 
 
 
 
 
 
22f855f
 
413f406
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
22f855f
 
 
 
 
 
 
 
 
 
413f406
 
 
 
 
22f855f
 
413f406
22f855f
413f406
 
22f855f
 
413f406
22f855f
 
 
 
df1a0e5
22f855f
413f406
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
from typing import TypedDict, Annotated, List
import operator
import os
import base64
import requests
from langchain_google_genai import ChatGoogleGenerativeAI
from langchain_core.messages import BaseMessage, HumanMessage, SystemMessage
from langgraph.graph import StateGraph, END, START
from langgraph.prebuilt import ToolNode
from langchain_core.tools import tool
from langchain_community.document_loaders import YoutubeLoader, WikipediaLoader
from langchain_community.tools import WikipediaQueryRun
from langchain_community.utilities import WikipediaAPIWrapper
from langchain_experimental.utilities import PythonREPL
from langchain_chroma import Chroma
from langchain_huggingface import HuggingFaceEmbeddings
from langchain.tools import tool
from langchain_community.tools import YouTubeSearchTool
# Playwright Imports (Optional)
try:
    from langchain_community.agent_toolkits import PlaywrightBrowserToolkit
    from langchain_community.tools.playwright.utils import create_sync_playwright_browser
except ImportError:
    PlaywrightBrowserToolkit = None
    create_sync_playwright_browser = None

from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_core.documents import Document
from dotenv import load_dotenv

load_dotenv()

# Configure tracing
try:
    if os.getenv("ARIZE_SPACE_ID") and os.getenv("ARIZE_API_KEY"):
        from arize.otel import register
        from openinference.instrumentation.google_genai import GoogleGenAIInstrumentor
        from openinference.instrumentation.langchain import LangChainInstrumentor
        
        tracer_provider = register(
            space_id=os.getenv("ARIZE_SPACE_ID"),
            api_key=os.getenv("ARIZE_API_KEY"),
            project_name=os.getenv("ARIZE_PROJECT_NAME", "langgraph-agent-test")
        )
        GoogleGenAIInstrumentor().instrument(tracer_provider=tracer_provider)
        LangChainInstrumentor().instrument(tracer_provider=tracer_provider)
        print("Tracing configured with Arize.")
    else:
        print("Arize tracing skipped: ARIZE_SPACE_ID or ARIZE_API_KEY not set.")
except ImportError:
    print("Tracing libraries not installed. Skipping tracing.")
except Exception as e:
    print(f"Error configuring tracing: {e}")

# 1. Define the state
class AgentState(TypedDict):
    messages: Annotated[List[BaseMessage], operator.add]

# Helper to split and save documents to Chroma
def save_to_chroma(docs):
    if 'vector_store' in globals() and vector_store and docs:
        try:
            splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=200)
            splits = splitter.split_documents(docs)
            if splits:
                vector_store.add_documents(splits)
        except Exception as e:
            print(f"Error saving to Chroma: {e}")

# 2. Define the tools
@tool
def get_youtube_transcript(url: str) -> str:
    """Retrieves the transcript of a YouTube video given its URL."""
    try:
        loader = YoutubeLoader.from_youtube_url(url, add_video_info=True)
        docs = loader.load()
        if not docs:
             return "No transcript found. Please search Google for the video title or ID."
        
        # Save to Chroma
        save_to_chroma(docs)
        
        return "\n\n".join([f"Metadata: {d.metadata}\nContent: {d.page_content}" for d in docs])
    except Exception as e:
        return f"Error getting transcript: {e}. Please try searching Google for the video URL or ID."

@tool
def calculator(expression: str) -> str:
    """Calculates a mathematical expression using Python. Example: '2 + 2', '34 * 5', 'import math; math.sqrt(2)'"""
    try:
        repl = PythonREPL()
        if "print" not in expression:
            expression = f"print({expression})"
        return repl.run(expression)
    except Exception as e:
        return f"Error calculating: {e}"

@tool
def search_wikipedia(query: str) -> str:
    """Search Wikipedia for a query. Useful for factual lists and biographies."""
    try:
        loader = WikipediaLoader(query=query, load_max_docs=3)
        docs = loader.load()
        # Save to Chroma
        save_to_chroma(docs)
        return "\n\n".join([d.page_content[:10000] for d in docs])
    except Exception as e:
        return f"Error searching Wikipedia: {e}"

# ChromaDB RAG Tool
vector_store = None
try:
    embeddings = HuggingFaceEmbeddings(model_name="all-MiniLM-L6-v2")
    vector_store = Chroma(
        collection_name="agent_memory",
        embedding_function=embeddings,
        persist_directory="./chroma_db"
    )
except Exception as e:
    print(f"Warning: ChromaDB initialization failed. RAG features disabled. Error: {e}")

@tool
def search_knowledge_base(query: str) -> str:
    """Searches for relevant documents in the persistent knowledge base (memory of previous searches)."""
    try:
        retriever = vector_store.as_retriever()
        docs = retriever.invoke(query)
        if not docs:
            return "No relevant information found."
        return "\n".join([d.page_content for d in docs])
    except Exception as e:
        return f"Error searching knowledge base: {e}"

@tool
def browse_page(url: str) -> str:
    """Browses a web page and extracts text using Playwright. Use this to read content from specific URLs."""
    if not create_sync_playwright_browser:
        return "Browsing unavailable (Playwright not installed)."
    try:
        browser = create_sync_playwright_browser(headless=True)
        page = browser.new_page()
        page.goto(url)
        text = page.inner_text("body")
        browser.close()
        # Save to Chroma
        if 'vector_store' in globals() and vector_store:
             splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=200)
             docs = [Document(page_content=text, metadata={"source": url})]
             splits = splitter.split_documents(docs)
             vector_store.add_documents(splits)
        return text[:10000]
    except Exception as e:
        return f"Error browsing: {e}"

@tool
def search_youtube_videos(query: str) -> str:
    """Search for YouTube videos. Provide only the search keywords."""
    try:
        tool = YouTubeSearchTool()
        return tool.run(f"{query}, 3")
    except Exception as e:
        return f"Error searching YouTube: {e}"

# Combine Tools (Native Google Search is enabled via model param)
# Removed rag_tool/knowledge_base as it was empty -> Adding it back now
tools = [get_youtube_transcript, calculator, search_wikipedia, search_knowledge_base, search_youtube_videos, browse_page]
tool_node = ToolNode(tools)

# 3. Define the model
LLM = "gemini-2.0-flash"
model = ChatGoogleGenerativeAI(
    model=LLM, 
    temperature=0, 
    max_retries=5,
    google_search_retrieval=True
)
model = model.bind_tools(tools)

# 4. Define the agent node
def should_continue(state):
    messages = state['messages']
    last_message = messages[-1]
    if not last_message.tool_calls:
        return "end"
    else:
        return "continue"

def call_model(state):
    messages = state['messages']
    response = model.invoke(messages)
    return {"messages": [response]}

# 5. Create the graph
workflow = StateGraph(AgentState)
workflow.add_node("agent", call_model)
workflow.add_node("action", tool_node)
workflow.add_edge(START, "agent")
workflow.add_conditional_edges("agent", should_continue, {"continue": "action", "end": END})
workflow.add_edge("action", "agent")
app = workflow.compile()

class LangGraphAgent:
    def __init__(self):
        self.app = app

    def __call__(self, question: str, task_id: str = None) -> str:
        messages = [
            SystemMessage(content="""You are a helpful assistant with multimodal capabilities (Vision, Audio, PDF analysis).
Step 1: ALWAYS START by performing a Google Search (or using Wikipedia/YouTube) to gather up-to-date information. Do not answer from memory.
Step 2: If a URL is provided, search for the **EXACT URL** string on Google first to identify the video/page title. Do not add keywords yet. **DO NOT use the 'youtube_search' tool for this step; use Google Search.**
Step 3: Once you have the title, search for that title to find descriptions or summaries.
Step 4: Analyze the information found. If you cannot access a specific page or video directly (e.g. empty transcript), DO NOT GIVE UP. Use Google Search to find descriptions, summaries, or discussions from reliable sources.
Step 5: If you identify relevant Wikipedia pages or YouTube videos, use the specific tools ('search_wikipedia', 'get_youtube_transcript') to ingest them into your Knowledge Base.
Step 6: Reason to find the exact answer. Verify your findings by cross-referencing multiple sources if possible. You can use 'search_knowledge_base' to connect facts you have saved.
Step 7: Output the final answer strictly in this format:
FINAL ANSWER: [ANSWER]

Do not include "FINAL ANSWER:" in the [ANSWER] part itself.
Example:
Thinking: ...
FINAL ANSWER: 3

If the question involves an image, video, or audio file provided in the context, analyze it to answer.
"""),
        ]
        
        content = []
        content.append({"type": "text", "text": question})

        if task_id:
            image_url = f"https://agents-course-unit4-scoring.hf.space/files/{task_id}"
            try:
                # Check headers first
                response = requests.head(image_url, timeout=5)
                mime_type = response.headers.get("Content-Type", "")
                
                # Allow images, audio, video, pdf
                if response.status_code == 200 and any(t in mime_type for t in ["image/", "audio/", "video/", "application/pdf"]):
                    # Fetch the file
                    img_response = requests.get(image_url, timeout=10)
                    if img_response.status_code == 200:
                        file_data = base64.b64encode(img_response.content).decode("utf-8")
                        content.append({
                            "type": "image_url", # LangChain uses this key for multimodal data URI
                            "image_url": {"url": f"data:{mime_type};base64,{file_data}"}
                        })
            except Exception as e:
                print(f"Error checking/fetching file: {e}")

        messages.append(HumanMessage(content=content))
        
        inputs = {"messages": messages}
        final_state = self.app.invoke(inputs)
        result = final_state['messages'][-1].content
        
        def extract_text(content):
            if isinstance(content, str):
                return content
            if isinstance(content, list):
                return " ".join([extract_text(c) for c in content])
            if isinstance(content, dict):
                return content.get('text', str(content))
            return str(content)

        text_result = extract_text(result)
        if "FINAL ANSWER:" in text_result:
            return text_result.split("FINAL ANSWER:")[-1].strip()
        return text_result