Oviya commited on
Commit
d2e4e52
·
1 Parent(s): a056eaa
Files changed (1) hide show
  1. ragg/ingest_trigger.py +47 -21
ragg/ingest_trigger.py CHANGED
@@ -23,6 +23,21 @@ def _list_dir_sample(path: str, patterns=("*.pdf", "*.PDF"), limit: int = 10):
23
  except Exception as e:
24
  return {"exists": False, "error": str(e)}
25
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
26
  @ingest_trigger_bp.route("/ingest/ping", methods=["GET"])
27
  def ingest_ping():
28
  info = {
@@ -41,7 +56,6 @@ def ingest_ping():
41
  print(info, flush=True)
42
  return jsonify(info), 200
43
 
44
- # ---------- LIGHTWEIGHT DEBUG (no ingestion run) ----------
45
  @ingest_trigger_bp.route("/ingest/debug", methods=["GET"])
46
  def ingest_debug():
47
  import importlib
@@ -78,7 +92,18 @@ def ingest_debug():
78
  print(resp, flush=True)
79
  return jsonify(resp), 200
80
 
81
- # ---------- INLINE RUN (use only for debugging) ----------
 
 
 
 
 
 
 
 
 
 
 
82
  @ingest_trigger_bp.route("/ingest/run-inline", methods=["POST"])
83
  def ingest_run_inline():
84
  import importlib
@@ -90,8 +115,8 @@ def ingest_run_inline():
90
  if not hasattr(mod, "ingest_all_levels"):
91
  return jsonify({"status": "error", "message": "ingest_all_levels() not found"}), 500
92
 
93
- # Optional: quick preflight
94
- for p in ["ragg/pdfs/low", "ragg/pdfs/mid", "ragg/pdfs/high"]:
95
  print(f"[INLINE] {p} -> {_list_dir_sample(p)}", flush=True)
96
 
97
  mod.ingest_all_levels()
@@ -102,6 +127,7 @@ def ingest_run_inline():
102
  return jsonify({"status": "error", "error": f"{type(e).__name__}: {e}",
103
  "traceback": traceback.format_exc()}), 500
104
 
 
105
  @ingest_trigger_bp.route("/ingest", methods=["POST"])
106
  def run_ingest():
107
  if not _ingest_lock.acquire(blocking=False):
@@ -109,40 +135,40 @@ def run_ingest():
109
  return jsonify({"status": "busy", "message": "Ingestion already in progress"}), 409
110
 
111
  try:
 
112
  module_name = os.getenv("INGEST_MODULE", "ragg.ingest_all")
113
  timeout_sec = int(os.getenv("INGEST_TIMEOUT_SEC", "1800"))
114
 
115
  print("\n=== [INGEST TRIGGER] ===", flush=True)
116
  print("Trigger called", flush=True)
 
117
  print("Module:", module_name, flush=True)
118
  print("CWD:", os.getcwd(), flush=True)
119
  print("PYTHON:", sys.executable, flush=True)
120
  print("PYTHONPATH (len={}):".format(len(sys.path)), flush=True)
121
  print(sys.path, flush=True)
122
-
123
- # Log key env vars the ingestion relies on
124
  print("ENV:", os.getenv("ENV"), flush=True)
125
  print("CHROMA_DIR:", os.getenv("CHROMA_DIR"), flush=True)
126
  print("CHROMA_ROOT:", os.getenv("CHROMA_ROOT"), flush=True)
127
- print("HF_HOME:", os.getenv("HF_HOME"), flush=True)
128
-
129
- # Quick preflight: show whether expected PDF folders exist and sample files
130
- bases = [
131
- "ragg/pdfs/low",
132
- "ragg/pdfs/mid",
133
- "ragg/pdfs/high",
134
- "pdfs/low",
135
- "pdfs/mid",
136
- "pdfs/high",
137
- ]
138
- print("\n[DEBUG] PDF folders preflight:", flush=True)
139
- for b in bases:
140
  info = _list_dir_sample(b)
141
  print(f" - {b}: exists={info.get('exists')} total={info.get('total', 0)} sample={info.get('sample', [])}", flush=True)
142
 
 
 
 
 
 
 
 
 
 
 
 
143
  cmd = [sys.executable, "-m", module_name]
144
  print("\n[DEBUG] Running subprocess:", cmd, flush=True)
145
-
146
  result = subprocess.run(
147
  cmd,
148
  capture_output=True,
@@ -150,7 +176,6 @@ def run_ingest():
150
  cwd=None,
151
  timeout=timeout_sec
152
  )
153
-
154
  print("\n[DEBUG] Subprocess completed.", flush=True)
155
  print("Return code:", result.returncode, flush=True)
156
 
@@ -173,6 +198,7 @@ def run_ingest():
173
  "returncode": result.returncode,
174
  "stdout": (result.stdout or "")[-4000:],
175
  "stderr": (result.stderr or "")[-4000:],
 
176
  }
177
  status_code = 200 if result.returncode == 0 else 500
178
  return jsonify(payload), status_code
 
23
  except Exception as e:
24
  return {"exists": False, "error": str(e)}
25
 
26
+ def _count_files(path: str, exts=(".json", ".sqlite3", ".parquet", ".bin", ".txt", ".lock")):
27
+ try:
28
+ if not os.path.isdir(path):
29
+ return {"exists": False, "total": 0, "by_ext": {}}
30
+ counts = {}
31
+ total = 0
32
+ for root, _, files in os.walk(path):
33
+ for f in files:
34
+ total += 1
35
+ ext = os.path.splitext(f)[1].lower()
36
+ counts[ext] = counts.get(ext, 0) + 1
37
+ return {"exists": True, "total": total, "by_ext": counts}
38
+ except Exception as e:
39
+ return {"exists": False, "total": 0, "error": str(e)}
40
+
41
  @ingest_trigger_bp.route("/ingest/ping", methods=["GET"])
42
  def ingest_ping():
43
  info = {
 
56
  print(info, flush=True)
57
  return jsonify(info), 200
58
 
 
59
  @ingest_trigger_bp.route("/ingest/debug", methods=["GET"])
60
  def ingest_debug():
61
  import importlib
 
92
  print(resp, flush=True)
93
  return jsonify(resp), 200
94
 
95
+ @ingest_trigger_bp.route("/ingest/status", methods=["GET"])
96
+ def ingest_status():
97
+ chroma_root = os.getenv("CHROMA_ROOT") or "/data/chroma"
98
+ levels = ["low", "mid", "high"]
99
+ out = {"chroma_root": chroma_root, "levels": {}}
100
+ for lv in levels:
101
+ out["levels"][lv] = _count_files(os.path.join(chroma_root, lv))
102
+ print("\n=== [INGEST STATUS] ===", flush=True)
103
+ print(out, flush=True)
104
+ return jsonify(out), 200
105
+
106
+ # Inline run (explicit)
107
  @ingest_trigger_bp.route("/ingest/run-inline", methods=["POST"])
108
  def ingest_run_inline():
109
  import importlib
 
115
  if not hasattr(mod, "ingest_all_levels"):
116
  return jsonify({"status": "error", "message": "ingest_all_levels() not found"}), 500
117
 
118
+ # Quick preflight
119
+ for p in ["ragg/pdfs/low", "ragg/pdfs/mid", "ragg/pdfs/high", "pdfs/low", "pdfs/mid", "pdfs/high"]:
120
  print(f"[INLINE] {p} -> {_list_dir_sample(p)}", flush=True)
121
 
122
  mod.ingest_all_levels()
 
127
  return jsonify({"status": "error", "error": f"{type(e).__name__}: {e}",
128
  "traceback": traceback.format_exc()}), 500
129
 
130
+ # /ingest – inline by default; subprocess kept as optional fallback
131
  @ingest_trigger_bp.route("/ingest", methods=["POST"])
132
  def run_ingest():
133
  if not _ingest_lock.acquire(blocking=False):
 
135
  return jsonify({"status": "busy", "message": "Ingestion already in progress"}), 409
136
 
137
  try:
138
+ use_subprocess = os.getenv("INGEST_USE_SUBPROCESS", "0") == "1"
139
  module_name = os.getenv("INGEST_MODULE", "ragg.ingest_all")
140
  timeout_sec = int(os.getenv("INGEST_TIMEOUT_SEC", "1800"))
141
 
142
  print("\n=== [INGEST TRIGGER] ===", flush=True)
143
  print("Trigger called", flush=True)
144
+ print("Use subprocess:", use_subprocess, flush=True)
145
  print("Module:", module_name, flush=True)
146
  print("CWD:", os.getcwd(), flush=True)
147
  print("PYTHON:", sys.executable, flush=True)
148
  print("PYTHONPATH (len={}):".format(len(sys.path)), flush=True)
149
  print(sys.path, flush=True)
 
 
150
  print("ENV:", os.getenv("ENV"), flush=True)
151
  print("CHROMA_DIR:", os.getenv("CHROMA_DIR"), flush=True)
152
  print("CHROMA_ROOT:", os.getenv("CHROMA_ROOT"), flush=True)
153
+
154
+ # Preflight: show expected PDF folders
155
+ for b in ["ragg/pdfs/low", "ragg/pdfs/mid", "ragg/pdfs/high", "pdfs/low", "pdfs/mid", "pdfs/high"]:
 
 
 
 
 
 
 
 
 
 
156
  info = _list_dir_sample(b)
157
  print(f" - {b}: exists={info.get('exists')} total={info.get('total', 0)} sample={info.get('sample', [])}", flush=True)
158
 
159
+ if not use_subprocess:
160
+ # Inline mode (default)
161
+ import importlib
162
+ print("[DEBUG] Running inline ingestion...", flush=True)
163
+ mod = importlib.import_module(module_name)
164
+ if not hasattr(mod, "ingest_all_levels"):
165
+ return jsonify({"status": "error", "message": "ingest_all_levels() not found"}), 500
166
+ mod.ingest_all_levels()
167
+ return jsonify({"status": "success", "mode": "inline"}), 200
168
+
169
+ # Subprocess fallback (set INGEST_USE_SUBPROCESS=1 to enable)
170
  cmd = [sys.executable, "-m", module_name]
171
  print("\n[DEBUG] Running subprocess:", cmd, flush=True)
 
172
  result = subprocess.run(
173
  cmd,
174
  capture_output=True,
 
176
  cwd=None,
177
  timeout=timeout_sec
178
  )
 
179
  print("\n[DEBUG] Subprocess completed.", flush=True)
180
  print("Return code:", result.returncode, flush=True)
181
 
 
198
  "returncode": result.returncode,
199
  "stdout": (result.stdout or "")[-4000:],
200
  "stderr": (result.stderr or "")[-4000:],
201
+ "mode": "subprocess"
202
  }
203
  status_code = 200 if result.returncode == 0 else 500
204
  return jsonify(payload), status_code