Spaces:
Running
Running
| """ | |
| AI Commentary Generator using OpenRouter API. | |
| Generates market commentary and stance in a single structured LLM call. | |
| """ | |
| from __future__ import annotations | |
| import json | |
| import logging | |
| from datetime import datetime, timezone | |
| from typing import Optional | |
| from .openrouter_client import OpenRouterError, create_chat_completion | |
| from .settings import get_settings | |
| logger = logging.getLogger(__name__) | |
| VALID_STANCES = {"BULLISH", "NEUTRAL", "BEARISH"} | |
| COMMENTARY_RESPONSE_FORMAT = { | |
| "type": "json_object", | |
| } | |
| def _extract_chat_message_content(data: dict) -> str: | |
| """Extract text content from OpenRouter chat completion response.""" | |
| message = data.get("choices", [{}])[0].get("message", {}) | |
| content = message.get("content", "") | |
| if isinstance(content, str): | |
| return content.strip() | |
| if isinstance(content, list): | |
| text_parts: list[str] = [] | |
| for item in content: | |
| if isinstance(item, dict) and item.get("type") == "text": | |
| text = item.get("text") | |
| if isinstance(text, str): | |
| text_parts.append(text) | |
| return "\n".join(text_parts).strip() | |
| return "" | |
| def _clean_json_content(content: str) -> str: | |
| """Normalize model text into parseable JSON content.""" | |
| normalized = content.strip() | |
| if normalized.startswith("```"): | |
| lines = normalized.splitlines() | |
| if lines and lines[0].startswith("```"): | |
| lines = lines[1:] | |
| if lines and lines[-1].strip() == "```": | |
| lines = lines[:-1] | |
| normalized = "\n".join(lines).strip() | |
| if normalized.startswith("json"): | |
| normalized = normalized[4:].strip() | |
| if not normalized.startswith("{"): | |
| first = normalized.find("{") | |
| last = normalized.rfind("}") | |
| if first != -1 and last != -1 and last > first: | |
| normalized = normalized[first : last + 1] | |
| return normalized | |
| def _normalize_stance(value: str) -> str: | |
| stance = str(value or "").strip().upper() | |
| if stance not in VALID_STANCES: | |
| raise ValueError(f"Invalid stance: {value!r}") | |
| return stance | |
| def _deterministic_stance_from_inputs(predicted_return: float, sentiment_index: float) -> str: | |
| combined = float(predicted_return) + float(sentiment_index) | |
| if combined > 0: | |
| return "BULLISH" | |
| if combined < 0: | |
| return "BEARISH" | |
| return "NEUTRAL" | |
| def _build_commentary_template_fallback( | |
| current_price: float, | |
| predicted_price: float, | |
| predicted_return: float, | |
| sentiment_index: float, | |
| sentiment_label: str, | |
| top_influencers: list[dict], | |
| news_count: int, | |
| ) -> str: | |
| """Deterministic fallback commentary used when LLM is unavailable.""" | |
| direction = "upside" if predicted_return >= 0 else "downside" | |
| top_driver_names = [inf.get("feature", "unknown_driver") for inf in top_influencers[:3]] | |
| while len(top_driver_names) < 3: | |
| top_driver_names.append("unknown_driver") | |
| return "\n".join( | |
| [ | |
| "Risks:", | |
| f"1. Model indicates {direction} uncertainty around the next-day move ({predicted_return * 100:.2f}%).", | |
| f"2. Sentiment regime is {sentiment_label} with score {sentiment_index:.3f}, which can reverse quickly.", | |
| f"3. News sample size ({news_count}) may be insufficient for stable short-horizon inference.", | |
| "Opportunities:", | |
| f"1. Predicted price path implies a move from ${current_price:.4f} to ${predicted_price:.4f}.", | |
| f"2. Feature signal concentration around `{top_driver_names[0]}` can support tactical monitoring.", | |
| f"3. Secondary drivers `{top_driver_names[1]}` and `{top_driver_names[2]}` provide confirmation checkpoints.", | |
| f"Summary: Current model inputs suggest a cautious {direction} bias with elevated uncertainty.", | |
| "Bias warning: This view is model-driven and sensitive to news mix, data latency, and feature drift.", | |
| "This is NOT financial advice.", | |
| ] | |
| ) | |
| def _detect_stance_from_keywords(text: str) -> str: | |
| """Fallback stance detector from commentary keywords.""" | |
| text_lower = (text or "").lower() | |
| bullish_keywords = [ | |
| "bullish", | |
| "upside", | |
| "upward", | |
| "positive", | |
| "gain", | |
| "rise", | |
| "rising", | |
| "higher", | |
| "growth", | |
| "optimistic", | |
| "rally", | |
| "surge", | |
| "strength", | |
| ] | |
| bearish_keywords = [ | |
| "bearish", | |
| "downside", | |
| "downward", | |
| "negative", | |
| "decline", | |
| "fall", | |
| "falling", | |
| "lower", | |
| "weakness", | |
| "pessimistic", | |
| "drop", | |
| "slump", | |
| "pressure", | |
| ] | |
| bullish_count = sum(1 for kw in bullish_keywords if kw in text_lower) | |
| bearish_count = sum(1 for kw in bearish_keywords if kw in text_lower) | |
| if bullish_count > bearish_count + 1: | |
| stance = "BULLISH" | |
| elif bearish_count > bullish_count + 1: | |
| stance = "BEARISH" | |
| else: | |
| stance = "NEUTRAL" | |
| logger.info( | |
| "Keyword stance detection: bullish=%s, bearish=%s -> %s", | |
| bullish_count, | |
| bearish_count, | |
| stance, | |
| ) | |
| return stance | |
| async def determine_ai_stance(commentary: str) -> str: | |
| """ | |
| Backward-compatible stance helper. | |
| Dedicated stance LLM call is disabled; this fallback is deterministic and local. | |
| """ | |
| if not commentary: | |
| return "NEUTRAL" | |
| return _detect_stance_from_keywords(commentary) | |
| def _parse_commentary_payload(content: str) -> tuple[str, str]: | |
| payload = json.loads(_clean_json_content(content)) | |
| if not isinstance(payload, dict): | |
| raise ValueError("Commentary payload must be a JSON object") | |
| stance = _normalize_stance(payload.get("stance", "")) | |
| commentary = str(payload.get("commentary", "")).strip() | |
| if not commentary: | |
| raise ValueError("Commentary text is empty") | |
| if "This is NOT financial advice." not in commentary: | |
| commentary = f"{commentary}\nThis is NOT financial advice." | |
| return stance, commentary | |
| async def _generate_commentary_and_stance( | |
| *, | |
| current_price: float, | |
| predicted_price: float, | |
| predicted_return: float, | |
| sentiment_index: float, | |
| sentiment_label: str, | |
| top_influencers: list[dict], | |
| news_count: int, | |
| ) -> tuple[str, str]: | |
| settings = get_settings() | |
| deterministic_stance = _deterministic_stance_from_inputs(predicted_return, sentiment_index) | |
| fallback_commentary = _build_commentary_template_fallback( | |
| current_price=current_price, | |
| predicted_price=predicted_price, | |
| predicted_return=predicted_return, | |
| sentiment_index=sentiment_index, | |
| sentiment_label=sentiment_label, | |
| top_influencers=top_influencers, | |
| news_count=news_count, | |
| ) | |
| if not settings.openrouter_api_key: | |
| logger.warning("OpenRouter API key not configured, using template commentary fallback") | |
| return fallback_commentary, deterministic_stance | |
| influencers_text = "\n".join( | |
| [ | |
| f"- {inf.get('feature', 'Unknown')}: {inf.get('importance', 0) * 100:.1f}%" | |
| for inf in top_influencers[:5] | |
| ] | |
| ) | |
| user_prompt = f"""Generate commentary and stance using only the provided data. | |
| Return strict JSON with keys: stance, commentary. | |
| Rules: | |
| - stance must be one of: BULLISH, BEARISH, NEUTRAL | |
| - commentary must include exactly: | |
| 1) 3 risk bullets | |
| 2) 3 opportunity bullets | |
| 3) 1 summary sentence | |
| 4) 1 bias warning sentence | |
| 5) final line: This is NOT financial advice. | |
| Data: | |
| - Current Price: {current_price:.4f} | |
| - Predicted Price: {predicted_price:.4f} | |
| - Predicted Return: {predicted_return:.6f} | |
| - Sentiment Index: {sentiment_index:.6f} | |
| - Sentiment Label: {sentiment_label} | |
| - News Count: {news_count} | |
| - Top Influencers: | |
| {influencers_text} | |
| """ | |
| base_request_kwargs = { | |
| "api_key": settings.openrouter_api_key, | |
| "model": settings.resolved_commentary_model, | |
| "messages": [ | |
| { | |
| "role": "system", | |
| "content": ( | |
| "You are a copper market analyst. " | |
| "Use only provided inputs. Return concise, structured output." | |
| ), | |
| }, | |
| {"role": "user", "content": user_prompt}, | |
| ], | |
| "max_tokens": 2500, | |
| "temperature": 0.0, | |
| "timeout_seconds": 60.0, | |
| "max_retries": settings.openrouter_max_retries, | |
| "rpm": settings.openrouter_rpm, | |
| "fallback_models": settings.openrouter_fallback_models_list, | |
| "referer": "https://copper-mind.vercel.app", | |
| "title": "CopperMind Commentary", | |
| } | |
| async def _request_commentary() -> str: | |
| kwargs = dict(base_request_kwargs) | |
| kwargs["response_format"] = COMMENTARY_RESPONSE_FORMAT | |
| data = await create_chat_completion(**kwargs) | |
| content = _extract_chat_message_content(data) | |
| if not content: | |
| raise ValueError("Empty OpenRouter response content") | |
| return content | |
| async def _repair_commentary(malformed_content: str) -> str: | |
| repair_prompt = ( | |
| "Fix this malformed output into valid JSON object with keys stance and commentary. " | |
| "Do not change meaning. Output JSON only.\n\n" | |
| f"{malformed_content}" | |
| ) | |
| repair_data = await create_chat_completion( | |
| api_key=settings.openrouter_api_key, | |
| model=settings.resolved_commentary_model, | |
| messages=[ | |
| { | |
| "role": "system", | |
| "content": "You repair JSON only. Output valid JSON and nothing else.", | |
| }, | |
| {"role": "user", "content": repair_prompt}, | |
| ], | |
| max_tokens=2500, | |
| temperature=0.0, | |
| timeout_seconds=60.0, | |
| max_retries=settings.openrouter_max_retries, | |
| rpm=settings.openrouter_rpm, | |
| fallback_models=settings.openrouter_fallback_models_list, | |
| referer="https://copper-mind.vercel.app", | |
| title="CopperMind Commentary JSON Repair", | |
| ) | |
| repaired = _extract_chat_message_content(repair_data) | |
| if not repaired: | |
| raise ValueError("Empty commentary repair response") | |
| return repaired | |
| try: | |
| content = await _request_commentary() | |
| try: | |
| stance, commentary = _parse_commentary_payload(content) | |
| logger.info("AI commentary generated successfully (%s chars)", len(commentary)) | |
| return commentary, stance | |
| except Exception as parse_exc: | |
| logger.warning("Commentary JSON parse failed, attempting repair: %s", parse_exc) | |
| repaired = await _repair_commentary(content) | |
| stance, commentary = _parse_commentary_payload(repaired) | |
| logger.info("AI commentary generated via JSON repair (%s chars)", len(commentary)) | |
| return commentary, stance | |
| except Exception as exc: | |
| logger.warning("Commentary generation failed, using deterministic fallback: %s", exc) | |
| return fallback_commentary, deterministic_stance | |
| async def generate_commentary( | |
| current_price: float, | |
| predicted_price: float, | |
| predicted_return: float, | |
| sentiment_index: float, | |
| sentiment_label: str, | |
| top_influencers: list[dict], | |
| news_count: int = 0, | |
| ) -> Optional[str]: | |
| """ | |
| Generate AI commentary text. | |
| """ | |
| commentary, _stance = await _generate_commentary_and_stance( | |
| current_price=current_price, | |
| predicted_price=predicted_price, | |
| predicted_return=predicted_return, | |
| sentiment_index=sentiment_index, | |
| sentiment_label=sentiment_label, | |
| top_influencers=top_influencers, | |
| news_count=news_count, | |
| ) | |
| return commentary | |
| def save_commentary_to_db( | |
| session, | |
| symbol: str, | |
| commentary: str, | |
| current_price: float, | |
| predicted_price: float, | |
| predicted_return: float, | |
| sentiment_label: str, | |
| ai_stance: str = "NEUTRAL", | |
| ) -> None: | |
| """ | |
| Save generated commentary to database (upsert). | |
| Called after pipeline completion. | |
| """ | |
| from .models import AICommentary | |
| settings = get_settings() | |
| existing = session.query(AICommentary).filter(AICommentary.symbol == symbol).first() | |
| if existing: | |
| existing.commentary = commentary | |
| existing.current_price = current_price | |
| existing.predicted_price = predicted_price | |
| existing.predicted_return = predicted_return | |
| existing.sentiment_label = sentiment_label | |
| existing.ai_stance = ai_stance | |
| existing.generated_at = datetime.now(timezone.utc) | |
| existing.model_name = settings.resolved_commentary_model | |
| logger.info("Updated AI commentary for %s (stance: %s)", symbol, ai_stance) | |
| else: | |
| new_commentary = AICommentary( | |
| symbol=symbol, | |
| commentary=commentary, | |
| current_price=current_price, | |
| predicted_price=predicted_price, | |
| predicted_return=predicted_return, | |
| sentiment_label=sentiment_label, | |
| ai_stance=ai_stance, | |
| model_name=settings.resolved_commentary_model, | |
| ) | |
| session.add(new_commentary) | |
| logger.info("Created new AI commentary for %s (stance: %s)", symbol, ai_stance) | |
| session.commit() | |
| def get_commentary_from_db(session, symbol: str) -> Optional[dict]: | |
| """ | |
| Get stored commentary from database. | |
| Returns dict with commentary and metadata, or None if not found. | |
| """ | |
| from .models import AICommentary | |
| record = session.query(AICommentary).filter(AICommentary.symbol == symbol).first() | |
| if record: | |
| return { | |
| "commentary": record.commentary, | |
| "generated_at": record.generated_at.isoformat() if record.generated_at else None, | |
| "current_price": record.current_price, | |
| "predicted_price": record.predicted_price, | |
| "predicted_return": record.predicted_return, | |
| "sentiment_label": record.sentiment_label, | |
| "ai_stance": record.ai_stance or "NEUTRAL", | |
| "model_name": record.model_name, | |
| } | |
| return None | |
| async def generate_and_save_commentary( | |
| session, | |
| symbol: str, | |
| current_price: float, | |
| predicted_price: float, | |
| predicted_return: float, | |
| sentiment_index: float, | |
| sentiment_label: str, | |
| top_influencers: list[dict], | |
| news_count: int = 0, | |
| ) -> Optional[str]: | |
| """ | |
| Generate commentary and save to database. | |
| Called after pipeline completion. | |
| """ | |
| commentary, ai_stance = await _generate_commentary_and_stance( | |
| current_price=current_price, | |
| predicted_price=predicted_price, | |
| predicted_return=predicted_return, | |
| sentiment_index=sentiment_index, | |
| sentiment_label=sentiment_label, | |
| top_influencers=top_influencers, | |
| news_count=news_count, | |
| ) | |
| if commentary: | |
| save_commentary_to_db( | |
| session=session, | |
| symbol=symbol, | |
| commentary=commentary, | |
| current_price=current_price, | |
| predicted_price=predicted_price, | |
| predicted_return=predicted_return, | |
| sentiment_label=sentiment_label, | |
| ai_stance=ai_stance, | |
| ) | |
| return commentary | |