""" Utility functions for database, logging, and data persistence. """ import json import os import logging from datetime import datetime from pathlib import Path # ===== LOGGING SETUP ===== def setup_logging(): """Initialize logging configuration.""" log_dir = Path("logs") log_dir.mkdir(exist_ok=True) logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler(log_dir / f"{datetime.now().strftime('%Y-%m-%d')}.log"), logging.StreamHandler() ] ) return logging.getLogger(__name__) logger = setup_logging() # ===== JSON DATABASE ===== class JSONDatabase: """Simple JSON-based persistence layer for CRM data.""" def __init__(self, filepath: str = "data/crm_database.json"): self.filepath = Path(filepath) self.filepath.parent.mkdir(parents=True, exist_ok=True) self._ensure_file() def _ensure_file(self): """Create database file if it doesn't exist.""" if not self.filepath.exists(): self.filepath.write_text(json.dumps({"records": []}, indent=2)) logger.info(f"Created new database at {self.filepath}") def load(self) -> list: """Load all records from database.""" try: data = json.loads(self.filepath.read_text()) return data.get("records", []) except Exception as e: logger.error(f"Error loading database: {e}") return [] def save(self, records: list) -> bool: """Save records to database.""" try: data = {"records": records, "last_updated": datetime.now().isoformat()} self.filepath.write_text(json.dumps(data, indent=2, ensure_ascii=False)) logger.info(f"Saved {len(records)} records to database") return True except Exception as e: logger.error(f"Error saving database: {e}") return False def add_record(self, record: dict) -> bool: """Add a single record to database.""" records = self.load() records.append(record) return self.save(records) def delete_record(self, index: int) -> bool: """Delete a record by index.""" records = self.load() if 0 <= index < len(records): records.pop(index) return self.save(records) return False # ===== VALIDATION ===== def validate_url(url: str) -> bool: """Validate if URL is properly formatted.""" if not url or len(url) < 5: return False if not url.startswith(('http://', 'https://', 'www.', 'ftp://')): return False return True def validate_email(email: str) -> bool: """Validate email format.""" import re pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$' return re.match(pattern, email) is not None def validate_pdf(file) -> tuple[bool, str]: """Validate PDF file size and type.""" if file is None: return False, "No file selected" if file.type != "application/pdf": return False, "File must be PDF" if file.size > 50 * 1024 * 1024: # 50MB limit return False, "File size exceeds 50MB limit" return True, "File is valid" # ===== ERROR HANDLING ===== def safe_api_call(func, *args, timeout=30, **kwargs): """Safely execute API calls with error handling.""" try: return func(*args, **kwargs) except TimeoutError: logger.error(f"Timeout error in {func.__name__}") return None except Exception as e: logger.error(f"Error in {func.__name__}: {e}") return None # ===== CACHING METADATA ===== def get_cache_metadata() -> dict: """Get cache statistics.""" return { "cache_dir": ".streamlit/cache", "cache_enabled": True }