File size: 17,918 Bytes
d5b7ee9
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
"""FinBERT sentiment analysis β€” lazy-loaded singleton, cached inference."""
 
from __future__ import annotations
 
import logging
import threading
from typing import Callable
 
logger = logging.getLogger(__name__)
 
# File descriptor limit is set in __main__.py at startup
# This module-level code is kept for backward compatibility when imported directly
try:
    import resource
    soft, hard = resource.getrlimit(resource.RLIMIT_NOFILE)
    target_limit = 256
    if soft > target_limit:
        new_soft = min(target_limit, hard)
        resource.setrlimit(resource.RLIMIT_NOFILE, (new_soft, hard))
        logger.info(f"Auto-adjusted file descriptor limit from {soft} to {new_soft}")
except Exception as e:
    if logger:
        logger.debug(f"Could not adjust file descriptor limit: {e}")
 
_MODEL_NAME = "ProsusAI/finbert"
_LABELS = ["positive", "negative", "neutral"]
 
 
class FinBERTAnalyzer:
    """
    Lazy-loaded FinBERT wrapper.
 
    Usage:
        analyzer = FinBERTAnalyzer()
        analyzer.load(progress_callback=lambda msg: print(msg))
        results = analyzer.analyze_batch(["Apple beats earnings", "Market crashes"])
    """
 
    _instance: FinBERTAnalyzer | None = None
    _lock = threading.Lock()
 
    def __init__(self) -> None:
        self._model = None
        self._tokenizer = None
        self._loaded = False
        self._load_error: str | None = None
        self._device: str = "cpu"
        self._tried_fds_workaround: bool = False
 
    @classmethod
    def get_instance(cls) -> FinBERTAnalyzer:
        if cls._instance is None:
            with cls._lock:
                if cls._instance is None:
                    cls._instance = FinBERTAnalyzer()
        assert cls._instance is not None
        return cls._instance
 
    @property
    def is_loaded(self) -> bool:
        return self._loaded
 
    @property
    def load_error(self) -> str | None:
        return self._load_error
 
    def reload(self, progress_callback: Callable[[str], None] | None = None) -> bool:
        """
        Reset error state and attempt to load again.
        Returns True on success, False on failure.
        """
        self._loaded = False
        self._load_error = None  # Will be set by load() if it fails
        self._model = None
        self._tokenizer = None
        self._tried_fds_workaround = False  # Reset workaround flag for fresh attempt
        return self.load(progress_callback)

    def load(self, progress_callback: Callable[[str], None] | None = None) -> bool:
        """
        Load model from HuggingFace Hub (or local cache).
        Returns True on success, False on failure.
        """
        if self._loaded:
            return True

        def _cb(msg: str) -> None:
            if progress_callback:
                progress_callback(msg)
            logger.info(msg)

        try:
            import os

            # Suppress warnings
            os.environ["TOKENIZERS_PARALLELISM"] = "false"
            os.environ["TRANSFORMERS_VERBOSITY"] = "error"
            os.environ["HF_HUB_DISABLE_TELEMETRY"] = "1"
            # Disable tqdm to avoid threading issues
            os.environ["TQDM_DISABLE"] = "1"

            import transformers
            transformers.logging.set_verbosity_error()

            # Auto-detect device
            import torch
            if torch.cuda.is_available():
                self._device = "cuda"
                _cb(f"Using CUDA GPU: {torch.cuda.get_device_name(0)}")
            elif hasattr(torch.backends, 'mps') and torch.backends.mps.is_available():
                self._device = "mps"
                _cb("Using Apple Metal (MPS)")
            elif hasattr(torch.version, 'hip') and torch.version.hip is not None:
                self._device = "cuda"  # ROCm uses cuda device type
                _cb("Using AMD ROCm GPU")
            else:
                self._device = "cpu"
                # Enable multi-threaded CPU inference for Intel/AMD CPUs
                # Don't restrict threads - let PyTorch use available cores
                _cb(f"Using CPU ({torch.get_num_threads()} threads)")

            _cb("Loading FinBERT tokenizer...")
            from transformers import AutoTokenizer

            self._tokenizer = AutoTokenizer.from_pretrained(
                _MODEL_NAME,
                use_fast=True,  # Fast tokenizer is much quicker
            )

            _cb("Loading FinBERT model weights (~500MB)...")
            from transformers import AutoModelForSequenceClassification

            # Use low_cpu_mem_usage for faster loading with meta tensors
            # CRITICAL: Do NOT use device_map="auto" as it can trigger subprocess issues
            # Instead, load on CPU first, then move to device manually
            self._model = AutoModelForSequenceClassification.from_pretrained(
                _MODEL_NAME,
                low_cpu_mem_usage=True,
                device_map=None,  # Avoid subprocess spawning
                # Disable features that might use subprocesses
                trust_remote_code=False,
            )
            self._model.eval()

            # Move to device after loading
            self._model = self._model.to(self._device)

            _cb(f"FinBERT ready on {self._device.upper()} βœ“")
            self._loaded = True
            return True

        except Exception as exc:
            import traceback
            import sys as sys_mod
            full_traceback = traceback.format_exc()
            msg = f"FinBERT load failed: {exc}"
            logger.error(msg)
            logger.error("Full traceback:\n%s", full_traceback)
            self._load_error = msg
            if progress_callback:
                progress_callback(msg)

            # If it's the fds_to_keep error, try once more with additional workarounds
            if "fds_to_keep" in str(exc) and not getattr(self, '_tried_fds_workaround', False):
                self._tried_fds_workaround = True
                logger.info("Attempting retry with fds_to_keep workaround...")
                logger.info("Original traceback:\n%s", full_traceback)
                # Preserve original error if workaround also fails
                original_error = msg
                success = self._load_with_fds_workaround(progress_callback)
                if not success and not self._load_error:
                    # Add helpful context about Python version
                    python_version = sys_mod.version
                    self._load_error = (
                        f"{original_error}\n"
                        f"\n"
                        f"This is a known issue with Python 3.12+ and transformers.\n"
                        f"Your Python version: {python_version}\n"
                        f"\n"
                        f"To fix this, consider:\n"
                        f"  1. Downgrade to Python 3.11 (recommended)\n"
                        f"  2. Or upgrade transformers: pip install -U transformers>=4.45.0\n"
                        f"  3. Or use the --no-sentiment flag to skip FinBERT loading"
                    )
                return success

            return False
    
    def _load_with_fds_workaround(self, progress_callback) -> bool:
        """Fallback loading method with additional workarounds for fds_to_keep error."""
        if self._loaded:
            return True

        def _cb(msg: str) -> None:
            if progress_callback:
                progress_callback(msg)
            logger.info(msg)

        try:
            import os

            # Suppress warnings
            os.environ["TOKENIZERS_PARALLELISM"] = "false"
            os.environ["TRANSFORMERS_VERBOSITY"] = "error"
            os.environ["HF_HUB_DISABLE_TELEMETRY"] = "1"
            os.environ["TQDM_DISABLE"] = "1"

            # Try to lower file descriptor limit if it's very high
            try:
                import resource
                soft, hard = resource.getrlimit(resource.RLIMIT_NOFILE)
                _cb(f"Current file descriptor limit: soft={soft}, hard={hard}")
                # Force lower limit for workaround attempt - must be very low for Python 3.14
                target_limit = 128
                if soft > target_limit:
                    new_soft = min(target_limit, hard)
                    resource.setrlimit(resource.RLIMIT_NOFILE, (new_soft, hard))
                    _cb(f"Lowered file descriptor limit from {soft} to {new_soft} (emergency fallback)")
            except (ImportError, ValueError, OSError) as e:
                logger.debug(f"Could not adjust file descriptor limit: {e}")

            import transformers
            transformers.logging.set_verbosity_error()

            # Auto-detect device
            import torch
            if torch.cuda.is_available():
                self._device = "cuda"
            elif hasattr(torch.backends, 'mps') and torch.backends.mps.is_available():
                self._device = "mps"
            else:
                self._device = "cpu"
                # Limit CPU threads for more stable loading
                torch.set_num_threads(min(torch.get_num_threads(), 4))

            _cb(f"Retrying FinBERT load on {self._device.upper()} ({torch.get_num_threads()} threads)...")

            from transformers import AutoTokenizer, AutoModelForSequenceClassification

            # Use fast tokenizer and optimized loading
            # Disable subprocess-based tokenization
            os.environ["TOKENIZERS_PARALLELISM"] = "false"
            self._tokenizer = AutoTokenizer.from_pretrained(
                _MODEL_NAME,
                use_fast=True,
            )

            # Use device_map for auto placement
            # For Python 3.14+, avoid using device_map="auto" which can trigger subprocess issues
            device_map = None
            self._model = AutoModelForSequenceClassification.from_pretrained(
                _MODEL_NAME,
                low_cpu_mem_usage=True,
                device_map=device_map,
            )
            self._model.eval()

            # Manually move to device
            self._model = self._model.to(self._device)

            _cb(f"FinBERT ready on {self._device.upper()} βœ“")
            self._loaded = True
            return True

        except Exception as exc:
            msg = f"FinBERT load failed (workaround attempt): {exc}"
            logger.error(msg)
            self._load_error = msg
            if progress_callback:
                progress_callback(msg)
            # Log additional context for debugging
            import traceback
            logger.debug("Workaround load traceback:\n%s", traceback.format_exc())
            
            # If still failing with fds_to_keep, try one more time with subprocess isolation
            if "fds_to_keep" in str(exc):
                logger.info("Attempting final retry with subprocess isolation...")
                return self._load_with_subprocess_isolation(progress_callback)
            
            return False

    def _load_with_subprocess_isolation(self, progress_callback) -> bool:
        """Final attempt: load model with maximum subprocess isolation for Python 3.14+."""
        if self._loaded:
            return True

        def _cb(msg: str) -> None:
            if progress_callback:
                progress_callback(msg)
            logger.info(msg)

        try:
            import os
            import subprocess
            import sys

            # Set maximum isolation before loading
            os.environ["TOKENIZERS_PARALLELISM"] = "false"
            os.environ["TRANSFORMERS_VERBOSITY"] = "error"
            os.environ["HF_HUB_DISABLE_TELEMETRY"] = "1"
            os.environ["TQDM_DISABLE"] = "1"
            
            # Additional isolation for Python 3.14
            os.environ["RAYON_RS_NUM_CPUS"] = "1"
            os.environ["OMP_NUM_THREADS"] = "1"

            # Force file descriptor limit to minimum
            try:
                import resource
                soft, hard = resource.getrlimit(resource.RLIMIT_NOFILE)
                resource.setrlimit(resource.RLIMIT_NOFILE, (64, hard))
                _cb("Set file descriptor limit to 64 (maximum isolation)")
            except Exception:
                pass

            import transformers
            transformers.logging.set_verbosity_error()

            import torch
            if torch.cuda.is_available():
                self._device = "cuda"
            elif hasattr(torch.backends, 'mps') and torch.backends.mps.is_available():
                self._device = "mps"
            else:
                self._device = "cpu"
                torch.set_num_threads(1)  # Single thread for maximum isolation

            _cb(f"Loading with subprocess isolation on {self._device.upper()}...")

            from transformers import AutoTokenizer, AutoModelForSequenceClassification

            # Use slow tokenizer to avoid Rust subprocess issues
            self._tokenizer = AutoTokenizer.from_pretrained(
                _MODEL_NAME,
                use_fast=False,  # Use slow tokenizer
            )

            self._model = AutoModelForSequenceClassification.from_pretrained(
                _MODEL_NAME,
                low_cpu_mem_usage=True,
            )
            self._model.eval()
            self._model = self._model.to(self._device)

            _cb(f"FinBERT ready on {self._device.upper()} βœ“")
            self._loaded = True
            return True

        except Exception as exc:
            msg = f"FinBERT load failed (subprocess isolation): {exc}"
            logger.error(msg)
            self._load_error = msg
            if progress_callback:
                progress_callback(msg)
            import traceback
            logger.debug("Subprocess isolation traceback:\n%s", traceback.format_exc())
            
            # Add helpful context
            import sys as sys_mod
            python_version = sys_mod.version
            self._load_error = (
                f"{msg}\n"
                f"\n"
                f"This is a known compatibility issue between Python 3.12+ and the transformers library.\n"
                f"Your Python version: {python_version}\n"
                f"\n"
                f"To resolve this issue:\n"
                f"  1. Downgrade to Python 3.11 (most reliable solution)\n"
                f"     - Use pyenv: pyenv install 3.11 && pyenv local 3.11\n"
                f"  2. Or upgrade to the latest transformers: pip install -U transformers\n"
                f"     - Note: As of now, you have transformers 5.5.0\n"
                f"  3. Or run with sentiment disabled: trading-cli --no-sentiment\n"
                f"\n"
                f"The app will continue without sentiment analysis."
            )
            return False
 
    def analyze_with_cache(self, headlines: list[str], conn) -> list[dict]:
        """
        Analyze headlines, checking SQLite cache first to avoid re-inference.
        Uncached headlines are batch-processed and then stored in the cache.
        """
        from trading_cli.data.db import get_cached_sentiment, cache_sentiment
 
        results: list[dict] = []
        uncached_indices: list[int] = []
        uncached_texts: list[str] = []
 
        for i, text in enumerate(headlines):
            cached = get_cached_sentiment(conn, text)
            if cached:
                results.append(cached)
            else:
                results.append(None)  # placeholder
                uncached_indices.append(i)
                uncached_texts.append(text)
 
        if uncached_texts:
            fresh = self.analyze_batch(uncached_texts)
            for idx, text, res in zip(uncached_indices, uncached_texts, fresh):
                results[idx] = res
                try:
                    cache_sentiment(conn, text, res["label"], res["score"])
                except Exception:
                    pass
 
        return [r or {"label": "neutral", "score": 0.5} for r in results]
 
    def analyze_batch(
        self,
        headlines: list[str],
        batch_size: int = 50,
    ) -> list[dict]:
        """
        Run FinBERT inference on a list of headlines.
 
        Returns list of {"label": str, "score": float} dicts,
        one per input headline.  Falls back to {"label": "neutral", "score": 0.5}
        if model is not loaded.
        """
        if not headlines:
            return []
        if not self._loaded:
            logger.warning("FinBERT not loaded β€” returning neutral for all headlines")
            return [{"label": "neutral", "score": 0.5}] * len(headlines)
 
        import torch

        results: list[dict] = []
        for i in range(0, len(headlines), batch_size):
            batch = headlines[i : i + batch_size]
            try:
                inputs = self._tokenizer(
                    batch,
                    padding=True,
                    truncation=True,
                    max_length=512,
                    return_tensors="pt",
                ).to(self._device)  # Move inputs to correct device
                with torch.no_grad():
                    outputs = self._model(**inputs)
                probs = torch.nn.functional.softmax(outputs.logits, dim=-1)
                for prob_row in probs:
                    idx = int(prob_row.argmax())
                    label = self._model.config.id2label[idx].lower()
                    # Normalise label variants (ProsusAI uses "positive","negative","neutral")
                    if label not in _LABELS:
                        label = "neutral"
                    results.append({"label": label, "score": float(prob_row[idx])})
            except Exception as exc:
                logger.error("FinBERT inference error on batch %d: %s", i, exc)
                results.extend([{"label": "neutral", "score": 0.5}] * len(batch))
        return results