| """ |
| Utility functions for database, logging, and data persistence. |
| """ |
| import json |
| import os |
| import logging |
| from datetime import datetime |
| from pathlib import Path |
|
|
| |
| def setup_logging(): |
| """Initialize logging configuration.""" |
| log_dir = Path("logs") |
| log_dir.mkdir(exist_ok=True) |
| |
| logging.basicConfig( |
| level=logging.INFO, |
| format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', |
| handlers=[ |
| logging.FileHandler(log_dir / f"{datetime.now().strftime('%Y-%m-%d')}.log"), |
| logging.StreamHandler() |
| ] |
| ) |
| return logging.getLogger(__name__) |
|
|
| logger = setup_logging() |
|
|
| |
| class JSONDatabase: |
| """Simple JSON-based persistence layer for CRM data.""" |
| |
| def __init__(self, filepath: str = "data/crm_database.json"): |
| self.filepath = Path(filepath) |
| self.filepath.parent.mkdir(parents=True, exist_ok=True) |
| self._ensure_file() |
| |
| def _ensure_file(self): |
| """Create database file if it doesn't exist.""" |
| if not self.filepath.exists(): |
| self.filepath.write_text(json.dumps({"records": []}, indent=2)) |
| logger.info(f"Created new database at {self.filepath}") |
| |
| def load(self) -> list: |
| """Load all records from database.""" |
| try: |
| data = json.loads(self.filepath.read_text()) |
| return data.get("records", []) |
| except Exception as e: |
| logger.error(f"Error loading database: {e}") |
| return [] |
| |
| def save(self, records: list) -> bool: |
| """Save records to database.""" |
| try: |
| data = {"records": records, "last_updated": datetime.now().isoformat()} |
| self.filepath.write_text(json.dumps(data, indent=2, ensure_ascii=False)) |
| logger.info(f"Saved {len(records)} records to database") |
| return True |
| except Exception as e: |
| logger.error(f"Error saving database: {e}") |
| return False |
| |
| def add_record(self, record: dict) -> bool: |
| """Add a single record to database.""" |
| records = self.load() |
| records.append(record) |
| return self.save(records) |
| |
| def delete_record(self, index: int) -> bool: |
| """Delete a record by index.""" |
| records = self.load() |
| if 0 <= index < len(records): |
| records.pop(index) |
| return self.save(records) |
| return False |
|
|
| |
| def validate_url(url: str) -> bool: |
| """Validate if URL is properly formatted.""" |
| if not url or len(url) < 5: |
| return False |
| if not url.startswith(('http://', 'https://', 'www.', 'ftp://')): |
| return False |
| return True |
|
|
| def validate_email(email: str) -> bool: |
| """Validate email format.""" |
| import re |
| pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$' |
| return re.match(pattern, email) is not None |
|
|
| def validate_pdf(file) -> tuple[bool, str]: |
| """Validate PDF file size and type.""" |
| if file is None: |
| return False, "No file selected" |
| if file.type != "application/pdf": |
| return False, "File must be PDF" |
| if file.size > 50 * 1024 * 1024: |
| return False, "File size exceeds 50MB limit" |
| return True, "File is valid" |
|
|
| |
| def safe_api_call(func, *args, timeout=30, **kwargs): |
| """Safely execute API calls with error handling.""" |
| try: |
| return func(*args, **kwargs) |
| except TimeoutError: |
| logger.error(f"Timeout error in {func.__name__}") |
| return None |
| except Exception as e: |
| logger.error(f"Error in {func.__name__}: {e}") |
| return None |
|
|
| |
| def get_cache_metadata() -> dict: |
| """Get cache statistics.""" |
| return { |
| "cache_dir": ".streamlit/cache", |
| "cache_enabled": True |
| } |
|
|