Kairo-Brain / worker.py
ekjotsingh's picture
Update worker.py
f18987e verified
import os
os.environ["PYTHONWARNINGS"] = "ignore"
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'
import warnings
import logging
import sys
warnings.filterwarnings("ignore")
logging.getLogger("asyncio").setLevel(logging.CRITICAL)
logging.getLogger("curl_cffi").setLevel(logging.CRITICAL) # ADD THIS LINE
import pandas as pd
pd.options.mode.chained_assignment = None
import threading
import time
import yfinance as yf
import numpy as np
from ddgs import DDGS # Updated to the silent package
from huggingface_hub import hf_hub_download
from llama_cpp import Llama
import storage
import json
TOP_N = 5
REGIME_FILTER_DAYS = 200
VOLATILITY_CAP = 0.60
PRICE_FLOOR = 10.0
MOMENTUM_LOOKBACK = 30
CACHE_FILE = "fundamental_cache.json"
stop_event = threading.Event()
is_running = False
class MetanthropicBrain:
def __init__(self):
print("๐Ÿง  Booting Live AI (Qwen 1.5B FP16)...")
try:
model_path = hf_hub_download(
repo_id="Qwen/Qwen2.5-1.5B-Instruct-GGUF",
filename="qwen2.5-1.5b-instruct-fp16.gguf"
)
cpu_threads = os.cpu_count() or 4
self.llm = Llama(
model_path=model_path,
n_ctx=2048,
n_threads=cpu_threads,
verbose=False
)
print(f"โœ… Live AI Online. Using {cpu_threads} CPU Cores.")
except Exception as e:
print(f"โŒ AI Boot Failure: {e}")
self.llm = None
def analyze_news(self, ticker, news_context, math_score):
if not self.llm: return 50, "AI Offline - Relied on Math"
system_msg = """You are a ruthless Quantitative Analyst at a top hedge fund.
Your job is to read the latest news headlines for a stock and determine if there is a fundamental catalyst or a massive risk.
Reply ONLY in this format:
SCORE: [0 to 100]
REASON: [One short sentence explaining why]"""
user_msg = f"STOCK: {ticker}\nMATH MOMENTUM: {math_score}%\nLATEST GLOBAL NEWS: {news_context}"
try:
output = self.llm.create_chat_completion(
messages=[{"role": "system", "content": system_msg}, {"role": "user", "content": user_msg}],
max_tokens=60,
temperature=0.3
)
response = output['choices'][0]['message']['content'].strip()
score_line = [line for line in response.split('\n') if 'SCORE:' in line]
reason_line = [line for line in response.split('\n') if 'REASON:' in line]
score = int(score_line[0].replace('SCORE:', '').strip()) if score_line else 50
reason = reason_line[0].replace('REASON:', '').strip() if reason_line else response
return score, reason
except:
return 50, "AI Parsing Error"
class GlobalIntel:
def get_news(self, ticker):
try:
clean_ticker = ticker.replace(".NS", "")
query = f'"{clean_ticker}" stock market news india recent'
with warnings.catch_warnings():
warnings.simplefilter("ignore")
with DDGS(timeout=10) as ddgs: # ADD THE TIMEOUT HERE
results = list(ddgs.text(query, region='in-en', max_results=3))
if not results: return "No major news recently. Operating purely on market mechanics."
summary = " | ".join([r['title'] for r in results])
return summary
except:
return "Internet search blocked or failed."
ai_brain = None
def get_live_recommendations():
global ai_brain
print("\n๐Ÿฆ… Initiating Live Alpha Scanner...")
try:
print("๐Ÿ“Š Checking Global Market Regime...")
nifty = yf.download("^NSEI", period="1y", progress=False)['Close']
if isinstance(nifty, pd.DataFrame): nifty = nifty.squeeze()
current_nifty = nifty.iloc[-1]
nifty_200_sma = nifty.rolling(REGIME_FILTER_DAYS).mean().iloc[-1]
if current_nifty < nifty_200_sma:
print("๐Ÿšจ CRASH DETECTED: Fleeing to Gold.")
return [{
"Ticker": "GOLD (GC=F)", "Score": 100, "Decision": "BUY (HEDGE)",
"Trend": "BEAR MARKET", "Key Metrics": f"Nifty < {nifty_200_sma:.0f}",
"AI Reasoning": "Macro regime is bearish. Capital Preservation Active."
}]
except:
pass
if not os.path.exists(CACHE_FILE):
print("โŒ Error: fundamental_cache.json not found. Rebuild it using the Data Tools in the UI.")
return []
with open(CACHE_FILE, 'r') as f:
all_tickers = json.load(f)
print(f"๐Ÿ“ก Fetching live mathematical data for {len(all_tickers)} Elite stocks...")
try:
with warnings.catch_warnings():
warnings.simplefilter("ignore")
data = yf.download(all_tickers, period="3mo", progress=False)['Close']
if isinstance(data.columns, pd.MultiIndex):
try: data = data['Close']
except: pass
data = data.ffill().bfill().infer_objects()
momentum = data.pct_change(MOMENTUM_LOOKBACK).iloc[-1]
volatility = (data.pct_change(1).rolling(MOMENTUM_LOOKBACK).std() * np.sqrt(252)).iloc[-1]
current_prices = data.iloc[-1]
valid_stocks = momentum[
(momentum > 0) &
(volatility < VOLATILITY_CAP) &
(current_prices > PRICE_FLOOR)
]
math_winners = valid_stocks.sort_values(ascending=False).head(15)
print(f"๐Ÿงฎ Math Engine found {len(math_winners)} raw candidates. Passing to AI for Macro vetting...")
except Exception as e:
print(f"โŒ Error fetching math data: {e}")
return []
if ai_brain is None: ai_brain = MetanthropicBrain()
intel = GlobalIntel()
final_picks = []
for ticker, mom in math_winners.items():
print(f" ๐Ÿ”Ž AI Investigating: {ticker} (+{mom*100:.1f}%)")
news = intel.get_news(ticker)
ai_score, ai_reason = ai_brain.analyze_news(ticker, news, round(mom*100, 1))
blended_score = (mom * 100 * 0.5) + (ai_score * 0.5)
final_picks.append({
"Ticker": ticker,
"Score": int(blended_score),
"Decision": "BUY (APPROVED)" if ai_score >= 50 else "REJECTED BY AI",
"Trend": f"+{mom*100:.1f}%",
"Key Metrics": f"Price: โ‚น{current_prices[ticker]:.1f} | AI Confidence: {ai_score}/100",
"AI Reasoning": ai_reason
})
approved_picks = [p for p in final_picks if p["Decision"] == "BUY (APPROVED)"]
approved_picks.sort(key=lambda x: x['Score'], reverse=True)
return approved_picks[:TOP_N]
def background_process():
global is_running
is_running = True
print("๐Ÿš€ LIVE PAPER TRADER STARTED")
while not stop_event.is_set():
picks = get_live_recommendations()
if picks:
storage.reset_database()
for pick in picks:
storage.save_result(pick)
print(f"๐Ÿ’Ž LIVE PICK: {pick['Ticker']} -> AI Reason: {pick['AI Reasoning']}")
for _ in range(21600):
if stop_event.is_set(): break
time.sleep(1)
def start_worker():
global is_running
if not is_running:
stop_event.clear()
t = threading.Thread(target=background_process, daemon=True)
t.start()
return "โ–ถ๏ธ AI Paper Trader Active"
return "โš ๏ธ Already Running"
def stop_worker():
stop_event.set()
global is_running
is_running = False
return "๐Ÿ›‘ Stopped"
def reset_worker():
storage.reset_database()
return "โ™ป๏ธ Database Wiped"