Spaces:
Running
Running
File size: 8,663 Bytes
6665ae7 34efa08 a1f6dcf 34efa08 6665ae7 34efa08 a1f6dcf d2e4e52 34efa08 a1f6dcf 34efa08 a056eaa d2e4e52 a056eaa d2e4e52 a056eaa d2e4e52 6665ae7 34efa08 a1f6dcf 34efa08 6665ae7 d2e4e52 34efa08 a1f6dcf 34efa08 a1f6dcf 34efa08 d2e4e52 34efa08 a1f6dcf d2e4e52 a1f6dcf d2e4e52 a1f6dcf 6665ae7 a1f6dcf 6665ae7 0dc5a09 a1f6dcf 6665ae7 a1f6dcf a056eaa a1f6dcf 34efa08 a1f6dcf d2e4e52 34efa08 a1f6dcf 34efa08 a1f6dcf 6665ae7 34efa08 a1f6dcf 34efa08 a1f6dcf |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 |
import os
import sys
import subprocess
import threading
import traceback
from glob import glob
from flask import Blueprint, jsonify, request
ingest_trigger_bp = Blueprint("ingest_trigger_bp", __name__)
# Prevent concurrent executions
_ingest_lock = threading.Lock()
def _list_dir_sample(path: str, patterns=("*.pdf", "*.PDF"), limit: int = 10):
try:
files = []
for pat in patterns:
files.extend(glob(os.path.join(path, pat)))
files.sort()
total = len(files)
sample = files[:limit]
return {"exists": os.path.isdir(path), "total": total, "sample": sample}
except Exception as e:
return {"exists": False, "error": str(e)}
def _count_files(path: str, exts=(".json", ".sqlite3", ".parquet", ".bin", ".txt", ".lock")):
try:
if not os.path.isdir(path):
return {"exists": False, "total": 0, "by_ext": {}}
counts = {}
total = 0
for root, _, files in os.walk(path):
for f in files:
total += 1
ext = os.path.splitext(f)[1].lower()
counts[ext] = counts.get(ext, 0) + 1
return {"exists": True, "total": total, "by_ext": counts}
except Exception as e:
return {"exists": False, "total": 0, "error": str(e)}
@ingest_trigger_bp.route("/ingest/ping", methods=["GET"])
def ingest_ping():
info = {
"ok": True,
"cwd": os.getcwd(),
"env": {
"ENV": os.getenv("ENV"),
"INGEST_MODULE": os.getenv("INGEST_MODULE", "ragg.ingest_all"),
"INGEST_TIMEOUT_SEC": os.getenv("INGEST_TIMEOUT_SEC", "1800"),
"CHROMA_DIR": os.getenv("CHROMA_DIR"),
"CHROMA_ROOT": os.getenv("CHROMA_ROOT"),
"HF_HOME": os.getenv("HF_HOME"),
}
}
print("\n=== [PING] ===", flush=True)
print(info, flush=True)
return jsonify(info), 200
@ingest_trigger_bp.route("/ingest/debug", methods=["GET"])
def ingest_debug():
import importlib
check_paths = ["ragg/pdfs/low", "ragg/pdfs/mid", "ragg/pdfs/high",
"pdfs/low", "pdfs/mid", "pdfs/high"]
paths_info = []
for p in check_paths:
sample = _list_dir_sample(p)
paths_info.append({"path": p, **sample})
mod_name = os.getenv("INGEST_MODULE", "ragg.ingest_all")
import_ok, callable_ok, import_error = False, False, None
try:
mod = importlib.import_module(mod_name)
import_ok = True
callable_ok = hasattr(mod, "ingest_all_levels")
except Exception as e:
import_error = f"{type(e).__name__}: {e}"
resp = {
"cwd": os.getcwd(),
"env": {
"ENV": os.getenv("ENV"),
"INGEST_MODULE": mod_name,
"CHROMA_DIR": os.getenv("CHROMA_DIR"),
"CHROMA_ROOT": os.getenv("CHROMA_ROOT"),
},
"paths": paths_info,
"import_ok": import_ok,
"callable_ok": callable_ok,
"import_error": import_error
}
print("\n=== [INGEST DEBUG] ===", flush=True)
print(resp, flush=True)
return jsonify(resp), 200
@ingest_trigger_bp.route("/ingest/status", methods=["GET"])
def ingest_status():
chroma_root = os.getenv("CHROMA_ROOT") or "/data/chroma"
levels = ["low", "mid", "high"]
out = {"chroma_root": chroma_root, "levels": {}}
for lv in levels:
out["levels"][lv] = _count_files(os.path.join(chroma_root, lv))
print("\n=== [INGEST STATUS] ===", flush=True)
print(out, flush=True)
return jsonify(out), 200
# Inline run (explicit)
@ingest_trigger_bp.route("/ingest/run-inline", methods=["POST"])
def ingest_run_inline():
import importlib
try:
print("\n=== [INGEST INLINE] ===", flush=True)
mod_name = os.getenv("INGEST_MODULE", "ragg.ingest_all")
print("Importing module:", mod_name, flush=True)
mod = importlib.import_module(mod_name)
if not hasattr(mod, "ingest_all_levels"):
return jsonify({"status": "error", "message": "ingest_all_levels() not found"}), 500
# Quick preflight
for p in ["ragg/pdfs/low", "ragg/pdfs/mid", "ragg/pdfs/high", "pdfs/low", "pdfs/mid", "pdfs/high"]:
print(f"[INLINE] {p} -> {_list_dir_sample(p)}", flush=True)
mod.ingest_all_levels()
return jsonify({"status": "success", "message": "Ingest completed inline"}), 200
except Exception as e:
print("[ERROR][INLINE]", e, flush=True)
print(traceback.format_exc(), flush=True)
return jsonify({"status": "error", "error": f"{type(e).__name__}: {e}",
"traceback": traceback.format_exc()}), 500
# /ingest – inline by default; subprocess kept as optional fallback
@ingest_trigger_bp.route("/ingest", methods=["POST"])
def run_ingest():
if not _ingest_lock.acquire(blocking=False):
print("[DEBUG] Ingest lock already held → busy", flush=True)
return jsonify({"status": "busy", "message": "Ingestion already in progress"}), 409
try:
use_subprocess = os.getenv("INGEST_USE_SUBPROCESS", "0") == "1"
module_name = os.getenv("INGEST_MODULE", "ragg.ingest_all")
timeout_sec = int(os.getenv("INGEST_TIMEOUT_SEC", "1800"))
print("\n=== [INGEST TRIGGER] ===", flush=True)
print("Trigger called", flush=True)
print("Use subprocess:", use_subprocess, flush=True)
print("Module:", module_name, flush=True)
print("CWD:", os.getcwd(), flush=True)
print("PYTHON:", sys.executable, flush=True)
print("PYTHONPATH (len={}):".format(len(sys.path)), flush=True)
print(sys.path, flush=True)
print("ENV:", os.getenv("ENV"), flush=True)
print("CHROMA_DIR:", os.getenv("CHROMA_DIR"), flush=True)
print("CHROMA_ROOT:", os.getenv("CHROMA_ROOT"), flush=True)
# Preflight: show expected PDF folders
for b in ["ragg/pdfs/low", "ragg/pdfs/mid", "ragg/pdfs/high", "pdfs/low", "pdfs/mid", "pdfs/high"]:
info = _list_dir_sample(b)
print(f" - {b}: exists={info.get('exists')} total={info.get('total', 0)} sample={info.get('sample', [])}", flush=True)
if not use_subprocess:
# Inline mode (default)
import importlib
print("[DEBUG] Running inline ingestion...", flush=True)
mod = importlib.import_module(module_name)
if not hasattr(mod, "ingest_all_levels"):
return jsonify({"status": "error", "message": "ingest_all_levels() not found"}), 500
mod.ingest_all_levels()
return jsonify({"status": "success", "mode": "inline"}), 200
# Subprocess fallback (set INGEST_USE_SUBPROCESS=1 to enable)
cmd = [sys.executable, "-m", module_name]
print("\n[DEBUG] Running subprocess:", cmd, flush=True)
result = subprocess.run(
cmd,
capture_output=True,
text=True,
cwd=None,
timeout=timeout_sec
)
print("\n[DEBUG] Subprocess completed.", flush=True)
print("Return code:", result.returncode, flush=True)
def _preview(label, text, head=30, tail=30):
lines = (text or "").splitlines()
print(f"\n----- {label} (total lines: {len(lines)}) -----", flush=True)
for line in lines[:head]:
print(line, flush=True)
if len(lines) > head + tail:
print("... [truncated] ...", flush=True)
for line in lines[-tail:]:
print(line, flush=True)
print("----- END", label, "-----\n", flush=True)
_preview("STDOUT", result.stdout)
_preview("STDERR", result.stderr)
payload = {
"status": "success" if result.returncode == 0 else "error",
"returncode": result.returncode,
"stdout": (result.stdout or "")[-4000:],
"stderr": (result.stderr or "")[-4000:],
"mode": "subprocess"
}
status_code = 200 if result.returncode == 0 else 500
return jsonify(payload), status_code
except subprocess.TimeoutExpired:
print("[ERROR] Ingestion timed out.", flush=True)
return jsonify({"status": "timeout"}), 504
except Exception as e:
print("[ERROR] Exception during ingestion:", e, flush=True)
print(traceback.format_exc(), flush=True)
return jsonify({
"status": "error",
"message": "trigger crashed",
"traceback": traceback.format_exc()
}), 500
finally:
try:
_ingest_lock.release()
except Exception:
pass
|