Spaces:
Sleeping
Sleeping
File size: 3,749 Bytes
94e5914 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 | import logging
from fastapi import FastAPI
from datasets import load_dataset
import pandas as pd
import numpy as np
# -------------------------------------------------
# Logging configuration
# -------------------------------------------------
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s | %(levelname)s | %(message)s"
)
logger = logging.getLogger(__name__)
app = FastAPI()
# -------------------------------------------------
# Lazy dataset cache
# -------------------------------------------------
DF = None
def load_data_once():
"""
Load the dataset only once using streaming mode.
Hugging Face Spaces cannot download large datasets at startup,
so streaming=True avoids timeouts and memory issues.
"""
global DF
if DF is None:
logger.info("Loading dataset (streaming mode): kurry/sp500_earnings_transcripts ...")
ds = load_dataset(
"kurry/sp500_earnings_transcripts",
split="train",
streaming=True
)
# Convert streaming dataset โ pandas DataFrame
# Limit rows to avoid memory overload
rows = []
for i, item in enumerate(ds):
rows.append(item)
if i > 5000: # Safety limit for Spaces
break
DF = pd.DataFrame(rows)
logger.info(f"Loaded {len(DF)} rows into DataFrame")
return DF
# -------------------------------------------------
# Utility: convert NumPy โ Python
# -------------------------------------------------
def to_python(obj):
if isinstance(obj, (np.integer, np.int64)):
return int(obj)
if isinstance(obj, (np.floating, np.float64)):
return float(obj)
if isinstance(obj, (np.bool_)):
return bool(obj)
if isinstance(obj, pd.Timestamp):
return obj.isoformat()
return obj
def clean_dict(d):
return {k: to_python(v) for k, v in d.items()}
# -------------------------------------------------
# Routes
# -------------------------------------------------
@app.on_event("startup")
def startup_event():
logger.info("๐ Earnings Transcript API starting up")
@app.get("/")
def root():
logger.info("Root endpoint called")
return {"message": "Earnings Transcript API is running"}
@app.get("/tickers")
def list_tickers():
logger.info("Listing all tickers")
df = load_data_once()
if "symbol" not in df.columns:
return {"error": "Dataset does not contain 'symbol' column"}
tickers = sorted(df["symbol"].dropna().unique().tolist())
logger.info(f"Returned {len(tickers)} tickers")
return {"tickers": tickers}
@app.get("/transcript/{symbol}")
def get_transcript(symbol: str):
logger.info(f"Transcript request received for symbol: {symbol}")
df = load_data_once()
symbol = symbol.upper()
if "symbol" not in df.columns:
return {"error": "Dataset missing 'symbol' column"}
rows = df[df["symbol"] == symbol]
if rows.empty:
logger.warning(f"No transcripts found for symbol: {symbol}")
return {"error": f"No transcripts found for symbol {symbol}"}
row = rows.iloc[0]
base_info = clean_dict(row.to_dict())
# Extract structured content (correct column name)
segments = row.get("structured_content", None)
if isinstance(segments, list):
logger.info(f"Cleaning {len(segments)} segments for {symbol}")
cleaned_segments = [
clean_dict(seg) for seg in segments if isinstance(seg, dict)
]
base_info["segments"] = cleaned_segments
else:
logger.info(f"No structured_content found for {symbol}")
base_info["segments"] = []
logger.info(f"Returning transcript for {symbol}")
return base_info
|