|
|
import os |
|
|
from pprint import pprint |
|
|
import requests |
|
|
from langchain_core.tools import tool |
|
|
from vector_store import get_vector_store |
|
|
try: |
|
|
from ddgs import DDGS |
|
|
except ImportError: |
|
|
DDGS = None |
|
|
try: |
|
|
from docling.document_converter import DocumentConverter |
|
|
except ImportError: |
|
|
DocumentConverter = None |
|
|
|
|
|
|
|
|
@tool |
|
|
def get_current_weather(city: str) -> dict: |
|
|
"""Get the current weather for a specific city. Returns temperature, condition, etc.""" |
|
|
api_key = os.getenv("OPENWEATHERMAP_API_KEY") |
|
|
if not api_key: |
|
|
return {"error": "Weather API key not configured."} |
|
|
|
|
|
url = f"http://api.openweathermap.org/data/2.5/weather?q={city}&appid={api_key}&units=metric" |
|
|
try: |
|
|
response = requests.get(url, timeout=10) |
|
|
if response.status_code == 200: |
|
|
return response.json() |
|
|
return {"error": f"API Error: {response.text}"} |
|
|
except Exception as e: |
|
|
return {"error": str(e)} |
|
|
|
|
|
@tool |
|
|
def get_weather_forecast(city: str) -> dict: |
|
|
"""Get the 5-day weather forecast for a city. Useful for checking future weather.""" |
|
|
api_key = os.getenv("OPENWEATHERMAP_API_KEY") |
|
|
if not api_key: |
|
|
return {"error": "Weather API key not configured."} |
|
|
|
|
|
url = f"http://api.openweathermap.org/data/2.5/forecast?q={city}&appid={api_key}&units=metric" |
|
|
try: |
|
|
response = requests.get(url, timeout=10) |
|
|
if response.status_code == 200: |
|
|
return response.json() |
|
|
return {"error": f"API Error: {response.text}"} |
|
|
except Exception as e: |
|
|
return {"error": str(e)} |
|
|
|
|
|
@tool |
|
|
def schedule_meeting(title: str, description: str, start_time: str, end_time: str, participants: str, location: str = "") -> str: |
|
|
""" |
|
|
Schedule a meeting in the database. |
|
|
|
|
|
Args: |
|
|
title: Meeting title |
|
|
description: Meeting description (can include weather info) |
|
|
start_time: Start time in format 'YYYY-MM-DD HH:MM:SS' |
|
|
end_time: End time in format 'YYYY-MM-DD HH:MM:SS' |
|
|
participants: Comma-separated list of participant names |
|
|
location: Meeting location |
|
|
|
|
|
Returns: |
|
|
Success or error message |
|
|
""" |
|
|
try: |
|
|
from database import engine |
|
|
from sqlmodel import Session |
|
|
from models import Meeting |
|
|
from datetime import datetime |
|
|
|
|
|
|
|
|
start_dt = datetime.strptime(start_time, "%Y-%m-%d %H:%M:%S") |
|
|
end_dt = datetime.strptime(end_time, "%Y-%m-%d %H:%M:%S") |
|
|
|
|
|
meeting = Meeting( |
|
|
title=title, |
|
|
description=description, |
|
|
location=location, |
|
|
start_time=start_dt, |
|
|
end_time=end_dt, |
|
|
participants=participants |
|
|
) |
|
|
|
|
|
with Session(engine) as session: |
|
|
session.add(meeting) |
|
|
session.commit() |
|
|
session.refresh(meeting) |
|
|
|
|
|
return f"✅ Meeting scheduled successfully! ID: {meeting.id}, Title: {title}, Time: {start_time} to {end_time}" |
|
|
|
|
|
except Exception as e: |
|
|
return f"❌ Failed to schedule meeting: {e}" |
|
|
|
|
|
@tool |
|
|
def cancel_meetings(date_filter: str = "all", meeting_ids: str = "") -> str: |
|
|
""" |
|
|
Cancel/delete meetings from the database. |
|
|
|
|
|
Args: |
|
|
date_filter: Filter for which meetings to cancel - "all", "today", "tomorrow", or specific date "YYYY-MM-DD" |
|
|
meeting_ids: Optional comma-separated list of specific meeting IDs to cancel (e.g., "1,2,3") |
|
|
|
|
|
Returns: |
|
|
Success message with count of cancelled meetings |
|
|
""" |
|
|
try: |
|
|
from database import engine |
|
|
from sqlmodel import Session, select |
|
|
from models import Meeting |
|
|
from datetime import datetime, timedelta |
|
|
|
|
|
with Session(engine) as session: |
|
|
|
|
|
if meeting_ids: |
|
|
|
|
|
ids = [int(id.strip()) for id in meeting_ids.split(",")] |
|
|
meetings = session.exec(select(Meeting).where(Meeting.id.in_(ids))).all() |
|
|
else: |
|
|
|
|
|
if date_filter == "today": |
|
|
today = datetime.now().date() |
|
|
meetings = session.exec( |
|
|
select(Meeting).where( |
|
|
(Meeting.start_time >= today) & |
|
|
(Meeting.start_time < today + timedelta(days=1)) |
|
|
) |
|
|
).all() |
|
|
elif date_filter == "tomorrow": |
|
|
tomorrow = (datetime.now() + timedelta(days=1)).date() |
|
|
meetings = session.exec( |
|
|
select(Meeting).where( |
|
|
(Meeting.start_time >= tomorrow) & |
|
|
(Meeting.start_time < tomorrow + timedelta(days=1)) |
|
|
) |
|
|
).all() |
|
|
elif date_filter == "all": |
|
|
meetings = session.exec(select(Meeting)).all() |
|
|
else: |
|
|
|
|
|
try: |
|
|
target_date = datetime.strptime(date_filter, "%Y-%m-%d").date() |
|
|
meetings = session.exec( |
|
|
select(Meeting).where( |
|
|
(Meeting.start_time >= target_date) & |
|
|
(Meeting.start_time < target_date + timedelta(days=1)) |
|
|
) |
|
|
).all() |
|
|
except ValueError: |
|
|
return f"❌ Invalid date format: {date_filter}. Use 'today', 'tomorrow', 'all', or 'YYYY-MM-DD'" |
|
|
|
|
|
if not meetings: |
|
|
return f"No meetings found to cancel for filter: {date_filter}" |
|
|
|
|
|
|
|
|
cancelled_titles = [f"'{m.title}' at {m.start_time}" for m in meetings] |
|
|
for meeting in meetings: |
|
|
session.delete(meeting) |
|
|
|
|
|
session.commit() |
|
|
|
|
|
return f"✅ Cancelled {len(meetings)} meeting(s):\n" + "\n".join(f" • {title}" for title in cancelled_titles) |
|
|
|
|
|
except Exception as e: |
|
|
return f"❌ Failed to cancel meetings: {e}" |
|
|
|
|
|
|
|
|
@tool |
|
|
def duckduckgo_search(query: str) -> str: |
|
|
"""Perform a DuckDuckGo search and return relevant results.""" |
|
|
if not DDGS: |
|
|
return "DuckDuckGo Search library not installed. Install with: pip install ddgs" |
|
|
try: |
|
|
with DDGS() as ddgs: |
|
|
|
|
|
results = list(ddgs.text( |
|
|
query, |
|
|
region='wt-wt', |
|
|
safesearch='moderate', |
|
|
timelimit='y', |
|
|
max_results=5 |
|
|
)) |
|
|
|
|
|
if not results: |
|
|
return "No search results found." |
|
|
|
|
|
|
|
|
formatted = [] |
|
|
for i, result in enumerate(results, 1): |
|
|
title = result.get('title', 'No title') |
|
|
body = result.get('body', 'No description') |
|
|
url = result.get('href', 'No URL') |
|
|
|
|
|
|
|
|
if len(body) > 300: |
|
|
body = body[:297] + "..." |
|
|
|
|
|
formatted.append(f"**Result {i}: {title}**\n{body}\nSource: {url}") |
|
|
print("\n\n".join(formatted)) |
|
|
return "\n\n".join(formatted) |
|
|
except Exception as e: |
|
|
return f"Search failed: {str(e)[:200]}" |
|
|
|
|
|
|
|
|
@tool |
|
|
def read_document_with_docling(file_path: str) -> str: |
|
|
"""Read and parse a PDF or Text document using Docling to extract text.""" |
|
|
if not DocumentConverter: |
|
|
return "Docling library not installed." |
|
|
try: |
|
|
converter = DocumentConverter() |
|
|
result = converter.convert(file_path) |
|
|
return result.document.export_to_markdown() |
|
|
except Exception as e: |
|
|
return f"Error reading document: {e}" |
|
|
|
|
|
@tool |
|
|
def ingest_document_to_vector_store(file_path: str, document_id: str, is_temporary: bool = True) -> str: |
|
|
""" |
|
|
Ingest a document into the vector store for semantic search. |
|
|
First parses the document, then chunks and embeds it into ChromaDB. |
|
|
|
|
|
Args: |
|
|
file_path: Path to the document file (PDF or text) |
|
|
document_id: Unique identifier for this document |
|
|
is_temporary: If True, stores in memory (session only). If False, stores to disk. |
|
|
|
|
|
Returns: |
|
|
Status message with number of chunks created |
|
|
""" |
|
|
try: |
|
|
|
|
|
if not DocumentConverter: |
|
|
return "Docling library not installed." |
|
|
|
|
|
|
|
|
try: |
|
|
from docling.datamodel.base_models import InputFormat |
|
|
from docling.datamodel.pipeline_options import PdfPipelineOptions |
|
|
from docling.document_converter import PdfFormatOption |
|
|
|
|
|
pipeline_options = PdfPipelineOptions() |
|
|
pipeline_options.do_ocr = False |
|
|
pipeline_options.do_table_structure = False |
|
|
|
|
|
pipeline_options.do_picture_classification = False |
|
|
pipeline_options.do_picture_description = False |
|
|
pipeline_options.do_code_enrichment = False |
|
|
pipeline_options.do_formula_enrichment = False |
|
|
pipeline_options.generate_picture_images = False |
|
|
|
|
|
converter = DocumentConverter( |
|
|
format_options={ |
|
|
InputFormat.PDF: PdfFormatOption(pipeline_options=pipeline_options) |
|
|
} |
|
|
) |
|
|
except Exception as config_error: |
|
|
|
|
|
print(f"⚠️ Using simple converter due to: {config_error}") |
|
|
converter = DocumentConverter() |
|
|
|
|
|
result = converter.convert(file_path) |
|
|
document_text = result.document.export_to_markdown() |
|
|
|
|
|
|
|
|
|
|
|
vector_store = get_vector_store(is_persistent=not is_temporary) |
|
|
|
|
|
num_chunks = vector_store.ingest_document( |
|
|
document_text=document_text, |
|
|
document_id=document_id, |
|
|
metadata={"file_path": file_path}, |
|
|
chunk_size=500, |
|
|
chunk_overlap=50 |
|
|
) |
|
|
|
|
|
store_type = "temporary (in-memory)" if is_temporary else "persistent (disk)" |
|
|
return f"Successfully ingested document '{document_id}' into {store_type} vector store. Created {num_chunks} chunks." |
|
|
|
|
|
except Exception as e: |
|
|
return f"Document ingestion failed: {e}" |
|
|
|
|
|
|
|
|
@tool |
|
|
def search_vector_store(query: str, document_id: str = "", top_k: int = 3, search_type: str = "persistent") -> str: |
|
|
""" |
|
|
Search the vector store for relevant document chunks. |
|
|
|
|
|
Args: |
|
|
query: Search query text |
|
|
document_id: Optional specific document to search within (empty string searches all documents) |
|
|
top_k: Number of top results to return (default: 3) |
|
|
search_type: "persistent" (default) or "temporary" (for uploaded files) |
|
|
|
|
|
Returns: |
|
|
Formatted search results with similarity scores |
|
|
""" |
|
|
try: |
|
|
is_persistent = (search_type == "persistent") |
|
|
vector_store = get_vector_store(is_persistent=is_persistent) |
|
|
|
|
|
|
|
|
doc_id = document_id if document_id else None |
|
|
|
|
|
results = vector_store.similarity_search( |
|
|
query=query, |
|
|
top_k=top_k, |
|
|
document_id=doc_id |
|
|
) |
|
|
|
|
|
if not results: |
|
|
return f"No relevant documents found in {search_type} vector store." |
|
|
|
|
|
|
|
|
output = f"{search_type.capitalize()} Vector Store Search Results:\n\n" |
|
|
for i, (chunk_text, score, metadata) in enumerate(results, 1): |
|
|
output += f"Result {i} (Similarity: {score:.3f}):\n" |
|
|
output += f"{chunk_text}\n" |
|
|
output += f"[Document: {metadata.get('document_id', 'unknown')}]\n\n" |
|
|
|
|
|
return output |
|
|
|
|
|
except Exception as e: |
|
|
return f"Vector store search failed: {e}" |