Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| from gradio import blocks | |
| from gradio_client import utils as client_utils | |
| import spaces | |
| import pandas as pd | |
| import torch | |
| from transformers import AutoModelForSequenceClassification, AutoModelForSeq2SeqLM, AutoTokenizer, AutoModel | |
| import plotly.graph_objects as go | |
| import logging | |
| import io | |
| from rapidfuzz import fuzz | |
| import time | |
| import os | |
| groq_key = os.environ['groq_key'] | |
| from langchain_openai import ChatOpenAI | |
| from langchain.prompts import PromptTemplate | |
| from openpyxl import load_workbook | |
| from openpyxl.utils.dataframe import dataframe_to_rows | |
| import torch.nn.functional as F | |
| import numpy as np | |
| import logging | |
| from typing import List, Set, Tuple | |
| import asyncio | |
| os.environ["ZEROGPU_ENABLED"] = "1" | |
| os.environ["CUDA_VISIBLE_DEVICES"] = "0" | |
| # Force torch to recognize CUDA if ZeroGPU is available | |
| if hasattr(spaces, "GPU_ENABLED"): | |
| print(f"🔍 Initial spaces.GPU_ENABLED: {spaces.GPU_ENABLED}") | |
| # Try to activate spaces GPU support | |
| try: | |
| spaces.GPU_ENABLED = True | |
| print(f"🔍 Set spaces.GPU_ENABLED to True") | |
| except: | |
| print(f"🔍 Could not set spaces.GPU_ENABLED") | |
| # Verify GPU detection after configuration | |
| def verify_gpu(): | |
| print("🔍 GPU Detection:") | |
| print(f" - torch.cuda.is_available(): {torch.cuda.is_available()}") | |
| if torch.cuda.is_available(): | |
| print(f" - CUDA Device: {torch.cuda.get_device_name(0)}") | |
| print(f" - CUDA Version: {torch.version.cuda}") | |
| print(f" - spaces.GPU_ENABLED: {hasattr(spaces, 'GPU_ENABLED') and spaces.GPU_ENABLED}") | |
| print(f" - CUDA_VISIBLE_DEVICES: {os.environ.get('CUDA_VISIBLE_DEVICES', 'not set')}") | |
| print(f" - ZEROGPU_ENABLED: {os.environ.get('ZEROGPU_ENABLED', 'not set')}") | |
| # Run verification at startup | |
| verify_gpu() | |
| def patch_gradio(): | |
| # Patch the get_type function to handle boolean schemas | |
| original_get_type = client_utils.get_type | |
| def safe_get_type(schema): | |
| if isinstance(schema, bool): | |
| return "bool" # Return a string type for boolean values | |
| try: | |
| return original_get_type(schema) | |
| except Exception: | |
| return "Any" # Fallback type for any other issues | |
| client_utils.get_type = safe_get_type | |
| # Patch json_schema_to_python_type to handle exceptions | |
| original_json_schema = client_utils.json_schema_to_python_type | |
| def safe_json_schema(schema, *args, **kwargs): | |
| try: | |
| return original_json_schema(schema, *args, **kwargs) | |
| except Exception: | |
| return "Any" # Fallback for any schema parsing issues | |
| client_utils.json_schema_to_python_type = safe_json_schema | |
| # Disable API info generation entirely if it fails | |
| original_get_api_info = blocks.Blocks.get_api_info | |
| def safe_get_api_info(self): | |
| try: | |
| return original_get_api_info(self) | |
| except Exception: | |
| return {"components": [], "dependencies": []} # Empty API info | |
| blocks.Blocks.get_api_info = safe_get_api_info | |
| print("✅ Gradio patched successfully") | |
| # Run the patch | |
| patch_gradio() | |
| def fuzzy_deduplicate(df, column, threshold=55): | |
| """Deduplicate rows based on fuzzy matching of text content""" | |
| seen_texts = [] | |
| indices_to_keep = [] | |
| for i, text in enumerate(df[column]): | |
| if pd.isna(text): | |
| indices_to_keep.append(i) | |
| continue | |
| text = str(text) | |
| if not seen_texts or all(fuzz.ratio(text, seen) < threshold for seen in seen_texts): | |
| seen_texts.append(text) | |
| indices_to_keep.append(i) | |
| return df.iloc[indices_to_keep] | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| class GPUTaskManager: | |
| def __init__(self, max_retries=3, retry_delay=30, cleanup_callback=None): | |
| self.max_retries = max_retries | |
| self.retry_delay = retry_delay | |
| self.cleanup_callback = cleanup_callback | |
| async def run_with_retry(self, task_func, *args, **kwargs): | |
| """Execute a GPU task with retry logic""" | |
| for attempt in range(self.max_retries): | |
| try: | |
| return await task_func(*args, **kwargs) | |
| except Exception as e: | |
| if "GPU task aborted" in str(e) or "GPU quota" in str(e): | |
| if attempt < self.max_retries - 1: | |
| if self.cleanup_callback: | |
| self.cleanup_callback() | |
| torch.cuda.empty_cache() | |
| await asyncio.sleep(self.retry_delay) | |
| continue | |
| raise | |
| def batch_process(items, batch_size=3): | |
| """Split items into smaller batches""" | |
| return [items[i:i + batch_size] for i in range(0, len(items), batch_size)] | |
| def is_gpu_error(error): | |
| """Check if an error is GPU-related""" | |
| error_msg = str(error).lower() | |
| return any(msg in error_msg for msg in [ | |
| "gpu task aborted", | |
| "gpu quota", | |
| "cuda out of memory", | |
| "device-side assert" | |
| ]) | |
| class ProcessControl: | |
| def __init__(self): | |
| self.stop_requested = False | |
| def request_stop(self): | |
| self.stop_requested = True | |
| def should_stop(self): | |
| return self.stop_requested | |
| def reset(self): | |
| self.stop_requested = False | |
| class ProcessControl: | |
| def __init__(self): | |
| self.stop_requested = False | |
| self.error = None | |
| def request_stop(self): | |
| self.stop_requested = True | |
| def should_stop(self): | |
| return self.stop_requested | |
| def reset(self): | |
| self.stop_requested = False | |
| self.error = None | |
| def set_error(self, error): | |
| self.error = error | |
| self.stop_requested = True | |
| class EventDetector: | |
| def __init__(self): | |
| try: | |
| device = "cuda" if torch.cuda.is_available() else "cpu" | |
| logger.info(f"Initializing models on device: {device}") | |
| # Initialize all models | |
| self.initialize_models(device) | |
| # Initialize transformer for declusterization | |
| self.tokenizer_cluster = AutoTokenizer.from_pretrained('sentence-transformers/paraphrase-multilingual-mpnet-base-v2') | |
| self.model_cluster = AutoModel.from_pretrained('sentence-transformers/paraphrase-multilingual-mpnet-base-v2').to(device) | |
| self.device = device | |
| self.initialized = True | |
| logger.info("All models initialized successfully") | |
| except Exception as e: | |
| logger.error(f"Error in EventDetector initialization: {str(e)}") | |
| raise | |
| def mean_pooling(self, model_output, attention_mask): | |
| token_embeddings = model_output[0] | |
| input_mask_expanded = attention_mask.unsqueeze(-1).expand(token_embeddings.size()).float() | |
| return torch.sum(token_embeddings * input_mask_expanded, 1) / torch.clamp(input_mask_expanded.sum(1), min=1e-9) | |
| def encode_text(self, text): | |
| if pd.isna(text): | |
| text = "" | |
| text = str(text) | |
| encoded_input = self.tokenizer_cluster(text, padding=True, truncation=True, max_length=512, return_tensors='pt').to(self.device) | |
| with torch.no_grad(): | |
| model_output = self.model_cluster(**encoded_input) | |
| sentence_embeddings = self.mean_pooling(model_output, encoded_input['attention_mask']) | |
| return torch.nn.functional.normalize(sentence_embeddings[0], p=2, dim=0) | |
| def decluster_texts(self, df, text_column, similarity_threshold=0.75, time_threshold=24): | |
| try: | |
| if df.empty: | |
| return df | |
| # Sort by datetime if available | |
| if 'datetime' in df.columns: | |
| df = df.sort_values('datetime') | |
| # Initialize lists and sets for tracking | |
| indices_to_delete = set() | |
| # Process each text | |
| for i in df.index: | |
| if i in indices_to_delete: # Skip if already marked for deletion | |
| continue | |
| text1 = df.loc[i, text_column] | |
| if pd.isna(text1): | |
| continue | |
| text1_embedding = self.encode_text(text1) | |
| current_cluster = [] | |
| # Compare with other texts | |
| for j in df.index: | |
| if i == j or j in indices_to_delete: # Skip same text or already marked | |
| continue | |
| text2 = df.loc[j, text_column] | |
| if pd.isna(text2): | |
| continue | |
| # Check time difference if datetime available | |
| if 'datetime' in df.columns: | |
| time_diff = pd.to_datetime(df.loc[j, 'datetime']) - pd.to_datetime(df.loc[i, 'datetime']) | |
| if abs(time_diff.total_seconds() / 3600) > time_threshold: | |
| continue | |
| text2_embedding = self.encode_text(text2) | |
| similarity = torch.dot(text1_embedding, text2_embedding).item() | |
| if similarity >= similarity_threshold: | |
| current_cluster.append(j) | |
| # If we found similar texts, keep the longest one | |
| if current_cluster: | |
| current_cluster.append(i) # Add the current text to cluster | |
| text_lengths = df.loc[current_cluster, text_column].fillna('').str.len() | |
| longest_text_idx = text_lengths.idxmax() | |
| # Mark all except longest for deletion | |
| indices_to_delete.update(set(current_cluster) - {longest_text_idx}) | |
| # Return DataFrame without deleted rows | |
| return df.drop(index=list(indices_to_delete)) | |
| except Exception as e: | |
| logger.error(f"Declusterization error: {str(e)}") | |
| return df | |
| def initialize_models(self, device): | |
| """Initialize all models with GPU support""" | |
| try: | |
| # Initialize translation model with safetensors format | |
| from transformers import AutoModelForSeq2SeqLM, MarianTokenizer | |
| # Direct model loading with safetensors | |
| self.translator_tokenizer = MarianTokenizer.from_pretrained("Helsinki-NLP/opus-mt-ru-en") | |
| self.translator_model = AutoModelForSeq2SeqLM.from_pretrained( | |
| "Helsinki-NLP/opus-mt-ru-en", | |
| use_safetensors=True, # Force safetensors | |
| from_tf=False, # Not from TensorFlow | |
| device_map=device | |
| ) | |
| # Create your own translation function | |
| def translate_fn(text): | |
| inputs = self.translator_tokenizer(text, return_tensors="pt").to(device) | |
| outputs = self.translator_model.generate(**inputs) | |
| return self.translator_tokenizer.decode(outputs[0], skip_special_tokens=True) | |
| # Replace pipeline with custom function | |
| self.translator = translate_fn | |
| # Same for Russian translator | |
| self.rutranslator_tokenizer = MarianTokenizer.from_pretrained("Helsinki-NLP/opus-mt-en-ru") | |
| self.rutranslator_model = AutoModelForSeq2SeqLM.from_pretrained( | |
| "Helsinki-NLP/opus-mt-en-ru", | |
| use_safetensors=True, | |
| device_map=device | |
| ) | |
| def ru_translate_fn(text): | |
| inputs = self.rutranslator_tokenizer(text, return_tensors="pt").to(device) | |
| outputs = self.rutranslator_model.generate(**inputs) | |
| return self.rutranslator_tokenizer.decode(outputs[0], skip_special_tokens=True) | |
| self.rutranslator = ru_translate_fn | |
| # Initialize FinBERT | |
| finbert_tokenizer = AutoTokenizer.from_pretrained("ProsusAI/finbert") | |
| finbert_model = AutoModelForSequenceClassification.from_pretrained( | |
| "ProsusAI/finbert", | |
| use_safetensors=True, | |
| device_map=device | |
| ) | |
| # Define custom pipeline instead of using transformers pipeline | |
| def finbert_sentiment(text): | |
| inputs = finbert_tokenizer(text, return_tensors="pt", truncation=True, max_length=512).to(device) | |
| outputs = finbert_model(**inputs) | |
| probs = outputs.logits.softmax(dim=1)[0] | |
| pred_idx = probs.argmax().item() | |
| labels = ["negative", "neutral", "positive"] | |
| return {"label": labels[pred_idx], "score": probs[pred_idx].item()} | |
| self.finbert = lambda text: [finbert_sentiment(text)] | |
| # Initialize RoBERTa | |
| roberta_tokenizer = AutoTokenizer.from_pretrained("cardiffnlp/twitter-roberta-base-sentiment") | |
| roberta_model = AutoModelForSequenceClassification.from_pretrained( | |
| "cardiffnlp/twitter-roberta-base-sentiment", | |
| use_safetensors=True, | |
| device_map=device | |
| ) | |
| # Define custom pipeline instead of using transformers pipeline | |
| def roberta_sentiment(text): | |
| inputs = roberta_tokenizer(text, return_tensors="pt", truncation=True, max_length=512).to(device) | |
| outputs = roberta_model(**inputs) | |
| probs = outputs.logits.softmax(dim=1)[0] | |
| pred_idx = probs.argmax().item() | |
| labels = ["negative", "neutral", "positive"] | |
| return {"label": labels[pred_idx], "score": probs[pred_idx].item()} | |
| self.roberta = lambda text: [roberta_sentiment(text)] | |
| # Initialize finbert_tone | |
| finbert_tone_tokenizer = AutoTokenizer.from_pretrained("yiyanghkust/finbert-tone") | |
| finbert_tone_model = AutoModelForSequenceClassification.from_pretrained( | |
| "yiyanghkust/finbert-tone", | |
| use_safetensors=True, | |
| device_map=device | |
| ) | |
| # Define custom pipeline instead of using transformers pipeline | |
| def finbert_tone_sentiment(text): | |
| inputs = finbert_tone_tokenizer(text, return_tensors="pt", truncation=True, max_length=512).to(device) | |
| outputs = finbert_tone_model(**inputs) | |
| probs = outputs.logits.softmax(dim=1)[0] | |
| pred_idx = probs.argmax().item() | |
| labels = ["negative", "neutral", "positive"] | |
| return {"label": labels[pred_idx], "score": probs[pred_idx].item()} | |
| self.finbert_tone = lambda text: [finbert_tone_sentiment(text)] | |
| # Initialize MT5 model with safetensors | |
| self.model_name = "google/mt5-small" | |
| self.tokenizer = AutoTokenizer.from_pretrained( | |
| self.model_name, | |
| legacy=True | |
| ) | |
| self.model = AutoModelForSeq2SeqLM.from_pretrained( | |
| self.model_name, | |
| use_safetensors=True, | |
| device_map=device | |
| ) | |
| # Initialize Groq LLM if key is available | |
| if 'groq_key': | |
| self.groq = ChatOpenAI( | |
| base_url="https://api.groq.com/openai/v1", | |
| model="llama-3.3-70b-versatile", | |
| openai_api_key=groq_key, | |
| temperature=0.0 | |
| ) | |
| else: | |
| logger.warning("Groq API key not found, impact estimation will be limited") | |
| self.groq = None | |
| except Exception as e: | |
| logger.error(f"Error in model initialization: {str(e)}") | |
| raise | |
| def _translate_text(self, text): | |
| """Translate Russian text to English""" | |
| try: | |
| if not text or not isinstance(text, str): | |
| return "" | |
| text = text.strip() | |
| if not text: | |
| return "" | |
| # Split into manageable chunks | |
| max_length = 450 | |
| chunks = [text[i:i + max_length] for i in range(0, len(text), max_length)] | |
| translated_chunks = [] | |
| for chunk in chunks: | |
| result = self.translator(chunk)[0]['translation_text'] | |
| translated_chunks.append(result) | |
| time.sleep(0.1) # Rate limiting | |
| return " ".join(translated_chunks) | |
| except Exception as e: | |
| logger.error(f"Translation error: {str(e)}") | |
| return text | |
| def analyze_sentiment(self, text): | |
| """Enhanced sentiment analysis with better negative detection""" | |
| try: | |
| if not text or not isinstance(text, str): | |
| return "Neutral" | |
| text = text.strip() | |
| if not text: | |
| return "Neutral" | |
| # Get predictions with confidence scores | |
| finbert_result = self.finbert(text)[0] | |
| roberta_result = self.roberta(text)[0] | |
| finbert_tone_result = self.finbert_tone(text)[0] | |
| # Enhanced sentiment mapping with confidence thresholds | |
| def map_sentiment(result): | |
| label = result['label'].lower() | |
| score = result['score'] | |
| # Higher threshold for positive to reduce false positives | |
| if label in ['positive', 'pos', 'positive tone'] and score > 0.75: | |
| logger.info(f"Positive: {str(score)}") | |
| return "Positive" | |
| # Lower threshold for negative to catch more cases | |
| elif label in ['negative', 'neg', 'negative tone'] and score > 0.75: | |
| logger.info(f"Negative: {str(score)}") | |
| return "Negative" | |
| # Consider high-confidence neutral predictions | |
| elif label == 'neutral' and score > 0.8: | |
| logger.info(f"Neutral: {str(score)}") | |
| return "Neutral" | |
| # Default to negative for uncertain cases in financial context | |
| else: | |
| return "Negative" if score > 0.4 else "Neutral" | |
| # Get mapped sentiments with confidence-based logic | |
| sentiments = [ | |
| map_sentiment(finbert_result), | |
| map_sentiment(roberta_result), | |
| map_sentiment(finbert_tone_result) | |
| ] | |
| # Weighted voting - prioritize negative signals | |
| if "Negative" in sentiments: | |
| neg_count = sentiments.count("Negative") | |
| if neg_count >= 2: # negative should be consensus | |
| return "Negative" | |
| pos_count = sentiments.count("Positive") | |
| if pos_count >= 2: # Require stronger positive consensus | |
| return "Positive" | |
| return "Neutral" | |
| except Exception as e: | |
| logger.error(f"Sentiment analysis error: {str(e)}") | |
| return "Neutral" | |
| def estimate_impact(self, text, entity): | |
| """Estimate impact using Groq for negative sentiment texts""" | |
| try: | |
| if not self.groq: | |
| return "Неопределенный эффект", "Groq API недоступен" | |
| template = """ | |
| You are a financial analyst. Analyze this news about {entity} and assess its potential impact. | |
| News: {news} | |
| Classify the impact into one of these categories: | |
| 1. "Значительный риск убытков" (Significant loss risk) | |
| 2. "Умеренный риск убытков" (Moderate loss risk) | |
| 3. "Незначительный риск убытков" (Minor loss risk) | |
| 4. "Вероятность прибыли" (Potential profit) | |
| 5. "Неопределенный эффект" (Uncertain effect) | |
| Format your response exactly as: | |
| Impact: [category] | |
| Reasoning: [explanation in 2-3 sentences] | |
| """ | |
| prompt = PromptTemplate(template=template, input_variables=["entity", "news"]) | |
| chain = prompt | self.groq | |
| response = chain.invoke({ | |
| "entity": entity, | |
| "news": text | |
| }) | |
| # Parse response | |
| response_text = response.content if hasattr(response, 'content') else str(response) | |
| if "Impact:" in response_text and "Reasoning:" in response_text: | |
| parts = response_text.split("Reasoning:") | |
| impact = parts[0].split("Impact:")[1].strip() | |
| reasoning = parts[1].strip() | |
| else: | |
| impact = "Неопределенный эффект" | |
| reasoning = "Не удалось определить влияние" | |
| return impact, reasoning | |
| except Exception as e: | |
| logger.error(f"Impact estimation error: {str(e)}") | |
| return "Неопределенный эффект", f"Ошибка анализа: {str(e)}" | |
| def process_text(self, text, entity): | |
| # Force CUDA at function level too | |
| if torch.cuda.is_available(): | |
| torch.cuda.set_device(0) | |
| print(f"Using CUDA device: {torch.cuda.get_device_name(0)}") | |
| """Process text with Groq-driven sentiment analysis""" | |
| try: | |
| translated_text = self._translate_text(text) | |
| initial_sentiment = self.analyze_sentiment(translated_text) | |
| impact = "Неопределенный эффект" | |
| reasoning = "" | |
| # Always get Groq analysis for all texts | |
| impact, reasoning = self.estimate_impact(translated_text, entity) | |
| reasoning = self.rutranslator(reasoning)[0]['translation_text'] | |
| # Override sentiment based on Groq impact | |
| final_sentiment = initial_sentiment | |
| if impact == "Вероятность прибыли": | |
| final_sentiment = "Positive" | |
| event_type, event_summary = self.detect_events(text, entity) | |
| return { | |
| 'translated_text': translated_text, | |
| 'sentiment': final_sentiment, | |
| 'impact': impact, | |
| 'reasoning': reasoning, | |
| 'event_type': event_type, | |
| 'event_summary': event_summary | |
| } | |
| except Exception as e: | |
| logger.error(f"Text processing error: {str(e)}") | |
| return { | |
| 'translated_text': '', | |
| 'sentiment': 'Neutral', | |
| 'impact': 'Неопределенный эффект', | |
| 'reasoning': f'Ошибка обработки: {str(e)}', | |
| 'event_type': 'Нет', | |
| 'event_summary': '' | |
| } | |
| def detect_events(self, text, entity): | |
| if not text or not entity: | |
| return "Нет", "Invalid input" | |
| try: | |
| # Improved prompt for MT5 | |
| prompt = f"""<s>Analyze this news about {entity}: | |
| Text: {text} | |
| Classify this news into ONE of these categories: | |
| 1. "Отчетность" if about: financial reports, revenue, profit, EBITDA, financial results, quarterly/annual reports | |
| 2. "Суд" if about: court cases, lawsuits, arbitration, bankruptcy, legal proceedings | |
| 3. "РЦБ" if about: bonds, securities, defaults, debt restructuring, coupon payments | |
| 4. "Нет" if none of the above | |
| Provide classification and 2-3 sentence summary focusing on key facts. | |
| Format response exactly as: | |
| Category: [category name] | |
| Summary: [brief factual summary]</s>""" | |
| inputs = self.tokenizer( | |
| prompt, | |
| return_tensors="pt", | |
| padding=True, | |
| truncation=True, | |
| max_length=512 | |
| ).to(self.device) | |
| outputs = self.model.generate( | |
| **inputs, | |
| max_length=200, | |
| num_return_sequences=1, | |
| do_sample=False, | |
| #temperature=0.0, | |
| #top_p=0.9, | |
| no_repeat_ngram_size=3 | |
| ) | |
| response = self.tokenizer.decode(outputs[0], skip_special_tokens=True) | |
| # Extract category and summary | |
| if "Category:" in response and "Summary:" in response: | |
| parts = response.split("Summary:") | |
| category = parts[0].split("Category:")[1].strip() | |
| summary = parts[1].strip() | |
| # Validate category | |
| valid_categories = {"Отчетность", "Суд", "РЦБ", "Нет"} | |
| category = category if category in valid_categories else "Нет" | |
| return category, summary | |
| return "Нет", "Could not classify event" | |
| except Exception as e: | |
| logger.error(f"Event detection error: {str(e)}") | |
| return "Нет", f"Error in event detection: {str(e)}" | |
| def cleanup(self): | |
| """Clean up GPU resources""" | |
| try: | |
| self.model = None | |
| self.translator = None | |
| self.finbert = None | |
| self.roberta = None | |
| self.finbert_tone = None | |
| torch.cuda.empty_cache() | |
| self.initialized = False | |
| logger.info("Cleaned up GPU resources") | |
| except Exception as e: | |
| logger.error(f"Error in cleanup: {str(e)}") | |
| def create_visualizations(df): | |
| if df is None or df.empty: | |
| return None, None | |
| try: | |
| sentiments = df['Sentiment'].value_counts() | |
| fig_sentiment = go.Figure(data=[go.Pie( | |
| labels=sentiments.index, | |
| values=sentiments.values, | |
| marker_colors=['#FF6B6B', '#4ECDC4', '#95A5A6'] | |
| )]) | |
| fig_sentiment.update_layout(title="Распределение тональности") | |
| events = df['Event_Type'].value_counts() | |
| fig_events = go.Figure(data=[go.Bar( | |
| x=events.index, | |
| y=events.values, | |
| marker_color='#2196F3' | |
| )]) | |
| fig_events.update_layout(title="Распределение событий") | |
| return fig_sentiment, fig_events | |
| except Exception as e: | |
| logger.error(f"Visualization error: {e}") | |
| return None, None | |
| def process_file(file_obj): | |
| try: | |
| logger.info("Starting to read Excel file...") | |
| df = pd.read_excel(file_obj, sheet_name='Публикации') | |
| logger.info(f"Successfully read Excel file. Shape: {df.shape}") | |
| # Deduplication | |
| original_count = len(df) | |
| df = fuzzy_deduplicate(df, 'Выдержки из текста', threshold=55) | |
| logger.info(f"Removed {original_count - len(df)} duplicate entries") | |
| detector = EventDetector() | |
| processed_rows = [] | |
| total = len(df) | |
| # Process in smaller batches with quota management | |
| BATCH_SIZE = 3 # Reduced batch size | |
| QUOTA_WAIT_TIME = 60 # Wait time when quota is exceeded | |
| for batch_start in range(0, total, BATCH_SIZE): | |
| try: | |
| batch_end = min(batch_start + BATCH_SIZE, total) | |
| batch = df.iloc[batch_start:batch_end] | |
| # Initialize models for batch | |
| if not detector.initialized: | |
| detector.initialize_models() | |
| time.sleep(1) # Wait after initialization | |
| for idx, row in batch.iterrows(): | |
| try: | |
| text = str(row.get('Выдержки из текста', '')) | |
| if not text.strip(): | |
| continue | |
| entity = str(row.get('Объект', '')) | |
| if not entity.strip(): | |
| continue | |
| # Process with GPU quota management | |
| event_type = "Нет" | |
| event_summary = "" | |
| sentiment = "Neutral" | |
| try: | |
| event_type, event_summary = detector.detect_events(text, entity) | |
| time.sleep(1) # Wait between GPU operations | |
| sentiment = detector.analyze_sentiment(text) | |
| except Exception as e: | |
| if "GPU quota" in str(e): | |
| logger.warning("GPU quota exceeded, waiting...") | |
| time.sleep(QUOTA_WAIT_TIME) | |
| continue | |
| else: | |
| raise e | |
| processed_rows.append({ | |
| 'Объект': entity, | |
| 'Заголовок': str(row.get('Заголовок', '')), | |
| 'Sentiment': sentiment, | |
| 'Event_Type': event_type, | |
| 'Event_Summary': event_summary, | |
| 'Текст': text[:1000] | |
| }) | |
| logger.info(f"Processed {idx + 1}/{total} rows") | |
| except Exception as e: | |
| logger.error(f"Error processing row {idx}: {str(e)}") | |
| continue | |
| # Create intermediate results | |
| if processed_rows: | |
| intermediate_df = pd.DataFrame(processed_rows) | |
| yield ( | |
| intermediate_df, | |
| None, | |
| None, | |
| f"Обработано {len(processed_rows)}/{total} строк" | |
| ) | |
| # Wait between batches | |
| time.sleep(2) | |
| # Cleanup GPU resources after each batch | |
| torch.cuda.empty_cache() | |
| except Exception as e: | |
| logger.error(f"Batch processing error: {str(e)}") | |
| if "GPU quota" in str(e): | |
| time.sleep(QUOTA_WAIT_TIME) | |
| continue | |
| # Final results | |
| if processed_rows: | |
| result_df = pd.DataFrame(processed_rows) | |
| fig_sentiment, fig_events = create_visualizations(result_df) | |
| return result_df, fig_sentiment, fig_events, "Обработка завершена!" | |
| else: | |
| return None, None, None, "Нет обработанных данных" | |
| except Exception as e: | |
| logger.error(f"File processing error: {str(e)}") | |
| raise | |
| def create_output_file(df, uploaded_file): | |
| """Create Excel file with multiple sheets from processed DataFrame""" | |
| try: | |
| wb = load_workbook("sample_file.xlsx") | |
| # 1. Update 'Публикации' sheet | |
| ws = wb['Публикации'] | |
| for r_idx, row in enumerate(dataframe_to_rows(df, index=False, header=True), start=1): | |
| for c_idx, value in enumerate(row, start=1): | |
| ws.cell(row=r_idx, column=c_idx, value=value) | |
| # 2. Update 'Мониторинг' sheet with events | |
| ws = wb['Мониторинг'] | |
| row_idx = 4 | |
| events_df = df[df['Event_Type'] != 'Нет'].copy() | |
| for _, row in events_df.iterrows(): | |
| ws.cell(row=row_idx, column=5, value=row['Объект']) | |
| ws.cell(row=row_idx, column=6, value=row['Заголовок']) | |
| ws.cell(row=row_idx, column=7, value=row['Event_Type']) | |
| ws.cell(row=row_idx, column=8, value=row['Event_Summary']) | |
| ws.cell(row=row_idx, column=9, value=row['Выдержки из текста']) | |
| row_idx += 1 | |
| # 3. Update 'Сводка' sheet | |
| ws = wb['Сводка'] | |
| unique_entities = df['Объект'].unique() | |
| entity_stats = [] | |
| for entity in unique_entities: | |
| entity_df = df[df['Объект'] == entity] | |
| stats = { | |
| 'Объект': entity, | |
| 'Всего': len(entity_df), | |
| 'Негативные': len(entity_df[entity_df['Sentiment'] == 'Negative']), | |
| 'Позитивные': len(entity_df[entity_df['Sentiment'] == 'Positive']) | |
| } | |
| # Get most severe impact for entity | |
| negative_df = entity_df[entity_df['Sentiment'] == 'Negative'] | |
| if len(negative_df) > 0: | |
| impacts = negative_df['Impact'].dropna() | |
| if len(impacts) > 0: | |
| stats['Impact'] = impacts.iloc[0] | |
| else: | |
| stats['Impact'] = 'Неопределенный эффект' | |
| else: | |
| stats['Impact'] = 'Неопределенный эффект' | |
| entity_stats.append(stats) | |
| # Sort by number of negative mentions | |
| entity_stats = sorted(entity_stats, key=lambda x: x['Негативные'], reverse=True) | |
| # Write to sheet | |
| row_idx = 4 # Starting row in Сводка sheet | |
| for stats in entity_stats: | |
| ws.cell(row=row_idx, column=5, value=stats['Объект']) | |
| ws.cell(row=row_idx, column=6, value=stats['Всего']) | |
| ws.cell(row=row_idx, column=7, value=stats['Негативные']) | |
| ws.cell(row=row_idx, column=8, value=stats['Позитивные']) | |
| ws.cell(row=row_idx, column=9, value=stats['Impact']) | |
| row_idx += 1 | |
| # 4. Update 'Значимые' sheet | |
| ws = wb['Значимые'] | |
| row_idx = 3 | |
| sentiment_df = df[df['Sentiment'].isin(['Negative', 'Positive'])].copy() | |
| for _, row in sentiment_df.iterrows(): | |
| ws.cell(row=row_idx, column=3, value=row['Объект']) | |
| ws.cell(row=row_idx, column=4, value='релевантно') | |
| ws.cell(row=row_idx, column=5, value=row['Sentiment']) | |
| ws.cell(row=row_idx, column=6, value=row.get('Impact', '-')) | |
| ws.cell(row=row_idx, column=7, value=row['Заголовок']) | |
| ws.cell(row=row_idx, column=8, value=row['Выдержки из текста']) | |
| row_idx += 1 | |
| # 5. Update 'Анализ' sheet | |
| ws = wb['Анализ'] | |
| row_idx = 4 | |
| negative_df = df[df['Sentiment'] == 'Negative'].copy() | |
| for _, row in negative_df.iterrows(): | |
| ws.cell(row=row_idx, column=5, value=row['Объект']) | |
| ws.cell(row=row_idx, column=6, value=row['Заголовок']) | |
| ws.cell(row=row_idx, column=7, value="Риск убытка") | |
| ws.cell(row=row_idx, column=8, value=row.get('Reasoning', '-')) | |
| ws.cell(row=row_idx, column=9, value=row['Выдержки из текста']) | |
| row_idx += 1 | |
| # 6. Update 'Тех.приложение' sheet | |
| if 'Тех.приложение' not in wb.sheetnames: | |
| wb.create_sheet('Тех.приложение') | |
| ws = wb['Тех.приложение'] | |
| tech_cols = ['Объект', 'Заголовок', 'Выдержки из текста', 'Translated', 'Sentiment', 'Impact', 'Reasoning'] | |
| tech_df = df[tech_cols].copy() | |
| for r_idx, row in enumerate(dataframe_to_rows(tech_df, index=False, header=True), start=1): | |
| for c_idx, value in enumerate(row, start=1): | |
| ws.cell(row=r_idx, column=c_idx, value=value) | |
| # Save workbook | |
| output = io.BytesIO() | |
| wb.save(output) | |
| output.seek(0) | |
| return output | |
| except Exception as e: | |
| logger.error(f"Error creating output file: {str(e)}") | |
| logger.error(f"DataFrame shape: {df.shape}") | |
| logger.error(f"Available columns: {df.columns.tolist()}") | |
| return None | |
| def process_and_download(file_input, control=None): | |
| """Synchronous wrapper for async processing""" | |
| # Handle different file input types | |
| if file_input is None: | |
| gr.Warning("Пожалуйста, загрузите файл") | |
| return pd.DataFrame(), None, None, None, "Ожидание файла...", "" | |
| # Add debug logging | |
| logger.info(f"File input type: {type(file_input)}") | |
| if hasattr(file_input, '__dict__'): | |
| logger.info(f"File input attributes: {dir(file_input)}") | |
| # Convert file input to bytes | |
| try: | |
| if hasattr(file_input, 'name'): # Gradio 4+ NamedString object | |
| # Different ways to extract file content | |
| if isinstance(file_input, str): | |
| # If it's actually a file path | |
| with open(file_input, 'rb') as f: | |
| file_bytes = f.read() | |
| elif hasattr(file_input, 'value'): | |
| # Try accessing value attribute (common in Gradio 4+) | |
| file_bytes = file_input.value | |
| elif hasattr(file_input, 'content'): | |
| # Try accessing content attribute | |
| file_bytes = file_input.content | |
| else: | |
| # Last resort - convert to string and encode | |
| file_bytes = str(file_input).encode('utf-8') | |
| elif isinstance(file_input, dict) and 'path' in file_input: | |
| # Gradio sometimes returns a dict with file path | |
| with open(file_input['path'], 'rb') as f: | |
| file_bytes = f.read() | |
| elif isinstance(file_input, (str, bytes)): | |
| # Either bytes or file path | |
| if isinstance(file_input, str): | |
| with open(file_input, 'rb') as f: | |
| file_bytes = f.read() | |
| else: | |
| file_bytes = file_input | |
| else: | |
| raise ValueError(f"Unexpected file input type: {type(file_input)}") | |
| logger.info(f"Successfully processed file, size: {len(file_bytes) if file_bytes else 0} bytes") | |
| except Exception as e: | |
| logger.error(f"File reading error: {str(e)}", exc_info=True) | |
| return pd.DataFrame(), None, None, None, f"Ошибка чтения файла: {str(e)}", "" | |
| if control is None: | |
| control = ProcessControl() | |
| async def async_process(): | |
| detector = None | |
| gpu_manager = GPUTaskManager( | |
| max_retries=3, | |
| retry_delay=30, | |
| cleanup_callback=lambda: detector.cleanup() if detector else None | |
| ) | |
| try: | |
| file_obj = io.BytesIO(file_bytes) | |
| logger.info("File loaded into BytesIO successfully") | |
| detector = EventDetector() | |
| # Read and deduplicate data with retry | |
| async def read_and_dedupe(): | |
| df = pd.read_excel(file_obj, sheet_name='Публикации') | |
| original_count = len(df) | |
| df = fuzzy_deduplicate(df, 'Выдержки из текста', threshold=55) | |
| return df, original_count | |
| df, original_count = await gpu_manager.run_with_retry(read_and_dedupe) | |
| # Process in smaller batches with better error handling | |
| processed_rows = [] | |
| batches = gpu_manager.batch_process(list(df.iterrows()), batch_size=3) | |
| latest_result = (pd.DataFrame(), None, None, None, "Начало обработки...", "") | |
| for batch in batches: | |
| if control.should_stop(): | |
| return latest_result | |
| try: | |
| # Process batch with retry mechanism | |
| async def process_batch(): | |
| batch_results = [] | |
| for idx, row in batch: | |
| text = str(row.get('Выдержки из текста', '')).strip() | |
| entity = str(row.get('Объект', '')).strip() | |
| if text and entity: | |
| results = detector.process_text(text, entity) | |
| batch_results.append({ | |
| 'Объект': entity, | |
| 'Заголовок': str(row.get('Заголовок', '')), | |
| 'Translated': results['translated_text'], | |
| 'Sentiment': results['sentiment'], | |
| 'Impact': results['impact'], | |
| 'Reasoning': results['reasoning'], | |
| 'Event_Type': results['event_type'], | |
| 'Event_Summary': results['event_summary'], | |
| 'Выдержки из текста': text | |
| }) | |
| return batch_results | |
| batch_results = await gpu_manager.run_with_retry(process_batch) | |
| processed_rows.extend(batch_results) | |
| # Update latest result | |
| if processed_rows: | |
| result_df = pd.DataFrame(processed_rows) | |
| latest_result = ( | |
| result_df, | |
| None, None, None, | |
| f"Обработано {len(processed_rows)}/{len(df)} строк", | |
| f"Удалено {original_count - len(df)} дубликатов" | |
| ) | |
| except Exception as e: | |
| if gpu_manager.is_gpu_error(e): | |
| logger.warning(f"GPU error in batch processing: {str(e)}") | |
| continue | |
| else: | |
| logger.error(f"Non-GPU error in batch processing: {str(e)}") | |
| finally: | |
| torch.cuda.empty_cache() | |
| # Create final results | |
| if processed_rows: | |
| result_df = pd.DataFrame(processed_rows) | |
| output_bytes_io = create_output_file(result_df, file_obj) | |
| fig_sentiment, fig_events = create_visualizations(result_df) | |
| if output_bytes_io: | |
| temp_file = "results.xlsx" | |
| with open(temp_file, "wb") as f: | |
| f.write(output_bytes_io.getvalue()) | |
| return ( | |
| result_df, | |
| fig_sentiment, | |
| fig_events, | |
| temp_file, | |
| "Обработка завершена!", | |
| f"Удалено {original_count - len(df)} дубликатов" | |
| ) | |
| return (pd.DataFrame(), None, None, None, "Нет обработанных данных", "") | |
| except Exception as e: | |
| error_msg = f"Ошибка анализа: {str(e)}" | |
| logger.error(error_msg) | |
| return (pd.DataFrame(), None, None, None, error_msg, "") | |
| finally: | |
| if detector: | |
| detector.cleanup() | |
| # Run the async function in the event loop | |
| try: | |
| loop = asyncio.get_event_loop() | |
| except RuntimeError: | |
| loop = asyncio.new_event_loop() | |
| asyncio.set_event_loop(loop) | |
| return loop.run_until_complete(async_process()) | |
| # Update the interface creation to pass the control object | |
| def create_interface(): | |
| control = ProcessControl() | |
| with gr.Blocks(analytics_enabled=False) as app: | |
| gr.Markdown("# AI-анализ мониторинга новостей v.2.26 + forced cuda") | |
| with gr.Row(): | |
| file_input = gr.File( | |
| label="Загрузите Excel файл", | |
| file_types=[".xlsx"] | |
| ) | |
| with gr.Row(): | |
| analyze_btn = gr.Button("▶️ Начать анализ", variant="primary") | |
| stop_btn = gr.Button("⏹️ Остановить", variant="stop") | |
| with gr.Row(): | |
| status_box = gr.Textbox( | |
| label="Статус дедупликации", | |
| interactive=False, | |
| value="" | |
| ) | |
| with gr.Row(): | |
| progress = gr.Textbox( | |
| label="Статус обработки", | |
| interactive=False, | |
| value="Ожидание файла..." | |
| ) | |
| with gr.Row(): | |
| stats = gr.DataFrame( | |
| label="Результаты анализа", | |
| interactive=False, | |
| wrap=True # Use simple bool value | |
| ) | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| sentiment_plot = gr.Plot(label="Распределение тональности") | |
| with gr.Column(scale=1): | |
| events_plot = gr.Plot(label="Распределение событий") | |
| with gr.Row(): | |
| file_output = gr.File( | |
| label="Скачать результаты", | |
| visible=True, | |
| interactive=True | |
| ) | |
| def stop_processing(): | |
| control.request_stop() | |
| return "Остановка обработки..." | |
| stop_btn.click(fn=stop_processing, outputs=[progress]) | |
| # Main processing with control object passed | |
| analyze_btn.click( | |
| fn=lambda x: process_and_download(x, control), | |
| inputs=[file_input], | |
| outputs=[ | |
| stats, | |
| sentiment_plot, | |
| events_plot, | |
| file_output, | |
| progress, | |
| status_box | |
| ] | |
| ) | |
| return app | |
| # === LAUNCH APP WITH DISABLED API INFO === | |
| if __name__ == "__main__": | |
| app = create_interface() | |
| app.launch( | |
| server_name="0.0.0.0", | |
| server_port=7860, | |
| show_api=False, # Disable API info generation | |
| share=False # Don't attempt to create sharing links | |
| ) |