File size: 5,981 Bytes
3670fc5
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
import matplotlib.pyplot as plt
import pandas as pd
import csv
import json
import tempfile
import gc
import logging
from datetime import datetime
from functools import wraps
from contextlib import contextmanager
from typing import List, Dict, Optional, Tuple, Any, Callable


logger = logging.getLogger(__name__)


# Decorators and Context Managers
def handle_errors(default_return=None):
    """Centralized error handling decorator"""
    def decorator(func: Callable) -> Callable:
        @wraps(func)
        def wrapper(*args, **kwargs):
            try:
                return func(*args, **kwargs)
            except Exception as e:
                logger.error(f"{func.__name__} failed: {e}")
                return default_return if default_return is not None else f"Error: {str(e)}"
        return wrapper
    return decorator


@contextmanager
def managed_figure(*args, **kwargs):
    """Context manager for matplotlib figures to prevent memory leaks"""
    fig = plt.figure(*args, **kwargs)
    try:
        yield fig
    finally:
        plt.close(fig)
        gc.collect()


class HistoryManager:
    """Simplified history management"""
    def __init__(self):
        self._history = []
    
    def add(self, entry: Dict):
        from config import config
        self._history.append({**entry, 'timestamp': datetime.now().isoformat()})
        if len(self._history) > config.MAX_HISTORY_SIZE:
            self._history = self._history[-config.MAX_HISTORY_SIZE:]
    
    def get_all(self) -> List[Dict]:
        return self._history.copy()
    
    def clear(self) -> int:
        count = len(self._history)
        self._history.clear()
        return count
    
    def size(self) -> int:
        return len(self._history)


class DataHandler:
    """Handles all data operations"""
    
    @staticmethod
    @handle_errors(default_return=(None, "Export failed"))
    def export_data(data: List[Dict], format_type: str) -> Tuple[Optional[str], str]:
        """Universal data export"""
        if not data:
            return None, "No data to export"
        
        temp_file = tempfile.NamedTemporaryFile(mode='w', delete=False, 
                                               suffix=f'.{format_type}', encoding='utf-8')
        
        if format_type == 'csv':
            writer = csv.writer(temp_file)
            writer.writerow(['Timestamp', 'Text', 'Sentiment', 'Confidence', 'Pos_Prob', 'Neg_Prob'])
            for entry in data:
                writer.writerow([
                    entry.get('timestamp', ''),
                    entry.get('text', ''),
                    entry.get('sentiment', ''),
                    f"{entry.get('confidence', 0):.4f}",
                    f"{entry.get('pos_prob', 0):.4f}",
                    f"{entry.get('neg_prob', 0):.4f}"
                ])
        elif format_type == 'json':
            json.dump(data, temp_file, indent=2, ensure_ascii=False)
        
        temp_file.close()
        return temp_file.name, f"Exported {len(data)} entries"
    
    @staticmethod
    @handle_errors(default_return="")
    def process_file(file) -> str:
        """Process uploaded file with improved CSV handling"""
        if not file:
            return ""
        
        try:
            file_path = file.name
            
            if file_path.endswith('.csv'):
                for encoding in ['utf-8', 'latin-1', 'cp1252', 'iso-8859-1']:
                    try:
                        df = pd.read_csv(file_path, encoding=encoding)
                        
                        text_columns = []
                        for col in df.columns:
                            sample_values = df[col].dropna().head(10)
                            if len(sample_values) > 0:
                                text_count = sum(1 for val in sample_values 
                                               if isinstance(val, str) and len(str(val).strip()) > 10)
                                if text_count > len(sample_values) * 0.7:
                                    text_columns.append(col)
                        
                        if text_columns:
                            selected_column = text_columns[0]
                        else:
                            selected_column = df.columns[0]
                        
                        reviews = df[selected_column].dropna().astype(str).tolist()
                        
                        cleaned_reviews = []
                        for review in reviews:
                            review = review.strip()
                            if len(review) > 10 and review.lower() != 'nan':
                                cleaned_reviews.append(review)
                        
                        if cleaned_reviews:
                            logger.info(f"Successfully read {len(cleaned_reviews)} reviews from CSV")
                            return '\n'.join(cleaned_reviews)
                            
                    except Exception as e:
                        continue
                
                return "Error: Could not read CSV file. Please check the file format and encoding."
            
            else:
                for encoding in ['utf-8', 'latin-1', 'cp1252']:
                    try:
                        with open(file_path, 'r', encoding=encoding) as f:
                            content = f.read().strip()
                        if content:
                            return content
                    except Exception as e:
                        continue
                
                return "Error: Could not read text file. Please check the file encoding."
                
        except Exception as e:
            logger.error(f"File processing error: {e}")
            return f"Error processing file: {str(e)}"