Oviya commited on
Commit
a1f6dcf
·
1 Parent(s): 13c4b11
Files changed (1) hide show
  1. ragg/ingest_trigger.py +94 -10
ragg/ingest_trigger.py CHANGED
@@ -3,6 +3,7 @@ import sys
3
  import subprocess
4
  import threading
5
  import traceback
 
6
  from flask import Blueprint, jsonify, request
7
 
8
  ingest_trigger_bp = Blueprint("ingest_trigger_bp", __name__)
@@ -10,45 +11,128 @@ ingest_trigger_bp = Blueprint("ingest_trigger_bp", __name__)
10
  # Prevent concurrent executions
11
  _ingest_lock = threading.Lock()
12
 
 
 
 
 
 
 
 
 
 
 
 
 
13
  @ingest_trigger_bp.route("/ingest/ping", methods=["GET"])
14
  def ingest_ping():
15
- return jsonify({"ok": True, "cwd": os.getcwd()}), 200
 
 
 
 
 
 
 
 
 
 
 
 
 
 
16
 
17
  @ingest_trigger_bp.route("/ingest", methods=["POST"])
18
  def run_ingest():
19
  if not _ingest_lock.acquire(blocking=False):
 
20
  return jsonify({"status": "busy", "message": "Ingestion already in progress"}), 409
21
 
22
  try:
23
- # Run as module to ensure package imports work
24
  module_name = os.getenv("INGEST_MODULE", "ragg.ingest_all")
 
25
 
 
26
  print("Trigger called", flush=True)
27
  print("Module:", module_name, flush=True)
28
  print("CWD:", os.getcwd(), flush=True)
29
- print("PYTHONPATH:", sys.path, flush=True)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
30
 
31
  result = subprocess.run(
32
- [sys.executable, "-m", module_name],
33
  capture_output=True,
34
  text=True,
35
  cwd=None,
36
- timeout=int(os.getenv("INGEST_TIMEOUT_SEC", "1800")) # default 30 min
37
  )
38
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
39
  payload = {
40
  "status": "success" if result.returncode == 0 else "error",
41
  "returncode": result.returncode,
42
- "stdout": result.stdout[-20000:], # trim if very long
43
- "stderr": result.stderr[-20000:],
 
44
  }
45
- return jsonify(payload), 200 if result.returncode == 0 else 500
 
46
 
47
- except Exception:
 
 
 
 
 
 
48
  return jsonify({
49
  "status": "error",
50
  "message": "trigger crashed",
51
  "traceback": traceback.format_exc()
52
  }), 500
 
53
  finally:
54
- _ingest_lock.release()
 
 
 
 
 
3
  import subprocess
4
  import threading
5
  import traceback
6
+ from glob import glob
7
  from flask import Blueprint, jsonify, request
8
 
9
  ingest_trigger_bp = Blueprint("ingest_trigger_bp", __name__)
 
11
  # Prevent concurrent executions
12
  _ingest_lock = threading.Lock()
13
 
14
+ def _list_dir_sample(path: str, patterns=("*.pdf", "*.PDF"), limit: int = 10):
15
+ try:
16
+ files = []
17
+ for pat in patterns:
18
+ files.extend(glob(os.path.join(path, pat)))
19
+ files.sort()
20
+ total = len(files)
21
+ sample = files[:limit]
22
+ return {"exists": os.path.isdir(path), "total": total, "sample": sample}
23
+ except Exception as e:
24
+ return {"exists": False, "error": str(e)}
25
+
26
  @ingest_trigger_bp.route("/ingest/ping", methods=["GET"])
27
  def ingest_ping():
28
+ info = {
29
+ "ok": True,
30
+ "cwd": os.getcwd(),
31
+ "env": {
32
+ "ENV": os.getenv("ENV"),
33
+ "INGEST_MODULE": os.getenv("INGEST_MODULE", "ragg.ingest_all"),
34
+ "INGEST_TIMEOUT_SEC": os.getenv("INGEST_TIMEOUT_SEC", "1800"),
35
+ "CHROMA_DIR": os.getenv("CHROMA_DIR"),
36
+ "CHROMA_ROOT": os.getenv("CHROMA_ROOT"),
37
+ "HF_HOME": os.getenv("HF_HOME"),
38
+ }
39
+ }
40
+ print("\n=== [PING] ===", flush=True)
41
+ print(info, flush=True)
42
+ return jsonify(info), 200
43
 
44
  @ingest_trigger_bp.route("/ingest", methods=["POST"])
45
  def run_ingest():
46
  if not _ingest_lock.acquire(blocking=False):
47
+ print("[DEBUG] Ingest lock already held → busy", flush=True)
48
  return jsonify({"status": "busy", "message": "Ingestion already in progress"}), 409
49
 
50
  try:
 
51
  module_name = os.getenv("INGEST_MODULE", "ragg.ingest_all")
52
+ timeout_sec = int(os.getenv("INGEST_TIMEOUT_SEC", "1800"))
53
 
54
+ print("\n=== [INGEST TRIGGER] ===", flush=True)
55
  print("Trigger called", flush=True)
56
  print("Module:", module_name, flush=True)
57
  print("CWD:", os.getcwd(), flush=True)
58
+ print("PYTHON:", sys.executable, flush=True)
59
+ print("PYTHONPATH (len={}):".format(len(sys.path)), flush=True)
60
+ print(sys.path, flush=True)
61
+
62
+ # Log key env vars the ingestion relies on
63
+ print("ENV:", os.getenv("ENV"), flush=True)
64
+ print("CHROMA_DIR:", os.getenv("CHROMA_DIR"), flush=True)
65
+ print("CHROMA_ROOT:", os.getenv("CHROMA_ROOT"), flush=True)
66
+ print("HF_HOME:", os.getenv("HF_HOME"), flush=True)
67
+
68
+ # Quick preflight: show whether expected PDF folders exist and sample files
69
+ bases = [
70
+ "ragg/pdfs/low",
71
+ "ragg/pdfs/mid",
72
+ "ragg/pdfs/high",
73
+ "pdfs/low",
74
+ "pdfs/mid",
75
+ "pdfs/high",
76
+ ]
77
+ print("\n[DEBUG] PDF folders preflight:", flush=True)
78
+ for b in bases:
79
+ info = _list_dir_sample(b)
80
+ print(f" - {b}: exists={info.get('exists')} total={info.get('total', 0)} sample={info.get('sample', [])}", flush=True)
81
+
82
+ cmd = [sys.executable, "-m", module_name]
83
+ print("\n[DEBUG] Running subprocess:", cmd, flush=True)
84
 
85
  result = subprocess.run(
86
+ cmd,
87
  capture_output=True,
88
  text=True,
89
  cwd=None,
90
+ timeout=timeout_sec
91
  )
92
 
93
+ print("\n[DEBUG] Subprocess completed.", flush=True)
94
+ print("Return code:", result.returncode, flush=True)
95
+ # Print first and last few lines to avoid massive logs
96
+ def _preview(label, text, head=30, tail=30):
97
+ lines = (text or "").splitlines()
98
+ print(f"\n----- {label} (total lines: {len(lines)}) -----", flush=True)
99
+ for line in lines[:head]:
100
+ print(line, flush=True)
101
+ if len(lines) > head + tail:
102
+ print("... [truncated] ...", flush=True)
103
+ for line in lines[-tail:]:
104
+ print(line, flush=True)
105
+ print("----- END", label, "-----\n", flush=True)
106
+
107
+ _preview("STDOUT", result.stdout)
108
+ _preview("STDERR", result.stderr)
109
+
110
  payload = {
111
  "status": "success" if result.returncode == 0 else "error",
112
  "returncode": result.returncode,
113
+ # Return last ~4000 chars so the client can also see logs in Postman
114
+ "stdout": (result.stdout or "")[-4000:],
115
+ "stderr": (result.stderr or "")[-4000:],
116
  }
117
+ status_code = 200 if result.returncode == 0 else 500
118
+ return jsonify(payload), status_code
119
 
120
+ except subprocess.TimeoutExpired:
121
+ print("[ERROR] Ingestion timed out.", flush=True)
122
+ return jsonify({"status": "timeout"}), 504
123
+
124
+ except Exception as e:
125
+ print("[ERROR] Exception during ingestion:", e, flush=True)
126
+ print(traceback.format_exc(), flush=True)
127
  return jsonify({
128
  "status": "error",
129
  "message": "trigger crashed",
130
  "traceback": traceback.format_exc()
131
  }), 500
132
+
133
  finally:
134
+ try:
135
+ _ingest_lock.release()
136
+ except Exception:
137
+ # Avoid rare "release unlocked lock" noise
138
+ pass