backend-deploy / agent.py
SyedFarooqAlii
add
a426a46
import os
import json
import logging
from typing import Dict, List, Any
from dotenv import load_dotenv
import asyncio
import time
# Load environment variables
load_dotenv()
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def retrieve_information(query: str, top_k: int = 5, threshold: float = 0.3) -> Dict:
"""
Retrieve information from the knowledge base based on a query
"""
from retrieving import RAGRetriever
retriever = RAGRetriever()
try:
# Call the existing retrieve method from the RAGRetriever instance
json_response = retriever.retrieve(query_text=query, top_k=top_k, threshold=threshold)
results = json.loads(json_response)
# Format the results for the assistant
formatted_results = []
for result in results.get('results', []):
formatted_results.append({
'content': result['content'],
'url': result['url'],
'position': result['position'],
'similarity_score': result['similarity_score'],
'chunk_id': result.get('chunk_id', ''),
'created_at': result.get('created_at', '')
})
return {
'query': query,
'retrieved_chunks': formatted_results,
'total_results': len(formatted_results),
'metadata': results.get('metadata', {})
}
except Exception as e:
logger.error(f"Error in retrieve_information: {e}")
return {
'query': query,
'retrieved_chunks': [],
'total_results': 0,
'error': str(e),
'metadata': {}
}
class RAGAgent:
def __init__(self):
# Initialize the RAG system components
# For now, we'll use the retrieval function directly
# In a real implementation, you would initialize your existing RAG components
logger.info("RAG Agent initialized with retrieval and generation components")
def query_agent(self, query_text: str, session_id: str = None, query_type: str = "global", selected_text: str = None) -> Dict:
"""
Process a query through the RAG system and return structured response
"""
start_time = time.time()
logger.info(f"Processing query through RAG system: '{query_text[:50]}...'")
try:
# Retrieve relevant information using our retrieval system
retrieval_result = retrieve_information(query_text, top_k=5, threshold=0.3)
if retrieval_result.get('error'):
return {
"answer": "Sorry, I encountered an error retrieving information.",
"sources": [],
"matched_chunks": [],
"citations": [],
"error": retrieval_result['error'],
"query_time_ms": (time.time() - start_time) * 1000,
"session_id": session_id,
"query_type": query_type
}
# Format the retrieved information for response generation
# In a real implementation, you would connect this to your response generator
retrieved_chunks = retrieval_result.get('retrieved_chunks', [])
if not retrieved_chunks:
return {
"answer": "I couldn't find relevant information in the Physical AI & Humanoid Robotics curriculum to answer your question. Please try asking about specific topics from the curriculum like ROS 2, Digital Twins, AI-Brain, or VLA.",
"sources": [],
"matched_chunks": [],
"citations": [],
"error": None,
"query_time_ms": (time.time() - start_time) * 1000,
"session_id": session_id,
"query_type": query_type
}
# Generate a response based on the retrieved information
# For now, we'll create a simple response based on the retrieved chunks
answer_parts = ["Based on the Physical AI & Humanoid Robotics curriculum:"]
# Include content from the most relevant chunks
for i, chunk in enumerate(retrieved_chunks[:2]): # Use top 2 chunks
content = chunk.get('content', '')[:300] # Limit content length
answer_parts.append(f"{content}...")
answer = " ".join(answer_parts)
# Create citations from the retrieved chunks
citations = []
for chunk in retrieved_chunks:
citation = {
"document_id": chunk.get('chunk_id', ''),
"title": chunk.get('url', ''),
"chapter": "",
"section": "",
"page_reference": ""
}
citations.append(citation)
# Calculate query time
query_time_ms = (time.time() - start_time) * 1000
# Format the response
response = {
"answer": answer,
"sources": [chunk.get('url', '') for chunk in retrieved_chunks if chunk.get('url')],
"matched_chunks": retrieved_chunks,
"citations": citations,
"query_time_ms": query_time_ms,
"session_id": session_id,
"query_type": query_type,
"confidence": self._calculate_confidence(retrieved_chunks),
"error": None
}
logger.info(f"Query processed in {query_time_ms:.2f}ms")
return response
except Exception as e:
logger.error(f"Error processing query: {e}")
return {
"answer": "Sorry, I encountered an error processing your request.",
"sources": [],
"matched_chunks": [],
"citations": [],
"error": str(e),
"query_time_ms": (time.time() - start_time) * 1000,
"session_id": session_id,
"query_type": query_type
}
def _calculate_confidence(self, matched_chunks: List[Dict]) -> str:
"""
Calculate confidence level based on similarity scores and number of matches
"""
if not matched_chunks:
return "low"
avg_score = sum(chunk.get('similarity_score', 0.0) for chunk in matched_chunks) / len(matched_chunks)
if avg_score >= 0.7:
return "high"
elif avg_score >= 0.4:
return "medium"
else:
return "low"
def query_agent(query_text: str) -> Dict:
"""
Convenience function to query the RAG agent
"""
agent = RAGAgent()
return agent.query_agent(query_text)
def run_agent_sync(query_text: str) -> Dict:
"""
Synchronous function to run the agent for direct usage
"""
import asyncio
async def run_async():
agent = RAGAgent()
return await agent._async_query_agent(query_text)
# Check if there's already a running event loop
try:
loop = asyncio.get_running_loop()
# If there's already a loop, run in a separate thread
import concurrent.futures
with concurrent.futures.ThreadPoolExecutor() as executor:
future = executor.submit(asyncio.run, run_async())
return future.result()
except RuntimeError:
# No running loop, safe to use asyncio.run
return asyncio.run(run_async())
def main():
"""
Main function to demonstrate the RAG agent functionality
"""
logger.info("Initializing RAG Agent...")
# Initialize the agent
agent = RAGAgent()
# Example queries to test the system
test_queries = [
"What is ROS2?",
"Explain humanoid design principles",
"How does VLA work?",
"What are simulation techniques?",
"Explain AI control systems"
]
print("RAG Agent - Testing Queries")
print("=" * 50)
for i, query in enumerate(test_queries, 1):
print(f"\nQuery {i}: {query}")
print("-" * 30)
# Process query through agent
response = agent.query_agent(query)
# Print formatted results
print(f"Answer: {response['answer']}")
if response.get('sources'):
print(f"Sources: {len(response['sources'])} documents")
for source in response['sources'][:3]: # Show first 3 sources
print(f" - {source}")
if response.get('matched_chunks'):
print(f"Matched chunks: {len(response['matched_chunks'])}")
for j, chunk in enumerate(response['matched_chunks'][:2], 1): # Show first 2 chunks
content_preview = chunk['content'][:100] + "..." if len(chunk['content']) > 100 else chunk['content']
print(f" Chunk {j}: {content_preview}")
print(f" Source: {chunk['url']}")
print(f" Score: {chunk['similarity_score']:.3f}")
print(f"Query time: {response['query_time_ms']:.2f}ms")
print(f"Confidence: {response.get('confidence', 'unknown')}")
if i < len(test_queries): # Don't sleep after the last query
time.sleep(1) # Small delay between queries
if __name__ == "__main__":
main()