import gradio as gr import asyncio import json import os import pandas as pd import time import threading from datetime import date as dt_date, datetime, timedelta, time as dt_time from pathlib import Path from zoneinfo import ZoneInfo from data.scraper import NewsScraper from engine.analytics import AnalyticsEngine scraper = NewsScraper(limit=600) engine = AnalyticsEngine() persistent_cache = {} CACHE_LOCK = threading.Lock() REFRESH_LOCK = threading.Lock() IST = ZoneInfo("Asia/Kolkata") MARKET_CLOSE = dt_time(15, 30) CACHE_DIR = Path(__file__).resolve().parent / ".cache" CACHE_FILE = CACHE_DIR / "sentiment_cache.json" AUTO_REFRESH_TARGETS = ["NIFTY 50", "SENSEX", "NIFTY BANK", "NIFTY IT", "NIFTY FIN SERVICE", "NIFTY MIDCAP 50"] _scheduler_started = False def _now_ist() -> datetime: return datetime.now(IST) def _previous_business_day(day: dt_date) -> dt_date: cursor = day while cursor.weekday() >= 5: cursor -= timedelta(days=1) return cursor def _session_date_for_now(now: datetime | None = None) -> str: now = now or _now_ist() session_day = now.date() if now.weekday() >= 5 or now.time() < MARKET_CLOSE: session_day = _previous_business_day(session_day - timedelta(days=1)) return session_day.isoformat() def _cache_key(ticker: str, session_date: str) -> str: return f"{ticker}_{session_date}" def _serialize_result(result: tuple[str, pd.DataFrame]) -> dict: report_text, table = result return { "report_text": report_text, "preview_rows": table.to_dict(orient="records"), "timestamp": _now_ist().isoformat(), } def _deserialize_result(payload: dict) -> tuple[str, pd.DataFrame]: return payload.get("report_text", ""), pd.DataFrame(payload.get("preview_rows", [])) def _load_cache_from_disk() -> None: if not CACHE_FILE.exists(): return try: with CACHE_FILE.open("r", encoding="utf-8") as handle: payload = json.load(handle) if not isinstance(payload, dict): return with CACHE_LOCK: persistent_cache.clear() for key, value in payload.items(): if isinstance(value, dict): persistent_cache[key] = _deserialize_result(value) except Exception as exc: print(f"Failed to load sentiment cache: {exc}") def _save_cache_to_disk_locked() -> None: CACHE_DIR.mkdir(parents=True, exist_ok=True) payload = {key: _serialize_result(value) for key, value in persistent_cache.items()} CACHE_FILE.write_text(json.dumps(payload, ensure_ascii=False), encoding="utf-8") def _persist_cache(result_key: str, result: tuple[str, pd.DataFrame]) -> None: with CACHE_LOCK: persistent_cache[result_key] = result try: _save_cache_to_disk_locked() except Exception as exc: print(f"Failed to persist sentiment cache: {exc}") def _refresh_cached_targets(session_date: str) -> None: with REFRESH_LOCK: for ticker in AUTO_REFRESH_TARGETS: key = _cache_key(ticker, session_date) with CACHE_LOCK: if key in persistent_cache: continue try: asyncio.run(run_pipeline(ticker, session_date, progress=lambda *args, **kwargs: None)) except Exception as exc: print(f"Auto-refresh failed for {ticker} on {session_date}: {exc}") def _sentiment_refresh_loop() -> None: last_refreshed_session = None while True: try: now = _now_ist() if now.weekday() < 5 and now.time() >= MARKET_CLOSE: session_date = now.date().isoformat() if session_date != last_refreshed_session: _refresh_cached_targets(session_date) last_refreshed_session = session_date elif now.weekday() >= 5: last_refreshed_session = None time.sleep(300) except Exception as exc: print(f"Sentiment refresh loop error: {exc}") time.sleep(300) def start_sentiment_refresh_scheduler() -> None: global _scheduler_started if _scheduler_started: return _scheduler_started = True thread = threading.Thread(target=_sentiment_refresh_loop, name="sentiment-refresh-scheduler", daemon=True) thread.start() _load_cache_from_disk() def build_preview_table(frame: pd.DataFrame) -> pd.DataFrame: preview_cols = [ "title", "direction_signal", "headline_priority", "ensemble_pol", "agreement", "event_strength", ] available_cols = [col for col in preview_cols if col in frame.columns] if not available_cols: return pd.DataFrame() preview = frame[available_cols] if "headline_priority" in preview.columns: preview = preview.sort_values(by="headline_priority", ascending=False, kind="stable") return preview.head(8).reset_index(drop=True) async def run_pipeline(ticker, date_str, progress=gr.Progress()): start_sentiment_refresh_scheduler() date_str = date_str or _session_date_for_now() cache_key = f"{ticker}_{date_str}" print(f"\n--- Request: {cache_key} ---") with CACHE_LOCK: cached_result = persistent_cache.get(cache_key) if cached_result is not None: progress(1.0, "Loaded from cache.") return cached_result start_total = time.time() try: if not ticker: return "Error: No ticker provided", pd.DataFrame() # 0. Send estimated time immediately estimated_seconds = engine.estimate_time(scraper.limit) progress(0.01, f"ETA:{estimated_seconds}s | Initializing...") await asyncio.sleep(0.2) try: dt = datetime.strptime(date_str, "%Y-%m-%d") except Exception as e: return f"Error: Invalid Date Format ({str(e)})", pd.DataFrame() scraper.cleanup() # 1. Scrape Phase (0.02 → 0.30) progress(0.02, f"ETA:{estimated_seconds}s | Collecting headlines...") s_start = time.time() articles = await scraper.scrape( ticker, dt, progress_cb=lambda val, msg: progress(0.02 + (val * 0.28), f"ETA:{estimated_seconds}s | {msg}") ) s_time = time.time() - s_start # Record scrape timing for self-calibration if articles: engine.record_timing("scrape_per_article", s_time, len(articles)) if not articles: return f"Error: No news found for '{ticker}' since {date_str}. Try an earlier date.", pd.DataFrame() df = pd.DataFrame(articles) total_analyzed = len(df) # Recalculate ETA now that we know article count estimated_seconds = engine.estimate_time(total_analyzed) remaining = max(0, estimated_seconds - int(time.time() - start_total)) # 2. Analyze Phase (0.30 → 0.90) progress(0.30, f"ETA:{remaining}s | Running sentiment models on {total_analyzed} articles...") a_start = time.time() def run_analysis(): return engine.analyze( df, ticker, progress_cb=lambda val, msg: progress(0.30 + (val * 0.60), f"ETA:{remaining}s | {msg}") ) analyzed_df = await asyncio.to_thread(run_analysis) summary = engine.get_summary(analyzed_df) a_time = time.time() - a_start # Record analysis timing for self-calibration batches_done = max(1, (total_analyzed + 31) // 32) per_batch_time = a_time / (batches_done * 3) # 3 models engine.record_timing("finbert_per_batch", per_batch_time * 1.0, 1) engine.record_timing("roberta_per_batch", per_batch_time * 0.7, 1) engine.record_timing("ranker_per_batch", per_batch_time * 1.3, 1) total_time = time.time() - start_total # 3. Format Output progress(0.95, "Generating report...") report_text = f"{ticker} SENTIMENT REPORT\n" report_text += f"{'='*30}\n" report_text += f"PHASE STATS:\n" report_text += f" > Scraping : {s_time:.2f}s\n" report_text += f" > ML Analysis : {a_time:.2f}s\n" report_text += f" > TOTAL TIME : {total_time:.2f}s\n" report_text += f" > ANALYZED : {total_analyzed} articles\n" report_text += f"{'='*30}\n\n" report_text += f"CURRENT STATE: {summary['state_title']}\n" report_text += f"STATE SUMMARY: {summary['state_summary']}\n" report_text += f"DIRECTION CALL: {summary['direction_call']}\n" report_text += f"DIRECTION SCORE: {summary['direction_score']}/100\n" report_text += f"DIRECTION CONFIDENCE: {summary['direction_confidence']}/100\n" report_text += f"QUICK VIBE: {summary['vibe']}/10\n" report_text += f"AVG POLARITY: {summary['avg_polarity']:.3f}\n" report_text += f"DIRECTION RATIO: {summary['dir_ratio']:.3f}\n" report_text += f"BULLISH PRESSURE: {summary['bullish_pressure']:.3f}\n" report_text += f"BEARISH PRESSURE: {summary['bearish_pressure']:.3f}\n" report_text += f"RISK BALANCE: {summary['risk_balance']:.3f}\n" report_text += f"CONVICTION SCORE: {summary['conviction_weighted']:.3f}\n" report_text += f"MODEL AGREEMENT: {summary['agreement_rate']:.1%}\n" report_text += f"MOMENTUM TREND: {summary['momentum_delta']:+.3f}\n" report_text += f"RECENCY SUPPORT: {summary['recency_support']:.3f}\n" report_text += f"EFFECTIVE HEADLINES: {summary['effective_articles']:.2f}\n" report_text += f"HEADLINE CONCENTRATION: {summary['headline_concentration']:.3f}\n" report_text += f"CONFLICT LOAD: {summary['conflict_load']:.3f}\n" report_text += f"EVENT SUPPORT: {summary['event_support']:.3f}\n" report_text += f"CALIBRATION FACTOR: {summary['calibration_factor']:.3f}\n" report_text += f"NEWS REGIME: {summary['news_regime']}\n" report_text += f"COMPOSITE SCORE: {summary['composite_score']:.3f}\n" report_text += f"TAIL RISK: {summary['tail_risk']:.1%}\n\n" report_text += "STATE EXPLANATION:\n" for line in summary["state_explanation"]: report_text += f"- {line}\n" report_text += "\nUPSIDE DRIVERS:\n" for line in summary["bullish_drivers"]: report_text += f"- {line}\n" report_text += "\nDOWNSIDE RISKS:\n" for line in summary["bearish_risks"]: report_text += f"- {line}\n" report_text += "\nHEAVY HITTER SIGNALS:\n" for a in summary["heavy_hitters"]: report_text += ( f"- [{a['direction_label']}] {a['title']} " f"({a['catalyst_label']})\n" ) result = (report_text, build_preview_table(analyzed_df)) _persist_cache(cache_key, result) progress(1.0, "Done.") return result except Exception as e: print(f"Pipeline Error: {str(e)}") import traceback traceback.print_exc() return f"ERROR: {str(e)}", pd.DataFrame() async def warm_sentiment_cache(session_date, progress=gr.Progress()): start_sentiment_refresh_scheduler() session_date = session_date or _session_date_for_now() total = len(AUTO_REFRESH_TARGETS) for index, ticker in enumerate(AUTO_REFRESH_TARGETS, start=1): cache_key = _cache_key(ticker, session_date) with CACHE_LOCK: cached_result = persistent_cache.get(cache_key) if cached_result is not None: progress(index / total, f"{ticker} loaded from cache.") yield json.dumps({ "phase": "cached", "ticker": ticker, "session_date": session_date, "index": index, "total": total, "summary_text": cached_result[0], }) continue yield json.dumps({ "phase": "start", "ticker": ticker, "session_date": session_date, "index": index, "total": total, "message": "Connecting...", }) try: result = await run_pipeline(ticker, session_date, progress=progress) yield json.dumps({ "phase": "done", "ticker": ticker, "session_date": session_date, "index": index, "total": total, "summary_text": result[0] if isinstance(result, tuple) else "", }) except Exception as exc: yield json.dumps({ "phase": "error", "ticker": ticker, "session_date": session_date, "index": index, "total": total, "message": str(exc), }) def demo(): with gr.Blocks(title="Sentiment Analyzer") as app: gr.Markdown("# Sentiment Analyzer") with gr.Row(): ticker = gr.Textbox(label="Ticker Symbol", value="TSLA") date = gr.Textbox(label="Lookback Date (YYYY-MM-DD)", value="2024-01-01") btn = gr.Button("Analyze Sentiment") output = gr.Textbox(label="Report") table = gr.Dataframe(label="Dataset") btn.click( fn=run_pipeline, inputs=[ticker, date], outputs=[output, table], api_name="run_pipeline" ) warm_btn = gr.Button("Warm Sentiment Cache", visible=False) warm_output = gr.Textbox(visible=False) warm_btn.click( fn=warm_sentiment_cache, inputs=[date], outputs=[warm_output], api_name="warm_sentiment_cache" ) return app if __name__ == "__main__": demo().queue().launch( server_name="0.0.0.0", server_port=int(os.environ.get("PORT", "7860")), show_error=True, )