Spaces:
Sleeping
Sleeping
File size: 5,981 Bytes
3670fc5 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 | import matplotlib.pyplot as plt
import pandas as pd
import csv
import json
import tempfile
import gc
import logging
from datetime import datetime
from functools import wraps
from contextlib import contextmanager
from typing import List, Dict, Optional, Tuple, Any, Callable
logger = logging.getLogger(__name__)
# Decorators and Context Managers
def handle_errors(default_return=None):
"""Centralized error handling decorator"""
def decorator(func: Callable) -> Callable:
@wraps(func)
def wrapper(*args, **kwargs):
try:
return func(*args, **kwargs)
except Exception as e:
logger.error(f"{func.__name__} failed: {e}")
return default_return if default_return is not None else f"Error: {str(e)}"
return wrapper
return decorator
@contextmanager
def managed_figure(*args, **kwargs):
"""Context manager for matplotlib figures to prevent memory leaks"""
fig = plt.figure(*args, **kwargs)
try:
yield fig
finally:
plt.close(fig)
gc.collect()
class HistoryManager:
"""Simplified history management"""
def __init__(self):
self._history = []
def add(self, entry: Dict):
from config import config
self._history.append({**entry, 'timestamp': datetime.now().isoformat()})
if len(self._history) > config.MAX_HISTORY_SIZE:
self._history = self._history[-config.MAX_HISTORY_SIZE:]
def get_all(self) -> List[Dict]:
return self._history.copy()
def clear(self) -> int:
count = len(self._history)
self._history.clear()
return count
def size(self) -> int:
return len(self._history)
class DataHandler:
"""Handles all data operations"""
@staticmethod
@handle_errors(default_return=(None, "Export failed"))
def export_data(data: List[Dict], format_type: str) -> Tuple[Optional[str], str]:
"""Universal data export"""
if not data:
return None, "No data to export"
temp_file = tempfile.NamedTemporaryFile(mode='w', delete=False,
suffix=f'.{format_type}', encoding='utf-8')
if format_type == 'csv':
writer = csv.writer(temp_file)
writer.writerow(['Timestamp', 'Text', 'Sentiment', 'Confidence', 'Pos_Prob', 'Neg_Prob'])
for entry in data:
writer.writerow([
entry.get('timestamp', ''),
entry.get('text', ''),
entry.get('sentiment', ''),
f"{entry.get('confidence', 0):.4f}",
f"{entry.get('pos_prob', 0):.4f}",
f"{entry.get('neg_prob', 0):.4f}"
])
elif format_type == 'json':
json.dump(data, temp_file, indent=2, ensure_ascii=False)
temp_file.close()
return temp_file.name, f"Exported {len(data)} entries"
@staticmethod
@handle_errors(default_return="")
def process_file(file) -> str:
"""Process uploaded file with improved CSV handling"""
if not file:
return ""
try:
file_path = file.name
if file_path.endswith('.csv'):
for encoding in ['utf-8', 'latin-1', 'cp1252', 'iso-8859-1']:
try:
df = pd.read_csv(file_path, encoding=encoding)
text_columns = []
for col in df.columns:
sample_values = df[col].dropna().head(10)
if len(sample_values) > 0:
text_count = sum(1 for val in sample_values
if isinstance(val, str) and len(str(val).strip()) > 10)
if text_count > len(sample_values) * 0.7:
text_columns.append(col)
if text_columns:
selected_column = text_columns[0]
else:
selected_column = df.columns[0]
reviews = df[selected_column].dropna().astype(str).tolist()
cleaned_reviews = []
for review in reviews:
review = review.strip()
if len(review) > 10 and review.lower() != 'nan':
cleaned_reviews.append(review)
if cleaned_reviews:
logger.info(f"Successfully read {len(cleaned_reviews)} reviews from CSV")
return '\n'.join(cleaned_reviews)
except Exception as e:
continue
return "Error: Could not read CSV file. Please check the file format and encoding."
else:
for encoding in ['utf-8', 'latin-1', 'cp1252']:
try:
with open(file_path, 'r', encoding=encoding) as f:
content = f.read().strip()
if content:
return content
except Exception as e:
continue
return "Error: Could not read text file. Please check the file encoding."
except Exception as e:
logger.error(f"File processing error: {e}")
return f"Error processing file: {str(e)}" |