Jitendra12421's picture
Upload 2 files
f130439 verified
import gradio as gr
import asyncio
import json
import os
import pandas as pd
import time
import threading
from datetime import date as dt_date, datetime, timedelta, time as dt_time
from pathlib import Path
from zoneinfo import ZoneInfo
from data.scraper import NewsScraper
from engine.analytics import AnalyticsEngine
scraper = NewsScraper(limit=600)
engine = AnalyticsEngine()
persistent_cache = {}
CACHE_LOCK = threading.Lock()
REFRESH_LOCK = threading.Lock()
IST = ZoneInfo("Asia/Kolkata")
MARKET_CLOSE = dt_time(15, 30)
CACHE_DIR = Path(__file__).resolve().parent / ".cache"
CACHE_FILE = CACHE_DIR / "sentiment_cache.json"
AUTO_REFRESH_TARGETS = ["NIFTY 50", "SENSEX", "NIFTY BANK", "NIFTY IT", "NIFTY FIN SERVICE", "NIFTY MIDCAP 50"]
_scheduler_started = False
def _now_ist() -> datetime:
return datetime.now(IST)
def _previous_business_day(day: dt_date) -> dt_date:
cursor = day
while cursor.weekday() >= 5:
cursor -= timedelta(days=1)
return cursor
def _session_date_for_now(now: datetime | None = None) -> str:
now = now or _now_ist()
session_day = now.date()
if now.weekday() >= 5 or now.time() < MARKET_CLOSE:
session_day = _previous_business_day(session_day - timedelta(days=1))
return session_day.isoformat()
def _cache_key(ticker: str, session_date: str) -> str:
return f"{ticker}_{session_date}"
def _serialize_result(result: tuple[str, pd.DataFrame]) -> dict:
report_text, table = result
return {
"report_text": report_text,
"preview_rows": table.to_dict(orient="records"),
"timestamp": _now_ist().isoformat(),
}
def _deserialize_result(payload: dict) -> tuple[str, pd.DataFrame]:
return payload.get("report_text", ""), pd.DataFrame(payload.get("preview_rows", []))
def _load_cache_from_disk() -> None:
if not CACHE_FILE.exists():
return
try:
with CACHE_FILE.open("r", encoding="utf-8") as handle:
payload = json.load(handle)
if not isinstance(payload, dict):
return
with CACHE_LOCK:
persistent_cache.clear()
for key, value in payload.items():
if isinstance(value, dict):
persistent_cache[key] = _deserialize_result(value)
except Exception as exc:
print(f"Failed to load sentiment cache: {exc}")
def _save_cache_to_disk_locked() -> None:
CACHE_DIR.mkdir(parents=True, exist_ok=True)
payload = {key: _serialize_result(value) for key, value in persistent_cache.items()}
CACHE_FILE.write_text(json.dumps(payload, ensure_ascii=False), encoding="utf-8")
def _persist_cache(result_key: str, result: tuple[str, pd.DataFrame]) -> None:
with CACHE_LOCK:
persistent_cache[result_key] = result
try:
_save_cache_to_disk_locked()
except Exception as exc:
print(f"Failed to persist sentiment cache: {exc}")
def _refresh_cached_targets(session_date: str) -> None:
with REFRESH_LOCK:
for ticker in AUTO_REFRESH_TARGETS:
key = _cache_key(ticker, session_date)
with CACHE_LOCK:
if key in persistent_cache:
continue
try:
asyncio.run(run_pipeline(ticker, session_date, progress=lambda *args, **kwargs: None))
except Exception as exc:
print(f"Auto-refresh failed for {ticker} on {session_date}: {exc}")
def _sentiment_refresh_loop() -> None:
last_refreshed_session = None
while True:
try:
now = _now_ist()
if now.weekday() < 5 and now.time() >= MARKET_CLOSE:
session_date = now.date().isoformat()
if session_date != last_refreshed_session:
_refresh_cached_targets(session_date)
last_refreshed_session = session_date
elif now.weekday() >= 5:
last_refreshed_session = None
time.sleep(300)
except Exception as exc:
print(f"Sentiment refresh loop error: {exc}")
time.sleep(300)
def start_sentiment_refresh_scheduler() -> None:
global _scheduler_started
if _scheduler_started:
return
_scheduler_started = True
thread = threading.Thread(target=_sentiment_refresh_loop, name="sentiment-refresh-scheduler", daemon=True)
thread.start()
_load_cache_from_disk()
def build_preview_table(frame: pd.DataFrame) -> pd.DataFrame:
preview_cols = [
"title",
"direction_signal",
"headline_priority",
"ensemble_pol",
"agreement",
"event_strength",
]
available_cols = [col for col in preview_cols if col in frame.columns]
if not available_cols:
return pd.DataFrame()
preview = frame[available_cols]
if "headline_priority" in preview.columns:
preview = preview.sort_values(by="headline_priority", ascending=False, kind="stable")
return preview.head(8).reset_index(drop=True)
async def run_pipeline(ticker, date_str, progress=gr.Progress()):
start_sentiment_refresh_scheduler()
date_str = date_str or _session_date_for_now()
cache_key = f"{ticker}_{date_str}"
print(f"\n--- Request: {cache_key} ---")
with CACHE_LOCK:
cached_result = persistent_cache.get(cache_key)
if cached_result is not None:
progress(1.0, "Loaded from cache.")
return cached_result
start_total = time.time()
try:
if not ticker:
return "Error: No ticker provided", pd.DataFrame()
# 0. Send estimated time immediately
estimated_seconds = engine.estimate_time(scraper.limit)
progress(0.01, f"ETA:{estimated_seconds}s | Initializing...")
await asyncio.sleep(0.2)
try:
dt = datetime.strptime(date_str, "%Y-%m-%d")
except Exception as e:
return f"Error: Invalid Date Format ({str(e)})", pd.DataFrame()
scraper.cleanup()
# 1. Scrape Phase (0.02 → 0.30)
progress(0.02, f"ETA:{estimated_seconds}s | Collecting headlines...")
s_start = time.time()
articles = await scraper.scrape(
ticker, dt,
progress_cb=lambda val, msg: progress(0.02 + (val * 0.28), f"ETA:{estimated_seconds}s | {msg}")
)
s_time = time.time() - s_start
# Record scrape timing for self-calibration
if articles:
engine.record_timing("scrape_per_article", s_time, len(articles))
if not articles:
return f"Error: No news found for '{ticker}' since {date_str}. Try an earlier date.", pd.DataFrame()
df = pd.DataFrame(articles)
total_analyzed = len(df)
# Recalculate ETA now that we know article count
estimated_seconds = engine.estimate_time(total_analyzed)
remaining = max(0, estimated_seconds - int(time.time() - start_total))
# 2. Analyze Phase (0.30 → 0.90)
progress(0.30, f"ETA:{remaining}s | Running sentiment models on {total_analyzed} articles...")
a_start = time.time()
def run_analysis():
return engine.analyze(
df, ticker,
progress_cb=lambda val, msg: progress(0.30 + (val * 0.60), f"ETA:{remaining}s | {msg}")
)
analyzed_df = await asyncio.to_thread(run_analysis)
summary = engine.get_summary(analyzed_df)
a_time = time.time() - a_start
# Record analysis timing for self-calibration
batches_done = max(1, (total_analyzed + 31) // 32)
per_batch_time = a_time / (batches_done * 3) # 3 models
engine.record_timing("finbert_per_batch", per_batch_time * 1.0, 1)
engine.record_timing("roberta_per_batch", per_batch_time * 0.7, 1)
engine.record_timing("ranker_per_batch", per_batch_time * 1.3, 1)
total_time = time.time() - start_total
# 3. Format Output
progress(0.95, "Generating report...")
report_text = f"{ticker} SENTIMENT REPORT\n"
report_text += f"{'='*30}\n"
report_text += f"PHASE STATS:\n"
report_text += f" > Scraping : {s_time:.2f}s\n"
report_text += f" > ML Analysis : {a_time:.2f}s\n"
report_text += f" > TOTAL TIME : {total_time:.2f}s\n"
report_text += f" > ANALYZED : {total_analyzed} articles\n"
report_text += f"{'='*30}\n\n"
report_text += f"CURRENT STATE: {summary['state_title']}\n"
report_text += f"STATE SUMMARY: {summary['state_summary']}\n"
report_text += f"DIRECTION CALL: {summary['direction_call']}\n"
report_text += f"DIRECTION SCORE: {summary['direction_score']}/100\n"
report_text += f"DIRECTION CONFIDENCE: {summary['direction_confidence']}/100\n"
report_text += f"QUICK VIBE: {summary['vibe']}/10\n"
report_text += f"AVG POLARITY: {summary['avg_polarity']:.3f}\n"
report_text += f"DIRECTION RATIO: {summary['dir_ratio']:.3f}\n"
report_text += f"BULLISH PRESSURE: {summary['bullish_pressure']:.3f}\n"
report_text += f"BEARISH PRESSURE: {summary['bearish_pressure']:.3f}\n"
report_text += f"RISK BALANCE: {summary['risk_balance']:.3f}\n"
report_text += f"CONVICTION SCORE: {summary['conviction_weighted']:.3f}\n"
report_text += f"MODEL AGREEMENT: {summary['agreement_rate']:.1%}\n"
report_text += f"MOMENTUM TREND: {summary['momentum_delta']:+.3f}\n"
report_text += f"RECENCY SUPPORT: {summary['recency_support']:.3f}\n"
report_text += f"EFFECTIVE HEADLINES: {summary['effective_articles']:.2f}\n"
report_text += f"HEADLINE CONCENTRATION: {summary['headline_concentration']:.3f}\n"
report_text += f"CONFLICT LOAD: {summary['conflict_load']:.3f}\n"
report_text += f"EVENT SUPPORT: {summary['event_support']:.3f}\n"
report_text += f"CALIBRATION FACTOR: {summary['calibration_factor']:.3f}\n"
report_text += f"NEWS REGIME: {summary['news_regime']}\n"
report_text += f"COMPOSITE SCORE: {summary['composite_score']:.3f}\n"
report_text += f"TAIL RISK: {summary['tail_risk']:.1%}\n\n"
report_text += "STATE EXPLANATION:\n"
for line in summary["state_explanation"]:
report_text += f"- {line}\n"
report_text += "\nUPSIDE DRIVERS:\n"
for line in summary["bullish_drivers"]:
report_text += f"- {line}\n"
report_text += "\nDOWNSIDE RISKS:\n"
for line in summary["bearish_risks"]:
report_text += f"- {line}\n"
report_text += "\nHEAVY HITTER SIGNALS:\n"
for a in summary["heavy_hitters"]:
report_text += (
f"- [{a['direction_label']}] {a['title']} "
f"({a['catalyst_label']})\n"
)
result = (report_text, build_preview_table(analyzed_df))
_persist_cache(cache_key, result)
progress(1.0, "Done.")
return result
except Exception as e:
print(f"Pipeline Error: {str(e)}")
import traceback
traceback.print_exc()
return f"ERROR: {str(e)}", pd.DataFrame()
async def warm_sentiment_cache(session_date, progress=gr.Progress()):
start_sentiment_refresh_scheduler()
session_date = session_date or _session_date_for_now()
total = len(AUTO_REFRESH_TARGETS)
for index, ticker in enumerate(AUTO_REFRESH_TARGETS, start=1):
cache_key = _cache_key(ticker, session_date)
with CACHE_LOCK:
cached_result = persistent_cache.get(cache_key)
if cached_result is not None:
progress(index / total, f"{ticker} loaded from cache.")
yield json.dumps({
"phase": "cached",
"ticker": ticker,
"session_date": session_date,
"index": index,
"total": total,
"summary_text": cached_result[0],
})
continue
yield json.dumps({
"phase": "start",
"ticker": ticker,
"session_date": session_date,
"index": index,
"total": total,
"message": "Connecting...",
})
try:
result = await run_pipeline(ticker, session_date, progress=progress)
yield json.dumps({
"phase": "done",
"ticker": ticker,
"session_date": session_date,
"index": index,
"total": total,
"summary_text": result[0] if isinstance(result, tuple) else "",
})
except Exception as exc:
yield json.dumps({
"phase": "error",
"ticker": ticker,
"session_date": session_date,
"index": index,
"total": total,
"message": str(exc),
})
def demo():
with gr.Blocks(title="Sentiment Analyzer") as app:
gr.Markdown("# Sentiment Analyzer")
with gr.Row():
ticker = gr.Textbox(label="Ticker Symbol", value="TSLA")
date = gr.Textbox(label="Lookback Date (YYYY-MM-DD)", value="2024-01-01")
btn = gr.Button("Analyze Sentiment")
output = gr.Textbox(label="Report")
table = gr.Dataframe(label="Dataset")
btn.click(
fn=run_pipeline,
inputs=[ticker, date],
outputs=[output, table],
api_name="run_pipeline"
)
warm_btn = gr.Button("Warm Sentiment Cache", visible=False)
warm_output = gr.Textbox(visible=False)
warm_btn.click(
fn=warm_sentiment_cache,
inputs=[date],
outputs=[warm_output],
api_name="warm_sentiment_cache"
)
return app
if __name__ == "__main__":
demo().queue().launch(
server_name="0.0.0.0",
server_port=int(os.environ.get("PORT", "7860")),
show_error=True,
)