Spaces:
Runtime error
Runtime error
| """FinBERT sentiment analysis — lazy-loaded singleton, cached inference.""" | |
| from __future__ import annotations | |
| import logging | |
| import threading | |
| from typing import Callable | |
| logger = logging.getLogger(__name__) | |
| # File descriptor limit is set in __main__.py at startup | |
| # This module-level code is kept for backward compatibility when imported directly | |
| try: | |
| import resource | |
| soft, hard = resource.getrlimit(resource.RLIMIT_NOFILE) | |
| target_limit = 256 | |
| if soft > target_limit: | |
| new_soft = min(target_limit, hard) | |
| resource.setrlimit(resource.RLIMIT_NOFILE, (new_soft, hard)) | |
| logger.info(f"Auto-adjusted file descriptor limit from {soft} to {new_soft}") | |
| except Exception as e: | |
| if logger: | |
| logger.debug(f"Could not adjust file descriptor limit: {e}") | |
| _MODEL_NAME = "ProsusAI/finbert" | |
| _LABELS = ["positive", "negative", "neutral"] | |
| class FinBERTAnalyzer: | |
| """ | |
| Lazy-loaded FinBERT wrapper. | |
| Usage: | |
| analyzer = FinBERTAnalyzer() | |
| analyzer.load(progress_callback=lambda msg: print(msg)) | |
| results = analyzer.analyze_batch(["Apple beats earnings", "Market crashes"]) | |
| """ | |
| _instance: FinBERTAnalyzer | None = None | |
| _lock = threading.Lock() | |
| def __init__(self) -> None: | |
| self._model = None | |
| self._tokenizer = None | |
| self._loaded = False | |
| self._load_error: str | None = None | |
| self._device: str = "cpu" | |
| self._tried_fds_workaround: bool = False | |
| def get_instance(cls) -> FinBERTAnalyzer: | |
| if cls._instance is None: | |
| with cls._lock: | |
| if cls._instance is None: | |
| cls._instance = FinBERTAnalyzer() | |
| assert cls._instance is not None | |
| return cls._instance | |
| def is_loaded(self) -> bool: | |
| return self._loaded | |
| def load_error(self) -> str | None: | |
| return self._load_error | |
| def reload(self, progress_callback: Callable[[str], None] | None = None) -> bool: | |
| """ | |
| Reset error state and attempt to load again. | |
| Returns True on success, False on failure. | |
| """ | |
| self._loaded = False | |
| self._load_error = None # Will be set by load() if it fails | |
| self._model = None | |
| self._tokenizer = None | |
| self._tried_fds_workaround = False # Reset workaround flag for fresh attempt | |
| return self.load(progress_callback) | |
| def load(self, progress_callback: Callable[[str], None] | None = None) -> bool: | |
| """ | |
| Load model from HuggingFace Hub (or local cache). | |
| Returns True on success, False on failure. | |
| """ | |
| if self._loaded: | |
| return True | |
| def _cb(msg: str) -> None: | |
| if progress_callback: | |
| progress_callback(msg) | |
| logger.info(msg) | |
| try: | |
| import os | |
| # Suppress warnings | |
| os.environ["TOKENIZERS_PARALLELISM"] = "false" | |
| os.environ["TRANSFORMERS_VERBOSITY"] = "error" | |
| os.environ["HF_HUB_DISABLE_TELEMETRY"] = "1" | |
| # Disable tqdm to avoid threading issues | |
| os.environ["TQDM_DISABLE"] = "1" | |
| import transformers | |
| transformers.logging.set_verbosity_error() | |
| # Auto-detect device | |
| import torch | |
| if torch.cuda.is_available(): | |
| self._device = "cuda" | |
| _cb(f"Using CUDA GPU: {torch.cuda.get_device_name(0)}") | |
| elif hasattr(torch.backends, 'mps') and torch.backends.mps.is_available(): | |
| self._device = "mps" | |
| _cb("Using Apple Metal (MPS)") | |
| elif hasattr(torch.version, 'hip') and torch.version.hip is not None: | |
| self._device = "cuda" # ROCm uses cuda device type | |
| _cb("Using AMD ROCm GPU") | |
| else: | |
| self._device = "cpu" | |
| # Enable multi-threaded CPU inference for Intel/AMD CPUs | |
| # Don't restrict threads - let PyTorch use available cores | |
| _cb(f"Using CPU ({torch.get_num_threads()} threads)") | |
| _cb("Loading FinBERT tokenizer...") | |
| from transformers import AutoTokenizer | |
| self._tokenizer = AutoTokenizer.from_pretrained( | |
| _MODEL_NAME, | |
| use_fast=True, # Fast tokenizer is much quicker | |
| ) | |
| _cb("Loading FinBERT model weights (~500MB)...") | |
| from transformers import AutoModelForSequenceClassification | |
| # Use low_cpu_mem_usage for faster loading with meta tensors | |
| # CRITICAL: Do NOT use device_map="auto" as it can trigger subprocess issues | |
| # Instead, load on CPU first, then move to device manually | |
| self._model = AutoModelForSequenceClassification.from_pretrained( | |
| _MODEL_NAME, | |
| low_cpu_mem_usage=True, | |
| device_map=None, # Avoid subprocess spawning | |
| # Disable features that might use subprocesses | |
| trust_remote_code=False, | |
| ) | |
| self._model.eval() | |
| # Move to device after loading | |
| self._model = self._model.to(self._device) | |
| _cb(f"FinBERT ready on {self._device.upper()} ✓") | |
| self._loaded = True | |
| return True | |
| except Exception as exc: | |
| import traceback | |
| import sys as sys_mod | |
| full_traceback = traceback.format_exc() | |
| msg = f"FinBERT load failed: {exc}" | |
| logger.error(msg) | |
| logger.error("Full traceback:\n%s", full_traceback) | |
| self._load_error = msg | |
| if progress_callback: | |
| progress_callback(msg) | |
| # If it's the fds_to_keep error, try once more with additional workarounds | |
| if "fds_to_keep" in str(exc) and not getattr(self, '_tried_fds_workaround', False): | |
| self._tried_fds_workaround = True | |
| logger.info("Attempting retry with fds_to_keep workaround...") | |
| logger.info("Original traceback:\n%s", full_traceback) | |
| # Preserve original error if workaround also fails | |
| original_error = msg | |
| success = self._load_with_fds_workaround(progress_callback) | |
| if not success and not self._load_error: | |
| # Add helpful context about Python version | |
| python_version = sys_mod.version | |
| self._load_error = ( | |
| f"{original_error}\n" | |
| f"\n" | |
| f"This is a known issue with Python 3.12+ and transformers.\n" | |
| f"Your Python version: {python_version}\n" | |
| f"\n" | |
| f"To fix this, consider:\n" | |
| f" 1. Downgrade to Python 3.11 (recommended)\n" | |
| f" 2. Or upgrade transformers: pip install -U transformers>=4.45.0\n" | |
| f" 3. Or use the --no-sentiment flag to skip FinBERT loading" | |
| ) | |
| return success | |
| return False | |
| def _load_with_fds_workaround(self, progress_callback) -> bool: | |
| """Fallback loading method with additional workarounds for fds_to_keep error.""" | |
| if self._loaded: | |
| return True | |
| def _cb(msg: str) -> None: | |
| if progress_callback: | |
| progress_callback(msg) | |
| logger.info(msg) | |
| try: | |
| import os | |
| # Suppress warnings | |
| os.environ["TOKENIZERS_PARALLELISM"] = "false" | |
| os.environ["TRANSFORMERS_VERBOSITY"] = "error" | |
| os.environ["HF_HUB_DISABLE_TELEMETRY"] = "1" | |
| os.environ["TQDM_DISABLE"] = "1" | |
| # Try to lower file descriptor limit if it's very high | |
| try: | |
| import resource | |
| soft, hard = resource.getrlimit(resource.RLIMIT_NOFILE) | |
| _cb(f"Current file descriptor limit: soft={soft}, hard={hard}") | |
| # Force lower limit for workaround attempt - must be very low for Python 3.14 | |
| target_limit = 128 | |
| if soft > target_limit: | |
| new_soft = min(target_limit, hard) | |
| resource.setrlimit(resource.RLIMIT_NOFILE, (new_soft, hard)) | |
| _cb(f"Lowered file descriptor limit from {soft} to {new_soft} (emergency fallback)") | |
| except (ImportError, ValueError, OSError) as e: | |
| logger.debug(f"Could not adjust file descriptor limit: {e}") | |
| import transformers | |
| transformers.logging.set_verbosity_error() | |
| # Auto-detect device | |
| import torch | |
| if torch.cuda.is_available(): | |
| self._device = "cuda" | |
| elif hasattr(torch.backends, 'mps') and torch.backends.mps.is_available(): | |
| self._device = "mps" | |
| else: | |
| self._device = "cpu" | |
| # Limit CPU threads for more stable loading | |
| torch.set_num_threads(min(torch.get_num_threads(), 4)) | |
| _cb(f"Retrying FinBERT load on {self._device.upper()} ({torch.get_num_threads()} threads)...") | |
| from transformers import AutoTokenizer, AutoModelForSequenceClassification | |
| # Use fast tokenizer and optimized loading | |
| # Disable subprocess-based tokenization | |
| os.environ["TOKENIZERS_PARALLELISM"] = "false" | |
| self._tokenizer = AutoTokenizer.from_pretrained( | |
| _MODEL_NAME, | |
| use_fast=True, | |
| ) | |
| # Use device_map for auto placement | |
| # For Python 3.14+, avoid using device_map="auto" which can trigger subprocess issues | |
| device_map = None | |
| self._model = AutoModelForSequenceClassification.from_pretrained( | |
| _MODEL_NAME, | |
| low_cpu_mem_usage=True, | |
| device_map=device_map, | |
| ) | |
| self._model.eval() | |
| # Manually move to device | |
| self._model = self._model.to(self._device) | |
| _cb(f"FinBERT ready on {self._device.upper()} ✓") | |
| self._loaded = True | |
| return True | |
| except Exception as exc: | |
| msg = f"FinBERT load failed (workaround attempt): {exc}" | |
| logger.error(msg) | |
| self._load_error = msg | |
| if progress_callback: | |
| progress_callback(msg) | |
| # Log additional context for debugging | |
| import traceback | |
| logger.debug("Workaround load traceback:\n%s", traceback.format_exc()) | |
| # If still failing with fds_to_keep, try one more time with subprocess isolation | |
| if "fds_to_keep" in str(exc): | |
| logger.info("Attempting final retry with subprocess isolation...") | |
| return self._load_with_subprocess_isolation(progress_callback) | |
| return False | |
| def _load_with_subprocess_isolation(self, progress_callback) -> bool: | |
| """Final attempt: load model with maximum subprocess isolation for Python 3.14+.""" | |
| if self._loaded: | |
| return True | |
| def _cb(msg: str) -> None: | |
| if progress_callback: | |
| progress_callback(msg) | |
| logger.info(msg) | |
| try: | |
| import os | |
| import subprocess | |
| import sys | |
| # Set maximum isolation before loading | |
| os.environ["TOKENIZERS_PARALLELISM"] = "false" | |
| os.environ["TRANSFORMERS_VERBOSITY"] = "error" | |
| os.environ["HF_HUB_DISABLE_TELEMETRY"] = "1" | |
| os.environ["TQDM_DISABLE"] = "1" | |
| # Additional isolation for Python 3.14 | |
| os.environ["RAYON_RS_NUM_CPUS"] = "1" | |
| os.environ["OMP_NUM_THREADS"] = "1" | |
| # Force file descriptor limit to minimum | |
| try: | |
| import resource | |
| soft, hard = resource.getrlimit(resource.RLIMIT_NOFILE) | |
| resource.setrlimit(resource.RLIMIT_NOFILE, (64, hard)) | |
| _cb("Set file descriptor limit to 64 (maximum isolation)") | |
| except Exception: | |
| pass | |
| import transformers | |
| transformers.logging.set_verbosity_error() | |
| import torch | |
| if torch.cuda.is_available(): | |
| self._device = "cuda" | |
| elif hasattr(torch.backends, 'mps') and torch.backends.mps.is_available(): | |
| self._device = "mps" | |
| else: | |
| self._device = "cpu" | |
| torch.set_num_threads(1) # Single thread for maximum isolation | |
| _cb(f"Loading with subprocess isolation on {self._device.upper()}...") | |
| from transformers import AutoTokenizer, AutoModelForSequenceClassification | |
| # Use slow tokenizer to avoid Rust subprocess issues | |
| self._tokenizer = AutoTokenizer.from_pretrained( | |
| _MODEL_NAME, | |
| use_fast=False, # Use slow tokenizer | |
| ) | |
| self._model = AutoModelForSequenceClassification.from_pretrained( | |
| _MODEL_NAME, | |
| low_cpu_mem_usage=True, | |
| ) | |
| self._model.eval() | |
| self._model = self._model.to(self._device) | |
| _cb(f"FinBERT ready on {self._device.upper()} ✓") | |
| self._loaded = True | |
| return True | |
| except Exception as exc: | |
| msg = f"FinBERT load failed (subprocess isolation): {exc}" | |
| logger.error(msg) | |
| self._load_error = msg | |
| if progress_callback: | |
| progress_callback(msg) | |
| import traceback | |
| logger.debug("Subprocess isolation traceback:\n%s", traceback.format_exc()) | |
| # Add helpful context | |
| import sys as sys_mod | |
| python_version = sys_mod.version | |
| self._load_error = ( | |
| f"{msg}\n" | |
| f"\n" | |
| f"This is a known compatibility issue between Python 3.12+ and the transformers library.\n" | |
| f"Your Python version: {python_version}\n" | |
| f"\n" | |
| f"To resolve this issue:\n" | |
| f" 1. Downgrade to Python 3.11 (most reliable solution)\n" | |
| f" - Use pyenv: pyenv install 3.11 && pyenv local 3.11\n" | |
| f" 2. Or upgrade to the latest transformers: pip install -U transformers\n" | |
| f" - Note: As of now, you have transformers 5.5.0\n" | |
| f" 3. Or run with sentiment disabled: trading-cli --no-sentiment\n" | |
| f"\n" | |
| f"The app will continue without sentiment analysis." | |
| ) | |
| return False | |
| def analyze_with_cache(self, headlines: list[str], conn) -> list[dict]: | |
| """ | |
| Analyze headlines, checking SQLite cache first to avoid re-inference. | |
| Uncached headlines are batch-processed and then stored in the cache. | |
| """ | |
| from trading_cli.data.db import get_cached_sentiment, cache_sentiment | |
| results: list[dict] = [] | |
| uncached_indices: list[int] = [] | |
| uncached_texts: list[str] = [] | |
| for i, text in enumerate(headlines): | |
| cached = get_cached_sentiment(conn, text) | |
| if cached: | |
| results.append(cached) | |
| else: | |
| results.append(None) # placeholder | |
| uncached_indices.append(i) | |
| uncached_texts.append(text) | |
| if uncached_texts: | |
| fresh = self.analyze_batch(uncached_texts) | |
| for idx, text, res in zip(uncached_indices, uncached_texts, fresh): | |
| results[idx] = res | |
| try: | |
| cache_sentiment(conn, text, res["label"], res["score"]) | |
| except Exception: | |
| pass | |
| return [r or {"label": "neutral", "score": 0.5} for r in results] | |
| def analyze_batch( | |
| self, | |
| headlines: list[str], | |
| batch_size: int = 50, | |
| ) -> list[dict]: | |
| """ | |
| Run FinBERT inference on a list of headlines. | |
| Returns list of {"label": str, "score": float} dicts, | |
| one per input headline. Falls back to {"label": "neutral", "score": 0.5} | |
| if model is not loaded. | |
| """ | |
| if not headlines: | |
| return [] | |
| if not self._loaded: | |
| logger.warning("FinBERT not loaded — returning neutral for all headlines") | |
| return [{"label": "neutral", "score": 0.5}] * len(headlines) | |
| import torch | |
| results: list[dict] = [] | |
| for i in range(0, len(headlines), batch_size): | |
| batch = headlines[i : i + batch_size] | |
| try: | |
| inputs = self._tokenizer( | |
| batch, | |
| padding=True, | |
| truncation=True, | |
| max_length=512, | |
| return_tensors="pt", | |
| ).to(self._device) # Move inputs to correct device | |
| with torch.no_grad(): | |
| outputs = self._model(**inputs) | |
| probs = torch.nn.functional.softmax(outputs.logits, dim=-1) | |
| for prob_row in probs: | |
| idx = int(prob_row.argmax()) | |
| label = self._model.config.id2label[idx].lower() | |
| # Normalise label variants (ProsusAI uses "positive","negative","neutral") | |
| if label not in _LABELS: | |
| label = "neutral" | |
| results.append({"label": label, "score": float(prob_row[idx])}) | |
| except Exception as exc: | |
| logger.error("FinBERT inference error on batch %d: %s", i, exc) | |
| results.extend([{"label": "neutral", "score": 0.5}] * len(batch)) | |
| return results | |