Spaces:
Sleeping
Sleeping
| import csv | |
| import json | |
| import io | |
| import tempfile | |
| import re | |
| import numpy as np | |
| from datetime import datetime | |
| from functools import lru_cache | |
| from collections import Counter | |
| from typing import List, Dict, Optional, Tuple | |
| import nltk | |
| from nltk.corpus import stopwords | |
| from config import config | |
| from models import handle_errors | |
| # Initialize NLTK | |
| try: | |
| nltk.download('stopwords', quiet=True) | |
| nltk.download('punkt', quiet=True) | |
| STOP_WORDS = set(stopwords.words('english')) | |
| except: | |
| STOP_WORDS = {'the', 'a', 'an', 'and', 'or', 'but', 'in', 'on', 'at', 'to', 'for', 'of', 'with', 'by'} | |
| # Simplified Text Processing | |
| class TextProcessor: | |
| """Optimized text processing with multi-language support""" | |
| def clean_text(text: str, remove_punctuation: bool = True, remove_numbers: bool = False) -> str: | |
| """Clean text with language awareness""" | |
| text = text.strip() | |
| # Don't clean Chinese text aggressively | |
| if re.search(r'[\u4e00-\u9fff]', text): | |
| return text | |
| text = text.lower() | |
| if remove_numbers: | |
| text = re.sub(r'\d+', '', text) | |
| if remove_punctuation: | |
| text = re.sub(r'[^\w\s]', '', text) | |
| words = text.split() | |
| cleaned_words = [w for w in words if w not in STOP_WORDS and len(w) >= config.MIN_WORD_LENGTH] | |
| return ' '.join(cleaned_words) | |
| def parse_batch_input(text: str) -> List[str]: | |
| """Parse batch input from textarea""" | |
| lines = text.strip().split('\n') | |
| return [line.strip() for line in lines if line.strip()] | |
| # Enhanced History Manager | |
| class HistoryManager: | |
| """Enhanced history management with filtering""" | |
| def __init__(self): | |
| self._history = [] | |
| def add(self, entry: Dict): | |
| """Add entry with timestamp""" | |
| entry['timestamp'] = datetime.now().isoformat() | |
| self._history.append(entry) | |
| if len(self._history) > config.MAX_HISTORY_SIZE: | |
| self._history = self._history[-config.MAX_HISTORY_SIZE:] | |
| def add_batch(self, entries: List[Dict]): | |
| """Add multiple entries""" | |
| for entry in entries: | |
| self.add(entry) | |
| def get_all(self) -> List[Dict]: | |
| return self._history.copy() | |
| def get_recent(self, n: int = 10) -> List[Dict]: | |
| return self._history[-n:] if self._history else [] | |
| def filter_by(self, sentiment: str = None, language: str = None, | |
| min_confidence: float = None) -> List[Dict]: | |
| """Filter history by criteria""" | |
| filtered = self._history | |
| if sentiment: | |
| filtered = [h for h in filtered if h['sentiment'] == sentiment] | |
| if language: | |
| filtered = [h for h in filtered if h.get('language', 'en') == language] | |
| if min_confidence: | |
| filtered = [h for h in filtered if h['confidence'] >= min_confidence] | |
| return filtered | |
| def clear(self) -> int: | |
| count = len(self._history) | |
| self._history.clear() | |
| return count | |
| def size(self) -> int: | |
| return len(self._history) | |
| def get_stats(self) -> Dict: | |
| """Get comprehensive statistics""" | |
| if not self._history: | |
| return {} | |
| sentiments = [item['sentiment'] for item in self._history] | |
| confidences = [item['confidence'] for item in self._history] | |
| languages = [item.get('language', 'en') for item in self._history] | |
| return { | |
| 'total_analyses': len(self._history), | |
| 'positive_count': sentiments.count('Positive'), | |
| 'negative_count': sentiments.count('Negative'), | |
| 'neutral_count': sentiments.count('Neutral'), | |
| 'avg_confidence': np.mean(confidences), | |
| 'max_confidence': np.max(confidences), | |
| 'min_confidence': np.min(confidences), | |
| 'languages_detected': len(set(languages)), | |
| 'most_common_language': Counter(languages).most_common(1)[0][0] if languages else 'en' | |
| } | |
| # Universal Data Handler | |
| class DataHandler: | |
| """Enhanced data operations""" | |
| def export_data(data: List[Dict], format_type: str) -> Tuple[Optional[str], str]: | |
| """Export data with comprehensive information""" | |
| if not data: | |
| return None, "No data to export" | |
| temp_file = tempfile.NamedTemporaryFile(mode='w', delete=False, | |
| suffix=f'.{format_type}', encoding='utf-8') | |
| if format_type == 'csv': | |
| writer = csv.writer(temp_file) | |
| writer.writerow(['Timestamp', 'Text', 'Sentiment', 'Confidence', 'Language', | |
| 'Pos_Prob', 'Neg_Prob', 'Neu_Prob', 'Word_Count']) | |
| for entry in data: | |
| writer.writerow([ | |
| entry.get('timestamp', ''), | |
| entry.get('text', ''), | |
| entry.get('sentiment', ''), | |
| f"{entry.get('confidence', 0):.4f}", | |
| entry.get('language', 'en'), | |
| f"{entry.get('pos_prob', 0):.4f}", | |
| f"{entry.get('neg_prob', 0):.4f}", | |
| f"{entry.get('neu_prob', 0):.4f}", | |
| entry.get('word_count', 0) | |
| ]) | |
| elif format_type == 'json': | |
| json.dump(data, temp_file, indent=2, ensure_ascii=False) | |
| temp_file.close() | |
| return temp_file.name, f"Exported {len(data)} entries" | |
| def process_file(file) -> str: | |
| """Process uploaded files""" | |
| if not file: | |
| return "" | |
| content = file.read().decode('utf-8') | |
| if file.name.endswith('.csv'): | |
| csv_file = io.StringIO(content) | |
| reader = csv.reader(csv_file) | |
| try: | |
| next(reader) # Skip header | |
| texts = [] | |
| for row in reader: | |
| if row and row[0].strip(): | |
| text = row[0].strip().strip('"') | |
| if text: | |
| texts.append(text) | |
| return '\n'.join(texts) | |
| except: | |
| lines = content.strip().split('\n')[1:] | |
| texts = [] | |
| for line in lines: | |
| if line.strip(): | |
| text = line.strip().strip('"') | |
| if text: | |
| texts.append(text) | |
| return '\n'.join(texts) | |
| return content |