File size: 6,041 Bytes
f6fc677 33426c9 f6fc677 a4c9a4a 33426c9 f6fc677 10ba265 f6fc677 10ba265 3e6792a f6fc677 620168e 6387e77 f6fc677 3e6792a 10ba265 f6fc677 33426c9 f6fc677 a4c9a4a 33426c9 a4c9a4a f6fc677 10ba265 f6fc677 10ba265 f6fc677 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 |
import os
import random
import asyncio
import ssl
from dotenv import load_dotenv
from llama_index.core.agent.workflow import AgentWorkflow
from llama_index.llms.huggingface_api import HuggingFaceInferenceAPI
from openinference.instrumentation.llama_index import LlamaIndexInstrumentor
# from opentelemetry.sdk import trace as trace_sdk
from opentelemetry.sdk.trace.export import SimpleSpanProcessor
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from langfuse import get_client
from rich.pretty import pprint
import aiohttp
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
import base64
# Import tool functions and initializers from tools.py
from tools import (
get_tavily_tool,
get_arxiv_reader,
get_wikipedia_reader,
get_wikipedia_tool,
get_arxiv_tool,
get_search_tool,
get_calculator_tool,
get_hub_stats_tool,
get_hub_stats,
)
load_dotenv("env.local")
class LlamaIndexAgent:
def __init__(self):
# Tool initializations using imported functions
self.tavily_tool = get_tavily_tool()
self.arxiv_reader = get_arxiv_reader()
self.wikipedia_reader = get_wikipedia_reader()
self.wikipedia_tool = get_wikipedia_tool(self.wikipedia_reader)
self.arxiv_tool = get_arxiv_tool(self.arxiv_reader)
self.search_tool = get_search_tool()
self.calculator_tool = get_calculator_tool()
self.hub_stats_tool = get_hub_stats_tool()
with open("system_prompt.txt", "r") as f:
self.system_prompt = f.read()
print("system_prompt loaded:", self.system_prompt[:80], "...")
print("DEBUG: search_tool:", self.search_tool, type(self.search_tool))
print("DEBUG: calculator_tool:", self.calculator_tool, type(self.calculator_tool))
print("DEBUG: wikipedia_tool:", self.wikipedia_tool, type(self.wikipedia_tool))
print("DEBUG: arxiv_tool:", self.arxiv_tool, type(self.arxiv_tool))
print("DEBUG: hub_stats_tool:", self.hub_stats_tool, type(self.hub_stats_tool))
all_tools = [*self.search_tool, *self.calculator_tool, self.wikipedia_tool, self.arxiv_tool, self.hub_stats_tool]
print("DEBUG: All tools list:", all_tools)
print("DEBUG: Types in all_tools:", [type(t) for t in all_tools])
# LLM and agent workflow
# self.llm = HuggingFaceInferenceAPI(model_name="Qwen/Qwen2.5-Coder-32B-Instruct")
self.llm = HuggingFaceInferenceAPI(model_name="Qwen/Qwen2.5-Coder-32B-Instruct", streaming=False, client_kwargs={"timeout": 60})
self.alfred = AgentWorkflow.from_tools_or_functions(
all_tools,
llm=self.llm,
system_prompt=self.system_prompt
# verbose=True
)
LANGFUSE_AUTH=base64.b64encode(f"{os.getenv('LANGFUSE_PUBLIC_KEY')}:{os.getenv('LANGFUSE_SECRET_KEY')}".encode()).decode()
os.environ['OTEL_EXPORTER_OTLP_ENDPOINT'] = os.environ.get("LANGFUSE_HOST") + "/api/public/otel"
os.environ['OTEL_EXPORTER_OTLP_HEADERS'] = f"Authorization=Basic {LANGFUSE_AUTH}"
# Set up OpenTelemetry tracing
self.tracer_provider = TracerProvider()
self.tracer_provider.add_span_processor(SimpleSpanProcessor(OTLPSpanExporter()))
trace.set_tracer_provider(self.tracer_provider)
self.instrumentor = LlamaIndexInstrumentor(
public_key=os.getenv("LANGFUSE_PUBLIC_KEY"),
secret_key=os.getenv("LANGFUSE_SECRET_KEY"),
host=os.environ.get("LANGFUSE_HOST")
)
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=4, max=10),
retry=retry_if_exception_type((
aiohttp.client_exceptions.ClientConnectionError,
aiohttp.client_exceptions.ClientOSError,
ssl.SSLError,
KeyError,
ConnectionError
))
)
async def run_query(self, query: str):
# Instrument LlamaIndex with OpenTelemetry
self.instrumentor.instrument()
langfuse = get_client() # This picks up your LANGFUSE_PUBLIC_KEY, etc.
# Now, wrap your LlamaIndex calls in a Langfuse span for trace context
with langfuse.start_as_current_span(name="llamaindex-query") as span:
# Optionally set trace attributes
span.update_trace(user_id="user_123", input={"query": query})
try:
response = await self.alfred.run(query)
except aiohttp.client_exceptions.ClientConnectionError as e:
span.update_trace(output={"response": f"Connection error: {e}"})
raise # Re-raise for retry logic
except aiohttp.client_exceptions.ClientOSError as e:
span.update_trace(output={"response": f"Client OS error: {e}"})
raise # Re-raise for retry logic
except ssl.SSLError as e:
span.update_trace(output={"response": f"SSL error: {e}"})
raise # Re-raise for retry logic
except (KeyError, ConnectionError) as e:
span.update_trace(output={"response": f"Session/Connection error: {e}"})
raise # Re-raise for retry logic
except Exception as e:
span.update_trace(output={"response": f"General error: {e}"})
return f"AGENT ERROR: {e}"
# Optionally set trace output
span.update_trace(output={"response": str(response)})
# For short-lived scripts, flush before exit
langfuse.flush()
self.tracer_provider.shutdown()
return response
def main():
agent = LlamaIndexAgent()
query = "what is the capital of maharashtra?"
print(f"Running query: {query}")
response = asyncio.run(agent.run_query(query))
print("\n🎩 Agents's Response:")
print(response)
if __name__ == "__main__":
main() |