Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| """Centralized access to Hugging Face models with ensemble sentiment.""" | |
| from __future__ import annotations | |
| import logging | |
| import threading | |
| from dataclasses import dataclass | |
| from typing import Any, Dict, List, Mapping, Optional, Sequence | |
| from config import HUGGINGFACE_MODELS, get_settings | |
| # Set environment variables to avoid TensorFlow/Keras issues | |
| # We'll force PyTorch framework instead | |
| import os | |
| import sys | |
| # Completely disable TensorFlow to force PyTorch | |
| os.environ.setdefault('TRANSFORMERS_NO_ADVISORY_WARNINGS', '1') | |
| os.environ.setdefault('TRANSFORMERS_VERBOSITY', 'error') | |
| os.environ.setdefault('TF_CPP_MIN_LOG_LEVEL', '3') | |
| os.environ.setdefault('TRANSFORMERS_FRAMEWORK', 'pt') | |
| # Mock tf_keras to prevent transformers from trying to import it | |
| # This prevents the broken tf-keras installation from causing errors | |
| class TfKerasMock: | |
| """Mock tf_keras to prevent import errors when transformers checks for TensorFlow""" | |
| pass | |
| # Add mock to sys.modules before transformers imports | |
| sys.modules['tf_keras'] = TfKerasMock() | |
| sys.modules['tf_keras.src'] = TfKerasMock() | |
| sys.modules['tf_keras.src.utils'] = TfKerasMock() | |
| try: | |
| from transformers import pipeline | |
| TRANSFORMERS_AVAILABLE = True | |
| except ImportError: | |
| TRANSFORMERS_AVAILABLE = False | |
| logger = logging.getLogger(__name__) | |
| settings = get_settings() | |
| HF_MODE = os.getenv("HF_MODE", "off").lower() | |
| HF_TOKEN_ENV = os.getenv("HF_TOKEN") | |
| if HF_MODE not in ("off", "public", "auth"): | |
| HF_MODE = "off" | |
| logger.warning(f"Invalid HF_MODE, defaulting to 'off'") | |
| if HF_MODE == "auth" and not HF_TOKEN_ENV: | |
| HF_MODE = "off" | |
| logger.warning("HF_MODE='auth' but HF_TOKEN not set, defaulting to 'off'") | |
| ACTIVE_MODELS = [ | |
| "ElKulako/cryptobert", | |
| "kk08/CryptoBERT", | |
| "ProsusAI/finbert" | |
| ] | |
| LEGACY_MODELS = [ | |
| "burakutf/finetuned-finbert-crypto", | |
| "mathugo/crypto_news_bert", | |
| "svalabs/twitter-xlm-roberta-bitcoin-sentiment", | |
| "mayurjadhav/crypto-sentiment-model", | |
| "cardiffnlp/twitter-roberta-base-sentiment", | |
| "mrm8488/distilroberta-finetuned-financial-news-sentiment-analysis", | |
| "agarkovv/CryptoTrader-LM" | |
| ] | |
| CRYPTO_SENTIMENT_MODELS = ACTIVE_MODELS[:2] + LEGACY_MODELS[:2] | |
| SOCIAL_SENTIMENT_MODELS = LEGACY_MODELS[2:4] | |
| FINANCIAL_SENTIMENT_MODELS = [ACTIVE_MODELS[2]] + [LEGACY_MODELS[4]] | |
| NEWS_SENTIMENT_MODELS = [LEGACY_MODELS[5]] | |
| DECISION_MODELS = [LEGACY_MODELS[6]] | |
| class PipelineSpec: | |
| key: str | |
| task: str | |
| model_id: str | |
| requires_auth: bool = False | |
| category: str = "sentiment" | |
| MODEL_SPECS: Dict[str, PipelineSpec] = {} | |
| # Legacy models | |
| for lk in ["sentiment_twitter", "sentiment_financial", "summarization", "crypto_sentiment"]: | |
| if lk in HUGGINGFACE_MODELS: | |
| MODEL_SPECS[lk] = PipelineSpec( | |
| key=lk, | |
| task="sentiment-analysis" if "sentiment" in lk else "summarization", | |
| model_id=HUGGINGFACE_MODELS[lk], | |
| category="legacy" | |
| ) | |
| for i, mid in enumerate(ACTIVE_MODELS): | |
| MODEL_SPECS[f"active_{i}"] = PipelineSpec( | |
| key=f"active_{i}", task="sentiment-analysis", model_id=mid, | |
| category="crypto_sentiment" if i < 2 else "financial_sentiment", | |
| requires_auth=("ElKulako" in mid) | |
| ) | |
| for i, mid in enumerate(CRYPTO_SENTIMENT_MODELS): | |
| MODEL_SPECS[f"crypto_sent_{i}"] = PipelineSpec( | |
| key=f"crypto_sent_{i}", task="sentiment-analysis", model_id=mid, | |
| category="crypto_sentiment", requires_auth=("ElKulako" in mid) | |
| ) | |
| for i, mid in enumerate(SOCIAL_SENTIMENT_MODELS): | |
| MODEL_SPECS[f"social_sent_{i}"] = PipelineSpec( | |
| key=f"social_sent_{i}", task="sentiment-analysis", model_id=mid, category="social_sentiment" | |
| ) | |
| for i, mid in enumerate(FINANCIAL_SENTIMENT_MODELS): | |
| MODEL_SPECS[f"financial_sent_{i}"] = PipelineSpec( | |
| key=f"financial_sent_{i}", task="sentiment-analysis", model_id=mid, category="financial_sentiment" | |
| ) | |
| for i, mid in enumerate(NEWS_SENTIMENT_MODELS): | |
| MODEL_SPECS[f"news_sent_{i}"] = PipelineSpec( | |
| key=f"news_sent_{i}", task="sentiment-analysis", model_id=mid, category="news_sentiment" | |
| ) | |
| class ModelNotAvailable(RuntimeError): pass | |
| class ModelRegistry: | |
| def __init__(self): | |
| self._pipelines = {} | |
| self._lock = threading.Lock() | |
| self._initialized = False | |
| def get_pipeline(self, key: str): | |
| if not TRANSFORMERS_AVAILABLE: | |
| raise ModelNotAvailable("transformers not installed") | |
| if key not in MODEL_SPECS: | |
| raise ModelNotAvailable(f"Unknown key: {key}") | |
| spec = MODEL_SPECS[key] | |
| if key in self._pipelines: | |
| return self._pipelines[key] | |
| with self._lock: | |
| if key in self._pipelines: | |
| return self._pipelines[key] | |
| if HF_MODE == "off": | |
| raise ModelNotAvailable("HF_MODE=off") | |
| token_value = None | |
| if HF_MODE == "auth": | |
| token_value = HF_TOKEN_ENV or settings.hf_token | |
| elif HF_MODE == "public": | |
| token_value = None | |
| if spec.requires_auth and not token_value: | |
| raise ModelNotAvailable("Model requires auth but no token available") | |
| logger.info(f"Loading model: {spec.model_id} (mode: {HF_MODE})") | |
| try: | |
| pipeline_kwargs = { | |
| 'task': spec.task, | |
| 'model': spec.model_id, | |
| 'tokenizer': spec.model_id, | |
| 'framework': 'pt', | |
| 'device': -1, | |
| } | |
| pipeline_kwargs['token'] = token_value | |
| self._pipelines[key] = pipeline(**pipeline_kwargs) | |
| except Exception as e: | |
| error_msg = str(e) | |
| error_lower = error_msg.lower() | |
| try: | |
| from huggingface_hub.errors import RepositoryNotFoundError, HfHubHTTPError | |
| hf_errors = (RepositoryNotFoundError, HfHubHTTPError) | |
| except ImportError: | |
| hf_errors = () | |
| is_auth_error = any(kw in error_lower for kw in ['401', 'unauthorized', 'repository not found', 'expired', 'token']) | |
| is_hf_error = isinstance(e, hf_errors) or is_auth_error | |
| if is_hf_error: | |
| logger.warning(f"HF error for {spec.model_id}: {type(e).__name__}") | |
| raise ModelNotAvailable(f"HF error: {spec.model_id}") from e | |
| if any(kw in error_lower for kw in ['keras', 'tensorflow', 'tf_keras', 'framework']): | |
| try: | |
| pipeline_kwargs['torch_dtype'] = 'float32' | |
| self._pipelines[key] = pipeline(**pipeline_kwargs) | |
| return self._pipelines[key] | |
| except Exception: | |
| raise ModelNotAvailable(f"Framework error: {spec.model_id}") from e | |
| raise ModelNotAvailable(f"Load failed: {spec.model_id}") from e | |
| return self._pipelines[key] | |
| def get_loaded_models(self): | |
| """Get list of all loaded model keys""" | |
| return list(self._pipelines.keys()) | |
| def get_available_sentiment_models(self): | |
| """Get list of all available sentiment model keys""" | |
| return [key for key in MODEL_SPECS.keys() if "sent" in key or "sentiment" in key] | |
| def initialize_models(self): | |
| if self._initialized: | |
| return {"status": "already_initialized", "mode": HF_MODE, "models_loaded": len(self._pipelines)} | |
| if HF_MODE == "off": | |
| self._initialized = True | |
| return {"status": "disabled", "mode": "off", "models_loaded": 0, "loaded": [], "failed": []} | |
| if not TRANSFORMERS_AVAILABLE: | |
| return {"status": "transformers_not_available", "mode": HF_MODE, "models_loaded": 0} | |
| loaded, failed = [], [] | |
| active_keys = [f"active_{i}" for i in range(len(ACTIVE_MODELS))] | |
| for key in active_keys: | |
| try: | |
| self.get_pipeline(key) | |
| loaded.append(key) | |
| except ModelNotAvailable as e: | |
| failed.append((key, str(e)[:100])) | |
| except Exception as e: | |
| error_msg = str(e)[:100] | |
| failed.append((key, error_msg)) | |
| self._initialized = True | |
| status = "initialized" if loaded else "partial" | |
| return {"status": status, "mode": HF_MODE, "models_loaded": len(loaded), "loaded": loaded, "failed": failed} | |
| _registry = ModelRegistry() | |
| AI_MODELS_SUMMARY = {"status": "not_initialized", "mode": "off", "models_loaded": 0, "loaded": [], "failed": []} | |
| def initialize_models(): | |
| global AI_MODELS_SUMMARY | |
| result = _registry.initialize_models() | |
| AI_MODELS_SUMMARY = result | |
| return result | |
| def ensemble_crypto_sentiment(text: str) -> Dict[str, Any]: | |
| if not TRANSFORMERS_AVAILABLE or HF_MODE == "off": | |
| return {"label": "neutral", "confidence": 0.0, "scores": {}, "model_count": 0, "error": "HF disabled" if HF_MODE == "off" else "transformers N/A"} | |
| results, labels_count, total_conf = {}, {"bullish": 0, "bearish": 0, "neutral": 0}, 0.0 | |
| loaded_keys = _registry.get_loaded_models() | |
| available_keys = [key for key in loaded_keys if "sent" in key or "sentiment" in key or key.startswith("active_")] | |
| if not available_keys: | |
| return {"label": "neutral", "confidence": 0.0, "scores": {}, "model_count": 0, "error": "No models loaded"} | |
| for key in available_keys: | |
| try: | |
| pipe = _registry.get_pipeline(key) | |
| res = pipe(text[:512]) | |
| if isinstance(res, list) and res: res = res[0] | |
| label = res.get("label", "NEUTRAL").upper() | |
| score = res.get("score", 0.5) | |
| mapped = "bullish" if "POSITIVE" in label or "BULLISH" in label else ("bearish" if "NEGATIVE" in label or "BEARISH" in label else "neutral") | |
| spec = MODEL_SPECS.get(key) | |
| if spec: | |
| results[spec.model_id] = {"label": mapped, "score": score} | |
| else: | |
| results[key] = {"label": mapped, "score": score} | |
| labels_count[mapped] += 1 | |
| total_conf += score | |
| except ModelNotAvailable: | |
| continue | |
| except Exception as e: | |
| logger.warning(f"Ensemble failed for {key}: {e}") | |
| if not results: | |
| return {"label": "neutral", "confidence": 0.0, "scores": {}, "model_count": 0, "error": "All models failed"} | |
| final = max(labels_count, key=labels_count.get) | |
| avg_conf = total_conf / len(results) | |
| return {"label": final, "confidence": avg_conf, "scores": results, "model_count": len(results)} | |
| def analyze_crypto_sentiment(text: str): return ensemble_crypto_sentiment(text) | |
| def analyze_financial_sentiment(text: str): | |
| if not TRANSFORMERS_AVAILABLE: | |
| return {"label": "neutral", "score": 0.5, "error": "transformers N/A"} | |
| try: | |
| pipe = _registry.get_pipeline("financial_sent_0") | |
| res = pipe(text[:512]) | |
| if isinstance(res, list) and res: res = res[0] | |
| return {"label": res.get("label", "neutral").lower(), "score": res.get("score", 0.5)} | |
| except Exception as e: | |
| logger.error(f"Financial sentiment failed: {e}") | |
| return {"label": "neutral", "score": 0.5, "error": str(e)} | |
| def analyze_social_sentiment(text: str): | |
| if not TRANSFORMERS_AVAILABLE: | |
| return {"label": "neutral", "score": 0.5, "error": "transformers N/A"} | |
| try: | |
| pipe = _registry.get_pipeline("social_sent_0") | |
| res = pipe(text[:512]) | |
| if isinstance(res, list) and res: res = res[0] | |
| return {"label": res.get("label", "neutral").lower(), "score": res.get("score", 0.5)} | |
| except Exception as e: | |
| logger.error(f"Social sentiment failed: {e}") | |
| return {"label": "neutral", "score": 0.5, "error": str(e)} | |
| def analyze_market_text(text: str): return ensemble_crypto_sentiment(text) | |
| def analyze_chart_points(data: Sequence[Mapping[str, Any]], indicators: Optional[List[str]] = None): | |
| if not data: return {"trend": "neutral", "strength": 0, "analysis": "No data"} | |
| prices = [float(p.get("price", 0)) for p in data if p.get("price")] | |
| if not prices: return {"trend": "neutral", "strength": 0, "analysis": "No price data"} | |
| first, last = prices[0], prices[-1] | |
| change = ((last - first) / first * 100) if first > 0 else 0 | |
| if change > 5: trend, strength = "bullish", min(abs(change) / 10, 1.0) | |
| elif change < -5: trend, strength = "bearish", min(abs(change) / 10, 1.0) | |
| else: trend, strength = "neutral", abs(change) / 5 | |
| return {"trend": trend, "strength": strength, "change_pct": change, "support": min(prices), "resistance": max(prices), "analysis": f"Price moved {change:.2f}% showing {trend} trend"} | |
| def analyze_news_item(item: Dict[str, Any]): | |
| text = item.get("title", "") + " " + item.get("description", "") | |
| sent = ensemble_crypto_sentiment(text) | |
| return {**item, "sentiment": sent["label"], "sentiment_confidence": sent["confidence"], "sentiment_details": sent} | |
| def get_model_info(): | |
| return { | |
| "transformers_available": TRANSFORMERS_AVAILABLE, | |
| "hf_mode": HF_MODE, | |
| "hf_token_configured": bool(HF_TOKEN_ENV or settings.hf_token) if HF_MODE == "auth" else False, | |
| "models_initialized": _registry._initialized, | |
| "models_loaded": len(_registry._pipelines), | |
| "active_models": ACTIVE_MODELS, | |
| "total_models": len(MODEL_SPECS) | |
| } | |
| def registry_status(): | |
| return { | |
| "initialized": _registry._initialized, | |
| "pipelines_loaded": len(_registry._pipelines), | |
| "available_models": list(MODEL_SPECS.keys()), | |
| "transformers_available": TRANSFORMERS_AVAILABLE | |
| } | |