Syntrex commited on
Commit
a937adb
·
1 Parent(s): 0c51058

Make debug snapshot reads retry-safe

Browse files
Files changed (4) hide show
  1. app.py +62 -2
  2. data/shared_baseline.py +3 -3
  3. database/db.py +93 -1
  4. visualization/debug_page.py +23 -0
app.py CHANGED
@@ -100,6 +100,7 @@ from database.db import (
100
  insert_bet,
101
  next_bet_id,
102
  read_table,
 
103
  read_cached_odds,
104
  read_cached_probable_starters,
105
  read_cached_probable_starters_meta,
@@ -3832,17 +3833,75 @@ def main() -> None:
3832
  _debug_baseline_bundle = load_shared_baseline_bundle_from_snapshots(
3833
  max_age_seconds=max(STATCAST_TTL_SECONDS, 60 * 60)
3834
  )
3835
- _debug_hitter_df = read_table(conn, "shared_hitter_baseline_event_rows").drop(
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
3836
  columns=["snapshot_built_at", "snapshot_version", "source_status"],
3837
  errors="ignore",
3838
  )
3839
- _debug_pitcher_df = read_table(conn, "shared_pitcher_baseline_event_rows").drop(
3840
  columns=["snapshot_built_at", "snapshot_version", "source_status"],
3841
  errors="ignore",
3842
  )
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
3843
  if _debug_hitter_df.empty:
 
3844
  _debug_hitter_df = load_statcast_recent()
3845
  if _debug_pitcher_df.empty:
 
3846
  _debug_pitcher_df = _debug_hitter_df
3847
  loader["update"]("Rendering debug tables...", 0.75)
3848
  render_debug(
@@ -3857,6 +3916,7 @@ def main() -> None:
3857
  grade_outcomes_fn=grade_final_game_outcomes_from_scores,
3858
  grade_props_fn=grade_batter_prop_outcomes_from_audit,
3859
  fill_realized_fn=fill_batter_prop_realized_outcomes,
 
3860
  )
3861
  loader["clear"]()
3862
 
 
100
  insert_bet,
101
  next_bet_id,
102
  read_table,
103
+ read_table_retryable,
104
  read_cached_odds,
105
  read_cached_probable_starters,
106
  read_cached_probable_starters_meta,
 
3833
  _debug_baseline_bundle = load_shared_baseline_bundle_from_snapshots(
3834
  max_age_seconds=max(STATCAST_TTL_SECONDS, 60 * 60)
3835
  )
3836
+ _debug_read_status = {
3837
+ "hitter_event_rows": {
3838
+ "table_name": "shared_hitter_baseline_event_rows",
3839
+ "read_source": "baseline_bundle",
3840
+ "read_attempts": 1,
3841
+ "retry_used": False,
3842
+ "read_error": "",
3843
+ "snapshot_built_at": None,
3844
+ "source_status": str(_debug_baseline_bundle.get("snapshot_source_status") or ""),
3845
+ },
3846
+ "pitcher_event_rows": {
3847
+ "table_name": "shared_pitcher_baseline_event_rows",
3848
+ "read_source": "baseline_bundle",
3849
+ "read_attempts": 1,
3850
+ "retry_used": False,
3851
+ "read_error": "",
3852
+ "snapshot_built_at": None,
3853
+ "source_status": str(_debug_baseline_bundle.get("snapshot_source_status") or ""),
3854
+ },
3855
+ }
3856
+ _baseline_status_df = _debug_baseline_bundle.get("snapshot_status", pd.DataFrame())
3857
+ if isinstance(_baseline_status_df, pd.DataFrame) and not _baseline_status_df.empty:
3858
+ for _table_key, _status_key in [
3859
+ ("shared_hitter_baseline_event_rows", "hitter_event_rows"),
3860
+ ("shared_pitcher_baseline_event_rows", "pitcher_event_rows"),
3861
+ ]:
3862
+ _row = _baseline_status_df[_baseline_status_df["table_name"] == _table_key]
3863
+ if not _row.empty:
3864
+ _debug_read_status[_status_key]["snapshot_built_at"] = _row.iloc[0].get("snapshot_built_at")
3865
+ _debug_read_status[_status_key]["source_status"] = _row.iloc[0].get("source_status")
3866
+
3867
+ _debug_hitter_df = _debug_baseline_bundle.get("blended_batter_df", pd.DataFrame()).copy()
3868
+ _debug_pitcher_df = _debug_baseline_bundle.get("blended_pitcher_df", pd.DataFrame()).copy()
3869
+ if not isinstance(_debug_hitter_df, pd.DataFrame):
3870
+ _debug_hitter_df = pd.DataFrame()
3871
+ if not isinstance(_debug_pitcher_df, pd.DataFrame):
3872
+ _debug_pitcher_df = pd.DataFrame()
3873
+ _debug_hitter_df = _debug_hitter_df.drop(
3874
  columns=["snapshot_built_at", "snapshot_version", "source_status"],
3875
  errors="ignore",
3876
  )
3877
+ _debug_pitcher_df = _debug_pitcher_df.drop(
3878
  columns=["snapshot_built_at", "snapshot_version", "source_status"],
3879
  errors="ignore",
3880
  )
3881
+
3882
+ if _debug_hitter_df.empty:
3883
+ _debug_hitter_df, _debug_read_status["hitter_event_rows"] = read_table_retryable(
3884
+ conn,
3885
+ "shared_hitter_baseline_event_rows",
3886
+ )
3887
+ _debug_hitter_df = _debug_hitter_df.drop(
3888
+ columns=["snapshot_built_at", "snapshot_version", "source_status"],
3889
+ errors="ignore",
3890
+ )
3891
+ if _debug_pitcher_df.empty:
3892
+ _debug_pitcher_df, _debug_read_status["pitcher_event_rows"] = read_table_retryable(
3893
+ conn,
3894
+ "shared_pitcher_baseline_event_rows",
3895
+ )
3896
+ _debug_pitcher_df = _debug_pitcher_df.drop(
3897
+ columns=["snapshot_built_at", "snapshot_version", "source_status"],
3898
+ errors="ignore",
3899
+ )
3900
  if _debug_hitter_df.empty:
3901
+ _debug_read_status["hitter_event_rows"]["read_source"] = "load_statcast_recent_fallback"
3902
  _debug_hitter_df = load_statcast_recent()
3903
  if _debug_pitcher_df.empty:
3904
+ _debug_read_status["pitcher_event_rows"]["read_source"] = "debug_hitter_fallback"
3905
  _debug_pitcher_df = _debug_hitter_df
3906
  loader["update"]("Rendering debug tables...", 0.75)
3907
  render_debug(
 
3916
  grade_outcomes_fn=grade_final_game_outcomes_from_scores,
3917
  grade_props_fn=grade_batter_prop_outcomes_from_audit,
3918
  fill_realized_fn=fill_batter_prop_realized_outcomes,
3919
+ debug_event_row_status=_debug_read_status,
3920
  )
3921
  loader["clear"]()
3922
 
data/shared_baseline.py CHANGED
@@ -8,7 +8,7 @@ from typing import Any
8
  import pandas as pd
9
  from sqlalchemy import text
10
 
11
- from database.db import get_connection, replace_table_contents, upsert_dataframe
12
  from features.pitch_features import add_pitch_features
13
  from models.rolling_form_model import (
14
  build_batter_rolling_form_row,
@@ -1015,8 +1015,8 @@ def _read_snapshot_table(
1015
  query = text(
1016
  f"SELECT * FROM {table_name} WHERE player_name IN ({', '.join(clauses)}) ORDER BY player_name"
1017
  )
1018
- return pd.read_sql(query, conn, params=params)
1019
- return pd.read_sql(text(f"SELECT * FROM {table_name} ORDER BY player_name"), conn)
1020
 
1021
 
1022
  def _replace_snapshot_scope(
 
8
  import pandas as pd
9
  from sqlalchemy import text
10
 
11
+ from database.db import get_connection, replace_table_contents, safe_read_sql, upsert_dataframe
12
  from features.pitch_features import add_pitch_features
13
  from models.rolling_form_model import (
14
  build_batter_rolling_form_row,
 
1015
  query = text(
1016
  f"SELECT * FROM {table_name} WHERE player_name IN ({', '.join(clauses)}) ORDER BY player_name"
1017
  )
1018
+ return safe_read_sql(query, conn, params=params)
1019
+ return safe_read_sql(text(f"SELECT * FROM {table_name} ORDER BY player_name"), conn)
1020
 
1021
 
1022
  def _replace_snapshot_scope(
database/db.py CHANGED
@@ -23,10 +23,12 @@ so they can be added later without structural changes.
23
  from __future__ import annotations
24
 
25
  import json
 
26
  from typing import Any, Iterable, Mapping
27
 
28
  import pandas as pd
29
  from sqlalchemy import text
 
30
 
31
  from database import remote_db
32
  from utils.helpers import utc_now_iso
@@ -37,6 +39,8 @@ from utils.helpers import utc_now_iso
37
  # ---------------------------------------------------------------------------
38
 
39
  _INSERT_CHUNK_SIZE = 500
 
 
40
 
41
 
42
  # ---------------------------------------------------------------------------
@@ -76,6 +80,94 @@ def _bulk_insert(conn, table: str, df: pd.DataFrame) -> None:
76
  conn.execute(sql, records[i : i + _INSERT_CHUNK_SIZE])
77
 
78
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
79
  # ---------------------------------------------------------------------------
80
  # Schema initialization
81
  # ---------------------------------------------------------------------------
@@ -582,7 +674,7 @@ def replace_table_contents(
582
 
583
 
584
  def read_table(conn, table_name: str) -> pd.DataFrame:
585
- return pd.read_sql(text(f"SELECT * FROM {table_name}"), conn)
586
 
587
 
588
  def _safe_json_dump(value: Any) -> str:
 
23
  from __future__ import annotations
24
 
25
  import json
26
+ import time
27
  from typing import Any, Iterable, Mapping
28
 
29
  import pandas as pd
30
  from sqlalchemy import text
31
+ from sqlalchemy.exc import DBAPIError, OperationalError
32
 
33
  from database import remote_db
34
  from utils.helpers import utc_now_iso
 
39
  # ---------------------------------------------------------------------------
40
 
41
  _INSERT_CHUNK_SIZE = 500
42
+ _READ_RETRY_ATTEMPTS = 4
43
+ _READ_RETRY_BASE_DELAY_SECONDS = 0.20
44
 
45
 
46
  # ---------------------------------------------------------------------------
 
80
  conn.execute(sql, records[i : i + _INSERT_CHUNK_SIZE])
81
 
82
 
83
+ def _is_retryable_read_error(exc: Exception) -> bool:
84
+ text_value = str(exc or "").lower()
85
+ retry_markers = (
86
+ "serializationfailure",
87
+ "restart transaction",
88
+ "transactionretrywithprotorefresherror",
89
+ "readwithinuncertaintyintervalerror",
90
+ "retry txn",
91
+ "retry_serializable",
92
+ )
93
+ return any(marker in text_value for marker in retry_markers)
94
+
95
+
96
+ def safe_read_sql(
97
+ sql,
98
+ conn,
99
+ params: Mapping[str, Any] | None = None,
100
+ *,
101
+ max_attempts: int = _READ_RETRY_ATTEMPTS,
102
+ base_delay_seconds: float = _READ_RETRY_BASE_DELAY_SECONDS,
103
+ ) -> pd.DataFrame:
104
+ last_error: Exception | None = None
105
+ for attempt in range(1, max(1, int(max_attempts)) + 1):
106
+ try:
107
+ return pd.read_sql(sql, conn, params=params)
108
+ except (OperationalError, DBAPIError) as exc:
109
+ last_error = exc
110
+ if attempt >= max_attempts or not _is_retryable_read_error(exc):
111
+ raise
112
+ time.sleep(base_delay_seconds * attempt)
113
+ if last_error is not None:
114
+ raise last_error
115
+ return pd.DataFrame()
116
+
117
+
118
+ def read_table_retryable(
119
+ conn,
120
+ table_name: str,
121
+ *,
122
+ where_sql: str | None = None,
123
+ params: Mapping[str, Any] | None = None,
124
+ max_attempts: int = _READ_RETRY_ATTEMPTS,
125
+ ) -> tuple[pd.DataFrame, dict[str, Any]]:
126
+ sql_text = f"SELECT * FROM {table_name}"
127
+ if where_sql:
128
+ sql_text += f" WHERE {where_sql}"
129
+
130
+ attempts = 0
131
+ retry_used = False
132
+ last_error = ""
133
+ for attempt in range(1, max(1, int(max_attempts)) + 1):
134
+ attempts = attempt
135
+ try:
136
+ df = safe_read_sql(
137
+ text(sql_text),
138
+ conn,
139
+ params=params,
140
+ max_attempts=1,
141
+ )
142
+ return df, {
143
+ "table_name": table_name,
144
+ "read_source": "db_retryable",
145
+ "read_attempts": attempts,
146
+ "retry_used": retry_used,
147
+ "read_error": "",
148
+ }
149
+ except (OperationalError, DBAPIError) as exc:
150
+ last_error = str(exc)
151
+ if attempt >= max_attempts or not _is_retryable_read_error(exc):
152
+ return pd.DataFrame(), {
153
+ "table_name": table_name,
154
+ "read_source": "db_retryable_failed",
155
+ "read_attempts": attempts,
156
+ "retry_used": retry_used,
157
+ "read_error": last_error,
158
+ }
159
+ retry_used = True
160
+ time.sleep(_READ_RETRY_BASE_DELAY_SECONDS * attempt)
161
+
162
+ return pd.DataFrame(), {
163
+ "table_name": table_name,
164
+ "read_source": "db_retryable_failed",
165
+ "read_attempts": attempts,
166
+ "retry_used": retry_used,
167
+ "read_error": last_error,
168
+ }
169
+
170
+
171
  # ---------------------------------------------------------------------------
172
  # Schema initialization
173
  # ---------------------------------------------------------------------------
 
674
 
675
 
676
  def read_table(conn, table_name: str) -> pd.DataFrame:
677
+ return safe_read_sql(text(f"SELECT * FROM {table_name}"), conn)
678
 
679
 
680
  def _safe_json_dump(value: Any) -> str:
visualization/debug_page.py CHANGED
@@ -316,6 +316,7 @@ def render_debug(
316
  grade_outcomes_fn: Callable | None = None,
317
  grade_props_fn: Callable | None = None,
318
  fill_realized_fn: Callable | None = None,
 
319
  ) -> None:
320
  """
321
  Full Debug Dashboard page.
@@ -1038,6 +1039,28 @@ def render_debug(
1038
  else:
1039
  st.info("Open the Props page in this session to capture HR health diagnostics.")
1040
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1041
  with st.expander("Cached Source Freshness", expanded=False):
1042
  freshness_rows: list[dict[str, Any]] = []
1043
 
 
316
  grade_outcomes_fn: Callable | None = None,
317
  grade_props_fn: Callable | None = None,
318
  fill_realized_fn: Callable | None = None,
319
+ debug_event_row_status: dict[str, dict[str, Any]] | None = None,
320
  ) -> None:
321
  """
322
  Full Debug Dashboard page.
 
1039
  else:
1040
  st.info("Open the Props page in this session to capture HR health diagnostics.")
1041
 
1042
+ with st.expander("Debug Event Row Read Status", expanded=False):
1043
+ read_status = debug_event_row_status or {}
1044
+ if read_status:
1045
+ status_rows = pd.DataFrame(
1046
+ [
1047
+ {
1048
+ "section": key,
1049
+ "table_name": value.get("table_name"),
1050
+ "read_source": value.get("read_source"),
1051
+ "read_attempts": value.get("read_attempts"),
1052
+ "retry_used": value.get("retry_used"),
1053
+ "snapshot_built_at": value.get("snapshot_built_at"),
1054
+ "source_status": value.get("source_status"),
1055
+ "read_error": value.get("read_error"),
1056
+ }
1057
+ for key, value in read_status.items()
1058
+ ]
1059
+ )
1060
+ st.dataframe(status_rows, use_container_width=True, hide_index=True)
1061
+ else:
1062
+ st.info("No debug event-row read status captured in this session.")
1063
+
1064
  with st.expander("Cached Source Freshness", expanded=False):
1065
  freshness_rows: list[dict[str, Any]] = []
1066