cr-agent / tools.py
Sibi Krishnamoorthy
first commit
fd06b5a
import os
from pprint import pprint
import requests
from langchain_core.tools import tool
from vector_store import get_vector_store
try:
from ddgs import DDGS
except ImportError:
DDGS = None
try:
from docling.document_converter import DocumentConverter
except ImportError:
DocumentConverter = None
# Weather Tools
@tool
def get_current_weather(city: str) -> dict:
"""Get the current weather for a specific city. Returns temperature, condition, etc."""
api_key = os.getenv("OPENWEATHERMAP_API_KEY")
if not api_key:
return {"error": "Weather API key not configured."}
url = f"http://api.openweathermap.org/data/2.5/weather?q={city}&appid={api_key}&units=metric"
try:
response = requests.get(url, timeout=10)
if response.status_code == 200:
return response.json()
return {"error": f"API Error: {response.text}"}
except Exception as e:
return {"error": str(e)}
@tool
def get_weather_forecast(city: str) -> dict:
"""Get the 5-day weather forecast for a city. Useful for checking future weather."""
api_key = os.getenv("OPENWEATHERMAP_API_KEY")
if not api_key:
return {"error": "Weather API key not configured."}
url = f"http://api.openweathermap.org/data/2.5/forecast?q={city}&appid={api_key}&units=metric"
try:
response = requests.get(url, timeout=10)
if response.status_code == 200:
return response.json()
return {"error": f"API Error: {response.text}"}
except Exception as e:
return {"error": str(e)}
@tool
def schedule_meeting(title: str, description: str, start_time: str, end_time: str, participants: str, location: str = "") -> str:
"""
Schedule a meeting in the database.
Args:
title: Meeting title
description: Meeting description (can include weather info)
start_time: Start time in format 'YYYY-MM-DD HH:MM:SS'
end_time: End time in format 'YYYY-MM-DD HH:MM:SS'
participants: Comma-separated list of participant names
location: Meeting location
Returns:
Success or error message
"""
try:
from database import engine
from sqlmodel import Session
from models import Meeting
from datetime import datetime
# Convert string datetime to datetime objects for SQLite
start_dt = datetime.strptime(start_time, "%Y-%m-%d %H:%M:%S")
end_dt = datetime.strptime(end_time, "%Y-%m-%d %H:%M:%S")
meeting = Meeting(
title=title,
description=description,
location=location,
start_time=start_dt,
end_time=end_dt,
participants=participants
)
with Session(engine) as session:
session.add(meeting)
session.commit()
session.refresh(meeting)
return f"✅ Meeting scheduled successfully! ID: {meeting.id}, Title: {title}, Time: {start_time} to {end_time}"
except Exception as e:
return f"❌ Failed to schedule meeting: {e}"
@tool
def cancel_meetings(date_filter: str = "all", meeting_ids: str = "") -> str:
"""
Cancel/delete meetings from the database.
Args:
date_filter: Filter for which meetings to cancel - "all", "today", "tomorrow", or specific date "YYYY-MM-DD"
meeting_ids: Optional comma-separated list of specific meeting IDs to cancel (e.g., "1,2,3")
Returns:
Success message with count of cancelled meetings
"""
try:
from database import engine
from sqlmodel import Session, select
from models import Meeting
from datetime import datetime, timedelta
with Session(engine) as session:
# Build query based on filters
if meeting_ids:
# Cancel specific meeting IDs
ids = [int(id.strip()) for id in meeting_ids.split(",")]
meetings = session.exec(select(Meeting).where(Meeting.id.in_(ids))).all()
else:
# Cancel by date filter
if date_filter == "today":
today = datetime.now().date()
meetings = session.exec(
select(Meeting).where(
(Meeting.start_time >= today) &
(Meeting.start_time < today + timedelta(days=1))
)
).all()
elif date_filter == "tomorrow":
tomorrow = (datetime.now() + timedelta(days=1)).date()
meetings = session.exec(
select(Meeting).where(
(Meeting.start_time >= tomorrow) &
(Meeting.start_time < tomorrow + timedelta(days=1))
)
).all()
elif date_filter == "all":
meetings = session.exec(select(Meeting)).all()
else:
# Try parsing as specific date
try:
target_date = datetime.strptime(date_filter, "%Y-%m-%d").date()
meetings = session.exec(
select(Meeting).where(
(Meeting.start_time >= target_date) &
(Meeting.start_time < target_date + timedelta(days=1))
)
).all()
except ValueError:
return f"❌ Invalid date format: {date_filter}. Use 'today', 'tomorrow', 'all', or 'YYYY-MM-DD'"
if not meetings:
return f"No meetings found to cancel for filter: {date_filter}"
# Delete meetings
cancelled_titles = [f"'{m.title}' at {m.start_time}" for m in meetings]
for meeting in meetings:
session.delete(meeting)
session.commit()
return f"✅ Cancelled {len(meetings)} meeting(s):\n" + "\n".join(f" • {title}" for title in cancelled_titles)
except Exception as e:
return f"❌ Failed to cancel meetings: {e}"
# Web Tools
@tool
def duckduckgo_search(query: str) -> str:
"""Perform a DuckDuckGo search and return relevant results."""
if not DDGS:
return "DuckDuckGo Search library not installed. Install with: pip install ddgs"
try:
with DDGS() as ddgs:
# Use better search parameters for more relevant results
results = list(ddgs.text(
query,
region='wt-wt', # Global results
safesearch='moderate',
timelimit='y', # Last year for fresher results
max_results=5
))
if not results:
return "No search results found."
# Format results with better structure
formatted = []
for i, result in enumerate(results, 1):
title = result.get('title', 'No title')
body = result.get('body', 'No description')
url = result.get('href', 'No URL')
# Truncate body to avoid token overflow
if len(body) > 300:
body = body[:297] + "..."
formatted.append(f"**Result {i}: {title}**\n{body}\nSource: {url}")
print("\n\n".join(formatted))
return "\n\n".join(formatted)
except Exception as e:
return f"Search failed: {str(e)[:200]}"
# Document Tools
@tool
def read_document_with_docling(file_path: str) -> str:
"""Read and parse a PDF or Text document using Docling to extract text."""
if not DocumentConverter:
return "Docling library not installed."
try:
converter = DocumentConverter()
result = converter.convert(file_path)
return result.document.export_to_markdown()
except Exception as e:
return f"Error reading document: {e}"
@tool
def ingest_document_to_vector_store(file_path: str, document_id: str, is_temporary: bool = True) -> str:
"""
Ingest a document into the vector store for semantic search.
First parses the document, then chunks and embeds it into ChromaDB.
Args:
file_path: Path to the document file (PDF or text)
document_id: Unique identifier for this document
is_temporary: If True, stores in memory (session only). If False, stores to disk.
Returns:
Status message with number of chunks created
"""
try:
# First parse the document
if not DocumentConverter:
return "Docling library not installed."
# Configure lightweight pipeline - no vision models, faster processing
try:
from docling.datamodel.base_models import InputFormat
from docling.datamodel.pipeline_options import PdfPipelineOptions
from docling.document_converter import PdfFormatOption
pipeline_options = PdfPipelineOptions()
pipeline_options.do_ocr = False # Keep OCR for text extraction
pipeline_options.do_table_structure = False # Disable table detection (slow)
# Disable slow enrichment features
pipeline_options.do_picture_classification = False
pipeline_options.do_picture_description = False
pipeline_options.do_code_enrichment = False
pipeline_options.do_formula_enrichment = False
pipeline_options.generate_picture_images = False
converter = DocumentConverter(
format_options={
InputFormat.PDF: PdfFormatOption(pipeline_options=pipeline_options)
}
)
except Exception as config_error:
# Fallback to simple converter if advanced options fail
print(f"⚠️ Using simple converter due to: {config_error}")
converter = DocumentConverter()
result = converter.convert(file_path)
document_text = result.document.export_to_markdown()
# Ingest into vector store
# Use temporary store for uploads by default, unless specified otherwise
vector_store = get_vector_store(is_persistent=not is_temporary)
num_chunks = vector_store.ingest_document(
document_text=document_text,
document_id=document_id,
metadata={"file_path": file_path},
chunk_size=500,
chunk_overlap=50
)
store_type = "temporary (in-memory)" if is_temporary else "persistent (disk)"
return f"Successfully ingested document '{document_id}' into {store_type} vector store. Created {num_chunks} chunks."
except Exception as e:
return f"Document ingestion failed: {e}"
@tool
def search_vector_store(query: str, document_id: str = "", top_k: int = 3, search_type: str = "persistent") -> str:
"""
Search the vector store for relevant document chunks.
Args:
query: Search query text
document_id: Optional specific document to search within (empty string searches all documents)
top_k: Number of top results to return (default: 3)
search_type: "persistent" (default) or "temporary" (for uploaded files)
Returns:
Formatted search results with similarity scores
"""
try:
is_persistent = (search_type == "persistent")
vector_store = get_vector_store(is_persistent=is_persistent)
# Convert empty string to None for the vector store
doc_id = document_id if document_id else None
results = vector_store.similarity_search(
query=query,
top_k=top_k,
document_id=doc_id
)
if not results:
return f"No relevant documents found in {search_type} vector store."
# Format results
output = f"{search_type.capitalize()} Vector Store Search Results:\n\n"
for i, (chunk_text, score, metadata) in enumerate(results, 1):
output += f"Result {i} (Similarity: {score:.3f}):\n"
output += f"{chunk_text}\n"
output += f"[Document: {metadata.get('document_id', 'unknown')}]\n\n"
return output
except Exception as e:
return f"Vector store search failed: {e}"