copper-mind / app /commentary.py
ifieryarrows's picture
Sync from GitHub (tests passed)
9b3cef4 verified
"""
AI Commentary Generator using OpenRouter API.
Generates market commentary and stance in a single structured LLM call.
"""
from __future__ import annotations
import json
import logging
from datetime import datetime, timezone
from typing import Optional
from .openrouter_client import OpenRouterError, create_chat_completion
from .settings import get_settings
logger = logging.getLogger(__name__)
VALID_STANCES = {"BULLISH", "NEUTRAL", "BEARISH"}
COMMENTARY_RESPONSE_FORMAT = {
"type": "json_object",
}
def _extract_chat_message_content(data: dict) -> str:
"""Extract text content from OpenRouter chat completion response."""
message = data.get("choices", [{}])[0].get("message", {})
content = message.get("content", "")
if isinstance(content, str):
return content.strip()
if isinstance(content, list):
text_parts: list[str] = []
for item in content:
if isinstance(item, dict) and item.get("type") == "text":
text = item.get("text")
if isinstance(text, str):
text_parts.append(text)
return "\n".join(text_parts).strip()
return ""
def _clean_json_content(content: str) -> str:
"""Normalize model text into parseable JSON content."""
normalized = content.strip()
if normalized.startswith("```"):
lines = normalized.splitlines()
if lines and lines[0].startswith("```"):
lines = lines[1:]
if lines and lines[-1].strip() == "```":
lines = lines[:-1]
normalized = "\n".join(lines).strip()
if normalized.startswith("json"):
normalized = normalized[4:].strip()
if not normalized.startswith("{"):
first = normalized.find("{")
last = normalized.rfind("}")
if first != -1 and last != -1 and last > first:
normalized = normalized[first : last + 1]
return normalized
def _normalize_stance(value: str) -> str:
stance = str(value or "").strip().upper()
if stance not in VALID_STANCES:
raise ValueError(f"Invalid stance: {value!r}")
return stance
def _deterministic_stance_from_inputs(predicted_return: float, sentiment_index: float) -> str:
combined = float(predicted_return) + float(sentiment_index)
if combined > 0:
return "BULLISH"
if combined < 0:
return "BEARISH"
return "NEUTRAL"
def _build_commentary_template_fallback(
current_price: float,
predicted_price: float,
predicted_return: float,
sentiment_index: float,
sentiment_label: str,
top_influencers: list[dict],
news_count: int,
) -> str:
"""Deterministic fallback commentary used when LLM is unavailable."""
direction = "upside" if predicted_return >= 0 else "downside"
top_driver_names = [inf.get("feature", "unknown_driver") for inf in top_influencers[:3]]
while len(top_driver_names) < 3:
top_driver_names.append("unknown_driver")
return "\n".join(
[
"Risks:",
f"1. Model indicates {direction} uncertainty around the next-day move ({predicted_return * 100:.2f}%).",
f"2. Sentiment regime is {sentiment_label} with score {sentiment_index:.3f}, which can reverse quickly.",
f"3. News sample size ({news_count}) may be insufficient for stable short-horizon inference.",
"Opportunities:",
f"1. Predicted price path implies a move from ${current_price:.4f} to ${predicted_price:.4f}.",
f"2. Feature signal concentration around `{top_driver_names[0]}` can support tactical monitoring.",
f"3. Secondary drivers `{top_driver_names[1]}` and `{top_driver_names[2]}` provide confirmation checkpoints.",
f"Summary: Current model inputs suggest a cautious {direction} bias with elevated uncertainty.",
"Bias warning: This view is model-driven and sensitive to news mix, data latency, and feature drift.",
"This is NOT financial advice.",
]
)
def _detect_stance_from_keywords(text: str) -> str:
"""Fallback stance detector from commentary keywords."""
text_lower = (text or "").lower()
bullish_keywords = [
"bullish",
"upside",
"upward",
"positive",
"gain",
"rise",
"rising",
"higher",
"growth",
"optimistic",
"rally",
"surge",
"strength",
]
bearish_keywords = [
"bearish",
"downside",
"downward",
"negative",
"decline",
"fall",
"falling",
"lower",
"weakness",
"pessimistic",
"drop",
"slump",
"pressure",
]
bullish_count = sum(1 for kw in bullish_keywords if kw in text_lower)
bearish_count = sum(1 for kw in bearish_keywords if kw in text_lower)
if bullish_count > bearish_count + 1:
stance = "BULLISH"
elif bearish_count > bullish_count + 1:
stance = "BEARISH"
else:
stance = "NEUTRAL"
logger.info(
"Keyword stance detection: bullish=%s, bearish=%s -> %s",
bullish_count,
bearish_count,
stance,
)
return stance
async def determine_ai_stance(commentary: str) -> str:
"""
Backward-compatible stance helper.
Dedicated stance LLM call is disabled; this fallback is deterministic and local.
"""
if not commentary:
return "NEUTRAL"
return _detect_stance_from_keywords(commentary)
def _parse_commentary_payload(content: str) -> tuple[str, str]:
payload = json.loads(_clean_json_content(content))
if not isinstance(payload, dict):
raise ValueError("Commentary payload must be a JSON object")
stance = _normalize_stance(payload.get("stance", ""))
commentary = str(payload.get("commentary", "")).strip()
if not commentary:
raise ValueError("Commentary text is empty")
if "This is NOT financial advice." not in commentary:
commentary = f"{commentary}\nThis is NOT financial advice."
return stance, commentary
async def _generate_commentary_and_stance(
*,
current_price: float,
predicted_price: float,
predicted_return: float,
sentiment_index: float,
sentiment_label: str,
top_influencers: list[dict],
news_count: int,
) -> tuple[str, str]:
settings = get_settings()
deterministic_stance = _deterministic_stance_from_inputs(predicted_return, sentiment_index)
fallback_commentary = _build_commentary_template_fallback(
current_price=current_price,
predicted_price=predicted_price,
predicted_return=predicted_return,
sentiment_index=sentiment_index,
sentiment_label=sentiment_label,
top_influencers=top_influencers,
news_count=news_count,
)
if not settings.openrouter_api_key:
logger.warning("OpenRouter API key not configured, using template commentary fallback")
return fallback_commentary, deterministic_stance
influencers_text = "\n".join(
[
f"- {inf.get('feature', 'Unknown')}: {inf.get('importance', 0) * 100:.1f}%"
for inf in top_influencers[:5]
]
)
user_prompt = f"""Generate commentary and stance using only the provided data.
Return strict JSON with keys: stance, commentary.
Rules:
- stance must be one of: BULLISH, BEARISH, NEUTRAL
- commentary must include exactly:
1) 3 risk bullets
2) 3 opportunity bullets
3) 1 summary sentence
4) 1 bias warning sentence
5) final line: This is NOT financial advice.
Data:
- Current Price: {current_price:.4f}
- Predicted Price: {predicted_price:.4f}
- Predicted Return: {predicted_return:.6f}
- Sentiment Index: {sentiment_index:.6f}
- Sentiment Label: {sentiment_label}
- News Count: {news_count}
- Top Influencers:
{influencers_text}
"""
base_request_kwargs = {
"api_key": settings.openrouter_api_key,
"model": settings.resolved_commentary_model,
"messages": [
{
"role": "system",
"content": (
"You are a copper market analyst. "
"Use only provided inputs. Return concise, structured output."
),
},
{"role": "user", "content": user_prompt},
],
"max_tokens": 2500,
"temperature": 0.0,
"timeout_seconds": 60.0,
"max_retries": settings.openrouter_max_retries,
"rpm": settings.openrouter_rpm,
"fallback_models": settings.openrouter_fallback_models_list,
"referer": "https://copper-mind.vercel.app",
"title": "CopperMind Commentary",
}
async def _request_commentary() -> str:
kwargs = dict(base_request_kwargs)
kwargs["response_format"] = COMMENTARY_RESPONSE_FORMAT
data = await create_chat_completion(**kwargs)
content = _extract_chat_message_content(data)
if not content:
raise ValueError("Empty OpenRouter response content")
return content
async def _repair_commentary(malformed_content: str) -> str:
repair_prompt = (
"Fix this malformed output into valid JSON object with keys stance and commentary. "
"Do not change meaning. Output JSON only.\n\n"
f"{malformed_content}"
)
repair_data = await create_chat_completion(
api_key=settings.openrouter_api_key,
model=settings.resolved_commentary_model,
messages=[
{
"role": "system",
"content": "You repair JSON only. Output valid JSON and nothing else.",
},
{"role": "user", "content": repair_prompt},
],
max_tokens=2500,
temperature=0.0,
timeout_seconds=60.0,
max_retries=settings.openrouter_max_retries,
rpm=settings.openrouter_rpm,
fallback_models=settings.openrouter_fallback_models_list,
referer="https://copper-mind.vercel.app",
title="CopperMind Commentary JSON Repair",
)
repaired = _extract_chat_message_content(repair_data)
if not repaired:
raise ValueError("Empty commentary repair response")
return repaired
try:
content = await _request_commentary()
try:
stance, commentary = _parse_commentary_payload(content)
logger.info("AI commentary generated successfully (%s chars)", len(commentary))
return commentary, stance
except Exception as parse_exc:
logger.warning("Commentary JSON parse failed, attempting repair: %s", parse_exc)
repaired = await _repair_commentary(content)
stance, commentary = _parse_commentary_payload(repaired)
logger.info("AI commentary generated via JSON repair (%s chars)", len(commentary))
return commentary, stance
except Exception as exc:
logger.warning("Commentary generation failed, using deterministic fallback: %s", exc)
return fallback_commentary, deterministic_stance
async def generate_commentary(
current_price: float,
predicted_price: float,
predicted_return: float,
sentiment_index: float,
sentiment_label: str,
top_influencers: list[dict],
news_count: int = 0,
) -> Optional[str]:
"""
Generate AI commentary text.
"""
commentary, _stance = await _generate_commentary_and_stance(
current_price=current_price,
predicted_price=predicted_price,
predicted_return=predicted_return,
sentiment_index=sentiment_index,
sentiment_label=sentiment_label,
top_influencers=top_influencers,
news_count=news_count,
)
return commentary
def save_commentary_to_db(
session,
symbol: str,
commentary: str,
current_price: float,
predicted_price: float,
predicted_return: float,
sentiment_label: str,
ai_stance: str = "NEUTRAL",
) -> None:
"""
Save generated commentary to database (upsert).
Called after pipeline completion.
"""
from .models import AICommentary
settings = get_settings()
existing = session.query(AICommentary).filter(AICommentary.symbol == symbol).first()
if existing:
existing.commentary = commentary
existing.current_price = current_price
existing.predicted_price = predicted_price
existing.predicted_return = predicted_return
existing.sentiment_label = sentiment_label
existing.ai_stance = ai_stance
existing.generated_at = datetime.now(timezone.utc)
existing.model_name = settings.resolved_commentary_model
logger.info("Updated AI commentary for %s (stance: %s)", symbol, ai_stance)
else:
new_commentary = AICommentary(
symbol=symbol,
commentary=commentary,
current_price=current_price,
predicted_price=predicted_price,
predicted_return=predicted_return,
sentiment_label=sentiment_label,
ai_stance=ai_stance,
model_name=settings.resolved_commentary_model,
)
session.add(new_commentary)
logger.info("Created new AI commentary for %s (stance: %s)", symbol, ai_stance)
session.commit()
def get_commentary_from_db(session, symbol: str) -> Optional[dict]:
"""
Get stored commentary from database.
Returns dict with commentary and metadata, or None if not found.
"""
from .models import AICommentary
record = session.query(AICommentary).filter(AICommentary.symbol == symbol).first()
if record:
return {
"commentary": record.commentary,
"generated_at": record.generated_at.isoformat() if record.generated_at else None,
"current_price": record.current_price,
"predicted_price": record.predicted_price,
"predicted_return": record.predicted_return,
"sentiment_label": record.sentiment_label,
"ai_stance": record.ai_stance or "NEUTRAL",
"model_name": record.model_name,
}
return None
async def generate_and_save_commentary(
session,
symbol: str,
current_price: float,
predicted_price: float,
predicted_return: float,
sentiment_index: float,
sentiment_label: str,
top_influencers: list[dict],
news_count: int = 0,
) -> Optional[str]:
"""
Generate commentary and save to database.
Called after pipeline completion.
"""
commentary, ai_stance = await _generate_commentary_and_stance(
current_price=current_price,
predicted_price=predicted_price,
predicted_return=predicted_return,
sentiment_index=sentiment_index,
sentiment_label=sentiment_label,
top_influencers=top_influencers,
news_count=news_count,
)
if commentary:
save_commentary_to_db(
session=session,
symbol=symbol,
commentary=commentary,
current_price=current_price,
predicted_price=predicted_price,
predicted_return=predicted_return,
sentiment_label=sentiment_label,
ai_stance=ai_stance,
)
return commentary