Spaces:
Sleeping
Sleeping
| """ | |
| Utility functions for TranscriptorAI | |
| """ | |
| import os | |
| import json | |
| import hashlib | |
| import pickle | |
| from datetime import datetime | |
| from typing import Any, Dict, List, Optional | |
| from pathlib import Path | |
| import logging | |
| # ============================================================================ | |
| # LOGGING SETUP | |
| # ============================================================================ | |
| def setup_logging(log_file: str = "transcript_analysis.log", level: str = "INFO"): | |
| """Setup logging configuration""" | |
| logging.basicConfig( | |
| level=getattr(logging, level.upper()), | |
| format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', | |
| handlers=[ | |
| logging.FileHandler(log_file), | |
| logging.StreamHandler() | |
| ] | |
| ) | |
| return logging.getLogger(__name__) | |
| logger = setup_logging() | |
| # ============================================================================ | |
| # CACHING UTILITIES | |
| # ============================================================================ | |
| def get_file_hash(file_path: str) -> str: | |
| """Generate hash for a file for caching purposes""" | |
| hasher = hashlib.md5() | |
| with open(file_path, 'rb') as f: | |
| buf = f.read(65536) # Read in 64kb chunks | |
| while len(buf) > 0: | |
| hasher.update(buf) | |
| buf = f.read(65536) | |
| return hasher.hexdigest() | |
| def cache_result(key: str, data: Any, cache_dir: str = "./.cache") -> bool: | |
| """Cache a result to disk""" | |
| try: | |
| os.makedirs(cache_dir, exist_ok=True) | |
| cache_file = os.path.join(cache_dir, f"{key}.pkl") | |
| with open(cache_file, 'wb') as f: | |
| pickle.dump(data, f) | |
| logger.debug(f"Cached result for key: {key}") | |
| return True | |
| except Exception as e: | |
| logger.error(f"Failed to cache result: {e}") | |
| return False | |
| def load_cached_result(key: str, cache_dir: str = "./.cache") -> Optional[Any]: | |
| """Load a cached result from disk""" | |
| try: | |
| cache_file = os.path.join(cache_dir, f"{key}.pkl") | |
| if not os.path.exists(cache_file): | |
| return None | |
| # Check if cache is less than 7 days old | |
| file_age = datetime.now().timestamp() - os.path.getmtime(cache_file) | |
| if file_age > 7 * 24 * 3600: # 7 days | |
| logger.debug(f"Cache expired for key: {key}") | |
| return None | |
| with open(cache_file, 'rb') as f: | |
| data = pickle.load(f) | |
| logger.debug(f"Loaded cached result for key: {key}") | |
| return data | |
| except Exception as e: | |
| logger.error(f"Failed to load cached result: {e}") | |
| return None | |
| def clear_cache(cache_dir: str = "./.cache"): | |
| """Clear all cached files""" | |
| try: | |
| if os.path.exists(cache_dir): | |
| for file in os.listdir(cache_dir): | |
| file_path = os.path.join(cache_dir, file) | |
| os.remove(file_path) | |
| logger.info(f"Cleared cache directory: {cache_dir}") | |
| except Exception as e: | |
| logger.error(f"Failed to clear cache: {e}") | |
| # ============================================================================ | |
| # FILE UTILITIES | |
| # ============================================================================ | |
| def ensure_directory(path: str) -> str: | |
| """Ensure directory exists, create if not""" | |
| os.makedirs(path, exist_ok=True) | |
| return path | |
| def get_unique_filename(base_path: str, extension: str = "") -> str: | |
| """Generate unique filename by adding timestamp""" | |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| base = os.path.splitext(base_path)[0] | |
| ext = extension or os.path.splitext(base_path)[1] | |
| return f"{base}_{timestamp}{ext}" | |
| def get_file_size_mb(file_path: str) -> float: | |
| """Get file size in MB""" | |
| return os.path.getsize(file_path) / (1024 * 1024) | |
| def validate_file(file_path: str, max_size_mb: int = 50, allowed_extensions: List[str] = None) -> tuple: | |
| """Validate file exists, size, and extension""" | |
| if allowed_extensions is None: | |
| allowed_extensions = ['.docx', '.pdf'] | |
| if not os.path.exists(file_path): | |
| return False, "File does not exist" | |
| if get_file_size_mb(file_path) > max_size_mb: | |
| return False, f"File exceeds {max_size_mb}MB limit" | |
| ext = os.path.splitext(file_path)[1].lower() | |
| if ext not in allowed_extensions: | |
| return False, f"File type {ext} not supported" | |
| return True, "Valid" | |
| # ============================================================================ | |
| # DATA PROCESSING UTILITIES | |
| # ============================================================================ | |
| def sanitize_text(text: str) -> str: | |
| """Sanitize text for safe processing""" | |
| # Remove null bytes | |
| text = text.replace('\x00', '') | |
| # Normalize whitespace | |
| text = ' '.join(text.split()) | |
| return text.strip() | |
| def truncate_text(text: str, max_length: int, suffix: str = "...") -> str: | |
| """Truncate text to max length with suffix""" | |
| if len(text) <= max_length: | |
| return text | |
| return text[:max_length - len(suffix)] + suffix | |
| def extract_keywords(text: str, top_n: int = 10) -> List[str]: | |
| """Extract top N keywords from text (simple frequency-based)""" | |
| from collections import Counter | |
| import re | |
| # Simple tokenization | |
| words = re.findall(r'\b[a-z]{3,}\b', text.lower()) | |
| # Remove common stop words | |
| stop_words = { | |
| 'the', 'and', 'for', 'are', 'but', 'not', 'you', 'with', | |
| 'this', 'that', 'from', 'they', 'have', 'has', 'was', 'were' | |
| } | |
| words = [w for w in words if w not in stop_words] | |
| # Count and return top N | |
| counter = Counter(words) | |
| return [word for word, count in counter.most_common(top_n)] | |
| # ============================================================================ | |
| # STATISTICS UTILITIES | |
| # ============================================================================ | |
| def calculate_statistics(values: List[float]) -> Dict[str, float]: | |
| """Calculate basic statistics for a list of values""" | |
| if not values: | |
| return {} | |
| import numpy as np | |
| return { | |
| "mean": np.mean(values), | |
| "median": np.median(values), | |
| "std": np.std(values), | |
| "min": np.min(values), | |
| "max": np.max(values), | |
| "count": len(values) | |
| } | |
| def calculate_percentile(values: List[float], percentile: int) -> float: | |
| """Calculate percentile of values""" | |
| import numpy as np | |
| return np.percentile(values, percentile) | |
| # ============================================================================ | |
| # JSON UTILITIES | |
| # ============================================================================ | |
| def save_json(data: Dict, filepath: str, pretty: bool = True) -> bool: | |
| """Save data as JSON file""" | |
| try: | |
| with open(filepath, 'w', encoding='utf-8') as f: | |
| if pretty: | |
| json.dump(data, f, indent=2, ensure_ascii=False) | |
| else: | |
| json.dump(data, f, ensure_ascii=False) | |
| logger.debug(f"Saved JSON to: {filepath}") | |
| return True | |
| except Exception as e: | |
| logger.error(f"Failed to save JSON: {e}") | |
| return False | |
| def load_json(filepath: str) -> Optional[Dict]: | |
| """Load JSON file""" | |
| try: | |
| with open(filepath, 'r', encoding='utf-8') as f: | |
| data = json.load(f) | |
| logger.debug(f"Loaded JSON from: {filepath}") | |
| return data | |
| except Exception as e: | |
| logger.error(f"Failed to load JSON: {e}") | |
| return None | |
| # ============================================================================ | |
| # PROGRESS TRACKING | |
| # ============================================================================ | |
| class ProgressTracker: | |
| """Simple progress tracker for long operations""" | |
| def __init__(self, total: int, description: str = "Processing"): | |
| self.total = total | |
| self.current = 0 | |
| self.description = description | |
| self.start_time = datetime.now() | |
| def update(self, n: int = 1): | |
| """Update progress""" | |
| self.current = min(self.current + n, self.total) | |
| self._print_progress() | |
| def _print_progress(self): | |
| """Print progress bar""" | |
| percentage = (self.current / self.total) * 100 if self.total > 0 else 0 | |
| bar_length = 40 | |
| filled = int(bar_length * self.current / self.total) if self.total > 0 else 0 | |
| bar = 'β' * filled + '-' * (bar_length - filled) | |
| elapsed = (datetime.now() - self.start_time).total_seconds() | |
| eta = (elapsed / self.current * (self.total - self.current)) if self.current > 0 else 0 | |
| print(f'\r{self.description}: |{bar}| {percentage:.1f}% ({self.current}/{self.total}) ETA: {eta:.0f}s', end='') | |
| if self.current >= self.total: | |
| print() # New line when complete | |
| # ============================================================================ | |
| # ERROR HANDLING UTILITIES | |
| # ============================================================================ | |
| def safe_execute(func, *args, default=None, error_msg="Operation failed", **kwargs): | |
| """Safely execute a function with error handling""" | |
| try: | |
| return func(*args, **kwargs) | |
| except Exception as e: | |
| logger.error(f"{error_msg}: {e}") | |
| return default | |
| # ============================================================================ | |
| # TEXT COMPARISON UTILITIES | |
| # ============================================================================ | |
| def calculate_similarity(text1: str, text2: str) -> float: | |
| """Calculate simple similarity score between two texts""" | |
| words1 = set(text1.lower().split()) | |
| words2 = set(text2.lower().split()) | |
| if not words1 or not words2: | |
| return 0.0 | |
| intersection = words1.intersection(words2) | |
| union = words1.union(words2) | |
| return len(intersection) / len(union) if union else 0.0 | |
| # ============================================================================ | |
| # BATCH PROCESSING UTILITIES | |
| # ============================================================================ | |
| def batch_items(items: List, batch_size: int) -> List[List]: | |
| """Split list into batches""" | |
| return [items[i:i + batch_size] for i in range(0, len(items), batch_size)] | |
| def parallel_process(func, items: List, max_workers: int = 4): | |
| """Process items in parallel""" | |
| from concurrent.futures import ThreadPoolExecutor, as_completed | |
| results = [] | |
| with ThreadPoolExecutor(max_workers=max_workers) as executor: | |
| futures = [executor.submit(func, item) for item in items] | |
| for future in as_completed(futures): | |
| try: | |
| result = future.result() | |
| results.append(result) | |
| except Exception as e: | |
| logger.error(f"Parallel processing error: {e}") | |
| results.append(None) | |
| return results | |
| # ============================================================================ | |
| # EXPORT UTILITIES | |
| # ============================================================================ | |
| def export_to_excel(data: Dict[str, List[Dict]], filepath: str) -> bool: | |
| """Export multiple dataframes to Excel with sheets""" | |
| try: | |
| import pandas as pd | |
| with pd.ExcelWriter(filepath, engine='openpyxl') as writer: | |
| for sheet_name, rows in data.items(): | |
| df = pd.DataFrame(rows) | |
| df.to_excel(writer, sheet_name=sheet_name, index=False) | |
| logger.info(f"Exported to Excel: {filepath}") | |
| return True | |
| except Exception as e: | |
| logger.error(f"Failed to export to Excel: {e}") | |
| return False | |
| # ============================================================================ | |
| # VALIDATION UTILITIES | |
| # ============================================================================ | |
| def is_valid_email(email: str) -> bool: | |
| """Basic email validation""" | |
| import re | |
| pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$' | |
| return bool(re.match(pattern, email)) | |
| def is_valid_url(url: str) -> bool: | |
| """Basic URL validation""" | |
| import re | |
| pattern = r'^https?://[^\s<>"]+$' | |
| return bool(re.match(pattern, url)) | |
| # ============================================================================ | |
| # MAIN (FOR TESTING) | |
| # ============================================================================ | |
| if __name__ == "__main__": | |
| # Test utilities | |
| print("Testing utilities...") | |
| # Test file operations | |
| test_dir = ensure_directory("./test_output") | |
| print(f"Created test directory: {test_dir}") | |
| # Test JSON operations | |
| test_data = {"key": "value", "number": 42} | |
| save_json(test_data, "./test_output/test.json") | |
| loaded = load_json("./test_output/test.json") | |
| assert loaded == test_data, "JSON save/load failed" | |
| print("β JSON operations work") | |
| # Test statistics | |
| test_values = [1, 2, 3, 4, 5] | |
| stats = calculate_statistics(test_values) | |
| print(f"β Statistics: {stats}") | |
| # Test progress tracker | |
| tracker = ProgressTracker(10, "Test") | |
| for i in range(10): | |
| import time | |
| time.sleep(0.1) | |
| tracker.update() | |
| print("β Progress tracker works") | |
| print("\nβ All utility tests passed!") |