Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| import asyncio | |
| import json | |
| import os | |
| import pandas as pd | |
| import time | |
| import threading | |
| from datetime import date as dt_date, datetime, timedelta, time as dt_time | |
| from pathlib import Path | |
| from zoneinfo import ZoneInfo | |
| from data.scraper import NewsScraper | |
| from engine.analytics import AnalyticsEngine | |
| scraper = NewsScraper(limit=600) | |
| engine = AnalyticsEngine() | |
| persistent_cache = {} | |
| CACHE_LOCK = threading.Lock() | |
| REFRESH_LOCK = threading.Lock() | |
| IST = ZoneInfo("Asia/Kolkata") | |
| MARKET_CLOSE = dt_time(15, 30) | |
| CACHE_DIR = Path(__file__).resolve().parent / ".cache" | |
| CACHE_FILE = CACHE_DIR / "sentiment_cache.json" | |
| AUTO_REFRESH_TARGETS = ["NIFTY 50", "SENSEX", "NIFTY BANK", "NIFTY IT", "NIFTY FIN SERVICE", "NIFTY MIDCAP 50"] | |
| _scheduler_started = False | |
| def _now_ist() -> datetime: | |
| return datetime.now(IST) | |
| def _previous_business_day(day: dt_date) -> dt_date: | |
| cursor = day | |
| while cursor.weekday() >= 5: | |
| cursor -= timedelta(days=1) | |
| return cursor | |
| def _session_date_for_now(now: datetime | None = None) -> str: | |
| now = now or _now_ist() | |
| session_day = now.date() | |
| if now.weekday() >= 5 or now.time() < MARKET_CLOSE: | |
| session_day = _previous_business_day(session_day - timedelta(days=1)) | |
| return session_day.isoformat() | |
| def _cache_key(ticker: str, session_date: str) -> str: | |
| return f"{ticker}_{session_date}" | |
| def _serialize_result(result: tuple[str, pd.DataFrame]) -> dict: | |
| report_text, table = result | |
| return { | |
| "report_text": report_text, | |
| "preview_rows": table.to_dict(orient="records"), | |
| "timestamp": _now_ist().isoformat(), | |
| } | |
| def _deserialize_result(payload: dict) -> tuple[str, pd.DataFrame]: | |
| return payload.get("report_text", ""), pd.DataFrame(payload.get("preview_rows", [])) | |
| def _load_cache_from_disk() -> None: | |
| if not CACHE_FILE.exists(): | |
| return | |
| try: | |
| with CACHE_FILE.open("r", encoding="utf-8") as handle: | |
| payload = json.load(handle) | |
| if not isinstance(payload, dict): | |
| return | |
| with CACHE_LOCK: | |
| persistent_cache.clear() | |
| for key, value in payload.items(): | |
| if isinstance(value, dict): | |
| persistent_cache[key] = _deserialize_result(value) | |
| except Exception as exc: | |
| print(f"Failed to load sentiment cache: {exc}") | |
| def _save_cache_to_disk_locked() -> None: | |
| CACHE_DIR.mkdir(parents=True, exist_ok=True) | |
| payload = {key: _serialize_result(value) for key, value in persistent_cache.items()} | |
| CACHE_FILE.write_text(json.dumps(payload, ensure_ascii=False), encoding="utf-8") | |
| def _persist_cache(result_key: str, result: tuple[str, pd.DataFrame]) -> None: | |
| with CACHE_LOCK: | |
| persistent_cache[result_key] = result | |
| try: | |
| _save_cache_to_disk_locked() | |
| except Exception as exc: | |
| print(f"Failed to persist sentiment cache: {exc}") | |
| def _refresh_cached_targets(session_date: str) -> None: | |
| with REFRESH_LOCK: | |
| for ticker in AUTO_REFRESH_TARGETS: | |
| key = _cache_key(ticker, session_date) | |
| with CACHE_LOCK: | |
| if key in persistent_cache: | |
| continue | |
| try: | |
| asyncio.run(run_pipeline(ticker, session_date, progress=lambda *args, **kwargs: None)) | |
| except Exception as exc: | |
| print(f"Auto-refresh failed for {ticker} on {session_date}: {exc}") | |
| def _sentiment_refresh_loop() -> None: | |
| last_refreshed_session = None | |
| while True: | |
| try: | |
| now = _now_ist() | |
| if now.weekday() < 5 and now.time() >= MARKET_CLOSE: | |
| session_date = now.date().isoformat() | |
| if session_date != last_refreshed_session: | |
| _refresh_cached_targets(session_date) | |
| last_refreshed_session = session_date | |
| elif now.weekday() >= 5: | |
| last_refreshed_session = None | |
| time.sleep(300) | |
| except Exception as exc: | |
| print(f"Sentiment refresh loop error: {exc}") | |
| time.sleep(300) | |
| def start_sentiment_refresh_scheduler() -> None: | |
| global _scheduler_started | |
| if _scheduler_started: | |
| return | |
| _scheduler_started = True | |
| thread = threading.Thread(target=_sentiment_refresh_loop, name="sentiment-refresh-scheduler", daemon=True) | |
| thread.start() | |
| _load_cache_from_disk() | |
| def build_preview_table(frame: pd.DataFrame) -> pd.DataFrame: | |
| preview_cols = [ | |
| "title", | |
| "direction_signal", | |
| "headline_priority", | |
| "ensemble_pol", | |
| "agreement", | |
| "event_strength", | |
| ] | |
| available_cols = [col for col in preview_cols if col in frame.columns] | |
| if not available_cols: | |
| return pd.DataFrame() | |
| preview = frame[available_cols] | |
| if "headline_priority" in preview.columns: | |
| preview = preview.sort_values(by="headline_priority", ascending=False, kind="stable") | |
| return preview.head(8).reset_index(drop=True) | |
| async def run_pipeline(ticker, date_str, progress=gr.Progress()): | |
| start_sentiment_refresh_scheduler() | |
| date_str = date_str or _session_date_for_now() | |
| cache_key = f"{ticker}_{date_str}" | |
| print(f"\n--- Request: {cache_key} ---") | |
| with CACHE_LOCK: | |
| cached_result = persistent_cache.get(cache_key) | |
| if cached_result is not None: | |
| progress(1.0, "Loaded from cache.") | |
| return cached_result | |
| start_total = time.time() | |
| try: | |
| if not ticker: | |
| return "Error: No ticker provided", pd.DataFrame() | |
| # 0. Send estimated time immediately | |
| estimated_seconds = engine.estimate_time(scraper.limit) | |
| progress(0.01, f"ETA:{estimated_seconds}s | Initializing...") | |
| await asyncio.sleep(0.2) | |
| try: | |
| dt = datetime.strptime(date_str, "%Y-%m-%d") | |
| except Exception as e: | |
| return f"Error: Invalid Date Format ({str(e)})", pd.DataFrame() | |
| scraper.cleanup() | |
| # 1. Scrape Phase (0.02 → 0.30) | |
| progress(0.02, f"ETA:{estimated_seconds}s | Collecting headlines...") | |
| s_start = time.time() | |
| articles = await scraper.scrape( | |
| ticker, dt, | |
| progress_cb=lambda val, msg: progress(0.02 + (val * 0.28), f"ETA:{estimated_seconds}s | {msg}") | |
| ) | |
| s_time = time.time() - s_start | |
| # Record scrape timing for self-calibration | |
| if articles: | |
| engine.record_timing("scrape_per_article", s_time, len(articles)) | |
| if not articles: | |
| return f"Error: No news found for '{ticker}' since {date_str}. Try an earlier date.", pd.DataFrame() | |
| df = pd.DataFrame(articles) | |
| total_analyzed = len(df) | |
| # Recalculate ETA now that we know article count | |
| estimated_seconds = engine.estimate_time(total_analyzed) | |
| remaining = max(0, estimated_seconds - int(time.time() - start_total)) | |
| # 2. Analyze Phase (0.30 → 0.90) | |
| progress(0.30, f"ETA:{remaining}s | Running sentiment models on {total_analyzed} articles...") | |
| a_start = time.time() | |
| def run_analysis(): | |
| return engine.analyze( | |
| df, ticker, | |
| progress_cb=lambda val, msg: progress(0.30 + (val * 0.60), f"ETA:{remaining}s | {msg}") | |
| ) | |
| analyzed_df = await asyncio.to_thread(run_analysis) | |
| summary = engine.get_summary(analyzed_df) | |
| a_time = time.time() - a_start | |
| # Record analysis timing for self-calibration | |
| batches_done = max(1, (total_analyzed + 31) // 32) | |
| per_batch_time = a_time / (batches_done * 3) # 3 models | |
| engine.record_timing("finbert_per_batch", per_batch_time * 1.0, 1) | |
| engine.record_timing("roberta_per_batch", per_batch_time * 0.7, 1) | |
| engine.record_timing("ranker_per_batch", per_batch_time * 1.3, 1) | |
| total_time = time.time() - start_total | |
| # 3. Format Output | |
| progress(0.95, "Generating report...") | |
| report_text = f"{ticker} SENTIMENT REPORT\n" | |
| report_text += f"{'='*30}\n" | |
| report_text += f"PHASE STATS:\n" | |
| report_text += f" > Scraping : {s_time:.2f}s\n" | |
| report_text += f" > ML Analysis : {a_time:.2f}s\n" | |
| report_text += f" > TOTAL TIME : {total_time:.2f}s\n" | |
| report_text += f" > ANALYZED : {total_analyzed} articles\n" | |
| report_text += f"{'='*30}\n\n" | |
| report_text += f"CURRENT STATE: {summary['state_title']}\n" | |
| report_text += f"STATE SUMMARY: {summary['state_summary']}\n" | |
| report_text += f"DIRECTION CALL: {summary['direction_call']}\n" | |
| report_text += f"DIRECTION SCORE: {summary['direction_score']}/100\n" | |
| report_text += f"DIRECTION CONFIDENCE: {summary['direction_confidence']}/100\n" | |
| report_text += f"QUICK VIBE: {summary['vibe']}/10\n" | |
| report_text += f"AVG POLARITY: {summary['avg_polarity']:.3f}\n" | |
| report_text += f"DIRECTION RATIO: {summary['dir_ratio']:.3f}\n" | |
| report_text += f"BULLISH PRESSURE: {summary['bullish_pressure']:.3f}\n" | |
| report_text += f"BEARISH PRESSURE: {summary['bearish_pressure']:.3f}\n" | |
| report_text += f"RISK BALANCE: {summary['risk_balance']:.3f}\n" | |
| report_text += f"CONVICTION SCORE: {summary['conviction_weighted']:.3f}\n" | |
| report_text += f"MODEL AGREEMENT: {summary['agreement_rate']:.1%}\n" | |
| report_text += f"MOMENTUM TREND: {summary['momentum_delta']:+.3f}\n" | |
| report_text += f"RECENCY SUPPORT: {summary['recency_support']:.3f}\n" | |
| report_text += f"EFFECTIVE HEADLINES: {summary['effective_articles']:.2f}\n" | |
| report_text += f"HEADLINE CONCENTRATION: {summary['headline_concentration']:.3f}\n" | |
| report_text += f"CONFLICT LOAD: {summary['conflict_load']:.3f}\n" | |
| report_text += f"EVENT SUPPORT: {summary['event_support']:.3f}\n" | |
| report_text += f"CALIBRATION FACTOR: {summary['calibration_factor']:.3f}\n" | |
| report_text += f"NEWS REGIME: {summary['news_regime']}\n" | |
| report_text += f"COMPOSITE SCORE: {summary['composite_score']:.3f}\n" | |
| report_text += f"TAIL RISK: {summary['tail_risk']:.1%}\n\n" | |
| report_text += "STATE EXPLANATION:\n" | |
| for line in summary["state_explanation"]: | |
| report_text += f"- {line}\n" | |
| report_text += "\nUPSIDE DRIVERS:\n" | |
| for line in summary["bullish_drivers"]: | |
| report_text += f"- {line}\n" | |
| report_text += "\nDOWNSIDE RISKS:\n" | |
| for line in summary["bearish_risks"]: | |
| report_text += f"- {line}\n" | |
| report_text += "\nHEAVY HITTER SIGNALS:\n" | |
| for a in summary["heavy_hitters"]: | |
| report_text += ( | |
| f"- [{a['direction_label']}] {a['title']} " | |
| f"({a['catalyst_label']})\n" | |
| ) | |
| result = (report_text, build_preview_table(analyzed_df)) | |
| _persist_cache(cache_key, result) | |
| progress(1.0, "Done.") | |
| return result | |
| except Exception as e: | |
| print(f"Pipeline Error: {str(e)}") | |
| import traceback | |
| traceback.print_exc() | |
| return f"ERROR: {str(e)}", pd.DataFrame() | |
| async def warm_sentiment_cache(session_date, progress=gr.Progress()): | |
| start_sentiment_refresh_scheduler() | |
| session_date = session_date or _session_date_for_now() | |
| total = len(AUTO_REFRESH_TARGETS) | |
| for index, ticker in enumerate(AUTO_REFRESH_TARGETS, start=1): | |
| cache_key = _cache_key(ticker, session_date) | |
| with CACHE_LOCK: | |
| cached_result = persistent_cache.get(cache_key) | |
| if cached_result is not None: | |
| progress(index / total, f"{ticker} loaded from cache.") | |
| yield json.dumps({ | |
| "phase": "cached", | |
| "ticker": ticker, | |
| "session_date": session_date, | |
| "index": index, | |
| "total": total, | |
| "summary_text": cached_result[0], | |
| }) | |
| continue | |
| yield json.dumps({ | |
| "phase": "start", | |
| "ticker": ticker, | |
| "session_date": session_date, | |
| "index": index, | |
| "total": total, | |
| "message": "Connecting...", | |
| }) | |
| try: | |
| result = await run_pipeline(ticker, session_date, progress=progress) | |
| yield json.dumps({ | |
| "phase": "done", | |
| "ticker": ticker, | |
| "session_date": session_date, | |
| "index": index, | |
| "total": total, | |
| "summary_text": result[0] if isinstance(result, tuple) else "", | |
| }) | |
| except Exception as exc: | |
| yield json.dumps({ | |
| "phase": "error", | |
| "ticker": ticker, | |
| "session_date": session_date, | |
| "index": index, | |
| "total": total, | |
| "message": str(exc), | |
| }) | |
| def demo(): | |
| with gr.Blocks(title="Sentiment Analyzer") as app: | |
| gr.Markdown("# Sentiment Analyzer") | |
| with gr.Row(): | |
| ticker = gr.Textbox(label="Ticker Symbol", value="TSLA") | |
| date = gr.Textbox(label="Lookback Date (YYYY-MM-DD)", value="2024-01-01") | |
| btn = gr.Button("Analyze Sentiment") | |
| output = gr.Textbox(label="Report") | |
| table = gr.Dataframe(label="Dataset") | |
| btn.click( | |
| fn=run_pipeline, | |
| inputs=[ticker, date], | |
| outputs=[output, table], | |
| api_name="run_pipeline" | |
| ) | |
| warm_btn = gr.Button("Warm Sentiment Cache", visible=False) | |
| warm_output = gr.Textbox(visible=False) | |
| warm_btn.click( | |
| fn=warm_sentiment_cache, | |
| inputs=[date], | |
| outputs=[warm_output], | |
| api_name="warm_sentiment_cache" | |
| ) | |
| return app | |
| if __name__ == "__main__": | |
| demo().queue().launch( | |
| server_name="0.0.0.0", | |
| server_port=int(os.environ.get("PORT", "7860")), | |
| show_error=True, | |
| ) | |