Spaces:
Running
Running
| """ | |
| stock.py β "Did this stock actually change?" on free Yahoo data (no API key). | |
| Finance-native use of the same machinery, honestly matched to how prices behave: | |
| * SHOCK events β the Clutch's MagnitudeGate fed with |daily return| / typical move. | |
| A single outsized day trips it instantly; several stressed days in a | |
| row accumulate and trip it too (which a naive threshold misses). | |
| * VOLATILITY regime changes β the iid change detector (change.detect) run on rolling | |
| daily volatility: "a normal day used to be Β±1.1%, now it's Β±2.6%". | |
| * DRIFT β total return over the window, compared against what pure luck could | |
| produce (sigma * sqrt(n)), stated plainly. | |
| Data: yfinance (Yahoo Finance scrape β free, keyless). News: yfinance's news feed, | |
| also keyless. Both can rate-limit; failures are reported, never faked. | |
| """ | |
| import time | |
| import numpy as np | |
| from clutch import Clutch, MagnitudeGate | |
| from change import detect | |
| # ------------------------------------------------------------------ data (keyless) | |
| _CACHE = {} | |
| _TTL = 900 # 15 min | |
| def fetch_prices(ticker, period="1y"): | |
| """Returns (dates, close, currency, err). Cached to be polite to Yahoo.""" | |
| key = (ticker.upper().strip(), period, int(time.time() // _TTL)) | |
| if key in _CACHE: | |
| return _CACHE[key] | |
| try: | |
| import yfinance as yf | |
| tk = yf.Ticker(ticker.strip()) | |
| hist = tk.history(period=period, interval="1d", auto_adjust=True) | |
| if hist is None or len(hist) < 40 or "Close" not in hist: | |
| out = (None, None, "", f"Couldn't get enough daily data for '{ticker}'. " | |
| "Check the symbol (Yahoo format, e.g. AAPL, NOK, BTC-USD, ^GSPC).") | |
| else: | |
| close = hist["Close"].to_numpy(dtype=float) | |
| dates = [d.strftime("%Y-%m-%d") for d in hist.index] | |
| cur = "" | |
| try: | |
| cur = tk.fast_info.get("currency") or "" | |
| except Exception: | |
| pass | |
| out = (dates, close, cur, None) | |
| except Exception as e: | |
| out = (None, None, "", f"Data fetch failed ({type(e).__name__}). Yahoo sometimes " | |
| "rate-limits shared servers β wait a minute and try again.") | |
| _CACHE[key] = out | |
| return out | |
| def fetch_news(ticker, k=6): | |
| """Free Yahoo headlines via yfinance; tolerant of old and new item formats.""" | |
| try: | |
| import yfinance as yf | |
| raw = yf.Ticker(ticker.strip()).news or [] | |
| except Exception: | |
| return [] | |
| return parse_news(raw, k) | |
| def parse_news(raw, k=6): | |
| items = [] | |
| for it in raw[: k * 2]: | |
| c = it.get("content", it) if isinstance(it, dict) else {} | |
| title = c.get("title") or it.get("title") | |
| if not title: | |
| continue | |
| url = "" | |
| cu = c.get("canonicalUrl") or c.get("clickThroughUrl") or {} | |
| if isinstance(cu, dict): | |
| url = cu.get("url", "") | |
| url = url or it.get("link", "") | |
| prov = c.get("provider") or {} | |
| publisher = (prov.get("displayName") if isinstance(prov, dict) else None) \ | |
| or it.get("publisher", "") | |
| when = c.get("pubDate") or c.get("displayTime") or "" | |
| if not when and it.get("providerPublishTime"): | |
| when = time.strftime("%Y-%m-%d", time.gmtime(it["providerPublishTime"])) | |
| when = str(when)[:10] | |
| items.append(dict(title=title.strip(), url=url, publisher=publisher, when=when)) | |
| if len(items) >= k: | |
| break | |
| return items | |
| # ------------------------------------------------------------------ analysis | |
| def robust_sigma(x): | |
| return float(1.4826 * np.median(np.abs(x - np.median(x))) + 1e-12) | |
| def local_sigma(rets, win=60, warm=20): | |
| """Past-only rolling robust sigma, so a rough regime stops spamming shock flags | |
| but a fresh crash (judged against the calm past) still screams.""" | |
| g = robust_sigma(rets) | |
| out = np.full(len(rets), g) | |
| for i in range(warm, len(rets)): | |
| out[i] = max(robust_sigma(rets[max(0, i - win):i]), 0.4 * g) | |
| return out | |
| def shock_events(rets, sensitivity=1.0): | |
| """Clutch gate on |return| / local typical move. A single outsized day trips it | |
| instantly; several stressed days in a row accumulate and trip it too. | |
| Returns (events, sig_global, calm_days).""" | |
| sig = robust_sigma(rets) | |
| loc = local_sigma(rets) | |
| gate = MagnitudeGate(gain=3.0, leak=3.2, trip=8.0 * sensitivity) | |
| trips = [] | |
| for i, r in enumerate(rets): | |
| if gate.update(abs(r) / loc[i]): | |
| trips.append(i) | |
| gate.clear() | |
| events = [] | |
| for i in trips: | |
| if events and i - events[-1][-1] <= 3: | |
| events[-1].append(i) | |
| else: | |
| events.append([i]) | |
| out = [] | |
| for grp in events: | |
| i0 = max(0, grp[0] - 2) | |
| i1 = min(len(rets) - 1, grp[-1]) | |
| cum = float(np.sum(rets[i0:i1 + 1])) | |
| peak_i = i0 + int(np.argmax(np.abs(rets[i0:i1 + 1]))) | |
| peak = float(np.abs(rets[peak_i]) / loc[peak_i]) | |
| biggest = float(rets[peak_i]) | |
| out.append(dict(i0=i0, i1=i1, cum_ret=cum, peak_z=peak, biggest=biggest)) | |
| calm = len(rets) - sum(e["i1"] - e["i0"] + 1 for e in out) | |
| return out, sig, calm | |
| def vol_regimes(rets, sensitivity=1.0, win=15): | |
| """Non-overlapping window vols + strongest-split ratio test (near-independent | |
| samples, unlike a rolling window). Reports at most one regime change β the | |
| strongest β per period. Returns [] or [dict(at, before, after, ratio)].""" | |
| n = len(rets) | |
| m = n // win | |
| if m < 8: | |
| return [] | |
| v = np.array([robust_sigma(rets[k * win:(k + 1) * win]) for k in range(m)]) | |
| best = None | |
| for k in range(3, m - 2): | |
| before, after = float(np.median(v[:k])), float(np.median(v[k:])) | |
| ratio = after / (before + 1e-12) | |
| score = max(ratio, 1.0 / ratio) | |
| if best is None or score > best[0]: | |
| best = (score, k, before, after, ratio) | |
| score, k, before, after, ratio = best | |
| thresh = 1.0 + 0.75 * sensitivity # sens 1 -> 1.75x; strict 1.5 -> ~2.1x; eager 0.7 -> ~1.5x | |
| if score < thresh: | |
| return [] | |
| # refine the change day: best day-level split within +/-2 windows of the coarse one | |
| c0 = k * win | |
| best_c, best_dev = c0, 0.0 | |
| for c in range(max(30, c0 - 2 * win), min(n - 30, c0 + 2 * win)): | |
| b = robust_sigma(rets[max(0, c - 60):c]) | |
| a = robust_sigma(rets[c:c + 60]) | |
| dev = abs(np.log(a / (b + 1e-12) + 1e-12)) | |
| if dev > best_dev: | |
| best_dev, best_c = dev, c | |
| before = robust_sigma(rets[max(0, best_c - 60):best_c]) | |
| after = robust_sigma(rets[best_c:best_c + 60]) | |
| return [dict(at=best_c, before=before, after=after, ratio=after / (before + 1e-12))] | |
| def analyze(dates, close, sensitivity=1.0): | |
| logp = np.log(np.asarray(close, float)) | |
| rets = np.diff(logp) | |
| shocks, sig_d, calm = shock_events(rets, sensitivity) | |
| vols = vol_regimes(rets, sensitivity) | |
| total = float(logp[-1] - logp[0]) | |
| luck2 = 2.0 * sig_d * np.sqrt(len(rets)) # 2-sigma of pure-luck drift | |
| return dict(rets=rets, sig_d=sig_d, shocks=shocks, vols=vols, calm=calm, | |
| total=total, luck2=luck2, n=len(rets)) | |
| # ------------------------------------------------------------------ language | |
| def pct(x): | |
| return f"{(np.exp(x) - 1) * 100:+.1f}%" | |
| def verdict_md(ticker, dates, close, a, currency=""): | |
| n = a["n"] | |
| d0, d1 = dates[0], dates[-1] | |
| sig_pct = (np.exp(a["sig_d"]) - 1) * 100 | |
| lines = [] | |
| if not a["shocks"] and not a["vols"]: | |
| lines.append(f"## π {ticker}: a quiet stretch β ordinary wobble only") | |
| lines.append(f"Across **{n} trading days** ({d0} β {d1}), no day or cluster of days " | |
| f"broke out of this stock's normal movement (a typical day here is about " | |
| f"**Β±{sig_pct:.1f}%**). Every scary-looking dip in this window was " | |
| f"within what dice would produce.") | |
| else: | |
| k = len(a["shocks"]) + len(a["vols"]) | |
| lines.append(f"## π {ticker}: {k} real event{'s' if k != 1 else ''} in this window") | |
| ranked = sorted(a["shocks"], | |
| key=lambda e: max(e["peak_z"], abs(e["cum_ret"]) / (a["sig_d"] + 1e-12)), | |
| reverse=True) | |
| hidden = max(0, len(ranked) - 5) | |
| for e in sorted(ranked[:5], key=lambda e: e["i0"]): | |
| day0, day1 = dates[e["i0"] + 1], dates[e["i1"] + 1] | |
| span = f"on {day1}" if e["i0"] == e["i1"] else f"over {day0} β {day1}" | |
| if abs(e["cum_ret"]) < 0.5 * abs(e["biggest"]): | |
| lines.append(f"- **Violent swings {span}** that largely cancelled out " | |
| f"(net {pct(e['cum_ret'])}, sharpest single day {pct(e['biggest'])}, " | |
| f"about {e['peak_z']:.0f}Γ normal). Something happened there even " | |
| f"though the price ended near where it started.") | |
| else: | |
| direction = "down" if e["cum_ret"] < 0 else "up" | |
| lines.append(f"- **Shock {span}**: moved **{direction} {pct(e['cum_ret'])}** " | |
| f"(sharpest day about {e['peak_z']:.0f}Γ a normal day). That is a " | |
| f"real event, not wobble β worth knowing *why* (headlines below).") | |
| if hidden: | |
| lines.append(f"- (+ {hidden} smaller flare-up{'s' if hidden > 1 else ''} not " | |
| f"listed β nothing above {ranked[5]['peak_z']:.0f}Γ normal.)") | |
| for v in a["vols"]: | |
| day = dates[min(v["at"] + 1, len(dates) - 1)] | |
| b = (np.exp(v["before"]) - 1) * 100 | |
| af = (np.exp(v["after"]) - 1) * 100 | |
| word = "rougher" if af > b else "calmer" | |
| lines.append(f"- **The ride got {word} around {day}**: a typical day went from " | |
| f"about Β±{b:.1f}% to Β±{af:.1f}%. Same stock, different weather.") | |
| # drift vs luck β the part people get wrong most | |
| tot, luck = a["total"], a["luck2"] | |
| lines.append("") | |
| if abs(tot) > luck: | |
| lines.append(f"**The drift is real too:** {pct(tot)} over the period β more than the " | |
| f"Β±{(np.exp(luck)-1)*100:.0f}% that pure day-to-day luck could plausibly " | |
| f"produce over {n} days.") | |
| else: | |
| lines.append(f"**About the overall {pct(tot)} move:** over {n} days, pure luck at this " | |
| f"stock's wobble could produce anything within about " | |
| f"Β±{(np.exp(luck)-1)*100:.0f}%. So the period's drift, by itself, is " | |
| f"**not distinguishable from chance** β an honest thing almost no chart " | |
| f"commentary will tell you.") | |
| lines.append("") | |
| lines.append(f"**Your attention, saved:** {a['calm']} of {n} days were inside the normal " | |
| f"band β days when checking the chart could tell you nothing.") | |
| lines.append("") | |
| lines.append("<small>This describes what already happened in free Yahoo data; it predicts " | |
| "nothing and is not investment advice or a recommendation to buy or sell " | |
| "anything. Past shocks say nothing about future ones.</small>") | |
| return "\n".join(lines) | |