""" AI Engine: Hybrid LLM + FinBERT sentiment scoring + XGBoost training. Sentiment Analysis: Direction: OpenRouter LLM (BULLISH/BEARISH/NEUTRAL) Intensity: FinBERT probabilities for each article Reliability: strict JSON + repair + deterministic fallback Usage: python -m app.ai_engine --run-all --target-symbol HG=F python -m app.ai_engine --score-only python -m app.ai_engine --refresh-sentiment python -m app.ai_engine --train-only --target-symbol HG=F """ import argparse import json import logging import os from functools import lru_cache from datetime import datetime, timedelta, timezone from pathlib import Path from typing import Any, Optional import numpy as np import pandas as pd from sqlalchemy import func from sqlalchemy.orm import Session import xgboost as xgb from sklearn.metrics import mean_absolute_error, mean_squared_error from app.db import SessionLocal, init_db from app.models import ( NewsArticle, NewsSentiment, DailySentiment, PriceBar, NewsProcessed, NewsRaw, NewsSentimentV2, DailySentimentV2, ) from app.settings import get_settings from app.features import build_feature_matrix, get_feature_descriptions from app.lock import pipeline_lock from app.async_bridge import run_async_from_sync from app.openrouter_client import OpenRouterError, OpenRouterRateLimitError, create_chat_completion logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s" ) logger = logging.getLogger(__name__) _FINBERT_OUTPUT_LOGGED = False _FINBERT_MISSING_LABELS_WARNED = False HYBRID_SCORING_VERSION = "hybrid_v2" HYBRID_FALLBACK_429_MODEL_NAME = "hybrid_fallback_429" HYBRID_FALLBACK_PARSE_MODEL_NAME = "hybrid_fallback_parse" LLM_LABELS = {"BULLISH", "BEARISH", "NEUTRAL"} LLM_SCORING_MAX_TOKENS_PRIMARY = 3500 LLM_SCORING_MAX_TOKENS_RETRY = 8000 LLM_V2_LABEL_THRESHOLD = 0.15 LLM_V2_EVENT_TYPES = { "supply_disruption", "supply_expansion", "demand_increase", "demand_decrease", "inventory_draw", "inventory_build", "policy_support", "policy_drag", "macro_usd_up", "macro_usd_down", "cost_push", "mixed_unclear", "non_copper", } LLM_V2_EVENT_SIGN = { "supply_disruption": 1, "inventory_draw": 1, "demand_increase": 1, "policy_support": 1, "macro_usd_down": 1, "cost_push": 1, "supply_expansion": -1, "inventory_build": -1, "demand_decrease": -1, "policy_drag": -1, "macro_usd_up": -1, "mixed_unclear": 0, "non_copper": 0, } LLM_V2_EVENT_STRENGTH = { "supply_disruption": 1.0, "inventory_draw": 0.9, "demand_increase": 0.95, "policy_support": 0.8, "macro_usd_down": 0.7, "cost_push": 0.75, "supply_expansion": 1.0, "inventory_build": 0.9, "demand_decrease": 0.95, "policy_drag": 0.8, "macro_usd_up": 0.7, "mixed_unclear": 0.25, "non_copper": 0.0, } LLM_V2_SYSTEM_PROMPT = """You are a Senior Copper Futures Analyst focused on COMEX HG=F front-month contract. Your job is to estimate 1-5 trading day copper price impact from each article. Core principle: Classify by expected HG=F price reaction, NOT by whether the news is "good" or "bad" for the economy/company. Output requirements: Return ONLY a JSON array. One object per input id. Each object must contain exactly: - id (integer) - label ("BULLISH" | "BEARISH" | "NEUTRAL") - impact_score (number, -1.00 to 1.00, two decimals) - confidence (number, 0.00 to 1.00, two decimals) - relevance (number, 0.00 to 1.00, two decimals) - event_type (one of: supply_disruption, supply_expansion, demand_increase, demand_decrease, inventory_draw, inventory_build, policy_support, policy_drag, macro_usd_up, macro_usd_down, cost_push, mixed_unclear, non_copper) - reasoning (single line, <= 160 chars) Copper-specific reasoning rules: 1) Supply tightening is typically BULLISH for copper price. 2) Supply expansion is typically BEARISH. 3) Demand increase is typically BULLISH. 4) Demand decrease is typically BEARISH. 5) USD stronger is usually BEARISH for dollar-denominated copper; USD weaker is usually BULLISH. 6) If article is not materially related to copper supply/demand/pricing, use non_copper + NEUTRAL with low relevance/confidence. 7) Use NEUTRAL only when net effect is truly mixed/unclear within 1-5 day horizon. Label mapping: - impact_score >= 0.15 => BULLISH - impact_score <= -0.15 => BEARISH - otherwise => NEUTRAL """ LLM_SCORING_RESPONSE_FORMAT_V2 = { "type": "json_object", } SCORING_V2_VERSION = "commodity_v2" # ============================================================================= # FinBERT Sentiment Scoring # ============================================================================= def _neutral_finbert_score() -> dict: """Neutral fallback score used when FinBERT output is invalid or unavailable.""" return { "prob_positive": 0.33, "prob_neutral": 0.34, "prob_negative": 0.33, "score": 0.0, } def _normalize_finbert_output(raw_output: Any) -> list[dict]: """ Normalize FinBERT output into a flat ``list[dict]``. Supported raw formats: - list[dict] - list[list[dict]] - dict - JSON string of any of the above """ output = raw_output if isinstance(output, str): try: output = json.loads(output) except json.JSONDecodeError as exc: raise ValueError("FinBERT output is not valid JSON") from exc if isinstance(output, dict): output = [output] if not isinstance(output, list): raise TypeError(f"Unsupported FinBERT output type: {type(output).__name__}") normalized: list[dict] = [] for item in output: if isinstance(item, dict): normalized.append(item) continue if isinstance(item, list): normalized.extend([child for child in item if isinstance(child, dict)]) continue logger.debug("Skipping unsupported FinBERT output item type: %s", type(item).__name__) return normalized def _log_finbert_output_once(raw_output: Any) -> None: """Log one representative FinBERT output shape for debugging parser mismatches.""" global _FINBERT_OUTPUT_LOGGED if _FINBERT_OUTPUT_LOGGED: return first_item = raw_output if isinstance(raw_output, list) and raw_output: first_item = raw_output[0] preview = repr(first_item) if len(preview) > 220: preview = f"{preview[:220]}..." logger.info( "FinBERT output sample: type=%s first_item_type=%s first_item=%s", type(raw_output).__name__, type(first_item).__name__, preview, ) _FINBERT_OUTPUT_LOGGED = True @lru_cache(maxsize=1) def get_finbert_pipeline(): """ Load FinBERT model pipeline. Lazy loading to avoid import overhead when not needed. """ settings = get_settings() os.environ.setdefault("HF_HUB_DISABLE_PROGRESS_BARS", "1") os.environ.setdefault("TRANSFORMERS_VERBOSITY", "error") os.environ.setdefault("TRANSFORMERS_NO_ADVISORY_WARNINGS", "1") os.environ.setdefault("TOKENIZERS_PARALLELISM", settings.tokenizers_parallelism) from transformers import pipeline, AutoModelForSequenceClassification, AutoTokenizer model_name = "ProsusAI/finbert" logger.info(f"Loading FinBERT model: {model_name}") tokenizer = AutoTokenizer.from_pretrained(model_name) model = AutoModelForSequenceClassification.from_pretrained(model_name) pipe = pipeline( "sentiment-analysis", model=model, tokenizer=tokenizer, return_all_scores=True, max_length=512, truncation=True ) logger.info("FinBERT model loaded") return pipe def score_text_with_finbert( pipe, text: str ) -> dict: """ Score a single text with FinBERT. Returns: Dict with prob_positive, prob_neutral, prob_negative, score """ if not text or len(text.strip()) < 10: return _neutral_finbert_score() # Truncate long text text = text[:1000] try: try: raw_output = pipe(text, top_k=None) except TypeError: # Older transformers pipeline signature may not support top_k argument. raw_output = pipe(text) _log_finbert_output_once(raw_output) results = _normalize_finbert_output(raw_output) if not results: logger.warning("FinBERT output normalized to empty list, using neutral fallback") return _neutral_finbert_score() probs: dict[str, float] = {} for row in results: label = row.get("label") score = row.get("score") if not isinstance(label, str): continue try: probs[label.lower()] = float(score) except (TypeError, ValueError): continue required_labels = {"positive", "neutral", "negative"} if not required_labels.issubset(probs): global _FINBERT_MISSING_LABELS_WARNED if not _FINBERT_MISSING_LABELS_WARNED: logger.warning( "FinBERT output missing labels. found=%s expected=%s (further repeats suppressed)", sorted(probs.keys()), sorted(required_labels), ) _FINBERT_MISSING_LABELS_WARNED = True return _neutral_finbert_score() prob_pos = probs["positive"] prob_neu = probs["neutral"] prob_neg = probs["negative"] # Derived score: positive - negative (range: -1 to 1) score = prob_pos - prob_neg return { "prob_positive": prob_pos, "prob_neutral": prob_neu, "prob_negative": prob_neg, "score": score } except Exception as e: logger.warning(f"FinBERT scoring error: {e}") return _neutral_finbert_score() # ============================================================================= # LLM Sentiment Scoring (Primary - OpenRouter) # ============================================================================= # Copper-specific system prompt for LLM direction classification. LLM_SENTIMENT_SYSTEM_PROMPT = """You are a neutral copper market classifier. Task: For each input article, classify immediate 1-5 day HG=F price impact direction. Allowed labels: - BULLISH - BEARISH - NEUTRAL Rules: - Use only provided title/description text. - Return one output item per input id. - Keep reasoning one line, no newline/tab characters. - Do not add extra keys. - If uncertain or non-copper-relevant, use NEUTRAL with low confidence. """ LLM_SCORING_RESPONSE_FORMAT = { "type": "json_object", } LLM_SCORING_PROVIDER_OPTIONS = {"require_parameters": True} class LLMStructuredOutputError(RuntimeError): """Raised when LLM structured output remains invalid after repair attempts.""" def _hybrid_model_name(llm_model: str) -> str: """Stable model identifier for hybrid scoring rows.""" return f"hybrid({llm_model}+ProsusAI/finbert)" def _sanitize_reasoning_text(value: Any) -> str: """Normalize reasoning to a single line without tabs/newlines.""" if value is None: return "" text = str(value).replace("\r", " ").replace("\n", " ").replace("\t", " ") return " ".join(text.split()).strip() def _neutral_llm_result( *, article_id: int, llm_model: str, reason: str, model_name_override: Optional[str] = None, ) -> dict: return { "id": int(article_id), "label": "NEUTRAL", "llm_confidence": 0.0, "llm_reasoning": _sanitize_reasoning_text(reason), "llm_model": llm_model, "model_name": model_name_override or _hybrid_model_name(llm_model), "json_repair_used": False, } def _is_rate_limit_error(exc: Exception) -> bool: """Classify OpenRouter 429 errors for deterministic neutral fallback semantics.""" if isinstance(exc, OpenRouterRateLimitError): return True if isinstance(exc, OpenRouterError) and exc.status_code == 429: return True message = str(exc).lower() return "429" in message and "rate" in message def _build_hybrid_reasoning_payload( *, label: str, llm_confidence: float, finbert_strength: float, finbert_polarity: float, llm_reasoning: str, llm_model: str, soft_neutral_applied: bool = False, ) -> str: payload = { "label": label, "llm_confidence": round(max(0.0, min(1.0, llm_confidence)), 4), "finbert_strength": round(max(0.0, min(1.0, finbert_strength)), 4), "finbert_polarity": round(max(-1.0, min(1.0, finbert_polarity)), 4), "llm_reasoning": _sanitize_reasoning_text(llm_reasoning), "llm_model": llm_model, "soft_neutral_applied": bool(soft_neutral_applied), "scoring_version": HYBRID_SCORING_VERSION, } return json.dumps(payload, ensure_ascii=True) def _compute_hybrid_score( *, label: str, llm_confidence: float, finbert_strength: float, finbert_polarity: Optional[float] = None, non_neutral_boost: float = 1.35, soft_neutral_polarity_threshold: float = 0.12, soft_neutral_max_mag: float = 0.25, soft_neutral_scale: float = 0.8, return_metadata: bool = False, ) -> float | tuple[float, bool]: """Compute final hybrid impact score in [-1, 1] with boosted non-neutral and soft-neutral rules.""" normalized_label = str(label).upper().strip() if normalized_label not in LLM_LABELS: normalized_label = "NEUTRAL" confidence = max(0.0, min(1.0, float(llm_confidence))) strength = max(0.0, min(1.0, float(finbert_strength))) polarity_value = float(finbert_polarity) if finbert_polarity is not None else 0.0 polarity = max(-1.0, min(1.0, polarity_value)) soft_neutral_applied = False if normalized_label == "NEUTRAL": abs_polarity = abs(polarity) if abs_polarity < max(0.0, float(soft_neutral_polarity_threshold)): final_score = 0.0 else: neutral_core = (0.6 * abs_polarity) + (0.4 * strength) neutral_mag = min( max(0.0, float(soft_neutral_max_mag)), neutral_core * max(0.0, float(soft_neutral_scale)), ) sign = 1.0 if polarity > 0 else -1.0 final_score = sign * neutral_mag soft_neutral_applied = True if return_metadata: return final_score, soft_neutral_applied return final_score sign = 1.0 if normalized_label == "BULLISH" else -1.0 base_mag = max(0.0, min(1.0, (0.7 * confidence) + (0.3 * strength))) boosted_mag = min(1.0, base_mag * max(0.0, float(non_neutral_boost))) final_score = sign * boosted_mag if return_metadata: return final_score, soft_neutral_applied return final_score def _extract_chat_message_content(data: dict[str, Any]) -> str: """Extract text content from OpenRouter chat completion response.""" message = data.get("choices", [{}])[0].get("message", {}) content = message.get("content", "") if isinstance(content, str): return content.strip() if isinstance(content, list): text_parts: list[str] = [] for item in content: if isinstance(item, dict) and item.get("type") == "text": text = item.get("text") if isinstance(text, str): text_parts.append(text) return "\n".join(text_parts).strip() return "" def _extract_finish_reason(data: dict[str, Any]) -> str: """Extract finish reason from first choice for empty-content diagnostics.""" reason = data.get("choices", [{}])[0].get("finish_reason", "") if not isinstance(reason, str): return "" return reason.strip().lower() def _clean_json_content(content: str) -> str: """ Normalize model text into parseable JSON content. Handles common LLM output quirks: - Markdown fenced code blocks (```json ... ```) - Wrapped objects like {"results": [...]} or {"scores": [...]} - Thinking/reasoning preamble before JSON - Raw JSON arrays """ normalized = content.strip() # Strip markdown code fences if normalized.startswith("```"): lines = normalized.splitlines() if lines and lines[0].startswith("```"): lines = lines[1:] if lines and lines[-1].strip() == "```": lines = lines[:-1] normalized = "\n".join(lines).strip() if normalized.startswith("json"): normalized = normalized[4:].strip() # Already a JSON array — return as-is if normalized.startswith("["): return normalized # Wrapped object: {"results": [...], ...} or {"scores": [...], ...} if normalized.startswith("{"): try: import json as _json obj = _json.loads(normalized) if isinstance(obj, dict): # Find the first list value for v in obj.values(): if isinstance(v, list): return _json.dumps(v) # Single object — wrap in array return _json.dumps([obj]) except Exception: pass # Preamble text before JSON — find the array first = normalized.find("[") last = normalized.rfind("]") if first != -1 and last != -1 and last > first: return normalized[first:last + 1] # Last resort — try to find an object first_obj = normalized.find("{") last_obj = normalized.rfind("}") if first_obj != -1 and last_obj != -1 and last_obj > first_obj: return normalized[first_obj:last_obj + 1] return normalized def _validate_llm_results( *, raw_results: Any, expected_ids: list[int], model_name: str, json_repair_used: bool = False, ) -> list[dict]: """Validate/normalize LLM output for direction+confidence labels.""" if not isinstance(raw_results, list): raise ValueError(f"Structured result must be a list, got {type(raw_results).__name__}") results_by_id: dict[int, dict] = {} for item in raw_results: if not isinstance(item, dict): raise ValueError(f"Structured result item must be object, got {type(item).__name__}") if "id" not in item: raise ValueError("Structured result missing required field: id") article_id = int(item["id"]) if article_id in results_by_id: raise ValueError(f"Duplicate article id in structured output: {article_id}") raw_label = item.get("label") raw_confidence = item.get("confidence") raw_score = item.get("score") score_value: Optional[float] = None if raw_score is not None: score_value = max(-1.0, min(1.0, float(raw_score))) if raw_label is not None: label = str(raw_label).upper().strip() if label not in LLM_LABELS: raise ValueError(f"Unsupported label in structured output: {label}") elif score_value is not None: if score_value > 0.05: label = "BULLISH" elif score_value < -0.05: label = "BEARISH" else: label = "NEUTRAL" else: raise ValueError("Structured result missing label/score fields") if raw_confidence is not None: confidence = max(0.0, min(1.0, float(raw_confidence))) elif score_value is not None: confidence = abs(score_value) else: confidence = 0.0 if label == "NEUTRAL" else 0.5 reasoning = _sanitize_reasoning_text(item.get("reasoning", "")) results_by_id[article_id] = { "id": article_id, "label": label, "llm_confidence": confidence, "llm_reasoning": reasoning, "llm_model": model_name, "model_name": _hybrid_model_name(model_name), "json_repair_used": json_repair_used, } expected = set(expected_ids) got = set(results_by_id.keys()) missing = sorted(expected - got) extra = sorted(got - expected) if missing or extra: raise ValueError(f"Structured result ID mismatch. missing={missing} extra={extra}") return [results_by_id[article_id] for article_id in expected_ids] async def score_batch_with_llm( articles: list[dict], ) -> list[dict]: """ Classify a batch with LLM direction + confidence using strict JSON schema. """ settings = get_settings() if not settings.openrouter_api_key: raise RuntimeError("OpenRouter API key not configured") normalized_articles = [ { "id": int(article["id"]), "title": str(article.get("title") or ""), "description": str(article.get("description") or "")[:600], } for article in articles ] expected_ids = [item["id"] for item in normalized_articles] user_prompt = ( "Classify each article into BULLISH, BEARISH, or NEUTRAL for short-term HG=F price impact.\n" "Return one output object per input id and keep reasoning single-line.\n\n" f"Input articles JSON:\n{json.dumps(normalized_articles, ensure_ascii=True)}" ) model_name = settings.resolved_scoring_model async def _request_scoring(*, max_tokens: int) -> dict[str, Any]: request_kwargs: dict[str, Any] = { "api_key": settings.openrouter_api_key, "model": model_name, "messages": [ {"role": "system", "content": LLM_SENTIMENT_SYSTEM_PROMPT}, {"role": "user", "content": user_prompt}, ], "max_tokens": max_tokens, "temperature": 0.0, "timeout_seconds": 60.0, "max_retries": settings.openrouter_max_retries, "rpm": settings.openrouter_rpm, "fallback_models": settings.openrouter_fallback_models_list, "referer": "https://copper-mind.vercel.app", "title": "CopperMind Sentiment Analysis", "response_format": LLM_SCORING_RESPONSE_FORMAT, "extra_payload": {"reasoning": {"exclude": True}}, } return await create_chat_completion(**request_kwargs) async def _repair_json_response(malformed_content: str) -> str: repair_prompt = ( "Convert the following malformed model output into valid JSON WITHOUT changing meaning.\n" f"Expected ids: {expected_ids}\n" "Output only JSON array and keep keys: id,label,confidence,reasoning.\n\n" f"MALFORMED:\n{malformed_content}" ) repair_data = await create_chat_completion( api_key=settings.openrouter_api_key, model=model_name, messages=[ { "role": "system", "content": ( "You fix JSON formatting only. Return valid JSON. " "No markdown and no extra text." ), }, {"role": "user", "content": repair_prompt}, ], max_tokens=4000, temperature=0.0, timeout_seconds=60.0, max_retries=settings.openrouter_max_retries, rpm=settings.openrouter_rpm, fallback_models=settings.openrouter_fallback_models_list, referer="https://copper-mind.vercel.app", title="CopperMind JSON Repair", extra_payload={"reasoning": {"exclude": True}}, ) repaired_content = _extract_chat_message_content(repair_data) if not repaired_content: raise LLMStructuredOutputError("JSON repair call returned empty content") return repaired_content def _parse_and_validate(content: str, *, repair_used: bool) -> list[dict]: raw_results = json.loads(_clean_json_content(content)) return _validate_llm_results( raw_results=raw_results, expected_ids=expected_ids, model_name=model_name, json_repair_used=repair_used, ) data = await _request_scoring(max_tokens=LLM_SCORING_MAX_TOKENS_PRIMARY) content = _extract_chat_message_content(data) if not content: finish_reason = _extract_finish_reason(data) if finish_reason == "length": logger.warning( "LLM response ended with finish_reason=length and empty content; " "retrying with max_tokens=%s", LLM_SCORING_MAX_TOKENS_RETRY, ) data = await _request_scoring(max_tokens=LLM_SCORING_MAX_TOKENS_RETRY) content = _extract_chat_message_content(data) if not content: raise OpenRouterError( f"Empty response content from LLM scoring (finish_reason={finish_reason or 'unknown'})" ) try: return _parse_and_validate(content, repair_used=False) except (ValueError, TypeError, json.JSONDecodeError) as exc: logger.warning("LLM structured parse failed, attempting JSON repair: %s", exc) try: repaired_content = await _repair_json_response(content) return _parse_and_validate(repaired_content, repair_used=True) except Exception as repair_exc: raise LLMStructuredOutputError( f"LLM structured output invalid after repair: {repair_exc}" ) from repair_exc def score_batch_with_finbert(articles: list) -> list[dict]: """ Score articles with FinBERT to provide sentiment intensity probabilities. Args: articles: List of NewsArticle ORM objects Returns: List of dicts with FinBERT probabilities """ pipe = get_finbert_pipeline() results = [] for article in articles: text = f"{article.title} {article.description or ''}" scores = score_text_with_finbert(pipe, text) finbert_strength = abs(scores["prob_positive"] - scores["prob_negative"]) results.append({ "id": article.id, "score": scores["score"], "prob_positive": scores["prob_positive"], "prob_neutral": scores["prob_neutral"], "prob_negative": scores["prob_negative"], "finbert_strength": finbert_strength, }) return results def _clip(value: float, lower: float, upper: float) -> float: """Clamp numeric value.""" return max(lower, min(upper, float(value))) def _label_from_impact_score(impact_score: float) -> str: """Map impact score to discrete label.""" if impact_score >= LLM_V2_LABEL_THRESHOLD: return "BULLISH" if impact_score <= -LLM_V2_LABEL_THRESHOLD: return "BEARISH" return "NEUTRAL" def _sign(value: float, eps: float = 1e-9) -> int: """Return numeric sign with epsilon deadzone.""" if value > eps: return 1 if value < -eps: return -1 return 0 def _normalize_event_type(value: Any) -> str: """Normalize event type to allowed vocabulary.""" normalized = str(value or "").strip().lower() if normalized in LLM_V2_EVENT_TYPES: return normalized return "mixed_unclear" def _infer_event_type_from_text(text: str) -> str: """Heuristic event inference used only for deterministic fallback.""" lower = (text or "").lower() if not lower or "copper" not in lower: return "non_copper" supply_disruption_keywords = [ "outage", "strike", "disruption", "shutdown", "halt", "sanction", "inventory draw", "stocks fell", "warehouse draw", ] supply_expansion_keywords = [ "ramp-up", "ramp up", "increase output", "production increase", "new mine", "inventory build", "stocks rose", ] demand_increase_keywords = [ "stimulus", "grid investment", "ev demand", "demand rise", "stockpile purchase", "import growth", ] demand_decrease_keywords = [ "slowdown", "weak demand", "demand decline", "construction slump", "pmi contraction", "import decline", ] if any(token in lower for token in supply_disruption_keywords): return "supply_disruption" if any(token in lower for token in supply_expansion_keywords): return "supply_expansion" if any(token in lower for token in demand_increase_keywords): return "demand_increase" if any(token in lower for token in demand_decrease_keywords): return "demand_decrease" if "dollar strengthens" in lower or "usd stronger" in lower: return "macro_usd_up" if "dollar weakens" in lower or "usd weaker" in lower: return "macro_usd_down" return "mixed_unclear" def _build_llm_v2_user_prompt(articles: list[dict], horizon_days: int) -> str: """Build compact JSON prompt for batch scoring.""" normalized_articles = [ { "id": int(article["id"]), "title": str(article.get("title") or "")[:500], "description": str(article.get("description") or "")[:800], } for article in articles ] return ( f"Classify each article for {horizon_days}-day HG=F copper futures impact.\n" "Return one object per id.\n\n" f"Input articles JSON:\n{json.dumps(normalized_articles, ensure_ascii=True)}" ) def _parse_llm_v2_items( *, raw_results: Any, expected_ids: list[int], model_name: str, ) -> tuple[dict[int, dict], list[int]]: """ Parse/validate V2 LLM outputs. Returns: (valid_results_by_id, failed_ids) """ if not isinstance(raw_results, list): raise ValueError(f"Structured result must be a list, got {type(raw_results).__name__}") expected = set(expected_ids) valid: dict[int, dict] = {} failed_ids: set[int] = set() for item in raw_results: if not isinstance(item, dict): logger.debug("V2 parse: skipping non-dict item: %s", type(item).__name__) continue if "id" not in item: logger.debug("V2 parse: item missing 'id' key, keys=%s", list(item.keys())) continue try: article_id = int(item["id"]) except (TypeError, ValueError): logger.debug("V2 parse: invalid id value: %r", item.get("id")) continue if article_id not in expected: continue if article_id in valid: failed_ids.add(article_id) continue raw_label = item.get("label", item.get("classification")) raw_impact = item.get("impact_score", item.get("score")) raw_confidence = item.get("confidence") raw_relevance = item.get("relevance", item.get("relevance_score")) raw_event_type = item.get("event_type") raw_reasoning = item.get("reasoning", "") try: # impact_score is required; try label-based inference if missing if raw_impact is None: if raw_label and str(raw_label).upper().strip() in LLM_LABELS: lbl = str(raw_label).upper().strip() raw_impact = {"BULLISH": 0.3, "BEARISH": -0.3, "NEUTRAL": 0.0}.get(lbl, 0.0) logger.debug("V2 parse: inferred impact_score=%.1f from label=%s for id=%d", raw_impact, lbl, article_id) else: raise ValueError("missing impact_score and no valid label") impact_score = _clip(float(raw_impact), -1.0, 1.0) # confidence and relevance: default to 0.5 if missing confidence = _clip(float(raw_confidence), 0.0, 1.0) if raw_confidence is not None else 0.5 relevance = _clip(float(raw_relevance), 0.0, 1.0) if raw_relevance is not None else 0.5 except (TypeError, ValueError) as exc: logger.debug("V2 parse: field error for id=%d: %s (keys=%s)", article_id, exc, list(item.keys())) failed_ids.add(article_id) continue event_type = _normalize_event_type(raw_event_type) label_from_impact = _label_from_impact_score(impact_score) if raw_label is None: label = label_from_impact else: label = str(raw_label).upper().strip() if label not in LLM_LABELS: label = label_from_impact if label != label_from_impact: # Keep deterministic consistency between score and class. label = label_from_impact reasoning = _sanitize_reasoning_text(raw_reasoning)[:160] valid[article_id] = { "id": article_id, "label": label, "impact_score": impact_score, "confidence": confidence, "relevance": relevance, "event_type": event_type, "reasoning": reasoning, "llm_model": model_name, } # Mark missing ids as failed. missing_ids = [] for article_id in expected_ids: if article_id not in valid: failed_ids.add(article_id) missing_ids.append(article_id) if missing_ids: logger.warning( "V2 parse: %d/%d articles missing from LLM response (model=%s, returned=%d items, missing_ids=%s)", len(missing_ids), len(expected_ids), model_name, len(raw_results), missing_ids[:10], ) return valid, sorted(failed_ids) async def _repair_json_response_v2( *, settings: Any, model_name: str, malformed_content: str, expected_ids: list[int], ) -> str: """Repair malformed JSON into V2 contract with no semantic rewrite.""" repair_prompt = ( "Convert the malformed output into valid JSON array.\n" f"Expected ids: {expected_ids}\n" "Keep keys: id,label,impact_score,confidence,relevance,event_type,reasoning.\n" "Do not add explanations.\n\n" f"MALFORMED:\n{malformed_content}" ) repair_data = await create_chat_completion( api_key=settings.openrouter_api_key, model=model_name, messages=[ { "role": "system", "content": "You repair JSON only. Return valid JSON array without markdown.", }, {"role": "user", "content": repair_prompt}, ], max_tokens=4000, temperature=0.0, timeout_seconds=60.0, max_retries=settings.openrouter_max_retries, rpm=settings.openrouter_rpm, fallback_models=settings.openrouter_fallback_models_list, referer="https://copper-mind.vercel.app", title="CopperMind V2 JSON Repair", extra_payload={"reasoning": {"exclude": True}}, ) repaired_content = _extract_chat_message_content(repair_data) if not repaired_content: raise LLMStructuredOutputError("V2 JSON repair returned empty content") return repaired_content async def _score_subset_with_model_v2( *, settings: Any, model_name: str, articles: list[dict], horizon_days: int, ) -> tuple[dict[int, dict], list[int], int]: """ Score subset with one model. Returns: (valid_results_by_id, failed_ids, parse_fail_count) """ if not articles: return {}, [], 0 expected_ids = [int(article["id"]) for article in articles] user_prompt = _build_llm_v2_user_prompt(articles, horizon_days=horizon_days) async def _request(*, max_tokens: int) -> dict[str, Any]: request_kwargs: dict[str, Any] = { "api_key": settings.openrouter_api_key, "model": model_name, "messages": [ {"role": "system", "content": LLM_V2_SYSTEM_PROMPT}, {"role": "user", "content": user_prompt}, ], "max_tokens": max_tokens, "temperature": 0.0, "timeout_seconds": 60.0, "max_retries": settings.openrouter_max_retries, "rpm": settings.openrouter_rpm, "fallback_models": settings.openrouter_fallback_models_list, "referer": "https://copper-mind.vercel.app", "title": "CopperMind Sentiment Analysis V2", "response_format": LLM_SCORING_RESPONSE_FORMAT_V2, "extra_payload": {"reasoning": {"exclude": True}}, } return await create_chat_completion(**request_kwargs) parse_fail_count = 0 rate_limited = False try: data = await _request(max_tokens=LLM_SCORING_MAX_TOKENS_PRIMARY) except OpenRouterRateLimitError: logger.warning("V2 scoring rate-limited for model=%s, skipping batch (%d articles)", model_name, len(articles)) rate_limited = True return {}, expected_ids, len(expected_ids), rate_limited except Exception as exc: logger.warning("V2 scoring failed for model=%s: %s", model_name, exc) return {}, expected_ids, len(expected_ids), False content = _extract_chat_message_content(data) if not content: finish_reason = _extract_finish_reason(data) if finish_reason == "length": data = await _request(max_tokens=LLM_SCORING_MAX_TOKENS_RETRY) content = _extract_chat_message_content(data) if not content: return {}, expected_ids, len(expected_ids), False try: raw_results = json.loads(_clean_json_content(content)) valid, failed = _parse_llm_v2_items( raw_results=raw_results, expected_ids=expected_ids, model_name=model_name, ) parse_fail_count += len(failed) return valid, failed, parse_fail_count, False except Exception as exc: logger.warning( "V2 JSON parse failed for model=%s: %s | response_preview=%.500s", model_name, exc, content, ) parse_fail_count += len(expected_ids) try: repaired = await _repair_json_response_v2( settings=settings, model_name=model_name, malformed_content=content, expected_ids=expected_ids, ) raw_results = json.loads(_clean_json_content(repaired)) valid, failed = _parse_llm_v2_items( raw_results=raw_results, expected_ids=expected_ids, model_name=model_name, ) return valid, failed, parse_fail_count, False except Exception: return {}, expected_ids, parse_fail_count, False async def score_batch_with_llm_v2( articles: list[dict], *, horizon_days: int = 5, ) -> dict[str, Any]: """ Commodity-aware sentiment scoring with fast+reliable model escalation. """ settings = get_settings() if not settings.openrouter_api_key: raise RuntimeError("OpenRouter API key not configured") fast_model = settings.resolved_scoring_fast_model reliable_model = settings.resolved_scoring_reliable_model conflict_threshold = _clip( float(getattr(settings, "sentiment_escalate_conflict_threshold", 0.55)), 0.0, 1.0, ) normalized_articles = [ { "id": int(article["id"]), "title": str(article.get("title") or "")[:500], "description": str(article.get("description") or "")[:800], "text": str(article.get("text") or "")[:1800], } for article in articles ] expected_ids = [item["id"] for item in normalized_articles] article_by_id = {item["id"]: item for item in normalized_articles} fast_valid, fast_failed, parse_fail_fast, fast_rate_limited = await _score_subset_with_model_v2( settings=settings, model_name=fast_model, articles=normalized_articles, horizon_days=horizon_days, ) results_by_id = dict(fast_valid) parse_fail_total = int(parse_fail_fast) conflict_ids: list[int] = [] for article_id, item in fast_valid.items(): event_type = _normalize_event_type(item.get("event_type")) rule_sign = int(LLM_V2_EVENT_SIGN.get(event_type, 0)) llm_sign = _sign(float(item.get("impact_score", 0.0))) conflict_strength = _clip( float(item.get("confidence", 0.0)) * float(item.get("relevance", 0.0)), 0.0, 1.0, ) if rule_sign != 0 and llm_sign != 0 and llm_sign != rule_sign and conflict_strength >= conflict_threshold: conflict_ids.append(article_id) results_by_id.pop(article_id, None) escalation_ids = sorted(set(fast_failed).union(conflict_ids)) escalation_count = len(escalation_ids) if escalation_ids and not fast_rate_limited: reliable_subset = [ article_by_id[article_id] for article_id in escalation_ids if article_id in article_by_id ] reliable_valid, _reliable_failed, parse_fail_reliable, _rl = await _score_subset_with_model_v2( settings=settings, model_name=reliable_model, articles=reliable_subset, horizon_days=horizon_days, ) results_by_id.update(reliable_valid) parse_fail_total += int(parse_fail_reliable) elif fast_rate_limited and escalation_ids: logger.info( "Skipping escalation to %s: fast model was rate-limited (%d articles → direct FinBERT fallback)", reliable_model, len(escalation_ids), ) results = [results_by_id[article_id] for article_id in expected_ids if article_id in results_by_id] failed_ids = [article_id for article_id in expected_ids if article_id not in results_by_id] fallback_count = len(failed_ids) return { "results": results, "failed_ids": failed_ids, "parse_fail_count": parse_fail_total, "escalation_count": escalation_count, "fallback_count": fallback_count, "model_fast": fast_model, "model_reliable": reliable_model, } def score_batch_with_finbert_v2(articles: list[dict]) -> dict[int, dict]: """Score text with FinBERT for tone/intensity features.""" pipe = get_finbert_pipeline() results: dict[int, dict] = {} for article in articles: article_id = int(article["id"]) text = str( article.get("text") or f"{article.get('title', '')} {article.get('description', '')}" )[:1200] scores = score_text_with_finbert(pipe, text) results[article_id] = { "prob_positive": float(scores["prob_positive"]), "prob_neutral": float(scores["prob_neutral"]), "prob_negative": float(scores["prob_negative"]), "tone": float(scores["score"]), "magnitude": abs(float(scores["prob_positive"]) - float(scores["prob_negative"])), } return results def compute_final_score_v2( *, impact_score_llm: float, confidence_llm: float, relevance_score: float, event_type: str, prob_positive: float, prob_negative: float, ) -> dict[str, float | int]: """Compute deterministic ensemble score for V2.""" llm_impact = _clip(float(impact_score_llm), -1.0, 1.0) llm_conf = _clip(float(confidence_llm), 0.0, 1.0) relevance = _clip(float(relevance_score), 0.0, 1.0) tone = _clip(float(prob_positive) - float(prob_negative), -1.0, 1.0) tone_mag = abs(tone) normalized_event = _normalize_event_type(event_type) rule_sign = int(LLM_V2_EVENT_SIGN.get(normalized_event, 0)) rule_strength = float(LLM_V2_EVENT_STRENGTH.get(normalized_event, 0.25)) llm_sign = _sign(llm_impact) final_sign = llm_sign if llm_sign != 0 else rule_sign if final_sign == 0 and tone_mag >= 0.2: final_sign = _sign(tone) impact_mag = _clip( (0.55 * abs(llm_impact)) + (0.25 * tone_mag) + (0.20 * _clip(rule_strength, 0.0, 1.0)), 0.0, 1.0, ) if final_sign == 0: impact_mag = min(impact_mag, 0.12) final_score = float(final_sign) * impact_mag agreement = 1.0 if (rule_sign == 0 or llm_sign == 0 or llm_sign == rule_sign) else 0.4 confidence_cal = _clip((0.50 * llm_conf) + (0.30 * agreement) + (0.20 * relevance), 0.01, 0.99) return { "rule_sign": rule_sign, "rule_strength": rule_strength, "final_score": final_score, "confidence_calibrated": confidence_cal, } def _build_article_fallback_v2( *, article: dict, finbert: dict, model_fast: str, model_reliable: str, ) -> dict: """Deterministic article-level fallback without zero-only outputs.""" text = str(article.get("text") or f"{article.get('title', '')} {article.get('description', '')}") event_type = _infer_event_type_from_text(text) rule_sign = int(LLM_V2_EVENT_SIGN.get(event_type, 0)) tone = float(finbert.get("tone", 0.0)) tone_sign = _sign(tone) direction = rule_sign if rule_sign != 0 else tone_sign if direction == 0: impact_score = 0.0 else: impact_score = float(direction) * _clip((abs(tone) * 0.35) + 0.08, 0.08, 0.25) relevance = 0.10 if event_type == "non_copper" else 0.45 confidence = 0.18 if direction == 0 else _clip(0.22 + (abs(tone) * 0.22), 0.22, 0.45) return { "id": int(article["id"]), "label": _label_from_impact_score(impact_score), "impact_score": impact_score, "confidence": confidence, "relevance": relevance, "event_type": event_type, "reasoning": "deterministic_fallback", "llm_model": model_fast, "model_fast": model_fast, "model_reliable": model_reliable, "fallback_used": True, } def score_unscored_processed_articles( session: Session, *, chunk_size: int = 12, backfill_days: Optional[int] = None, ) -> dict[str, int]: """ Score unscored `news_processed` articles into `news_sentiments_v2`. """ settings = get_settings() horizon_days = max(1, int(getattr(settings, "sentiment_horizon_days", 5))) relevance_min = _clip(float(getattr(settings, "sentiment_relevance_min", 0.35)), 0.0, 1.0) query = ( session.query( NewsProcessed.id.label("processed_id"), NewsProcessed.canonical_title, NewsProcessed.cleaned_text, NewsRaw.title.label("raw_title"), NewsRaw.description.label("raw_description"), NewsRaw.published_at, ) .join(NewsRaw, NewsProcessed.raw_id == NewsRaw.id) .outerjoin( NewsSentimentV2, (NewsProcessed.id == NewsSentimentV2.news_processed_id) & (NewsSentimentV2.horizon_days == horizon_days), ) .filter(NewsSentimentV2.id.is_(None)) .order_by(NewsRaw.published_at.asc(), NewsProcessed.id.asc()) ) if backfill_days is not None: cutoff = datetime.now(timezone.utc) - timedelta(days=max(1, int(backfill_days))) query = query.filter(NewsRaw.published_at >= cutoff) rows = query.all() if not rows: logger.info("No unscored processed articles found") return { "scored_count": 0, "parse_fail_count": 0, "escalation_count": 0, "fallback_count": 0, "finbert_used": 0, } logger.info("Found %s unscored processed articles for V2 scoring", len(rows)) scored_count = 0 parse_fail_count = 0 escalation_count = 0 fallback_count = 0 finbert_used = 0 llm_budget_remaining = max(0, int(settings.max_llm_articles_per_run)) fast_model = settings.resolved_scoring_fast_model reliable_model = settings.resolved_scoring_reliable_model for chunk_idx in range(0, len(rows), chunk_size): chunk_rows = rows[chunk_idx:chunk_idx + chunk_size] chunk_items: list[dict] = [] for row in chunk_rows: title = str(row.raw_title or row.canonical_title or "")[:500] description = str(row.raw_description or "")[:1000] text = str(row.cleaned_text or f"{title} {description}")[:2000] chunk_items.append( { "id": int(row.processed_id), "title": title, "description": description, "text": text, "published_at": row.published_at, } ) finbert_by_id = score_batch_with_finbert_v2(chunk_items) finbert_used += len(finbert_by_id) llm_results_by_id: dict[int, dict] = {} llm_candidates: list[dict] = [] # Rate-limit flag is keyed to today's UTC date so it resets automatically at midnight. today_utc = datetime.now(timezone.utc).date().isoformat() rate_limited_date = getattr(score_unscored_processed_articles, "_rate_limited_date", None) global_rate_limited = rate_limited_date == today_utc if settings.openrouter_api_key and llm_budget_remaining > 0 and not global_rate_limited: llm_take = min(len(chunk_items), llm_budget_remaining) llm_candidates = chunk_items[:llm_take] llm_budget_remaining -= llm_take if llm_candidates: try: llm_bundle = run_async_from_sync( score_batch_with_llm_v2, llm_candidates, horizon_days=horizon_days, ) for item in llm_bundle.get("results", []): llm_results_by_id[int(item["id"])] = item parse_fail_count += int(llm_bundle.get("parse_fail_count", 0)) escalation_count += int(llm_bundle.get("escalation_count", 0)) fast_model = str(llm_bundle.get("model_fast", fast_model)) reliable_model = str(llm_bundle.get("model_reliable", reliable_model)) # If LLM returned 100% fail and flagged rate limit, mark for today's UTC date. # Flag resets automatically the next UTC day when the daily limit refreshes. if llm_bundle.get("rate_limited", False): score_unscored_processed_articles._rate_limited_date = datetime.now(timezone.utc).date().isoformat() logger.warning( "V2 batch hit OpenRouter daily rate limit - LLM scoring disabled for the rest of UTC day %s.", score_unscored_processed_articles._rate_limited_date, ) except Exception as exc: logger.warning("V2 LLM scoring failed for chunk starting at %s: %s", chunk_idx, exc) parse_fail_count += len(llm_candidates) for article in chunk_items: article_id = int(article["id"]) finbert = finbert_by_id.get(article_id, _neutral_finbert_score()) llm = llm_results_by_id.get(article_id) if llm is None: llm = _build_article_fallback_v2( article=article, finbert=finbert if isinstance(finbert, dict) else {}, model_fast=fast_model, model_reliable=reliable_model, ) else: llm["model_fast"] = fast_model llm["model_reliable"] = reliable_model llm["fallback_used"] = False if bool(llm.get("fallback_used", False)): fallback_count += 1 if float(llm.get("relevance", 0.0)) < relevance_min and llm.get("event_type") != "non_copper": llm["event_type"] = "non_copper" llm["label"] = "NEUTRAL" llm["impact_score"] = 0.0 metrics = compute_final_score_v2( impact_score_llm=float(llm.get("impact_score", 0.0)), confidence_llm=float(llm.get("confidence", 0.01)), relevance_score=float(llm.get("relevance", 0.01)), event_type=str(llm.get("event_type", "mixed_unclear")), prob_positive=float(finbert.get("prob_positive", 0.33)), prob_negative=float(finbert.get("prob_negative", 0.33)), ) payload = { "label": llm.get("label", "NEUTRAL"), "impact_score": round(float(llm.get("impact_score", 0.0)), 4), "confidence": round(float(llm.get("confidence", 0.01)), 4), "relevance": round(float(llm.get("relevance", 0.01)), 4), "event_type": llm.get("event_type", "mixed_unclear"), "reasoning": llm.get("reasoning", ""), "rule_sign": metrics["rule_sign"], "rule_strength": round(float(metrics["rule_strength"]), 4), "confidence_calibrated": round(float(metrics["confidence_calibrated"]), 4), "fallback_used": bool(llm.get("fallback_used", False)), "llm_model": llm.get("llm_model", fast_model), "scoring_version": SCORING_V2_VERSION, } sentiment_v2 = NewsSentimentV2( news_processed_id=article_id, horizon_days=horizon_days, label=str(llm.get("label", "NEUTRAL")), impact_score_llm=float(llm.get("impact_score", 0.0)), confidence_llm=float(llm.get("confidence", 0.01)), confidence_calibrated=float(metrics["confidence_calibrated"]), relevance_score=float(llm.get("relevance", 0.01)), event_type=str(llm.get("event_type", "mixed_unclear")), rule_sign=int(metrics["rule_sign"]), final_score=float(metrics["final_score"]), finbert_pos=float(finbert.get("prob_positive", 0.33)), finbert_neu=float(finbert.get("prob_neutral", 0.34)), finbert_neg=float(finbert.get("prob_negative", 0.33)), reasoning_json=json.dumps(payload, ensure_ascii=True), model_fast=fast_model, model_reliable=reliable_model, scored_at=datetime.now(timezone.utc), ) session.add(sentiment_v2) scored_count += 1 session.commit() logger.info( "V2 scoring summary: scored=%s parse_fail=%s escalations=%s fallback=%s finbert_used=%s", scored_count, parse_fail_count, escalation_count, fallback_count, finbert_used, ) return { "scored_count": scored_count, "parse_fail_count": parse_fail_count, "escalation_count": escalation_count, "fallback_count": fallback_count, "finbert_used": finbert_used, } def aggregate_daily_sentiment_v2( session: Session, *, tau_hours: float = 12.0, ) -> int: """Aggregate V2 article scores into daily_sentiments_v2.""" settings = get_settings() tau_hours = tau_hours or settings.sentiment_tau_hours horizon_days = max(1, int(getattr(settings, "sentiment_horizon_days", 5))) relevance_min = _clip(float(getattr(settings, "sentiment_relevance_min", 0.35)), 0.0, 1.0) rows = ( session.query( NewsRaw.published_at, NewsSentimentV2.final_score, NewsSentimentV2.confidence_calibrated, NewsSentimentV2.relevance_score, ) .join(NewsProcessed, NewsProcessed.raw_id == NewsRaw.id) .join( NewsSentimentV2, (NewsSentimentV2.news_processed_id == NewsProcessed.id) & (NewsSentimentV2.horizon_days == horizon_days), ) .filter(NewsSentimentV2.relevance_score >= relevance_min) .all() ) if not rows: logger.info("No V2 scored articles available for daily aggregation") return 0 df = pd.DataFrame( rows, columns=["published_at", "final_score", "confidence_calibrated", "relevance_score"], ) df["date"] = pd.to_datetime(df["published_at"]).dt.normalize() def calc_weights(group): hours = (group["published_at"] - group["date"]).dt.total_seconds() / 3600.0 weights = np.exp(hours / tau_hours) return weights / weights.sum() daily_rows = [] for date, group in df.groupby("date"): weights = calc_weights(group) daily_rows.append( { "date": date, "sentiment_index": float((group["final_score"] * weights).sum()), "news_count": int(len(group)), "avg_confidence": float(group["confidence_calibrated"].mean()), "avg_relevance": float(group["relevance_score"].mean()), } ) count = 0 for row in daily_rows: date_dt = row["date"].to_pydatetime() if date_dt.tzinfo is None: date_dt = date_dt.replace(tzinfo=timezone.utc) existing = session.query(DailySentimentV2).filter( func.date(DailySentimentV2.date) == func.date(date_dt) ).first() if existing: existing.sentiment_index = row["sentiment_index"] existing.news_count = row["news_count"] existing.avg_confidence = row["avg_confidence"] existing.avg_relevance = row["avg_relevance"] existing.source_version = "v2" existing.aggregated_at = datetime.now(timezone.utc) else: session.add( DailySentimentV2( date=date_dt, sentiment_index=row["sentiment_index"], news_count=row["news_count"], avg_confidence=row["avg_confidence"], avg_relevance=row["avg_relevance"], source_version="v2", aggregated_at=datetime.now(timezone.utc), ) ) count += 1 session.commit() logger.info("Aggregated V2 sentiment for %s days", count) return count def backfill_sentiment_v2( session: Session, *, days: int = 180, batch_size: int = 50, ) -> dict[str, int]: """Idempotent V2 backfill helper for last N days.""" logger.info("Starting V2 backfill for last %s days (batch_size=%s)", days, batch_size) return score_unscored_processed_articles( session=session, chunk_size=batch_size, backfill_days=days, ) def score_unscored_articles( session: Session, chunk_size: int = 12 ) -> int: """ Score all articles that don't have sentiment scores yet. Strategy: - Primary direction: OpenRouter LLM label + confidence - Intensity: FinBERT probabilities for every article - Non-neutral boost: (0.7*llm_conf + 0.3*finbert_strength) * boost - Soft neutral: NEUTRAL labels can emit small directional score from FinBERT polarity - Chunk size: 12 articles for lower free-tier rate-limit pressure - Run budget: cap LLM-scored articles per run, overflow uses FinBERT Returns: Number of articles scored """ settings = get_settings() # Find unscored articles unscored = session.query(NewsArticle).outerjoin( NewsSentiment, NewsArticle.id == NewsSentiment.news_article_id ).filter(NewsSentiment.id.is_(None)).all() if not unscored: logger.info("No unscored articles found") return 0 logger.info(f"Found {len(unscored)} unscored articles") scored_count = 0 total_chunks = (len(unscored) + chunk_size - 1) // chunk_size llm_model = settings.resolved_scoring_model llm_budget_remaining = max(0, settings.max_llm_articles_per_run) non_neutral_boost = float(getattr(settings, "sentiment_non_neutral_boost", 1.35)) soft_neutral_polarity_threshold = float( getattr(settings, "sentiment_soft_neutral_polarity_threshold", 0.12) ) soft_neutral_max_mag = float(getattr(settings, "sentiment_soft_neutral_max_mag", 0.25)) soft_neutral_scale = float(getattr(settings, "sentiment_soft_neutral_scale", 0.8)) budget_exhausted_logged = False logger.info("LLM scoring budget for this run: %s articles", llm_budget_remaining) llm_success = 0 json_repair_success = 0 fallback_429 = 0 fallback_parse = 0 finbert_used = 0 # Process in chunks for chunk_idx in range(0, len(unscored), chunk_size): chunk = unscored[chunk_idx:chunk_idx + chunk_size] chunk_num = chunk_idx // chunk_size + 1 logger.info(f"Processing chunk {chunk_num}/{total_chunks} ({len(chunk)} articles)") llm_candidates: list[Any] = [] non_llm_candidates: list[Any] = [] if settings.openrouter_api_key and llm_budget_remaining > 0: llm_take = min(len(chunk), llm_budget_remaining) llm_candidates = chunk[:llm_take] non_llm_candidates = chunk[llm_take:] else: non_llm_candidates = chunk if settings.openrouter_api_key and llm_budget_remaining <= 0 and not budget_exhausted_logged: logger.info( "LLM budget exhausted (%s articles). Remaining chunks use soft-neutral FinBERT fallback.", settings.max_llm_articles_per_run, ) budget_exhausted_logged = True finbert_results = score_batch_with_finbert(chunk) finbert_used += len(finbert_results) finbert_by_id = {result["id"]: result for result in finbert_results} llm_results_by_id: dict[int, dict] = {} if llm_candidates: articles_data = [ {"id": a.id, "title": a.title, "description": a.description} for a in llm_candidates ] try: llm_results = run_async_from_sync(score_batch_with_llm, articles_data) llm_success += len(llm_results) json_repair_success += sum(1 for result in llm_results if result.get("json_repair_used")) for llm_result in llm_results: llm_results_by_id[int(llm_result["id"])] = llm_result llm_budget_remaining -= len(llm_candidates) logger.info( "LLM scored %s article(s) in chunk %s. Budget remaining: %s", len(llm_candidates), chunk_num, llm_budget_remaining, ) except Exception as e: if _is_rate_limit_error(e): fallback_model_name = HYBRID_FALLBACK_429_MODEL_NAME fallback_reason = "429 rate-limit fallback" fallback_429 += len(llm_candidates) else: fallback_model_name = HYBRID_FALLBACK_PARSE_MODEL_NAME fallback_reason = "structured parse/repair fallback" fallback_parse += len(llm_candidates) logger.warning("LLM scoring failed for chunk %s: %s", chunk_num, e) for article in llm_candidates: llm_results_by_id[article.id] = _neutral_llm_result( article_id=article.id, llm_model=llm_model, reason=fallback_reason, model_name_override=fallback_model_name, ) for article in non_llm_candidates: reason = ( "llm_skipped_budget" if settings.openrouter_api_key and llm_budget_remaining <= 0 else "llm_unavailable_no_api_key" ) llm_results_by_id[article.id] = _neutral_llm_result( article_id=article.id, llm_model=llm_model, reason=reason, model_name_override=_hybrid_model_name(llm_model), ) # Save to database for article in chunk: finbert = finbert_by_id.get(article.id) if not finbert: neutral_finbert = _neutral_finbert_score() finbert = { "prob_positive": neutral_finbert["prob_positive"], "prob_neutral": neutral_finbert["prob_neutral"], "prob_negative": neutral_finbert["prob_negative"], "finbert_strength": abs( neutral_finbert["prob_positive"] - neutral_finbert["prob_negative"] ), } llm_result = llm_results_by_id.get(article.id) if not llm_result: llm_result = _neutral_llm_result( article_id=article.id, llm_model=llm_model, reason="llm_result_missing", model_name_override=HYBRID_FALLBACK_PARSE_MODEL_NAME, ) fallback_parse += 1 label = str(llm_result.get("label", "NEUTRAL")).upper().strip() if label not in LLM_LABELS: label = "NEUTRAL" llm_confidence = float(llm_result.get("llm_confidence", 0.0)) finbert_polarity = float(finbert["prob_positive"]) - float(finbert["prob_negative"]) finbert_strength = float( finbert.get( "finbert_strength", abs(finbert_polarity), ) ) final_score, soft_neutral_applied = _compute_hybrid_score( label=label, llm_confidence=llm_confidence, finbert_strength=finbert_strength, finbert_polarity=finbert_polarity, non_neutral_boost=non_neutral_boost, soft_neutral_polarity_threshold=soft_neutral_polarity_threshold, soft_neutral_max_mag=soft_neutral_max_mag, soft_neutral_scale=soft_neutral_scale, return_metadata=True, ) reasoning_payload = _build_hybrid_reasoning_payload( label=label, llm_confidence=llm_confidence, finbert_strength=finbert_strength, finbert_polarity=finbert_polarity, llm_reasoning=llm_result.get("llm_reasoning", ""), llm_model=llm_result.get("llm_model", llm_model), soft_neutral_applied=soft_neutral_applied, ) sentiment = NewsSentiment( news_article_id=article.id, prob_positive=float(finbert["prob_positive"]), prob_neutral=float(finbert["prob_neutral"]), prob_negative=float(finbert["prob_negative"]), score=float(final_score), reasoning=reasoning_payload, model_name=str(llm_result.get("model_name", _hybrid_model_name(llm_model))), scored_at=datetime.now(timezone.utc) ) session.add(sentiment) scored_count += 1 # Commit after each chunk session.commit() logger.info(f"Committed chunk {chunk_num}: {len(chunk)} articles") logger.info( "Hybrid scoring summary: llm_success=%s json_repair_success=%s fallback_429=%s " "fallback_parse=%s finbert_used=%s", llm_success, json_repair_success, fallback_429, fallback_parse, finbert_used, ) logger.info(f"Total articles scored: {scored_count}") return scored_count # ============================================================================= # Daily Sentiment Aggregation # ============================================================================= def aggregate_daily_sentiment( session: Session, tau_hours: float = 12.0 ) -> int: """ Aggregate sentiment scores by day with recency weighting. Weighting formula: w = exp(-(hours_since_publish) / tau) Returns: Number of days aggregated """ settings = get_settings() tau_hours = tau_hours or settings.sentiment_tau_hours # Get scored articles with copper keyword filter to reduce symbol-unrelated noise. copper_filter = ( func.lower(NewsArticle.title).like("%copper%") | func.lower(func.coalesce(NewsArticle.description, "")).like("%copper%") ) scored_articles = session.query( NewsArticle.published_at, NewsSentiment.score, NewsSentiment.prob_positive, NewsSentiment.prob_neutral, NewsSentiment.prob_negative ).join( NewsSentiment, NewsArticle.id == NewsSentiment.news_article_id ).filter( copper_filter ).all() if not scored_articles: logger.info("No copper-filtered scored articles for aggregation") return 0 # Convert to DataFrame df = pd.DataFrame(scored_articles, columns=[ "published_at", "score", "prob_positive", "prob_neutral", "prob_negative" ]) # Extract date df["date"] = pd.to_datetime(df["published_at"]).dt.normalize() # Calculate recency weight within each day # Higher weight for articles later in the day (closer to market close) def calc_weights(group): # Hours since start of day hours = (group["published_at"] - group["date"]).dt.total_seconds() / 3600 # Exponential weighting: later = higher weight weights = np.exp(hours / tau_hours) return weights / weights.sum() # Normalize # Group by date and aggregate daily_data = [] for date, group in df.groupby("date"): weights = calc_weights(group) # Convert numpy types to native Python types for database compatibility daily_data.append({ "date": date, "sentiment_index": float((group["score"] * weights).sum()), "news_count": int(len(group)), "avg_positive": float(group["prob_positive"].mean()), "avg_neutral": float(group["prob_neutral"].mean()), "avg_negative": float(group["prob_negative"].mean()), }) # Upsert daily sentiments count = 0 for row in daily_data: date_dt = row["date"].to_pydatetime() if date_dt.tzinfo is None: date_dt = date_dt.replace(tzinfo=timezone.utc) # Check if exists existing = session.query(DailySentiment).filter( func.date(DailySentiment.date) == func.date(date_dt) ).first() if existing: # Update existing.sentiment_index = row["sentiment_index"] existing.news_count = row["news_count"] existing.avg_positive = row["avg_positive"] existing.avg_neutral = row["avg_neutral"] existing.avg_negative = row["avg_negative"] existing.aggregated_at = datetime.now(timezone.utc) else: # Insert daily = DailySentiment( date=date_dt, sentiment_index=row["sentiment_index"], news_count=row["news_count"], avg_positive=row["avg_positive"], avg_neutral=row["avg_neutral"], avg_negative=row["avg_negative"], weighting_method="recency_exponential", aggregated_at=datetime.now(timezone.utc) ) session.add(daily) count += 1 session.commit() logger.info(f"Aggregated sentiment for {count} days") return count # ============================================================================= # XGBoost Model Training # ============================================================================= def train_xgboost_model( session: Session, target_symbol: str = "HG=F", lookback_days: int = 365, validation_days: int = 30, early_stopping_rounds: int = 10 ) -> Optional[dict]: """ Train XGBoost model for price prediction. Target: Next-day return (more stationary, avoids direct price level issues) Returns: Dict with model path, metrics, and feature importance """ settings = get_settings() model_dir = Path(settings.model_dir) model_dir.mkdir(parents=True, exist_ok=True) # Build feature matrix X, y = build_feature_matrix( session, target_symbol=target_symbol, lookback_days=lookback_days ) if X.empty or len(X) < 50: logger.error(f"Insufficient data for training: {len(X)} samples") return None # Time-series split: last N days for validation split_date = X.index.max() - timedelta(days=validation_days) train_mask = X.index <= split_date val_mask = X.index > split_date X_train, y_train = X[train_mask], y[train_mask] X_val, y_val = X[val_mask], y[val_mask] logger.info(f"Training set: {len(X_train)} samples") logger.info(f"Validation set: {len(X_val)} samples") if len(X_train) < 30 or len(X_val) < 5: logger.error("Not enough samples for train/val split") return None # Store feature names feature_names = X.columns.tolist() # XGBoost parameters - tuned for overfitting prevention # With 250 samples / 198 features, we need strong regularization params = { "objective": "reg:squarederror", "eval_metric": "rmse", "max_depth": 4, # Shallower trees = less memorization "learning_rate": 0.05, # Slower learning = better generalization "subsample": 0.8, "colsample_bytree": 0.6, # Use fewer features per tree "min_child_weight": 5, # Require more samples per leaf "reg_alpha": 0.5, # L1 regularization (sparsity) "reg_lambda": 2.0, # L2 regularization (smoothness) "seed": 42, } # Create DMatrix dtrain = xgb.DMatrix(X_train, label=y_train, feature_names=feature_names) dval = xgb.DMatrix(X_val, label=y_val, feature_names=feature_names) # Train with early stopping evals = [(dtrain, "train"), (dval, "val")] logger.info("Training XGBoost model...") model = xgb.train( params, dtrain, num_boost_round=500, evals=evals, early_stopping_rounds=early_stopping_rounds, verbose_eval=10 # More frequent logging ) # Evaluate y_pred_train = model.predict(dtrain) y_pred_val = model.predict(dval) train_mae = mean_absolute_error(y_train, y_pred_train) train_rmse = np.sqrt(mean_squared_error(y_train, y_pred_train)) val_mae = mean_absolute_error(y_val, y_pred_val) val_rmse = np.sqrt(mean_squared_error(y_val, y_pred_val)) logger.info(f"Training MAE: {train_mae:.6f}, RMSE: {train_rmse:.6f}") logger.info(f"Validation MAE: {val_mae:.6f}, RMSE: {val_rmse:.6f}") # Feature importance importance = model.get_score(importance_type="gain") # Sort by importance sorted_importance = sorted( importance.items(), key=lambda x: x[1], reverse=True ) # Normalize importance total_importance = sum(v for _, v in sorted_importance) normalized_importance = [ {"feature": k, "importance": v / total_importance} for k, v in sorted_importance ] # Save model timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") model_path = model_dir / f"xgb_{target_symbol.replace('=', '_')}_{timestamp}.json" model.save_model(str(model_path)) # Save latest symlink/copy latest_path = model_dir / f"xgb_{target_symbol.replace('=', '_')}_latest.json" model.save_model(str(latest_path)) # Save metrics (including training symbols audit) # TARGET_TYPE: "simple_return" means model predicts next-day return, not price # This MUST be read by inference to correctly compute predicted_price metrics = { "target_symbol": target_symbol, # Target definition audit (prevents semantic confusion) "target_type": "simple_return", # Model predicts: close(t+1)/close(t) - 1 "target_shift_days": 1, # Predict 1 day ahead "target_definition": "simple_return(close,1).shift(-1)", # Exact pandas formula "baseline_price_source": "yfinance_close", # Which close normalizes returns "trained_at": datetime.now(timezone.utc).isoformat(), "train_samples": len(X_train), "val_samples": len(X_val), "train_mae": train_mae, "train_rmse": train_rmse, "val_mae": val_mae, "val_rmse": val_rmse, "best_iteration": model.best_iteration, "feature_count": len(feature_names), # Audit: which symbols were used for training "symbol_set_name": settings.symbol_set, "training_symbols": settings.training_symbols, "training_symbols_hash": settings.training_symbols_hash, "training_symbols_source": settings.training_symbols_source, } metrics_path = model_dir / f"xgb_{target_symbol.replace('=', '_')}_latest.metrics.json" with open(metrics_path, "w") as f: json.dump(metrics, f, indent=2) # Save feature names features_path = model_dir / f"xgb_{target_symbol.replace('=', '_')}_latest.features.json" with open(features_path, "w") as f: json.dump(feature_names, f, indent=2) # Save importance (Overwrite to reflect the latest model training) importance_path = model_dir / f"xgb_{target_symbol.replace('=', '_')}_latest.importance.json" with open(importance_path, "w") as f: json.dump(normalized_importance, f, indent=2) logger.info(f"Model saved to: {model_path}") # Log top influencers logger.info("Top 10 feature influencers:") descriptions = get_feature_descriptions() for item in normalized_importance[:10]: feat = item["feature"] imp = item["importance"] desc = descriptions.get(feat, feat) logger.info(f" {feat}: {imp:.4f} ({desc})") # Save metadata to database for persistence across HF Space restarts try: from app.db import SessionLocal with SessionLocal() as session: save_model_metadata_to_db( session=session, symbol=target_symbol, importance=normalized_importance, features=feature_names, metrics=metrics, ) except Exception as e: logger.warning(f"Could not save model metadata to DB: {e}") return { "model_path": str(model_path), "metrics": metrics, "top_influencers": normalized_importance[:10], "all_features": feature_names, } def load_model(target_symbol: str = "HG=F") -> Optional[xgb.Booster]: """Load the latest trained model for a symbol.""" settings = get_settings() model_dir = Path(settings.model_dir) model_path = model_dir / f"xgb_{target_symbol.replace('=', '_')}_latest.json" if not model_path.exists(): logger.warning(f"Model not found: {model_path}") return None model = xgb.Booster() model.load_model(str(model_path)) return model def save_model_metadata_to_db( session, symbol: str, importance: list, features: list, metrics: dict ) -> None: """ Save model metadata to database for persistence across restarts. Called after train_model=True pipeline runs. """ from .models import ModelMetadata from datetime import datetime # Try to find existing record existing = session.query(ModelMetadata).filter(ModelMetadata.symbol == symbol).first() if existing: existing.importance_json = json.dumps(importance) existing.features_json = json.dumps(features) existing.metrics_json = json.dumps(metrics) existing.trained_at = datetime.now(timezone.utc) logger.info(f"Updated model metadata in DB for {symbol}") else: new_record = ModelMetadata( symbol=symbol, importance_json=json.dumps(importance), features_json=json.dumps(features), metrics_json=json.dumps(metrics), ) session.add(new_record) logger.info(f"Saved new model metadata to DB for {symbol}") session.commit() def load_model_metadata_from_db(session, symbol: str) -> dict: """ Load model metadata from database. Returns dict with importance, features, metrics or None values if not found. """ from .models import ModelMetadata metadata = { "metrics": None, "features": None, "importance": None, } record = session.query(ModelMetadata).filter(ModelMetadata.symbol == symbol).first() if record: try: if record.importance_json: metadata["importance"] = json.loads(record.importance_json) if record.features_json: metadata["features"] = json.loads(record.features_json) if record.metrics_json: metadata["metrics"] = json.loads(record.metrics_json) logger.info(f"Loaded model metadata from DB for {symbol}") except json.JSONDecodeError as e: logger.warning(f"Failed to parse model metadata from DB: {e}") return metadata def load_model_metadata(target_symbol: str = "HG=F") -> dict: """ Load metrics and feature info for a model. Priority: 1. Database (survives HF Space restarts) 2. Local JSON files (fallback for development) """ from app.db import SessionLocal # Try database first try: with SessionLocal() as session: db_metadata = load_model_metadata_from_db(session, target_symbol) if db_metadata.get("importance") and db_metadata.get("features"): return db_metadata except Exception as e: logger.debug(f"Could not load metadata from DB: {e}") # Fallback to local files settings = get_settings() model_dir = Path(settings.model_dir) prefix = f"xgb_{target_symbol.replace('=', '_')}_latest" metadata = { "metrics": None, "features": None, "importance": None, } # Load metrics metrics_path = model_dir / f"{prefix}.metrics.json" if metrics_path.exists(): with open(metrics_path) as f: metadata["metrics"] = json.load(f) # Load features features_path = model_dir / f"{prefix}.features.json" if features_path.exists(): with open(features_path) as f: metadata["features"] = json.load(f) # Load importance importance_path = model_dir / f"{prefix}.importance.json" if importance_path.exists(): with open(importance_path) as f: metadata["importance"] = json.load(f) return metadata # ============================================================================= # Main Entry Point # ============================================================================= def run_full_pipeline( target_symbol: str = "HG=F", score_sentiment: bool = True, aggregate_sentiment: bool = True, train_model: bool = True ) -> dict: """ Run the full AI pipeline. Returns: Dict with results from each stage """ settings = get_settings() results = { "scored_articles": 0, "scored_articles_v2": 0, "aggregated_days": 0, "aggregated_days_v2": 0, "model_result": None, "timestamp": datetime.now(timezone.utc).isoformat(), } with SessionLocal() as session: if score_sentiment: if settings.scoring_source == "news_processed": scoring_stats = score_unscored_processed_articles(session) results["scored_articles"] = int(scoring_stats.get("scored_count", 0)) results["scored_articles_v2"] = int(scoring_stats.get("scored_count", 0)) results["llm_parse_fail_count"] = int(scoring_stats.get("parse_fail_count", 0)) results["escalation_count"] = int(scoring_stats.get("escalation_count", 0)) results["fallback_count"] = int(scoring_stats.get("fallback_count", 0)) else: results["scored_articles"] = score_unscored_articles(session) if aggregate_sentiment: if settings.scoring_source == "news_processed": results["aggregated_days_v2"] = aggregate_daily_sentiment_v2(session) results["aggregated_days"] = aggregate_daily_sentiment(session) if train_model: results["model_result"] = train_xgboost_model( session, target_symbol=target_symbol ) return results def main(): parser = argparse.ArgumentParser( description="Run AI pipeline: LLM sentiment scoring (with FinBERT fallback) and XGBoost training" ) parser.add_argument( "--run-all", action="store_true", help="Run full pipeline (score + aggregate + train)" ) parser.add_argument( "--score-only", action="store_true", help="Only run sentiment scoring (LLM primary, FinBERT fallback)" ) parser.add_argument( "--aggregate-only", action="store_true", help="Only run sentiment aggregation" ) parser.add_argument( "--train-only", action="store_true", help="Only run XGBoost training" ) parser.add_argument( "--refresh-sentiment", action="store_true", help="Run sentiment scoring + daily aggregation (no training)" ) parser.add_argument( "--backfill-v2-days", type=int, default=0, help="Backfill unscored V2 sentiment for last N days (idempotent)" ) parser.add_argument( "--backfill-v2-batch-size", type=int, default=50, help="Batch size for V2 backfill mode" ) parser.add_argument( "--target-symbol", type=str, default="HG=F", help="Target symbol for training (default: HG=F)" ) parser.add_argument( "--no-lock", action="store_true", help="Skip pipeline lock (for testing)" ) parser.add_argument( "--verbose", "-v", action="store_true", help="Verbose logging" ) args = parser.parse_args() if args.verbose: logging.getLogger().setLevel(logging.DEBUG) # Determine what to run score = args.run_all or args.score_only or args.refresh_sentiment aggregate = args.run_all or args.aggregate_only or args.refresh_sentiment train = args.run_all or args.train_only backfill_v2 = args.backfill_v2_days > 0 if not (score or aggregate or train or backfill_v2): parser.print_help() return # Initialize database logger.info("Initializing database...") init_db() # Run pipeline def do_run(): if backfill_v2: with SessionLocal() as session: scoring_stats = backfill_sentiment_v2( session, days=args.backfill_v2_days, batch_size=max(1, int(args.backfill_v2_batch_size)), ) aggregated_days_v2 = aggregate_daily_sentiment_v2(session) return { "timestamp": datetime.now(timezone.utc).isoformat(), "scored_articles": int(scoring_stats.get("scored_count", 0)), "scored_articles_v2": int(scoring_stats.get("scored_count", 0)), "aggregated_days": 0, "aggregated_days_v2": int(aggregated_days_v2), "llm_parse_fail_count": int(scoring_stats.get("parse_fail_count", 0)), "escalation_count": int(scoring_stats.get("escalation_count", 0)), "fallback_count": int(scoring_stats.get("fallback_count", 0)), "model_result": None, "backfill_days": int(args.backfill_v2_days), } return run_full_pipeline( target_symbol=args.target_symbol, score_sentiment=score, aggregate_sentiment=aggregate, train_model=train ) if args.no_lock: results = do_run() else: try: with pipeline_lock(): results = do_run() except RuntimeError as e: logger.error(f"Could not acquire lock: {e}") return # Print summary print("\n" + "=" * 50) print("AI PIPELINE SUMMARY") print("=" * 50) if score: print(f"\nSentiment Scoring: {results['scored_articles']} articles") if "scored_articles_v2" in results: print(f"V2 Sentiment Scoring: {results.get('scored_articles_v2', 0)} articles") if "llm_parse_fail_count" in results: print(f" - LLM parse failures: {results.get('llm_parse_fail_count', 0)}") print(f" - Escalations: {results.get('escalation_count', 0)}") print(f" - Deterministic fallbacks: {results.get('fallback_count', 0)}") if aggregate: print(f"Daily Aggregation: {results['aggregated_days']} days") if results.get("aggregated_days_v2") is not None: print(f"Daily Aggregation V2 (shadow): {results.get('aggregated_days_v2', 0)} days") if backfill_v2: print(f"\nBackfill V2 Days: {results.get('backfill_days', 0)}") print(f"V2 Aggregation Days: {results.get('aggregated_days_v2', 0)}") if train and results.get("model_result"): mr = results["model_result"] metrics = mr.get("metrics", {}) print(f"\nModel Training:") print(f" - Validation MAE: {metrics.get('val_mae', 'N/A'):.6f}") print(f" - Validation RMSE: {metrics.get('val_rmse', 'N/A'):.6f}") print(f" - Model saved to: {mr.get('model_path', 'N/A')}") print("\nTop Influencers:") for item in mr.get("top_influencers", [])[:5]: print(f" - {item['feature']}: {item['importance']:.4f}") print(f"\nTimestamp: {results.get('timestamp', 'N/A')}") if __name__ == "__main__": main()