JadeAssistant / tools.py
Madras1's picture
Upload 4 files
26dc96c verified
import json
import os
import logging
from datetime import datetime
from duckduckgo_search import DDGS
# Configure logging
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s")
logger = logging.getLogger(__name__)
def get_current_datetime() -> str:
"""Returns the current date and time as a formatted string."""
return datetime.now().strftime("%Y-%m-%d %H:%M:%S")
def search_web(query: str) -> str:
"""Searches the web using DuckDuckGo and returns the top results."""
try:
with DDGS() as ddgs:
results = list(ddgs.text(query, max_results=3))
if not results:
return "No results found."
formatted_results = []
for i, res in enumerate(results, 1):
formatted_results.append(f"{i}. {res['title']}: {res['body']} (URL: {res['href']})")
return "\n".join(formatted_results)
except Exception as e:
return f"Error searching web: {str(e)}"
class ReminderManager:
def __init__(self, storage_file="telegram_jade/reminders.json"):
self.storage_file = storage_file
self.reminders = []
self._load_reminders()
def _load_reminders(self):
if os.path.exists(self.storage_file):
try:
with open(self.storage_file, 'r') as f:
self.reminders = json.load(f)
except json.JSONDecodeError:
logger.error(f"Error decoding {self.storage_file}. Starting with empty reminders.")
self.reminders = []
else:
self.reminders = []
def _save_reminders(self):
# Ensure directory exists
os.makedirs(os.path.dirname(self.storage_file), exist_ok=True)
with open(self.storage_file, 'w') as f:
json.dump(self.reminders, f, indent=2)
def add_reminder(self, chat_id: int, time_str: str, message: str):
"""
Adds a reminder.
time_str format: "YYYY-MM-DD HH:MM:SS"
"""
# Generate ID safely
if self.reminders:
new_id = max(r["id"] for r in self.reminders) + 1
else:
new_id = 1
reminder = {
"id": new_id,
"chat_id": chat_id,
"time": time_str,
"message": message,
"status": "pending"
}
self.reminders.append(reminder)
self._save_reminders()
return f"Reminder set for {time_str}: {message}"
def list_reminders(self, chat_id: int):
"""Lists pending reminders for a specific chat."""
user_reminders = [r for r in self.reminders if r["chat_id"] == chat_id and r["status"] == "pending"]
if not user_reminders:
return "No pending reminders."
result = "Pending Reminders:\n"
for r in user_reminders:
result += f"- [{r['time']}] {r['message']} (ID: {r['id']})\n"
return result
def delete_reminder(self, chat_id: int, reminder_id: int):
"""Deletes a reminder by ID."""
for i, r in enumerate(self.reminders):
if r["id"] == reminder_id and r["chat_id"] == chat_id:
del self.reminders[i]
self._save_reminders()
return f"Reminder ID {reminder_id} deleted."
return f"Reminder ID {reminder_id} not found."
def get_due_reminders(self):
"""
Returns a list of reminders that are due now or in the past and mark them as sent?
Actually, the scheduler will likely handle the trigger logic, but we need a way to
retrieve 'active' reminders to schedule them on startup.
"""
return [r for r in self.reminders if r["status"] == "pending"]
def mark_as_sent(self, reminder_id: int):
for r in self.reminders:
if r["id"] == reminder_id:
r["status"] = "sent"
self._save_reminders()
return