""" Utility functions for TranscriptorAI """ import os import json import hashlib import pickle from datetime import datetime from typing import Any, Dict, List, Optional from pathlib import Path import logging # ============================================================================ # LOGGING SETUP # ============================================================================ def setup_logging(log_file: str = "transcript_analysis.log", level: str = "INFO"): """Setup logging configuration""" logging.basicConfig( level=getattr(logging, level.upper()), format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler(log_file), logging.StreamHandler() ] ) return logging.getLogger(__name__) logger = setup_logging() # ============================================================================ # CACHING UTILITIES # ============================================================================ def get_file_hash(file_path: str) -> str: """Generate hash for a file for caching purposes""" hasher = hashlib.md5() with open(file_path, 'rb') as f: buf = f.read(65536) # Read in 64kb chunks while len(buf) > 0: hasher.update(buf) buf = f.read(65536) return hasher.hexdigest() def cache_result(key: str, data: Any, cache_dir: str = "./.cache") -> bool: """Cache a result to disk""" try: os.makedirs(cache_dir, exist_ok=True) cache_file = os.path.join(cache_dir, f"{key}.pkl") with open(cache_file, 'wb') as f: pickle.dump(data, f) logger.debug(f"Cached result for key: {key}") return True except Exception as e: logger.error(f"Failed to cache result: {e}") return False def load_cached_result(key: str, cache_dir: str = "./.cache") -> Optional[Any]: """Load a cached result from disk""" try: cache_file = os.path.join(cache_dir, f"{key}.pkl") if not os.path.exists(cache_file): return None # Check if cache is less than 7 days old file_age = datetime.now().timestamp() - os.path.getmtime(cache_file) if file_age > 7 * 24 * 3600: # 7 days logger.debug(f"Cache expired for key: {key}") return None with open(cache_file, 'rb') as f: data = pickle.load(f) logger.debug(f"Loaded cached result for key: {key}") return data except Exception as e: logger.error(f"Failed to load cached result: {e}") return None def clear_cache(cache_dir: str = "./.cache"): """Clear all cached files""" try: if os.path.exists(cache_dir): for file in os.listdir(cache_dir): file_path = os.path.join(cache_dir, file) os.remove(file_path) logger.info(f"Cleared cache directory: {cache_dir}") except Exception as e: logger.error(f"Failed to clear cache: {e}") # ============================================================================ # FILE UTILITIES # ============================================================================ def ensure_directory(path: str) -> str: """Ensure directory exists, create if not""" os.makedirs(path, exist_ok=True) return path def get_unique_filename(base_path: str, extension: str = "") -> str: """Generate unique filename by adding timestamp""" timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") base = os.path.splitext(base_path)[0] ext = extension or os.path.splitext(base_path)[1] return f"{base}_{timestamp}{ext}" def get_file_size_mb(file_path: str) -> float: """Get file size in MB""" return os.path.getsize(file_path) / (1024 * 1024) def validate_file(file_path: str, max_size_mb: int = 50, allowed_extensions: List[str] = None) -> tuple: """Validate file exists, size, and extension""" if allowed_extensions is None: allowed_extensions = ['.docx', '.pdf'] if not os.path.exists(file_path): return False, "File does not exist" if get_file_size_mb(file_path) > max_size_mb: return False, f"File exceeds {max_size_mb}MB limit" ext = os.path.splitext(file_path)[1].lower() if ext not in allowed_extensions: return False, f"File type {ext} not supported" return True, "Valid" # ============================================================================ # DATA PROCESSING UTILITIES # ============================================================================ def sanitize_text(text: str) -> str: """Sanitize text for safe processing""" # Remove null bytes text = text.replace('\x00', '') # Normalize whitespace text = ' '.join(text.split()) return text.strip() def truncate_text(text: str, max_length: int, suffix: str = "...") -> str: """Truncate text to max length with suffix""" if len(text) <= max_length: return text return text[:max_length - len(suffix)] + suffix def extract_keywords(text: str, top_n: int = 10) -> List[str]: """Extract top N keywords from text (simple frequency-based)""" from collections import Counter import re # Simple tokenization words = re.findall(r'\b[a-z]{3,}\b', text.lower()) # Remove common stop words stop_words = { 'the', 'and', 'for', 'are', 'but', 'not', 'you', 'with', 'this', 'that', 'from', 'they', 'have', 'has', 'was', 'were' } words = [w for w in words if w not in stop_words] # Count and return top N counter = Counter(words) return [word for word, count in counter.most_common(top_n)] # ============================================================================ # STATISTICS UTILITIES # ============================================================================ def calculate_statistics(values: List[float]) -> Dict[str, float]: """Calculate basic statistics for a list of values""" if not values: return {} import numpy as np return { "mean": np.mean(values), "median": np.median(values), "std": np.std(values), "min": np.min(values), "max": np.max(values), "count": len(values) } def calculate_percentile(values: List[float], percentile: int) -> float: """Calculate percentile of values""" import numpy as np return np.percentile(values, percentile) # ============================================================================ # JSON UTILITIES # ============================================================================ def save_json(data: Dict, filepath: str, pretty: bool = True) -> bool: """Save data as JSON file""" try: with open(filepath, 'w', encoding='utf-8') as f: if pretty: json.dump(data, f, indent=2, ensure_ascii=False) else: json.dump(data, f, ensure_ascii=False) logger.debug(f"Saved JSON to: {filepath}") return True except Exception as e: logger.error(f"Failed to save JSON: {e}") return False def load_json(filepath: str) -> Optional[Dict]: """Load JSON file""" try: with open(filepath, 'r', encoding='utf-8') as f: data = json.load(f) logger.debug(f"Loaded JSON from: {filepath}") return data except Exception as e: logger.error(f"Failed to load JSON: {e}") return None # ============================================================================ # PROGRESS TRACKING # ============================================================================ class ProgressTracker: """Simple progress tracker for long operations""" def __init__(self, total: int, description: str = "Processing"): self.total = total self.current = 0 self.description = description self.start_time = datetime.now() def update(self, n: int = 1): """Update progress""" self.current = min(self.current + n, self.total) self._print_progress() def _print_progress(self): """Print progress bar""" percentage = (self.current / self.total) * 100 if self.total > 0 else 0 bar_length = 40 filled = int(bar_length * self.current / self.total) if self.total > 0 else 0 bar = '█' * filled + '-' * (bar_length - filled) elapsed = (datetime.now() - self.start_time).total_seconds() eta = (elapsed / self.current * (self.total - self.current)) if self.current > 0 else 0 print(f'\r{self.description}: |{bar}| {percentage:.1f}% ({self.current}/{self.total}) ETA: {eta:.0f}s', end='') if self.current >= self.total: print() # New line when complete # ============================================================================ # ERROR HANDLING UTILITIES # ============================================================================ def safe_execute(func, *args, default=None, error_msg="Operation failed", **kwargs): """Safely execute a function with error handling""" try: return func(*args, **kwargs) except Exception as e: logger.error(f"{error_msg}: {e}") return default # ============================================================================ # TEXT COMPARISON UTILITIES # ============================================================================ def calculate_similarity(text1: str, text2: str) -> float: """Calculate simple similarity score between two texts""" words1 = set(text1.lower().split()) words2 = set(text2.lower().split()) if not words1 or not words2: return 0.0 intersection = words1.intersection(words2) union = words1.union(words2) return len(intersection) / len(union) if union else 0.0 # ============================================================================ # BATCH PROCESSING UTILITIES # ============================================================================ def batch_items(items: List, batch_size: int) -> List[List]: """Split list into batches""" return [items[i:i + batch_size] for i in range(0, len(items), batch_size)] def parallel_process(func, items: List, max_workers: int = 4): """Process items in parallel""" from concurrent.futures import ThreadPoolExecutor, as_completed results = [] with ThreadPoolExecutor(max_workers=max_workers) as executor: futures = [executor.submit(func, item) for item in items] for future in as_completed(futures): try: result = future.result() results.append(result) except Exception as e: logger.error(f"Parallel processing error: {e}") results.append(None) return results # ============================================================================ # EXPORT UTILITIES # ============================================================================ def export_to_excel(data: Dict[str, List[Dict]], filepath: str) -> bool: """Export multiple dataframes to Excel with sheets""" try: import pandas as pd with pd.ExcelWriter(filepath, engine='openpyxl') as writer: for sheet_name, rows in data.items(): df = pd.DataFrame(rows) df.to_excel(writer, sheet_name=sheet_name, index=False) logger.info(f"Exported to Excel: {filepath}") return True except Exception as e: logger.error(f"Failed to export to Excel: {e}") return False # ============================================================================ # VALIDATION UTILITIES # ============================================================================ def is_valid_email(email: str) -> bool: """Basic email validation""" import re pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$' return bool(re.match(pattern, email)) def is_valid_url(url: str) -> bool: """Basic URL validation""" import re pattern = r'^https?://[^\s<>"]+$' return bool(re.match(pattern, url)) # ============================================================================ # MAIN (FOR TESTING) # ============================================================================ if __name__ == "__main__": # Test utilities print("Testing utilities...") # Test file operations test_dir = ensure_directory("./test_output") print(f"Created test directory: {test_dir}") # Test JSON operations test_data = {"key": "value", "number": 42} save_json(test_data, "./test_output/test.json") loaded = load_json("./test_output/test.json") assert loaded == test_data, "JSON save/load failed" print("✓ JSON operations work") # Test statistics test_values = [1, 2, 3, 4, 5] stats = calculate_statistics(test_values) print(f"✓ Statistics: {stats}") # Test progress tracker tracker = ProgressTracker(10, "Test") for i in range(10): import time time.sleep(0.1) tracker.update() print("✓ Progress tracker works") print("\n✓ All utility tests passed!")