""" features/utils.py — Shared utilities for all Sentinel add-on features. Wraps existing MCP gateway calls, Gemini client, and PDF export. """ import os import time import json import logging import functools import httpx from dotenv import load_dotenv from langchain_google_genai import ChatGoogleGenerativeAI load_dotenv() logger = logging.getLogger("SentinelFeatures") # --------------------------------------------------------------------------- # Configuration # --------------------------------------------------------------------------- MCP_GATEWAY_URL = os.getenv("MCP_GATEWAY_URL", "http://127.0.0.1:8000/route_agent_request") GOOGLE_API_KEY = os.getenv("GOOGLE_API_KEY", "") AV_RATE_LIMIT_DELAY = 12 # seconds between Alpha Vantage calls (free tier) _last_av_call = 0.0 # module-level timestamp for rate-limiting # --------------------------------------------------------------------------- # Retry decorator # --------------------------------------------------------------------------- def retry_with_backoff(max_retries: int = 3, base_delay: float = 2.0): """Decorator: retries a function with exponential back-off.""" def decorator(fn): @functools.wraps(fn) def wrapper(*args, **kwargs): last_exc = None for attempt in range(max_retries): try: return fn(*args, **kwargs) except Exception as exc: last_exc = exc wait = base_delay * (2 ** attempt) logger.warning(f"[retry {attempt+1}/{max_retries}] {fn.__name__} failed: {exc} — retrying in {wait:.1f}s") time.sleep(wait) raise last_exc # type: ignore return wrapper return decorator # --------------------------------------------------------------------------- # MCP Gateway helpers (mirrors tool_calling_agents.py pattern) # --------------------------------------------------------------------------- def _call_gateway(target_service: str, payload: dict, timeout: float = 60.0) -> dict: """Low-level POST to MCP Gateway.""" body = {"target_service": target_service, "payload": payload} with httpx.Client(timeout=timeout) as client: resp = client.post(MCP_GATEWAY_URL, json=body) resp.raise_for_status() return resp.json() @retry_with_backoff(max_retries=3) def fetch_stock_data(ticker: str, time_range: str = "INTRADAY") -> dict: """Fetch stock data via the MCP gateway → Alpha Vantage microservice. Respects rate-limiting (12 s between calls). """ global _last_av_call elapsed = time.time() - _last_av_call if elapsed < AV_RATE_LIMIT_DELAY: time.sleep(AV_RATE_LIMIT_DELAY - elapsed) result = _call_gateway("alpha_vantage_market_data", {"symbol": ticker, "time_range": time_range}) _last_av_call = time.time() return result @retry_with_backoff(max_retries=3) def run_tavily_search(query: str, search_depth: str = "basic", max_results: int = 5) -> dict: """Run a web search via the MCP gateway → Tavily microservice.""" return _call_gateway("tavily_research", {"queries": [query], "search_depth": search_depth}) @retry_with_backoff(max_retries=2) def fetch_company_overview(ticker: str) -> dict: """Fetch company fundamentals (Revenue, EPS, P/E, Market Cap, Margins) via AV OVERVIEW.""" return _call_gateway("alpha_vantage_overview", {"symbol": ticker}, timeout=20.0) @retry_with_backoff(max_retries=2) def fetch_global_quote(ticker: str) -> dict: """Fetch real-time price quote via AV GLOBAL_QUOTE.""" return _call_gateway("alpha_vantage_quote", {"symbol": ticker}, timeout=15.0) # --------------------------------------------------------------------------- # Gemini LLM helper (with automatic model fallback for rate limits) # --------------------------------------------------------------------------- # Ordered fallback chain: each model has its own 20 req/day free-tier limit _GEMINI_MODELS = [ "gemini-2.5-flash", "gemini-2.0-flash", "gemini-2.5-flash-lite", "gemini-1.5-flash", "gemini-1.5-flash-8b" ] def get_gemini_llm(temperature: float = 0.0, model: str = None): """Return a ChatGoogleGenerativeAI instance. Uses the specified model or the first in the chain.""" api_key = GOOGLE_API_KEY if not api_key: raise ValueError("GOOGLE_API_KEY not set. Cannot call Gemini.") model_name = model or _GEMINI_MODELS[0] return ChatGoogleGenerativeAI( model=model_name, google_api_key=api_key, temperature=temperature, max_retries=2, ) def call_gemini(prompt: str, system_prompt: str = "") -> str: """One-shot Gemini call with automatic model fallback on rate limits.""" from langchain_core.messages import SystemMessage, HumanMessage import time as _time messages = [] if system_prompt: messages = [SystemMessage(content=system_prompt), HumanMessage(content=prompt)] else: messages = [HumanMessage(content=prompt)] last_error = None for model_name in _GEMINI_MODELS: try: llm = get_gemini_llm(model=model_name) result = llm.invoke(messages).content.strip() return result except Exception as e: error_str = str(e) last_error = e if "429" in error_str or "quota" in error_str.lower() or "rate" in error_str.lower() or "404" in error_str: logger.warning(f"Model {model_name} failed ({error_str[:50]}), trying next Gemini model...") _time.sleep(2) # Brief pause before trying next model continue else: logger.warning(f"Model {model_name} failed with non-rate-limit error: {error_str[:50]}") continue # Keep trying other Gemini models just in case # All Gemini models exhausted. Try Groq/Llama fallback if available. import os from dotenv import load_dotenv env_path = os.path.join(os.path.dirname(os.path.dirname(__file__)), ".env") load_dotenv(dotenv_path=env_path, override=True) groq_api_key = os.getenv("GROQ_API_KEY") if groq_api_key: logger.info("All Gemini models failed. Falling back to Groq (Llama 3 70B)...") try: from langchain_groq import ChatGroq groq_llm = ChatGroq( model="llama-3.3-70b-versatile", api_key=groq_api_key, temperature=0.0, max_retries=2 ) result = groq_llm.invoke(messages).content.strip() return result except ImportError: logger.error("langchain_groq not installed. Cannot use Groq fallback.") except Exception as e: logger.error(f"Groq fallback also failed: {e}") raise last_error # All models exhausted and no fallback available raise last_error # --------------------------------------------------------------------------- # PDF export (fpdf2) # --------------------------------------------------------------------------- def _sanitize_for_pdf(text: str) -> str: """Replace Unicode characters unsupported by Helvetica (latin-1) with safe equivalents.""" import re replacements = { "\u2014": "--", # em dash — "\u2013": "-", # en dash – "\u2018": "'", # left single quote ' "\u2019": "'", # right single quote ' "\u201c": '"', # left double quote " "\u201d": '"', # right double quote " "\u2026": "...", # ellipsis … "\u2022": "*", # bullet • "\u2023": ">", # triangle bullet ‣ "\u2027": "-", # hyphenation point ‧ "\u00a0": " ", # non-breaking space "\u200b": "", # zero-width space "\u2032": "'", # prime ′ "\u2033": '"', # double prime ″ "\u2212": "-", # minus sign − "\u00b7": "*", # middle dot · "\u25cf": "*", # black circle ● "\u25cb": "o", # white circle ○ "\u2713": "[x]", # check mark ✓ "\u2717": "[ ]", # cross mark ✗ } for char, replacement in replacements.items(): text = text.replace(char, replacement) # Remove markdown bold/italic asterisks (but keep list bullet asterisks at start of lines) text = re.sub(r'(? bytes: """Generate a professional, Wall Street-grade PDF report with logo and styling. Each section dict: {"title": "...", "body": "..."} Returns the PDF as bytes. """ from fpdf import FPDF import re # --- Color palette --- NAVY = (43, 58, 103) # #2B3A67 GOLD = (197, 165, 90) # #C5A55A DARK_TEXT = (30, 30, 30) LIGHT_GRAY = (230, 230, 235) MID_GRAY = (160, 160, 170) WHITE = (255, 255, 255) SECTION_BG = (240, 242, 248) # --- Custom PDF class with header/footer --- class SentinelPDF(FPDF): def __init__(self): super().__init__() self.logo_path = os.path.join(os.path.dirname(os.path.dirname(__file__)), "assets", "sentinel_logo.png") def header(self): if self.page_no() == 1: return # Skip header on title page # Logo if os.path.exists(self.logo_path): self.image(self.logo_path, 10, 6, 12) # Company name self.set_font("Times", "B", 9) self.set_text_color(*NAVY) self.set_xy(24, 8) self.cell(0, 5, "SENTINEL AI", align="L") self.set_font("Times", "", 7) self.set_text_color(*MID_GRAY) self.set_xy(24, 13) self.cell(0, 4, "Equity Research Division", align="L") # Right side: CONFIDENTIAL stamp self.set_font("Times", "B", 7) self.set_text_color(*GOLD) self.set_xy(-50, 10) self.cell(40, 5, "CONFIDENTIAL", align="R") # Divider line self.set_draw_color(*NAVY) self.set_line_width(0.5) self.line(10, 20, self.w - 10, 20) self.ln(16) def footer(self): self.set_y(-15) self.set_draw_color(*LIGHT_GRAY) self.set_line_width(0.3) self.line(10, self.get_y(), self.w - 10, self.get_y()) self.ln(2) self.set_font("Times", "", 7) self.set_text_color(*MID_GRAY) self.cell(0, 5, "Generated by Sentinel AI | For Authorized Use Only", align="L") self.set_font("Times", "B", 7) self.cell(0, 5, f"Page {self.page_no()}", align="R") pdf = SentinelPDF() pdf.set_auto_page_break(auto=True, margin=20) # ===== TITLE PAGE ===== pdf.add_page() pdf.ln(30) # Logo centered if os.path.exists(pdf.logo_path): pdf.image(pdf.logo_path, (pdf.w - 40) / 2, pdf.get_y(), 40) pdf.ln(45) # Title pdf.set_font("Times", "B", 28) pdf.set_text_color(*NAVY) pdf.cell(0, 14, _sanitize_for_pdf("SENTINEL AI"), new_x="LMARGIN", new_y="NEXT", align="C") pdf.set_font("Times", "", 12) pdf.set_text_color(*GOLD) pdf.cell(0, 8, _sanitize_for_pdf("Equity Research Report"), new_x="LMARGIN", new_y="NEXT", align="C") pdf.ln(5) # Gold accent line pdf.set_draw_color(*GOLD) pdf.set_line_width(1) pdf.line(70, pdf.get_y(), pdf.w - 70, pdf.get_y()) pdf.ln(10) # Extract ticker from first section first_body = sections[0].get("body", "") if sections else "" ticker_match = re.search(r'\(([A-Z]{1,5})\)', first_body) ticker_display = ticker_match.group(1) if ticker_match else "" if ticker_display: pdf.set_font("Times", "B", 20) pdf.set_text_color(*DARK_TEXT) pdf.cell(0, 12, ticker_display, new_x="LMARGIN", new_y="NEXT", align="C") pdf.ln(2) # Date from datetime import datetime pdf.set_font("Times", "", 10) pdf.set_text_color(*MID_GRAY) pdf.cell(0, 8, f"Generated: {datetime.now().strftime('%B %d, %Y at %H:%M')}", new_x="LMARGIN", new_y="NEXT", align="C") pdf.ln(15) # Disclaimer box pdf.set_fill_color(*SECTION_BG) pdf.rect(20, pdf.get_y(), pdf.w - 40, 18, style="F") pdf.set_font("Times", "I", 7) pdf.set_text_color(*MID_GRAY) pdf.set_xy(25, pdf.get_y() + 3) pdf.multi_cell(pdf.w - 50, 4, _sanitize_for_pdf("This report is generated by Sentinel AI using real-time market data from Alpha Vantage, " "SEC EDGAR filings, and AI-powered analysis. It is for informational purposes only and does not " "constitute financial advice. Past performance is not indicative of future results.")) # ===== CONTENT PAGES ===== def _render_markdown_line(line: str): """Render a single markdown line with professional formatting.""" line = _sanitize_for_pdf(line) stripped = line.strip() if not stripped: pdf.ln(2) return # Headers if stripped.startswith("### "): pdf.ln(2) pdf.set_font("Times", "B", 11) pdf.set_text_color(*NAVY) pdf.multi_cell(0, 6, stripped[4:]) pdf.ln(1) return if stripped.startswith("## "): pdf.ln(3) pdf.set_font("Times", "B", 13) pdf.set_text_color(*NAVY) pdf.multi_cell(0, 7, stripped[3:]) pdf.set_draw_color(*GOLD) pdf.set_line_width(0.3) pdf.line(pdf.l_margin, pdf.get_y(), pdf.l_margin + 40, pdf.get_y()) pdf.ln(2) return if stripped.startswith("# "): pdf.ln(4) pdf.set_font("Times", "B", 15) pdf.set_text_color(*NAVY) pdf.multi_cell(0, 8, stripped[2:]) pdf.set_draw_color(*NAVY) pdf.set_line_width(0.5) pdf.line(pdf.l_margin, pdf.get_y(), pdf.w - pdf.r_margin, pdf.get_y()) pdf.ln(3) return # Horizontal rule if stripped in ("---", "***", "___"): pdf.set_draw_color(*LIGHT_GRAY) pdf.set_line_width(0.3) pdf.line(pdf.l_margin, pdf.get_y(), pdf.w - pdf.r_margin, pdf.get_y()) pdf.ln(3) return # Table rows if "|" in stripped and not stripped.startswith("|--"): # Skip separator lines like |---|---| if re.match(r'^[\|\-\:\s]+$', stripped): return cells = [c.strip() for c in stripped.split("|") if c.strip()] if cells: col_width = (pdf.w - pdf.l_margin - pdf.r_margin) / max(len(cells), 1) is_header = any(c.startswith("**") or c in ("Metric", "Value", "Rank", "Risk Factor", "Implication") for c in cells) if is_header: pdf.set_font("Times", "B", 8) pdf.set_fill_color(*NAVY) pdf.set_text_color(*WHITE) else: pdf.set_font("Times", "", 8) pdf.set_fill_color(*SECTION_BG) pdf.set_text_color(*DARK_TEXT) for cell in cells: cell = re.sub(r'\*\*(.*?)\*\*', r'\1', cell) # strip bold cell = cell[:50] # truncate long cells pdf.cell(col_width, 6, cell, border=1, fill=True) pdf.ln() return # Skip pure table separator lines if re.match(r'^[\|\-\:\s]+$', stripped): return # Bullet points if stripped.startswith(("- ", "* ")): prefix = stripped[:2] text = stripped[2:] # Handle bold text within bullets parts = re.split(r'(\*\*.*?\*\*)', text) pdf.set_text_color(*DARK_TEXT) pdf.cell(6) # indent pdf.set_font("Times", "", 9) pdf.cell(4, 5, chr(8226).encode("latin-1", errors="replace").decode("latin-1")) # bullet char # Build the full text (strip markdown bold markers) full_text = re.sub(r'\*\*(.*?)\*\*', r'\1', text) pdf.multi_cell(pdf.w - pdf.l_margin - pdf.r_margin - 12, 5, full_text) pdf.ln(1) return # Numbered lists num_match = re.match(r'^(\d+)\.\s+', stripped) if num_match: text = stripped[num_match.end():] text = re.sub(r'\*\*(.*?)\*\*', r'\1', text) pdf.set_font("Times", "", 9) pdf.set_text_color(*DARK_TEXT) pdf.cell(4) # indent pdf.multi_cell(pdf.w - pdf.l_margin - pdf.r_margin - 6, 5, f"{num_match.group(1)}. {text}") pdf.ln(1) return # Regular text text = re.sub(r'\*\*(.*?)\*\*', r'\1', stripped) pdf.set_font("Times", "", 9) pdf.set_text_color(*DARK_TEXT) pdf.multi_cell(0, 5, text) pdf.ln(1) for idx, sec in enumerate(sections): pdf.add_page() # Section header with colored left bar title = _sanitize_for_pdf(sec.get("title", "")) # Section number badge section_icons = { "Executive Summary": "01", "Business Overview": "02", "Recent News": "03", "Risk Factors": "04", "Analyst Verdict": "05", } badge_num = section_icons.get(title, f"{idx + 1:02d}") # Navy accent bar on the left y_start = pdf.get_y() pdf.set_fill_color(*NAVY) pdf.rect(pdf.l_margin, y_start, 3, 12, style="F") # Section number pdf.set_font("Times", "B", 8) pdf.set_text_color(*GOLD) pdf.set_xy(pdf.l_margin + 6, y_start) pdf.cell(10, 5, f"SECTION {badge_num}") pdf.set_font("Times", "B", 15) pdf.set_text_color(*NAVY) pdf.set_xy(pdf.l_margin + 6, y_start + 5) pdf.cell(0, 8, title) pdf.ln(8) # Gold underline pdf.set_draw_color(*GOLD) pdf.set_line_width(0.5) pdf.line(pdf.l_margin + 6, pdf.get_y(), pdf.l_margin + 80, pdf.get_y()) pdf.ln(6) # Render body body = sec.get("body", "") for line in body.split("\n"): _render_markdown_line(line) pdf.ln(4) return bytes(pdf.output()) # --------------------------------------------------------------------------- # Watchlist helper # --------------------------------------------------------------------------- WATCHLIST_FILE = "watchlist.json" def load_watchlist() -> list[str]: if not os.path.exists(WATCHLIST_FILE): return [] try: with open(WATCHLIST_FILE, "r") as f: return json.load(f) except Exception: return []