File size: 9,441 Bytes
243ed84
bb13ea9
 
 
d3ce5a6
 
bb13ea9
 
 
 
d3ce5a6
bb13ea9
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
d3ce5a6
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
243ed84
f17c710
d3ce5a6
 
 
243ed84
 
f17c710
243ed84
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
d3ce5a6
243ed84
 
d3ce5a6
 
243ed84
 
 
 
 
 
 
 
 
 
2451663
243ed84
 
2451663
243ed84
2451663
243ed84
 
d3ce5a6
ec17376
 
 
 
2451663
 
 
243ed84
 
 
 
 
 
 
ec17376
243ed84
 
 
 
 
 
 
 
bb13ea9
 
 
 
 
 
 
 
 
 
 
243ed84
 
 
 
 
bb13ea9
 
 
 
 
 
d3ce5a6
bb13ea9
 
 
d3ce5a6
bb13ea9
d3ce5a6
 
 
 
bb13ea9
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
d3ce5a6
bb13ea9
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
d3ce5a6
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
bb13ea9
243ed84
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
# app.py
import gradio as gr
import asyncio
import os
import subprocess
import sys
os.environ["GRADIO_SSR_MODE"] = "False" # Force disable SSR for Hugging Face Spaces

import pandas as pd
from datetime import datetime, timedelta
from pathlib import Path

from scraper import NewsScraper
from extractor import ContentExtractor
from features import Features, DataPipeline
from model import StockNewsModel
import threading
import time
import traceback
try:
    from zoneinfo import ZoneInfo
except ImportError:
    from backports.zoneinfo import ZoneInfo


def _env_flag(name: str, default: str = "0") -> bool:
    value = os.getenv(name, default).strip().lower()
    return value in {"1", "true", "yes", "on"}


# Load model globally to avoid reloading on each request
APP_DIR = Path(__file__).resolve().parent
MODEL_PATH = APP_DIR / "NSEI_model.pkl"
TRAIN_SCRIPT = APP_DIR / "train.py"
MODEL_LOCK = threading.Lock()
LAST_RETRAIN_DATE = None
BOOTSTRAP_STARTED = False


def _load_model_from_disk():
    global model
    try:
        loaded = StockNewsModel.load(str(MODEL_PATH))
        with MODEL_LOCK:
            model = loaded
        print(f"Loaded pre-trained model from {MODEL_PATH}")
        return True
    except Exception:
        print(f"Warning: Could not load model from {MODEL_PATH}. Models need to be trained first.")
        with MODEL_LOCK:
            model = None
        return False


def _run_training_job():
    print(f"Starting scheduled retrain at {datetime.now(IST).isoformat()}")
    subprocess.run([sys.executable, str(TRAIN_SCRIPT)], cwd=str(APP_DIR), check=True)
    if _load_model_from_disk():
        print("Reloaded retrained model successfully.")
        return True
    return False


_load_model_from_disk()

async def fetch_and_predict(ticker="^NSEI", days_back=7):
    with MODEL_LOCK:
        current_model = model
    if not current_model:
        return {"error": "Model not loaded. Please train the model first."}

    scraper = NewsScraper(limit=450) # Fetch 450+ headlines for the ML model
    extractor = ContentExtractor()
    features = Features(ticker)
    
    # 1. Scrape latest news (Fast Pass)
    lookback = datetime.now() - timedelta(days=days_back)
    articles = await scraper.scrape(ticker, lookback)
    
    if not articles:
        return {"message": f"No recent news found for {ticker}."}

    # 2. Prepare for Initial Pass (Quick ML)
    df = pd.DataFrame(articles)
    # Map RSS 'description' to 'content' for the initial feature engineering pass
    df['content'] = df['description'].fillna('')
    df['ts'] = pd.to_datetime(df['timestamp'], errors='coerce', utc=True)
    df = df.dropna(subset=['ts'])
    df['date'] = df['ts'].dt.date
    
    if df.empty:
        return {"message": "No valid timestamps found in articles."}

    # 3. Initial ML Ranking
    df_init_feats = features.build(df)
    pipeline = DataPipeline(ticker, train_days=0, test_days=0)
    price_df = pipeline.get_prices(datetime.now() - timedelta(days=30))
    df_init_feats = pipeline._add_market_context(df_init_feats, price_df)

    # Prepare features for ranking
    for col in current_model.feature_names:
        if col not in df_init_feats.columns:
            df_init_feats[col] = 0.0
    X_init = df_init_feats[current_model.feature_names].fillna(0).replace([float('inf'), float('-inf')], 0)
    init_results = current_model.predict_new(X_init)
    
    # Merge and Sort
    df_ranked = pd.concat([df, init_results], axis=1)
    df_ranked = df_ranked.sort_values(by='impact', ascending=False)

    # 4. Filtering (Survivor Selection)
    # Select top 12 candidates for Deep Extraction
    survivors = df_ranked.head(12).to_dict('records')
    print(f"[Pipeline] High-impact filtering complete. Deep extracting {len(survivors)} survivor(s).")

    # 5. Deep Extraction (Images & Branding Only)
    survivors = await extractor.extract_all(survivors)
    
    # 6. Final Format (Fast & Accurate)
    top_articles = []
    for i, row in enumerate(survivors[:10]):
        title = str(row.get('title', ''))
        source_name = str(row.get('source', 'Unknown'))
        image_url = row.get('image', '') or extractor._build_dynamic_image_url(
            title=title,
            source=source_name,
            ticker=ticker,
        )
        
        # fallback snippet from RSS if needed
        snippet = str(row.get('description', ''))[:400]
        
        top_articles.append({
            "id": i,
            "title": title,
            "source": source_name,
            "date": row.get('pub_date', ''),
            "url": row.get('link', ''),
            "image": image_url,
            "impact_score": round(row.get('impact', 0), 3),
            "sentiment": round(row.get('sent_combined', 0), 3),
            "content": f"<p>{snippet}</p>"
        })

    return top_articles

# --- START CACHING LOGIC ---
cached_headlines = None
last_refresh_date = None
CACHE_LOCK = threading.Lock()
IST = ZoneInfo("Asia/Kolkata")
_REFRESH_THREAD_STARTED = False
_INITIAL_REFRESH_STARTED = False

def update_cache(ticker="^NSEI"):
    global cached_headlines, last_refresh_date
    print(f"Fetching new daily market insights for {ticker}...")
    try:
        data = asyncio.run(fetch_and_predict(ticker, days_back=3))
        with CACHE_LOCK:
            cached_headlines = data
            last_refresh_date = datetime.now(IST).date()
        print("Market insights cache successfully updated.")
    except Exception as e:
        print(f"Error fetching insights: {e}")
        traceback.print_exc()


def _refresh_cache_loop():
    global last_refresh_date, LAST_RETRAIN_DATE
    while True:
        try:
            now = datetime.now(IST)
            # Retrain once per weekday after market close IST, then refresh the cache.
            if now.weekday() < 5 and (now.hour > 15 or (now.hour == 15 and now.minute >= 30)):
                if LAST_RETRAIN_DATE != now.date():
                    if _run_training_job():
                        LAST_RETRAIN_DATE = now.date()
                        update_cache()
            time.sleep(300) # Check every 5 minutes
        except Exception as e:
            print(f"Refresh loop error: {e}")
            time.sleep(300)


def _ensure_refresh_thread_started():
    global _REFRESH_THREAD_STARTED
    if _REFRESH_THREAD_STARTED:
        return
    with CACHE_LOCK:
        if _REFRESH_THREAD_STARTED:
            return
        threading.Thread(
            target=_refresh_cache_loop,
            daemon=True,
            name="insights-refresh-scheduler",
        ).start()
        _REFRESH_THREAD_STARTED = True


def _start_initial_refresh(ticker="^NSEI"):
    global _INITIAL_REFRESH_STARTED
    if _INITIAL_REFRESH_STARTED:
        return
    with CACHE_LOCK:
        if _INITIAL_REFRESH_STARTED:
            return

        def _worker():
            global _INITIAL_REFRESH_STARTED
            try:
                update_cache(ticker)
            finally:
                with CACHE_LOCK:
                    if cached_headlines is None:
                        _INITIAL_REFRESH_STARTED = False

        threading.Thread(
            target=_worker,
            daemon=True,
            name="insights-initial-refresh",
        ).start()
        _INITIAL_REFRESH_STARTED = True


_ensure_refresh_thread_started()


def _refresh_loop():
    # Backward-compatible alias for older imports.
    _refresh_cache_loop()

def get_predictions(ticker="^NSEI"):
    # Returns the cache instantly, disregarding the specific ticker since this is a global dashboard insight
    with CACHE_LOCK:
        cached = cached_headlines
    if cached is not None:
        return cached

    if _env_flag("HF_EAGER_FIRST_REFRESH"):
        update_cache(ticker)
        with CACHE_LOCK:
            cached = cached_headlines
        if cached is not None:
            return cached

    global BOOTSTRAP_STARTED
    with MODEL_LOCK:
        current_model = model
    if current_model is None and not BOOTSTRAP_STARTED:
        def _bootstrap_worker():
            global BOOTSTRAP_STARTED
            try:
                if _run_training_job():
                    update_cache(ticker)
            finally:
                BOOTSTRAP_STARTED = False

        threading.Thread(
            target=_bootstrap_worker,
            daemon=True,
            name="model-bootstrap",
        ).start()
        BOOTSTRAP_STARTED = True
    elif current_model is not None:
        # First request triggers a background refresh instead of blocking app startup.
        _start_initial_refresh(ticker)

    return [{"message": "Generating insights for the day... Check back in a minute."}]
# --- END CACHING LOGIC ---

def demo():
    with gr.Blocks(title="Miscellaneous News Impact Analyzer") as app:
        gr.Markdown("# Miscellaneous Model Backend")
        
        with gr.Row():
            ticker_input = gr.Textbox(label="Ticker Symbol", value="^NSEI")
            
        btn = gr.Button("Fetch Latest Impactful News")
        output = gr.JSON(label="Top Articles")
        
        btn.click(
            fn=get_predictions,
            inputs=[ticker_input],
            outputs=[output],
            api_name="predict"
        )
        
    return app

app = demo()

if __name__ == "__main__":
    app.queue().launch(
        server_name="0.0.0.0",
        server_port=int(os.environ.get("PORT", "7860")),
        ssr_mode=False,
        show_error=True
    )