Spaces:
Sleeping
Sleeping
| import matplotlib.pyplot as plt | |
| import pandas as pd | |
| import csv | |
| import json | |
| import tempfile | |
| import gc | |
| import logging | |
| from datetime import datetime | |
| from functools import wraps | |
| from contextlib import contextmanager | |
| from typing import List, Dict, Optional, Tuple, Any, Callable | |
| logger = logging.getLogger(__name__) | |
| # Decorators and Context Managers | |
| def handle_errors(default_return=None): | |
| """Centralized error handling decorator""" | |
| def decorator(func: Callable) -> Callable: | |
| def wrapper(*args, **kwargs): | |
| try: | |
| return func(*args, **kwargs) | |
| except Exception as e: | |
| logger.error(f"{func.__name__} failed: {e}") | |
| return default_return if default_return is not None else f"Error: {str(e)}" | |
| return wrapper | |
| return decorator | |
| def managed_figure(*args, **kwargs): | |
| """Context manager for matplotlib figures to prevent memory leaks""" | |
| fig = plt.figure(*args, **kwargs) | |
| try: | |
| yield fig | |
| finally: | |
| plt.close(fig) | |
| gc.collect() | |
| class HistoryManager: | |
| """Simplified history management""" | |
| def __init__(self): | |
| self._history = [] | |
| def add(self, entry: Dict): | |
| from config import config | |
| self._history.append({**entry, 'timestamp': datetime.now().isoformat()}) | |
| if len(self._history) > config.MAX_HISTORY_SIZE: | |
| self._history = self._history[-config.MAX_HISTORY_SIZE:] | |
| def get_all(self) -> List[Dict]: | |
| return self._history.copy() | |
| def clear(self) -> int: | |
| count = len(self._history) | |
| self._history.clear() | |
| return count | |
| def size(self) -> int: | |
| return len(self._history) | |
| class DataHandler: | |
| """Handles all data operations""" | |
| def export_data(data: List[Dict], format_type: str) -> Tuple[Optional[str], str]: | |
| """Universal data export""" | |
| if not data: | |
| return None, "No data to export" | |
| temp_file = tempfile.NamedTemporaryFile(mode='w', delete=False, | |
| suffix=f'.{format_type}', encoding='utf-8') | |
| if format_type == 'csv': | |
| writer = csv.writer(temp_file) | |
| writer.writerow(['Timestamp', 'Text', 'Sentiment', 'Confidence', 'Pos_Prob', 'Neg_Prob']) | |
| for entry in data: | |
| writer.writerow([ | |
| entry.get('timestamp', ''), | |
| entry.get('text', ''), | |
| entry.get('sentiment', ''), | |
| f"{entry.get('confidence', 0):.4f}", | |
| f"{entry.get('pos_prob', 0):.4f}", | |
| f"{entry.get('neg_prob', 0):.4f}" | |
| ]) | |
| elif format_type == 'json': | |
| json.dump(data, temp_file, indent=2, ensure_ascii=False) | |
| temp_file.close() | |
| return temp_file.name, f"Exported {len(data)} entries" | |
| def process_file(file) -> str: | |
| """Process uploaded file with improved CSV handling""" | |
| if not file: | |
| return "" | |
| try: | |
| file_path = file.name | |
| if file_path.endswith('.csv'): | |
| for encoding in ['utf-8', 'latin-1', 'cp1252', 'iso-8859-1']: | |
| try: | |
| df = pd.read_csv(file_path, encoding=encoding) | |
| text_columns = [] | |
| for col in df.columns: | |
| sample_values = df[col].dropna().head(10) | |
| if len(sample_values) > 0: | |
| text_count = sum(1 for val in sample_values | |
| if isinstance(val, str) and len(str(val).strip()) > 10) | |
| if text_count > len(sample_values) * 0.7: | |
| text_columns.append(col) | |
| if text_columns: | |
| selected_column = text_columns[0] | |
| else: | |
| selected_column = df.columns[0] | |
| reviews = df[selected_column].dropna().astype(str).tolist() | |
| cleaned_reviews = [] | |
| for review in reviews: | |
| review = review.strip() | |
| if len(review) > 10 and review.lower() != 'nan': | |
| cleaned_reviews.append(review) | |
| if cleaned_reviews: | |
| logger.info(f"Successfully read {len(cleaned_reviews)} reviews from CSV") | |
| return '\n'.join(cleaned_reviews) | |
| except Exception as e: | |
| continue | |
| return "Error: Could not read CSV file. Please check the file format and encoding." | |
| else: | |
| for encoding in ['utf-8', 'latin-1', 'cp1252']: | |
| try: | |
| with open(file_path, 'r', encoding=encoding) as f: | |
| content = f.read().strip() | |
| if content: | |
| return content | |
| except Exception as e: | |
| continue | |
| return "Error: Could not read text file. Please check the file encoding." | |
| except Exception as e: | |
| logger.error(f"File processing error: {e}") | |
| return f"Error processing file: {str(e)}" |