Peter Mutwiri commited on
Commit
f77f60f
Β·
1 Parent(s): c695b18

refactored background jobs

Browse files
app/main.py CHANGED
@@ -46,7 +46,20 @@ async def lifespan(app: FastAPI):
46
 
47
  app.state.instance_id = f"engine-{uuid.uuid4().hex[:8]}"
48
  logger.info(f"Instance ID: {app.state.instance_id}")
 
49
 
 
 
 
 
 
 
 
 
 
 
 
 
50
  # Validate service health on boot
51
  try:
52
  services = check_all_services()
@@ -93,6 +106,10 @@ async def lifespan(app: FastAPI):
93
  # Start background KPI scheduler
94
  logger.info("⏰ Starting KPI refresh scheduler...")
95
  asyncio.create_task(continuous_kpi_refresh(), name="kpi_scheduler")
 
 
 
 
96
  yield
97
 
98
  # ─── Shutdown ──────────────────────────────────────────────────────────────
 
46
 
47
  app.state.instance_id = f"engine-{uuid.uuid4().hex[:8]}"
48
  logger.info(f"Instance ID: {app.state.instance_id}")
49
+ logger.info("πŸš€ STARTUP SEQUENCE")
50
 
51
+ # βœ… CRITICAL: Set persistent cache dir (survives restarts)
52
+ os.makedirs("/data/hf_cache", exist_ok=True)
53
+ os.environ["HF_HOME"] = "/data/hf_cache"
54
+ os.environ["TRANSFORMERS_CACHE"] = "/data/hf_cache"
55
+ os.environ["HF_HUB_CACHE"] = "/data/hf_cache"
56
+
57
+ # Set Hugging Face cache symlink (if needed)
58
+ cache_dir = pathlib.Path("/data/hf_cache")
59
+ home_cache = pathlib.Path.home() / ".cache" / "huggingface"
60
+ if not home_cache.exists():
61
+ home_cache.parent.mkdir(parents=True, exist_ok=True)
62
+ home_cache.symlink_to(cache_dir)
63
  # Validate service health on boot
64
  try:
65
  services = check_all_services()
 
106
  # Start background KPI scheduler
107
  logger.info("⏰ Starting KPI refresh scheduler...")
108
  asyncio.create_task(continuous_kpi_refresh(), name="kpi_scheduler")
109
+ # Now load LLM service - it will use persistent cache
110
+ from app.service.llm_service import LocalLLMService
111
+ logger.info("πŸ€– LLM service initialized (will use persistent cache)")
112
+
113
  yield
114
 
115
  # ─── Shutdown ──────────────────────────────────────────────────────────────
app/qstash_client.py CHANGED
@@ -1 +1,69 @@
1
- qstash_client.py
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # app/qstash_client.py
2
+ import os
3
+ import logging
4
+ from typing import Optional
5
+
6
+ logger = logging.getLogger(__name__)
7
+
8
+ # βœ… Module-level singleton (survives imports)
9
+ _qstash_client: Optional["Client"] = None
10
+
11
+ def init_qstash_client() -> bool:
12
+ """
13
+ EXPLICITLY initialize QStash client on startup.
14
+ Call this AFTER HF secrets are loaded.
15
+
16
+ Returns:
17
+ bool: True if successful, False if QStash not configured
18
+ """
19
+ global _qstash_client
20
+
21
+ if _qstash_client is not None:
22
+ logger.info("βœ… QStash client already initialized")
23
+ return True
24
+
25
+ token = os.getenv("QSTASH_TOKEN")
26
+ if not token:
27
+ logger.warning("⚠️ QSTASH_TOKEN not found - QStash features disabled")
28
+ return False
29
+
30
+ try:
31
+ from upstash_qstash import Client
32
+
33
+ qstash_url = os.getenv("QSTASH_URL")
34
+ if qstash_url:
35
+ _qstash_client = Client(token=token, url=qstash_url)
36
+ logger.info(f"βœ… QStash client initialized (custom URL: {qstash_url})")
37
+ else:
38
+ _qstash_client = Client(token=token)
39
+ logger.info("βœ… QStash client initialized")
40
+
41
+ return True
42
+
43
+ except ImportError:
44
+ logger.error("❌ upstash_qstash not installed. pip install upstash-qstash")
45
+ return False
46
+ except Exception as e:
47
+ logger.error(f"❌ QStash initialization failed: {e}")
48
+ return False
49
+
50
+ def get_qstash_client() -> Optional["Client"]:
51
+ """
52
+ Get the singleton QStash client.
53
+
54
+ Returns:
55
+ Client instance or None if not initialized
56
+
57
+ Raises:
58
+ RuntimeError: If called before init_qstash_client()
59
+ """
60
+ if _qstash_client is None:
61
+ raise RuntimeError(
62
+ "QStash client not initialized. "
63
+ "Call init_qstash_client() in startup sequence first."
64
+ )
65
+ return _qstash_client
66
+
67
+ def is_qstash_available() -> bool:
68
+ """Check if QStash is configured and ready"""
69
+ return _qstash_client is not None
app/routers/datasources.py CHANGED
@@ -32,7 +32,6 @@ async def create_source_json(
32
  orgId: str = Query(...), # βœ… From Vercel
33
  sourceId: str = Query(...), # βœ… From Vercel
34
  type: str = Query(...), # βœ… From Vercel
35
- background_tasks: BackgroundTasks,
36
  current_user: dict = Depends(get_current_user), # βœ… For auth
37
  _: str = Depends(verify_api_key),
38
  ):
 
32
  orgId: str = Query(...), # βœ… From Vercel
33
  sourceId: str = Query(...), # βœ… From Vercel
34
  type: str = Query(...), # βœ… From Vercel
 
35
  current_user: dict = Depends(get_current_user), # βœ… For auth
36
  _: str = Depends(verify_api_key),
37
  ):
app/service/llm_service.py CHANGED
@@ -20,11 +20,15 @@ class LocalLLMService:
20
  self._load_error = None
21
  self._lock = Lock()
22
 
 
 
 
 
23
  logger.info("πŸš€ Starting background LLM load...")
24
  Thread(target=self._load_model_background, daemon=True).start()
25
 
26
  def _load_model_background(self):
27
- """Load model in background thread"""
28
  with self._lock:
29
  if self._is_loading or self._is_loaded:
30
  return
@@ -33,12 +37,17 @@ class LocalLLMService:
33
  try:
34
  logger.info(f"πŸ€– [BACKGROUND] Loading LLM: {self.model_id}...")
35
 
 
 
 
36
  # Phi-3 tokenizer
37
  self._tokenizer = AutoTokenizer.from_pretrained(
38
  self.model_id,
39
  token=HF_API_TOKEN,
40
- trust_remote_code=True
 
41
  )
 
42
  self._tokenizer.pad_token = self._tokenizer.eos_token
43
 
44
  # Phi-3 model - OPTIMIZED for speed
 
20
  self._load_error = None
21
  self._lock = Lock()
22
 
23
+ # βœ… Use persistent cache
24
+ cache_dir = "/data/hf_cache"
25
+ os.makedirs(cache_dir, exist_ok=True)
26
+
27
  logger.info("πŸš€ Starting background LLM load...")
28
  Thread(target=self._load_model_background, daemon=True).start()
29
 
30
  def _load_model_background(self):
31
+ """Load model in background thread with persistent cache"""
32
  with self._lock:
33
  if self._is_loading or self._is_loaded:
34
  return
 
37
  try:
38
  logger.info(f"πŸ€– [BACKGROUND] Loading LLM: {self.model_id}...")
39
 
40
+ # βœ… Use persistent cache directory
41
+ cache_dir = "/data/hf_cache"
42
+
43
  # Phi-3 tokenizer
44
  self._tokenizer = AutoTokenizer.from_pretrained(
45
  self.model_id,
46
  token=HF_API_TOKEN,
47
+ trust_remote_code=True,
48
+ cache_dir=cache_dir # βœ… Persistent cache
49
  )
50
+ # .
51
  self._tokenizer.pad_token = self._tokenizer.eos_token
52
 
53
  # Phi-3 model - OPTIMIZED for speed