Spaces:
Sleeping
Sleeping
| import asyncio | |
| import os | |
| import time | |
| from datetime import datetime | |
| from typing import Dict, Any, Optional, Callable | |
| from pathlib import Path | |
| import json | |
| import logging | |
| from dataclasses import dataclass, asdict | |
| from threading import Thread, Event | |
| import queue | |
| from src.core.log_parser import VampireBotLogParser, BotAnalysis | |
| from src.analysis.market_analysis import TechnicalAnalysisEngine | |
| class RealTimeConfig: | |
| """Configuração para integração em tempo real""" | |
| log_file_path: str | |
| check_interval: float = 1.0 # segundos | |
| max_queue_size: int = 100 | |
| enable_notifications: bool = True | |
| auto_analysis: bool = True | |
| backup_logs: bool = True | |
| class BotEvent: | |
| """Evento do bot em tempo real""" | |
| timestamp: datetime | |
| event_type: str # 'new_analysis', 'fibonacci_alert', 'signal_change' | |
| data: Dict[str, Any] | |
| priority: str = 'normal' # 'low', 'normal', 'high', 'critical' | |
| class FileWatcher: | |
| """Monitor de arquivos para detectar mudanças em tempo real""" | |
| def __init__(self, file_path: str, callback: Callable[[str], None]): | |
| self.file_path = Path(file_path) | |
| self.callback = callback | |
| self.last_modified = 0 | |
| self.last_size = 0 | |
| self.running = False | |
| self._stop_event = Event() | |
| def start(self): | |
| """Inicia o monitoramento do arquivo""" | |
| self.running = True | |
| self._stop_event.clear() | |
| if self.file_path.exists(): | |
| stat = self.file_path.stat() | |
| self.last_modified = stat.st_mtime | |
| self.last_size = stat.st_size | |
| def stop(self): | |
| """Para o monitoramento""" | |
| self.running = False | |
| self._stop_event.set() | |
| def check_changes(self) -> bool: | |
| """Verifica se o arquivo foi modificado""" | |
| if not self.file_path.exists(): | |
| return False | |
| try: | |
| stat = self.file_path.stat() | |
| current_modified = stat.st_mtime | |
| current_size = stat.st_size | |
| # Verifica se houve mudança | |
| if (current_modified > self.last_modified or | |
| current_size != self.last_size): | |
| self.last_modified = current_modified | |
| self.last_size = current_size | |
| # Lê o conteúdo novo | |
| try: | |
| with open(self.file_path, 'r', encoding='utf-8') as f: | |
| content = f.read() | |
| self.callback(content) | |
| return True | |
| except Exception as e: | |
| logging.error(f"Erro ao ler arquivo: {e}") | |
| except Exception as e: | |
| logging.error(f"Erro ao verificar arquivo: {e}") | |
| return False | |
| class RealTimeProcessor: | |
| """Processador de dados em tempo real do bot""" | |
| def __init__(self, config: RealTimeConfig): | |
| self.config = config | |
| self.log_parser = VampireBotLogParser() | |
| self.technical_engine = TechnicalAnalysisEngine() | |
| self.event_queue = queue.Queue(maxsize=config.max_queue_size) | |
| self.subscribers = [] | |
| self.running = False | |
| self.last_analysis: Optional[BotAnalysis] = None | |
| # Setup logging | |
| self.logger = logging.getLogger(__name__) | |
| def subscribe(self, callback: Callable[[BotEvent], None]): | |
| """Inscreve um callback para receber eventos""" | |
| self.subscribers.append(callback) | |
| def unsubscribe(self, callback: Callable[[BotEvent], None]): | |
| """Remove um callback da lista de inscritos""" | |
| if callback in self.subscribers: | |
| self.subscribers.remove(callback) | |
| def _notify_subscribers(self, event: BotEvent): | |
| """Notifica todos os inscritos sobre um evento""" | |
| for callback in self.subscribers: | |
| try: | |
| callback(event) | |
| except Exception as e: | |
| self.logger.error(f"Erro ao notificar subscriber: {e}") | |
| def _process_new_log_data(self, log_content: str): | |
| """Processa novos dados de log""" | |
| try: | |
| # Parse do log | |
| bot_analysis = self.log_parser.parse_log(log_content) | |
| if bot_analysis: | |
| # Verifica se é uma nova análise | |
| is_new_analysis = ( | |
| self.last_analysis is None or | |
| bot_analysis.timestamp != self.last_analysis.timestamp | |
| ) | |
| if is_new_analysis: | |
| # Cria evento de nova análise | |
| event = BotEvent( | |
| timestamp=datetime.now(), | |
| event_type='new_analysis', | |
| data=asdict(bot_analysis), | |
| priority='normal' | |
| ) | |
| # Adiciona à fila de eventos | |
| try: | |
| self.event_queue.put_nowait(event) | |
| except queue.Full: | |
| self.logger.warning("Fila de eventos cheia, removendo evento mais antigo") | |
| try: | |
| self.event_queue.get_nowait() | |
| self.event_queue.put_nowait(event) | |
| except queue.Empty: | |
| pass | |
| # Verifica alertas de Fibonacci | |
| if bot_analysis.fibonacci_analysis and bot_analysis.fibonacci_analysis.alerts: | |
| fib_event = BotEvent( | |
| timestamp=datetime.now(), | |
| event_type='fibonacci_alert', | |
| data={ | |
| 'alerts': bot_analysis.fibonacci_analysis.alerts, | |
| 'signal': bot_analysis.fibonacci_analysis.signal, | |
| 'confidence': bot_analysis.fibonacci_analysis.confidence | |
| }, | |
| priority='high' | |
| ) | |
| try: | |
| self.event_queue.put_nowait(fib_event) | |
| except queue.Full: | |
| pass | |
| # Verifica mudança de sinal | |
| if (self.last_analysis and | |
| bot_analysis.fibonacci_analysis and | |
| self.last_analysis.fibonacci_analysis and | |
| bot_analysis.fibonacci_analysis.signal != self.last_analysis.fibonacci_analysis.signal): | |
| signal_event = BotEvent( | |
| timestamp=datetime.now(), | |
| event_type='signal_change', | |
| data={ | |
| 'old_signal': self.last_analysis.fibonacci_analysis.signal, | |
| 'new_signal': bot_analysis.fibonacci_analysis.signal, | |
| 'confidence': bot_analysis.fibonacci_analysis.confidence | |
| }, | |
| priority='critical' | |
| ) | |
| try: | |
| self.event_queue.put_nowait(signal_event) | |
| except queue.Full: | |
| pass | |
| self.last_analysis = bot_analysis | |
| except Exception as e: | |
| self.logger.error(f"Erro ao processar log: {e}") | |
| def _event_processor_loop(self): | |
| """Loop principal de processamento de eventos""" | |
| while self.running: | |
| try: | |
| # Processa eventos da fila | |
| try: | |
| event = self.event_queue.get(timeout=0.1) | |
| self._notify_subscribers(event) | |
| self.event_queue.task_done() | |
| except queue.Empty: | |
| continue | |
| except Exception as e: | |
| self.logger.error(f"Erro no loop de eventos: {e}") | |
| time.sleep(0.1) | |
| def start(self): | |
| """Inicia o processamento em tempo real""" | |
| if self.running: | |
| return | |
| self.running = True | |
| self.logger.info("Iniciando processamento em tempo real") | |
| # Inicia thread de processamento de eventos | |
| self.event_thread = Thread(target=self._event_processor_loop, daemon=True) | |
| self.event_thread.start() | |
| # Configura watcher de arquivo | |
| self.file_watcher = FileWatcher( | |
| self.config.log_file_path, | |
| self._process_new_log_data | |
| ) | |
| self.file_watcher.start() | |
| # Inicia thread de monitoramento | |
| self.monitor_thread = Thread(target=self._monitor_loop, daemon=True) | |
| self.monitor_thread.start() | |
| def stop(self): | |
| """Para o processamento em tempo real""" | |
| if not self.running: | |
| return | |
| self.logger.info("Parando processamento em tempo real") | |
| self.running = False | |
| if hasattr(self, 'file_watcher'): | |
| self.file_watcher.stop() | |
| def _monitor_loop(self): | |
| """Loop de monitoramento de arquivo""" | |
| while self.running: | |
| try: | |
| self.file_watcher.check_changes() | |
| time.sleep(self.config.check_interval) | |
| except Exception as e: | |
| self.logger.error(f"Erro no monitoramento: {e}") | |
| time.sleep(1) | |
| def get_status(self) -> Dict[str, Any]: | |
| """Retorna status do processador""" | |
| return { | |
| 'running': self.running, | |
| 'queue_size': self.event_queue.qsize(), | |
| 'subscribers_count': len(self.subscribers), | |
| 'last_analysis_time': self.last_analysis.timestamp if self.last_analysis else None, | |
| 'config': asdict(self.config) | |
| } | |
| class RealTimeIntegration: | |
| """Sistema principal de integração em tempo real""" | |
| def __init__(self, log_file_path: str): | |
| self.config = RealTimeConfig(log_file_path=log_file_path) | |
| self.processor = RealTimeProcessor(self.config) | |
| self.event_history = [] | |
| self.max_history = 1000 | |
| # Setup logging | |
| self.logger = logging.getLogger(__name__) | |
| # Inscreve callback padrão | |
| self.processor.subscribe(self._default_event_handler) | |
| def _default_event_handler(self, event: BotEvent): | |
| """Handler padrão para eventos""" | |
| # Adiciona ao histórico | |
| self.event_history.append(event) | |
| # Mantém tamanho do histórico | |
| if len(self.event_history) > self.max_history: | |
| self.event_history = self.event_history[-self.max_history:] | |
| # Log do evento | |
| self.logger.info(f"Evento {event.event_type} - Prioridade: {event.priority}") | |
| # Processamento específico por tipo | |
| if event.event_type == 'signal_change': | |
| self.logger.warning( | |
| f"MUDANÇA DE SINAL: {event.data['old_signal']} -> {event.data['new_signal']} " | |
| f"(Confiança: {event.data['confidence']}%)" | |
| ) | |
| elif event.event_type == 'fibonacci_alert': | |
| self.logger.info(f"Alerta Fibonacci: {len(event.data['alerts'])} alertas") | |
| def start(self): | |
| """Inicia a integração em tempo real""" | |
| self.processor.start() | |
| self.logger.info(f"Integração em tempo real iniciada para: {self.config.log_file_path}") | |
| def stop(self): | |
| """Para a integração em tempo real""" | |
| self.processor.stop() | |
| self.logger.info("Integração em tempo real parada") | |
| def get_recent_events(self, limit: int = 10) -> list[BotEvent]: | |
| """Retorna eventos recentes""" | |
| return self.event_history[-limit:] if self.event_history else [] | |
| def get_status(self) -> Dict[str, Any]: | |
| """Retorna status completo do sistema""" | |
| processor_status = self.processor.get_status() | |
| return { | |
| **processor_status, | |
| 'event_history_size': len(self.event_history), | |
| 'recent_events': len([e for e in self.event_history if | |
| (datetime.now() - e.timestamp).seconds < 300]) # últimos 5 min | |
| } | |
| # Exemplo de uso | |
| if __name__ == "__main__": | |
| # Configurar logging | |
| logging.basicConfig(level=logging.INFO) | |
| # Criar integração | |
| integration = RealTimeIntegration("d:/hugging_face_spaces/text") | |
| # Callback personalizado | |
| def custom_handler(event: BotEvent): | |
| print(f"[{event.timestamp}] {event.event_type}: {event.priority}") | |
| integration.processor.subscribe(custom_handler) | |
| try: | |
| # Iniciar | |
| integration.start() | |
| # Manter rodando | |
| while True: | |
| time.sleep(1) | |
| status = integration.get_status() | |
| if status['recent_events'] > 0: | |
| print(f"Eventos recentes: {status['recent_events']}") | |
| except KeyboardInterrupt: | |
| print("Parando integração...") | |
| integration.stop() |