researchradar / scripts /grobid_watchdog.py
unknown
Add full-text extraction pipeline scripts (Phase 8 WIP)
202bf64
"""Watchdog for GROBID extraction pipeline.
Monitors and auto-restarts the GROBID pipeline if it crashes.
Also ensures the GROBID Docker container stays running.
Usage:
pythonw scripts/grobid_watchdog.py (silent background)
python scripts/grobid_watchdog.py (with console output)
"""
import json
import logging
import os
import subprocess
import sys
import time
import urllib.request
from pathlib import Path
PROJECT_DIR = Path(__file__).parent.parent
os.chdir(PROJECT_DIR)
LOG_FILE = PROJECT_DIR / "data" / "raw" / "grobid_watchdog.log"
PROGRESS_FILE = PROJECT_DIR / "data" / "raw" / "pipeline_progress.json"
DOCKER_PATH = r"C:\Program Files\Docker\Docker\resources\bin\docker.exe"
GROBID_URL = "http://localhost:8070"
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
handlers=[
logging.FileHandler(str(LOG_FILE), encoding="utf-8"),
logging.StreamHandler(sys.stdout),
],
)
log = logging.getLogger(__name__)
MAX_RETRIES = 20
RETRY_DELAY = 60
def is_grobid_alive():
try:
r = urllib.request.urlopen(f"{GROBID_URL}/api/isalive", timeout=5)
return r.status == 200
except Exception:
return False
def ensure_grobid():
if is_grobid_alive():
return True
log.info("GROBID not responding, restarting container...")
subprocess.run([DOCKER_PATH, "start", "grobid"], capture_output=True)
for i in range(60):
if is_grobid_alive():
log.info("GROBID recovered after %ds", i * 5)
return True
time.sleep(5)
log.error("GROBID failed to recover")
return False
def is_pipeline_done():
try:
data = json.loads(PROGRESS_FILE.read_text(encoding="utf-8"))
done = len(data.get("grobid_done", []))
failed = len(data.get("grobid_failed", []))
total = 26543 # known total
return (done + failed) >= total * 0.99 # 99% threshold
except Exception:
return False
def main():
log.info("GROBID watchdog started")
for attempt in range(1, MAX_RETRIES + 1):
if is_pipeline_done():
log.info("Pipeline already complete!")
break
log.info("=== Attempt %d/%d ===", attempt, MAX_RETRIES)
if not ensure_grobid():
log.error("Cannot start GROBID, retrying in %ds...", RETRY_DELAY)
time.sleep(RETRY_DELAY)
continue
log.info("Starting fulltext_pipeline.py --skip-download")
proc = subprocess.Popen(
[sys.executable, "scripts/fulltext_pipeline.py", "--skip-download", "--workers", "1"],
cwd=str(PROJECT_DIR),
)
log.info("Pipeline PID=%d", proc.pid)
proc.wait()
if proc.returncode == 0:
log.info("Pipeline completed successfully!")
break
else:
log.warning("Pipeline exited with code %d", proc.returncode)
if attempt < MAX_RETRIES:
log.info("Restarting in %ds...", RETRY_DELAY)
time.sleep(RETRY_DELAY)
# Cleanup: stop GROBID container
log.info("Stopping GROBID container...")
subprocess.run([DOCKER_PATH, "stop", "grobid"], capture_output=True)
subprocess.run([DOCKER_PATH, "rm", "grobid"], capture_output=True)
log.info("Watchdog finished. Shutting down machine in 60s...")
subprocess.run(["shutdown", "/s", "/t", "60"], capture_output=True)
if __name__ == "__main__":
main()