Spaces:
Running
Running
Update usage_logging.py
Browse files- usage_logging.py +82 -9
usage_logging.py
CHANGED
|
@@ -276,12 +276,16 @@ def _expand_country_code(code: str) -> str:
|
|
| 276 |
def rebuild_visits_rollup_from_event_files() -> str:
|
| 277 |
"""
|
| 278 |
Rebuilds usage/visits.jsonl from immutable per-event JSON files in usage/events/.
|
| 279 |
-
|
|
|
|
|
|
|
| 280 |
"""
|
| 281 |
api = _hf_api()
|
| 282 |
if not api:
|
| 283 |
return "HF_TOKEN not available. Rollup requires write access."
|
| 284 |
|
|
|
|
|
|
|
| 285 |
# 1) List files
|
| 286 |
try:
|
| 287 |
files = list_repo_files(repo_id=USAGE_DATASET_REPO, repo_type="dataset")
|
|
@@ -317,22 +321,91 @@ def rebuild_visits_rollup_from_event_files() -> str:
|
|
| 317 |
# 3) Sort by ts_utc
|
| 318 |
events.sort(key=lambda e: (e.get("ts_utc") or ""))
|
| 319 |
|
| 320 |
-
# 4)
|
| 321 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 322 |
for evt in events:
|
| 323 |
-
|
| 324 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 325 |
|
| 326 |
-
#
|
| 327 |
try:
|
| 328 |
api.upload_file(
|
| 329 |
repo_id=USAGE_DATASET_REPO,
|
| 330 |
repo_type="dataset",
|
| 331 |
-
path_in_repo=ROLLUP_PATH,
|
| 332 |
-
path_or_fileobj=
|
| 333 |
commit_message=f"rebuild {ROLLUP_PATH} from {USAGE_EVENTS_DIR}",
|
| 334 |
)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 335 |
except Exception as e:
|
| 336 |
return f"Rollup upload failed: {repr(e)}"
|
| 337 |
|
| 338 |
-
return
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 276 |
def rebuild_visits_rollup_from_event_files() -> str:
|
| 277 |
"""
|
| 278 |
Rebuilds usage/visits.jsonl from immutable per-event JSON files in usage/events/.
|
| 279 |
+
ALSO writes an enriched rollup usage/visits_enriched.jsonl where:
|
| 280 |
+
- legacy rows (no session_id) keep their original country as final_country
|
| 281 |
+
- new click rows (with session_id) get final_country from the session's usage_start row when available
|
| 282 |
"""
|
| 283 |
api = _hf_api()
|
| 284 |
if not api:
|
| 285 |
return "HF_TOKEN not available. Rollup requires write access."
|
| 286 |
|
| 287 |
+
ENRICHED_ROLLUP_PATH = "usage/visits_enriched.jsonl"
|
| 288 |
+
|
| 289 |
# 1) List files
|
| 290 |
try:
|
| 291 |
files = list_repo_files(repo_id=USAGE_DATASET_REPO, repo_type="dataset")
|
|
|
|
| 321 |
# 3) Sort by ts_utc
|
| 322 |
events.sort(key=lambda e: (e.get("ts_utc") or ""))
|
| 323 |
|
| 324 |
+
# 4) Build session → geo map from usage_start events (new schema)
|
| 325 |
+
# NOTE: you said you are using "usage_start" (not "session_start")
|
| 326 |
+
session_geo = {}
|
| 327 |
+
for e in events:
|
| 328 |
+
if e.get("event") == "usage_start":
|
| 329 |
+
sid = (e.get("session_id") or "").strip()
|
| 330 |
+
if not sid:
|
| 331 |
+
continue # legacy usage_start rows won’t have session_id; ignore for mapping
|
| 332 |
+
if sid not in session_geo:
|
| 333 |
+
session_geo[sid] = {
|
| 334 |
+
"country_session": e.get("country") or "Unknown",
|
| 335 |
+
"country_code_session": (e.get("country_code") or "").strip().upper(),
|
| 336 |
+
"country_source_session": e.get("country_source") or "unknown",
|
| 337 |
+
}
|
| 338 |
+
|
| 339 |
+
# 5) Write RAW JSONL (same behavior as today)
|
| 340 |
+
buf_raw = BytesIO()
|
| 341 |
+
for evt in events:
|
| 342 |
+
buf_raw.write((json.dumps(evt, ensure_ascii=False) + "\n").encode("utf-8"))
|
| 343 |
+
buf_raw.seek(0)
|
| 344 |
+
|
| 345 |
+
# 6) Write ENRICHED JSONL
|
| 346 |
+
# Rules:
|
| 347 |
+
# - Legacy rows (no session_id): final_country = evt.country (already correct)
|
| 348 |
+
# - New rows with session_id:
|
| 349 |
+
# - if evt.country is Unknown/blank, use country from session usage_start
|
| 350 |
+
# - keep original evt.country as well (don’t overwrite)
|
| 351 |
+
buf_enriched = BytesIO()
|
| 352 |
+
|
| 353 |
for evt in events:
|
| 354 |
+
enriched = dict(evt)
|
| 355 |
+
sid = (evt.get("session_id") or "").strip()
|
| 356 |
+
geo = session_geo.get(sid, {}) if sid else {}
|
| 357 |
+
|
| 358 |
+
# Keep a copy of session geo (useful for debugging/audit)
|
| 359 |
+
if geo:
|
| 360 |
+
enriched.update(geo)
|
| 361 |
+
|
| 362 |
+
evt_country = (evt.get("country") or "").strip()
|
| 363 |
+
evt_cc = (evt.get("country_code") or "").strip().upper()
|
| 364 |
+
|
| 365 |
+
# Determine final_country / final_country_code
|
| 366 |
+
if sid:
|
| 367 |
+
# New schema row: prefer existing event country if it’s meaningful,
|
| 368 |
+
# else fall back to session geo.
|
| 369 |
+
if evt_country and evt_country.lower() != "unknown":
|
| 370 |
+
enriched["final_country"] = evt_country
|
| 371 |
+
enriched["final_country_code"] = evt_cc
|
| 372 |
+
enriched["final_country_source"] = evt.get("country_source") or "unknown"
|
| 373 |
+
else:
|
| 374 |
+
enriched["final_country"] = geo.get("country_session", "Unknown")
|
| 375 |
+
enriched["final_country_code"] = geo.get("country_code_session", "")
|
| 376 |
+
enriched["final_country_source"] = geo.get("country_source_session", "unknown")
|
| 377 |
+
else:
|
| 378 |
+
# Legacy row: preserve the original country fields
|
| 379 |
+
enriched["final_country"] = evt_country or "Unknown"
|
| 380 |
+
enriched["final_country_code"] = evt_cc
|
| 381 |
+
enriched["final_country_source"] = evt.get("country_source") or "unknown"
|
| 382 |
+
|
| 383 |
+
buf_enriched.write((json.dumps(enriched, ensure_ascii=False) + "\n").encode("utf-8"))
|
| 384 |
+
|
| 385 |
+
buf_enriched.seek(0)
|
| 386 |
|
| 387 |
+
# 7) Upload both rollups
|
| 388 |
try:
|
| 389 |
api.upload_file(
|
| 390 |
repo_id=USAGE_DATASET_REPO,
|
| 391 |
repo_type="dataset",
|
| 392 |
+
path_in_repo=ROLLUP_PATH, # your existing usage/visits.jsonl
|
| 393 |
+
path_or_fileobj=buf_raw,
|
| 394 |
commit_message=f"rebuild {ROLLUP_PATH} from {USAGE_EVENTS_DIR}",
|
| 395 |
)
|
| 396 |
+
api.upload_file(
|
| 397 |
+
repo_id=USAGE_DATASET_REPO,
|
| 398 |
+
repo_type="dataset",
|
| 399 |
+
path_in_repo=ENRICHED_ROLLUP_PATH,
|
| 400 |
+
path_or_fileobj=buf_enriched,
|
| 401 |
+
commit_message=f"rebuild {ENRICHED_ROLLUP_PATH} from {USAGE_EVENTS_DIR}",
|
| 402 |
+
)
|
| 403 |
except Exception as e:
|
| 404 |
return f"Rollup upload failed: {repr(e)}"
|
| 405 |
|
| 406 |
+
return (
|
| 407 |
+
f"Rollups rebuilt:\n"
|
| 408 |
+
f"- RAW: {ROLLUP_PATH} rows={len(events)} (bad_files={bad})\n"
|
| 409 |
+
f"- ENRICHED: {ENRICHED_ROLLUP_PATH} rows={len(events)} (bad_files={bad})"
|
| 410 |
+
)
|
| 411 |
+
|