import json from pathlib import Path from datetime import datetime from typing import Dict, List, Optional class Storage: """Handles JSON-based storage for leads, tickets, and conversations.""" def __init__(self, base_path: str = "data"): self.base_path = Path(base_path) self.base_path.mkdir(exist_ok=True) self.leads_file = self.base_path / "leads.json" self.tickets_file = self.base_path / "support_tickets.json" self.conversations_file = self.base_path / "conversations.json" def _load_json(self, filepath: Path) -> Dict: """Load JSON file safely.""" if filepath.exists(): try: with open(filepath, 'r') as f: return json.load(f) except json.JSONDecodeError: return {} return {} def _save_json(self, filepath: Path, data: Dict): """Save JSON file safely.""" with open(filepath, 'w') as f: json.dump(data, f, indent=2) # Lead Management def save_lead(self, lead_data: Dict) -> str: """ Save a new lead. Args: lead_data: Lead information Returns: Lead ID """ data = self._load_json(self.leads_file) leads = data.get("leads", []) lead_id = f"lead_{len(leads) + 1}_{datetime.now().timestamp()}" lead_data["id"] = lead_id lead_data["created_at"] = datetime.now().isoformat() leads.append(lead_data) data["leads"] = leads self._save_json(self.leads_file, data) return lead_id def get_all_leads(self) -> List[Dict]: """Get all saved leads.""" data = self._load_json(self.leads_file) return data.get("leads", []) def update_lead(self, lead_id: str, updated_data: Dict): """Update existing lead.""" data = self._load_json(self.leads_file) leads = data.get("leads", []) for i, lead in enumerate(leads): if lead.get("id") == lead_id: leads[i].update(updated_data) break data["leads"] = leads self._save_json(self.leads_file, data) # Support Ticket Management def save_support_ticket(self, ticket_data: Dict) -> str: """ Save a new support ticket. Returns: Ticket ID """ data = self._load_json(self.tickets_file) tickets = data.get("tickets", []) ticket_id = f"ticket_{len(tickets) + 1}_{datetime.now().timestamp()}" ticket_data["id"] = ticket_id ticket_data["created_at"] = datetime.now().isoformat() ticket_data["status"] = "open" tickets.append(ticket_data) data["tickets"] = tickets self._save_json(self.tickets_file, data) return ticket_id def get_all_tickets(self) -> List[Dict]: """Get all support tickets.""" data = self._load_json(self.tickets_file) return data.get("tickets", []) # Conversation Logging def save_conversation(self, conversation_data: Dict) -> str: """ Save conversation log. Returns: Conversation ID """ data = self._load_json(self.conversations_file) conversations = data.get("conversations", []) conv_id = f"conv_{len(conversations) + 1}_{datetime.now().timestamp()}" conversation_data["id"] = conv_id conversation_data["timestamp"] = datetime.now().isoformat() conversations.append(conversation_data) data["conversations"] = conversations self._save_json(self.conversations_file, data) return conv_id def get_all_conversations(self) -> List[Dict]: """Get all conversation logs.""" data = self._load_json(self.conversations_file) return data.get("conversations", []) def get_summary(self) -> Dict: """Get storage summary.""" return { "total_leads": len(self.get_all_leads()), "total_tickets": len(self.get_all_tickets()), "total_conversations": len(self.get_all_conversations()) }