File size: 3,749 Bytes
94e5914
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
import logging
from fastapi import FastAPI
from datasets import load_dataset
import pandas as pd
import numpy as np

# -------------------------------------------------
# Logging configuration
# -------------------------------------------------
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s | %(levelname)s | %(message)s"
)
logger = logging.getLogger(__name__)

app = FastAPI()

# -------------------------------------------------
# Lazy dataset cache
# -------------------------------------------------
DF = None


def load_data_once():
    """
    Load the dataset only once using streaming mode.
    Hugging Face Spaces cannot download large datasets at startup,
    so streaming=True avoids timeouts and memory issues.
    """
    global DF

    if DF is None:
        logger.info("Loading dataset (streaming mode): kurry/sp500_earnings_transcripts ...")

        ds = load_dataset(
            "kurry/sp500_earnings_transcripts",
            split="train",
            streaming=True
        )

        # Convert streaming dataset โ†’ pandas DataFrame
        # Limit rows to avoid memory overload
        rows = []
        for i, item in enumerate(ds):
            rows.append(item)
            if i > 5000:   # Safety limit for Spaces
                break

        DF = pd.DataFrame(rows)
        logger.info(f"Loaded {len(DF)} rows into DataFrame")

    return DF


# -------------------------------------------------
# Utility: convert NumPy โ†’ Python
# -------------------------------------------------
def to_python(obj):
    if isinstance(obj, (np.integer, np.int64)):
        return int(obj)
    if isinstance(obj, (np.floating, np.float64)):
        return float(obj)
    if isinstance(obj, (np.bool_)):
        return bool(obj)
    if isinstance(obj, pd.Timestamp):
        return obj.isoformat()
    return obj


def clean_dict(d):
    return {k: to_python(v) for k, v in d.items()}


# -------------------------------------------------
# Routes
# -------------------------------------------------

@app.on_event("startup")
def startup_event():
    logger.info("๐Ÿš€ Earnings Transcript API starting up")


@app.get("/")
def root():
    logger.info("Root endpoint called")
    return {"message": "Earnings Transcript API is running"}


@app.get("/tickers")
def list_tickers():
    logger.info("Listing all tickers")
    df = load_data_once()

    if "symbol" not in df.columns:
        return {"error": "Dataset does not contain 'symbol' column"}

    tickers = sorted(df["symbol"].dropna().unique().tolist())
    logger.info(f"Returned {len(tickers)} tickers")
    return {"tickers": tickers}


@app.get("/transcript/{symbol}")
def get_transcript(symbol: str):
    logger.info(f"Transcript request received for symbol: {symbol}")

    df = load_data_once()
    symbol = symbol.upper()

    if "symbol" not in df.columns:
        return {"error": "Dataset missing 'symbol' column"}

    rows = df[df["symbol"] == symbol]

    if rows.empty:
        logger.warning(f"No transcripts found for symbol: {symbol}")
        return {"error": f"No transcripts found for symbol {symbol}"}

    row = rows.iloc[0]
    base_info = clean_dict(row.to_dict())

    # Extract structured content (correct column name)
    segments = row.get("structured_content", None)

    if isinstance(segments, list):
        logger.info(f"Cleaning {len(segments)} segments for {symbol}")
        cleaned_segments = [
            clean_dict(seg) for seg in segments if isinstance(seg, dict)
        ]
        base_info["segments"] = cleaned_segments
    else:
        logger.info(f"No structured_content found for {symbol}")
        base_info["segments"] = []

    logger.info(f"Returning transcript for {symbol}")
    return base_info