USAMA BHATTI commited on
Commit
a833774
·
1 Parent(s): 239dbce

Feat: Implement Secure Multi-tenant SaaS Architecture with API Key Auth, Domain Whitelisting, and Strict AI Grounding

Browse files
backend/src/api/routes/ingestion.py CHANGED
@@ -1,4 +1,3 @@
1
-
2
  import os
3
  import shutil
4
  from fastapi import APIRouter, UploadFile, File, HTTPException, Form, BackgroundTasks, Depends
@@ -6,11 +5,11 @@ from pydantic import BaseModel
6
  from sqlalchemy.ext.asyncio import AsyncSession
7
  from sqlalchemy.future import select
8
 
9
- # --- Security Imports ---
10
  from backend.src.api.routes.deps import get_current_user
11
  from backend.src.models.user import User
12
 
13
- # --- Internal Services & DB Imports ---
14
  from backend.src.services.ingestion.file_processor import process_file
15
  from backend.src.services.ingestion.crawler import SmartCrawler
16
  from backend.src.services.ingestion.zip_processor import SmartZipProcessor
@@ -20,55 +19,67 @@ from backend.src.models.ingestion import IngestionJob, JobStatus, IngestionType
20
  # --- CONFIG ---
21
  MAX_ZIP_SIZE_MB = 100
22
  MAX_ZIP_SIZE_BYTES = MAX_ZIP_SIZE_MB * 1024 * 1024
 
23
 
24
  router = APIRouter()
25
- UPLOAD_DIRECTORY = "./uploaded_files"
26
 
27
  # ==========================================
28
- # FILE UPLOAD (Protected)
29
  # ==========================================
30
  @router.post("/ingest/upload")
31
  async def upload_and_process_file(
32
  session_id: str = Form(...),
33
  file: UploadFile = File(...),
34
- current_user: User = Depends(get_current_user) # <--- 🔒 TALA LAGA DIYA
 
35
  ):
36
- # (Function logic same rahegi, bas ab current_user mil jayega)
37
  if not os.path.exists(UPLOAD_DIRECTORY):
38
  os.makedirs(UPLOAD_DIRECTORY)
39
 
40
  file_path = os.path.join(UPLOAD_DIRECTORY, file.filename)
41
  try:
 
42
  with open(file_path, "wb") as buffer:
43
  shutil.copyfileobj(file.file, buffer)
44
 
45
- chunks_added = await process_file(file_path, session_id)
46
- if chunks_added <= 0:
47
- raise HTTPException(status_code=400, detail="Could not process file.")
 
 
 
 
 
 
 
 
 
48
 
49
  return {
50
- "message": "File processed successfully",
51
  "filename": file.filename,
52
- "chunks_added": chunks_added,
53
- "session_id": session_id
54
  }
 
55
  except Exception as e:
56
  raise HTTPException(status_code=500, detail=str(e))
57
  finally:
58
- if os.path.exists(file_path):
59
- os.remove(file_path)
60
 
61
  # ==========================================
62
- # WEB CRAWLER (Protected)
63
  # ==========================================
64
  class WebIngestRequest(BaseModel):
65
  url: str
66
  session_id: str
67
  crawl_type: str = "single_page"
68
 
69
- async def run_crawler_task(job_id, url, session_id, crawl_type, db_factory):
 
70
  async with db_factory() as db:
71
- crawler = SmartCrawler(job_id, url, session_id, crawl_type, db)
 
72
  await crawler.start()
73
 
74
  @router.post("/ingest/url")
@@ -76,9 +87,8 @@ async def start_web_ingestion(
76
  request: WebIngestRequest,
77
  background_tasks: BackgroundTasks,
78
  db: AsyncSession = Depends(get_db),
79
- current_user: User = Depends(get_current_user) # <--- 🔒 TALA LAGA DIYA
80
  ):
81
- # (Function logic same rahegi)
82
  new_job = IngestionJob(
83
  session_id=request.session_id,
84
  ingestion_type=IngestionType.URL,
@@ -89,28 +99,21 @@ async def start_web_ingestion(
89
  await db.commit()
90
  await db.refresh(new_job)
91
 
92
- background_tasks.add_task(run_crawler_task, new_job.id, request.url, request.session_id, request.crawl_type, AsyncSessionLocal)
93
- return {"message": "Ingestion job started", "job_id": new_job.id}
94
-
95
- @router.get("/ingest/status/{job_id}")
96
- async def check_job_status(
97
- job_id: int,
98
- db: AsyncSession = Depends(get_db),
99
- current_user: User = Depends(get_current_user) # <--- 🔒 TALA LAGA DIYA
100
- ):
101
- # (Function logic same rahegi)
102
- result = await db.execute(select(IngestionJob).where(IngestionJob.id == job_id))
103
- job = result.scalars().first()
104
- if not job:
105
- raise HTTPException(status_code=404, detail="Job not found")
106
- return job
107
 
108
  # ==========================================
109
- # BULK ZIP UPLOAD (Protected)
110
  # ==========================================
111
- async def run_zip_task(job_id, zip_path, session_id, db_factory):
112
  async with db_factory() as db:
113
- processor = SmartZipProcessor(job_id, zip_path, session_id, db)
 
114
  await processor.start()
115
 
116
  @router.post("/ingest/upload-zip")
@@ -119,13 +122,10 @@ async def upload_and_process_zip(
119
  file: UploadFile = File(...),
120
  background_tasks: BackgroundTasks = BackgroundTasks(),
121
  db: AsyncSession = Depends(get_db),
122
- current_user: User = Depends(get_current_user) # <--- 🔒 TALA LAGA DIYA
123
  ):
124
- # (Function logic same rahegi)
125
  if not file.filename.endswith(".zip"):
126
- raise HTTPException(status_code=400, detail="Only .zip files are allowed.")
127
- if file.size > MAX_ZIP_SIZE_BYTES:
128
- raise HTTPException(status_code=413, detail=f"File too large. Max size is {MAX_ZIP_SIZE_MB} MB.")
129
 
130
  zip_dir = os.path.join(UPLOAD_DIRECTORY, "zips")
131
  os.makedirs(zip_dir, exist_ok=True)
@@ -144,5 +144,26 @@ async def upload_and_process_zip(
144
  await db.commit()
145
  await db.refresh(new_job)
146
 
147
- background_tasks.add_task(run_zip_task, new_job.id, file_path, session_id, AsyncSessionLocal)
148
- return {"message": "Zip processing started", "job_id": new_job.id}
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
  import os
2
  import shutil
3
  from fastapi import APIRouter, UploadFile, File, HTTPException, Form, BackgroundTasks, Depends
 
5
  from sqlalchemy.ext.asyncio import AsyncSession
6
  from sqlalchemy.future import select
7
 
8
+ # --- Security & User Context ---
9
  from backend.src.api.routes.deps import get_current_user
10
  from backend.src.models.user import User
11
 
12
+ # --- Internal Services ---
13
  from backend.src.services.ingestion.file_processor import process_file
14
  from backend.src.services.ingestion.crawler import SmartCrawler
15
  from backend.src.services.ingestion.zip_processor import SmartZipProcessor
 
19
  # --- CONFIG ---
20
  MAX_ZIP_SIZE_MB = 100
21
  MAX_ZIP_SIZE_BYTES = MAX_ZIP_SIZE_MB * 1024 * 1024
22
+ UPLOAD_DIRECTORY = "./uploaded_files"
23
 
24
  router = APIRouter()
 
25
 
26
  # ==========================================
27
+ # 1. INDIVIDUAL FILE UPLOAD (Secure ✅)
28
  # ==========================================
29
  @router.post("/ingest/upload")
30
  async def upload_and_process_file(
31
  session_id: str = Form(...),
32
  file: UploadFile = File(...),
33
+ db: AsyncSession = Depends(get_db), # DB session add ki
34
+ current_user: User = Depends(get_current_user)
35
  ):
 
36
  if not os.path.exists(UPLOAD_DIRECTORY):
37
  os.makedirs(UPLOAD_DIRECTORY)
38
 
39
  file_path = os.path.join(UPLOAD_DIRECTORY, file.filename)
40
  try:
41
+ # File temporary save karein
42
  with open(file_path, "wb") as buffer:
43
  shutil.copyfileobj(file.file, buffer)
44
 
45
+ # 🚀 PASSING USER CONTEXT: process_file ab user_id aur db mang raha hai
46
+ chunks_added = await process_file(
47
+ file_path=file_path,
48
+ session_id=session_id,
49
+ user_id=str(current_user.id),
50
+ db=db
51
+ )
52
+
53
+ if chunks_added == -1: # Database not connected error
54
+ raise HTTPException(status_code=400, detail="Database not connected. Please go to User Settings first.")
55
+ elif chunks_added <= 0:
56
+ raise HTTPException(status_code=400, detail="Could not extract content from file.")
57
 
58
  return {
59
+ "status": "success",
60
  "filename": file.filename,
61
+ "chunks": chunks_added,
62
+ "owner_id": current_user.id
63
  }
64
+ except HTTPException as he: raise he
65
  except Exception as e:
66
  raise HTTPException(status_code=500, detail=str(e))
67
  finally:
68
+ if os.path.exists(file_path): os.remove(file_path)
 
69
 
70
  # ==========================================
71
+ # 2. WEB CRAWLER (Secure Background Task ✅)
72
  # ==========================================
73
  class WebIngestRequest(BaseModel):
74
  url: str
75
  session_id: str
76
  crawl_type: str = "single_page"
77
 
78
+ # Helper to run crawler in background with User ID
79
+ async def run_crawler_task(job_id, url, session_id, crawl_type, db_factory, user_id):
80
  async with db_factory() as db:
81
+ # 🚀 PASSING USER ID: Crawler ko bataya kis ka data hai
82
+ crawler = SmartCrawler(job_id, url, session_id, crawl_type, db, user_id=user_id)
83
  await crawler.start()
84
 
85
  @router.post("/ingest/url")
 
87
  request: WebIngestRequest,
88
  background_tasks: BackgroundTasks,
89
  db: AsyncSession = Depends(get_db),
90
+ current_user: User = Depends(get_current_user)
91
  ):
 
92
  new_job = IngestionJob(
93
  session_id=request.session_id,
94
  ingestion_type=IngestionType.URL,
 
99
  await db.commit()
100
  await db.refresh(new_job)
101
 
102
+ # 🚀 BACKGROUND LINK: Pass user_id to the task
103
+ background_tasks.add_task(
104
+ run_crawler_task,
105
+ new_job.id, request.url, request.session_id, request.crawl_type,
106
+ AsyncSessionLocal, str(current_user.id)
107
+ )
108
+ return {"message": "Crawler started securely", "job_id": new_job.id}
 
 
 
 
 
 
 
 
109
 
110
  # ==========================================
111
+ # 3. BULK ZIP UPLOAD (Secure Background Task ✅)
112
  # ==========================================
113
+ async def run_zip_task(job_id, zip_path, session_id, db_factory, user_id):
114
  async with db_factory() as db:
115
+ # 🚀 PASSING USER ID: Zip processor ab owner-aware hai
116
+ processor = SmartZipProcessor(job_id, zip_path, session_id, db, user_id=user_id)
117
  await processor.start()
118
 
119
  @router.post("/ingest/upload-zip")
 
122
  file: UploadFile = File(...),
123
  background_tasks: BackgroundTasks = BackgroundTasks(),
124
  db: AsyncSession = Depends(get_db),
125
+ current_user: User = Depends(get_current_user)
126
  ):
 
127
  if not file.filename.endswith(".zip"):
128
+ raise HTTPException(status_code=400, detail="Invalid format. ZIP only.")
 
 
129
 
130
  zip_dir = os.path.join(UPLOAD_DIRECTORY, "zips")
131
  os.makedirs(zip_dir, exist_ok=True)
 
144
  await db.commit()
145
  await db.refresh(new_job)
146
 
147
+ # 🚀 BACKGROUND LINK: Pass user_id to the task
148
+ background_tasks.add_task(
149
+ run_zip_task,
150
+ new_job.id, file_path, session_id,
151
+ AsyncSessionLocal, str(current_user.id)
152
+ )
153
+ return {"message": "Secure Zip processing scheduled", "job_id": new_job.id}
154
+
155
+ # ==========================================
156
+ # 4. STATUS CHECKER (Secure ✅)
157
+ # ==========================================
158
+ @router.get("/ingest/status/{job_id}")
159
+ async def check_job_status(
160
+ job_id: int,
161
+ db: AsyncSession = Depends(get_db),
162
+ current_user: User = Depends(get_current_user)
163
+ ):
164
+ # Only allow users to see their own session jobs? (Optional improvement)
165
+ result = await db.execute(select(IngestionJob).where(IngestionJob.id == job_id))
166
+ job = result.scalars().first()
167
+ if not job:
168
+ raise HTTPException(status_code=404, detail="Job not found")
169
+ return job
backend/src/services/ingestion/crawler.py CHANGED
@@ -1,74 +1,94 @@
1
  import asyncio
2
  import requests
 
3
  import numpy as np
4
  from bs4 import BeautifulSoup
5
  from urllib.parse import urljoin
6
  from sqlalchemy.ext.asyncio import AsyncSession
 
 
7
  from backend.src.models.ingestion import IngestionJob, JobStatus
 
8
  from backend.src.services.vector_store.qdrant_adapter import get_vector_store
9
  from langchain_text_splitters import RecursiveCharacterTextSplitter
10
  from langchain_core.documents import Document
11
  from qdrant_client.http import models
12
 
13
- # --- NEW IMPORT ---
14
  from backend.src.services.ingestion.guardrail_factory import predict_with_model
15
 
16
- # --- CONFIGURATION ---
17
  MAX_PAGES_LIMIT = 50
18
 
19
  class SmartCrawler:
20
- def __init__(self, job_id: int, url: str, session_id: str, crawl_type: str, db: AsyncSession):
 
21
  self.job_id = job_id
22
  self.root_url = url
23
  self.session_id = session_id
24
  self.crawl_type = crawl_type
25
  self.db = db
 
26
  self.visited = set()
27
- self.vector_store = get_vector_store()
28
- # YAHAN SE MODEL LOAD HATA DIYA
29
 
30
  async def log_status(self, status: str, processed=0, total=0, error=None):
31
  try:
32
- job = await self.db.get(IngestionJob, self.job_id)
 
 
33
  if job:
34
  job.status = status
35
- job.pages_processed = processed
36
- job.total_pages_found = total
37
  if error:
38
  job.error_message = str(error)
39
  await self.db.commit()
40
  except Exception as e:
41
  print(f"DB Log Error: {e}")
42
 
43
- async def is_ai_unsafe(self, text: str, url: str) -> bool: # <--- Async bana diya
 
44
  """
45
- Non-blocking AI Check using Factory.
46
  """
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
47
  sample_text = text[:300] + " ... " + text[len(text)//2 : len(text)//2 + 300]
48
  label = "This is an e-commerce product page with price, buy button, or shopping cart."
49
 
50
- # --- FIX: Call Factory Async Function ---
51
- # Ab ye server ko block nahi karega
52
  scores = await predict_with_model(sample_text, label)
53
-
54
- # Softmax Calculation
55
  probs = np.exp(scores) / np.sum(np.exp(scores))
56
  entailment_score = probs[1]
57
 
58
- print("\n" + "="*60)
59
- print(f"🤖 AI ANALYSIS REPORT for: {url}")
60
- print("-" * 60)
61
- print(f"📊 Scores -> Contradiction: {probs[0]:.2f}, Entailment: {probs[1]:.2f}, Neutral: {probs[2]:.2f}")
62
- print(f"🎯 Target Score (Entailment): {entailment_score:.4f} (Threshold: 0.5)")
63
-
64
  if entailment_score > 0.5:
65
- print(f"⛔ DECISION: BLOCKED")
66
- print("="*60 + "\n")
67
  return True
68
- else:
69
- print(f"✅ DECISION: ALLOWED")
70
- print("="*60 + "\n")
71
- return False
72
 
73
  async def fetch_page(self, url: str):
74
  try:
@@ -84,12 +104,7 @@ class SmartCrawler:
84
  collection_name=self.vector_store.collection_name,
85
  points_selector=models.FilterSelector(
86
  filter=models.Filter(
87
- must=[
88
- models.FieldCondition(
89
- key="metadata.source",
90
- match=models.MatchValue(value=self.root_url)
91
- )
92
- ]
93
  )
94
  )
95
  )
@@ -101,15 +116,9 @@ class SmartCrawler:
101
  script.extract()
102
 
103
  text = soup.get_text(separator=" ", strip=True)
104
-
105
- if len(text) < 200:
106
- print(f"⚠️ Skipping {url} (Not enough text: {len(text)} chars)")
107
- return False
108
 
109
- # --- AWAIT HERE ---
110
- # Ab hum 'await' use kar rahe hain taake ye background mein chale
111
- if await self.is_ai_unsafe(text, url):
112
- return False
113
 
114
  splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=200)
115
  docs = [Document(page_content=text, metadata={
@@ -125,6 +134,11 @@ class SmartCrawler:
125
 
126
  async def start(self):
127
  try:
 
 
 
 
 
128
  await self.log_status(JobStatus.PROCESSING)
129
  await self.clean_existing_data()
130
 
@@ -134,13 +148,10 @@ class SmartCrawler:
134
 
135
  while queue and total_processed < MAX_PAGES_LIMIT:
136
  current_url = queue.pop(0)
137
-
138
  response = await self.fetch_page(current_url)
139
- if not response or response.status_code != 200:
140
- continue
141
 
142
  soup = BeautifulSoup(response.content, 'html.parser')
143
-
144
  success = await self.process_page(current_url, soup)
145
 
146
  if not success:
 
1
  import asyncio
2
  import requests
3
+ import json # Credentials decode karne ke liye
4
  import numpy as np
5
  from bs4 import BeautifulSoup
6
  from urllib.parse import urljoin
7
  from sqlalchemy.ext.asyncio import AsyncSession
8
+ from sqlalchemy.future import select # Query karne ke liye
9
+
10
  from backend.src.models.ingestion import IngestionJob, JobStatus
11
+ from backend.src.models.integration import UserIntegration # integration model import kiya
12
  from backend.src.services.vector_store.qdrant_adapter import get_vector_store
13
  from langchain_text_splitters import RecursiveCharacterTextSplitter
14
  from langchain_core.documents import Document
15
  from qdrant_client.http import models
16
 
 
17
  from backend.src.services.ingestion.guardrail_factory import predict_with_model
18
 
 
19
  MAX_PAGES_LIMIT = 50
20
 
21
  class SmartCrawler:
22
+ # 1. Init mein 'user_id' add kiya taake hum uski settings dhoond saken
23
+ def __init__(self, job_id: int, url: str, session_id: str, crawl_type: str, db: AsyncSession, user_id: str):
24
  self.job_id = job_id
25
  self.root_url = url
26
  self.session_id = session_id
27
  self.crawl_type = crawl_type
28
  self.db = db
29
+ self.user_id = user_id # Owner ID
30
  self.visited = set()
31
+ self.vector_store = None # Shuru mein None rakhein, verification ke baad fill hoga
 
32
 
33
  async def log_status(self, status: str, processed=0, total=0, error=None):
34
  try:
35
+ # SQL Alchemy 2.0 style query
36
+ result = await self.db.execute(select(IngestionJob).where(IngestionJob.id == self.job_id))
37
+ job = result.scalars().first()
38
  if job:
39
  job.status = status
40
+ job.items_processed = processed # Column name match karein (items_processed)
41
+ job.total_items = total
42
  if error:
43
  job.error_message = str(error)
44
  await self.db.commit()
45
  except Exception as e:
46
  print(f"DB Log Error: {e}")
47
 
48
+ # --- NEW: STRICT DATABASE VERIFICATION SKILL ---
49
+ async def verify_and_connect_db(self) -> bool:
50
  """
51
+ Check if user has a valid Qdrant Cloud integration.
52
  """
53
+ print(f"🔍 Verifying Database for User ID: {self.user_id}")
54
+ try:
55
+ stmt = select(UserIntegration).where(
56
+ UserIntegration.user_id == str(self.user_id),
57
+ UserIntegration.provider == "qdrant",
58
+ UserIntegration.is_active == True
59
+ )
60
+ result = await self.db.execute(stmt)
61
+ integration = result.scalars().first()
62
+
63
+ if not integration:
64
+ error_msg = "❌ No Qdrant Cloud connected. Please go to 'Settings' and connect your database first."
65
+ print(error_msg)
66
+ await self.log_status(JobStatus.FAILED, error=error_msg)
67
+ return False
68
+
69
+ # User ki encrypted/json credentials nikalen
70
+ creds = json.loads(integration.credentials) if isinstance(integration.credentials, str) else integration.credentials
71
+
72
+ # Smart Adapter ko user ki chabiyan (keys) bhejein
73
+ self.vector_store = get_vector_store(credentials=creds)
74
+ return True
75
+
76
+ except Exception as e:
77
+ await self.log_status(JobStatus.FAILED, error=f"Database Connection Error: {str(e)}")
78
+ return False
79
+
80
+ async def is_ai_unsafe(self, text: str, url: str) -> bool:
81
  sample_text = text[:300] + " ... " + text[len(text)//2 : len(text)//2 + 300]
82
  label = "This is an e-commerce product page with price, buy button, or shopping cart."
83
 
 
 
84
  scores = await predict_with_model(sample_text, label)
 
 
85
  probs = np.exp(scores) / np.sum(np.exp(scores))
86
  entailment_score = probs[1]
87
 
 
 
 
 
 
 
88
  if entailment_score > 0.5:
89
+ print(f"⛔ AI BLOCKED (E-commerce): {url}")
 
90
  return True
91
+ return False
 
 
 
92
 
93
  async def fetch_page(self, url: str):
94
  try:
 
104
  collection_name=self.vector_store.collection_name,
105
  points_selector=models.FilterSelector(
106
  filter=models.Filter(
107
+ must=[models.FieldCondition(key="metadata.source", match=models.MatchValue(value=self.root_url))]
 
 
 
 
 
108
  )
109
  )
110
  )
 
116
  script.extract()
117
 
118
  text = soup.get_text(separator=" ", strip=True)
119
+ if len(text) < 200: return False
 
 
 
120
 
121
+ if await self.is_ai_unsafe(text, url): return False
 
 
 
122
 
123
  splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=200)
124
  docs = [Document(page_content=text, metadata={
 
134
 
135
  async def start(self):
136
  try:
137
+ # 1. PEHLA KAAM: Database check karo
138
+ db_ready = await self.verify_and_connect_db()
139
+ if not db_ready:
140
+ return # Stop process if no DB
141
+
142
  await self.log_status(JobStatus.PROCESSING)
143
  await self.clean_existing_data()
144
 
 
148
 
149
  while queue and total_processed < MAX_PAGES_LIMIT:
150
  current_url = queue.pop(0)
 
151
  response = await self.fetch_page(current_url)
152
+ if not response or response.status_code != 200: continue
 
153
 
154
  soup = BeautifulSoup(response.content, 'html.parser')
 
155
  success = await self.process_page(current_url, soup)
156
 
157
  if not success:
backend/src/services/ingestion/file_processor.py CHANGED
@@ -1,94 +1,97 @@
1
- # backend/src/services/ingestion/file_processor.py
2
  import os
3
  import asyncio
 
 
 
 
4
  # Specific Stable Loaders
5
  from langchain_community.document_loaders import (
6
  TextLoader,
7
  PyPDFLoader,
8
  CSVLoader,
9
  Docx2txtLoader,
10
- UnstructuredMarkdownLoader
 
11
  )
12
- # Fallback loader (agar upar walon mein se koi na ho)
13
- from langchain_community.document_loaders import UnstructuredFileLoader
14
  from langchain_text_splitters import RecursiveCharacterTextSplitter
15
  from backend.src.services.vector_store.qdrant_adapter import get_vector_store
 
16
 
17
  def get_loader(file_path: str):
18
  """
19
  Factory function jo file extension ke hisaab se
20
- sabse stable loader return karta hai.
21
  """
22
  ext = os.path.splitext(file_path)[1].lower()
23
 
24
  if ext == ".txt":
25
- # TextLoader sabse fast aur safe hai
26
  return TextLoader(file_path, encoding="utf-8")
27
-
28
  elif ext == ".pdf":
29
- # PyPDFLoader pure python hai, hang nahi hota
30
  return PyPDFLoader(file_path)
31
-
32
  elif ext == ".csv":
33
  return CSVLoader(file_path, encoding="utf-8")
34
-
35
  elif ext in [".doc", ".docx"]:
36
- # Docx2txtLoader light hai
37
  return Docx2txtLoader(file_path)
38
-
39
  elif ext == ".md":
40
- # Markdown ko hum TextLoader se bhi parh sakte hain agar Unstructured tang kare
41
  return TextLoader(file_path, encoding="utf-8")
42
-
43
  else:
44
- # Agar koi ajeeb format ho, tab hum Heavy 'Unstructured' loader try karenge
45
- print(f"INFO: Unknown format '{ext}', attempting to use UnstructuredFileLoader...")
46
  return UnstructuredFileLoader(file_path)
47
 
48
- async def process_file(file_path: str, session_id: str):
 
49
  """
50
- Processes a single uploaded file and adds it to the Vector DB.
51
- Supports: TXT, PDF, CSV, DOCX, MD and others.
52
  """
53
- print(f"INFO: [Ingestion] Starting processing for file: {file_path}")
54
 
55
  try:
56
- # 1. Sahi Loader select karein
57
- loader = get_loader(file_path)
 
 
 
 
 
 
 
 
 
 
 
 
 
58
 
59
- # 2. File Load karein (Thread mein taake server block na ho)
60
- # Note: 'aload()' har loader ke paas nahi hota, isliye hum standard 'load()' ko async wrap karte hain
 
 
 
61
  docs = await asyncio.to_thread(loader.load)
62
 
63
- except Exception as e:
64
- print(f"ERROR: [Ingestion] Failed to load file {file_path}: {e}")
65
- return 0
66
-
67
- if not docs:
68
- print(f"WARNING: [Ingestion] Could not extract any content from {file_path}")
69
- return 0
70
 
71
- # 3. Document ko chunks mein todein
72
- text_splitter = RecursiveCharacterTextSplitter(
73
- chunk_size=1000,
74
- chunk_overlap=200,
75
- length_function=len
76
- )
77
- split_docs = text_splitter.split_documents(docs)
78
-
79
- # Metadata update (Source tracking ke liye)
80
- for doc in split_docs:
81
- doc.metadata["session_id"] = session_id
82
- doc.metadata["file_name"] = os.path.basename(file_path)
83
- # Extension bhi store kar lete hain filter karne ke liye
84
- doc.metadata["file_type"] = os.path.splitext(file_path)[1].lower()
85
 
86
- # 4. Qdrant mein upload karein
87
- try:
88
- vector_store = get_vector_store()
89
  await vector_store.aadd_documents(split_docs)
90
- print(f"SUCCESS: [Ingestion] Processed {len(split_docs)} chunks from {file_path}")
91
  return len(split_docs)
 
92
  except Exception as e:
93
- print(f"ERROR: [Ingestion] Failed to upload to Qdrant: {e}")
94
  return 0
 
 
1
  import os
2
  import asyncio
3
+ import json
4
+ from sqlalchemy.ext.asyncio import AsyncSession
5
+ from sqlalchemy.future import select
6
+
7
  # Specific Stable Loaders
8
  from langchain_community.document_loaders import (
9
  TextLoader,
10
  PyPDFLoader,
11
  CSVLoader,
12
  Docx2txtLoader,
13
+ UnstructuredMarkdownLoader,
14
+ UnstructuredFileLoader
15
  )
 
 
16
  from langchain_text_splitters import RecursiveCharacterTextSplitter
17
  from backend.src.services.vector_store.qdrant_adapter import get_vector_store
18
+ from backend.src.models.integration import UserIntegration # Integration model zaroori hai
19
 
20
  def get_loader(file_path: str):
21
  """
22
  Factory function jo file extension ke hisaab se
23
+ loader return karta hai.
24
  """
25
  ext = os.path.splitext(file_path)[1].lower()
26
 
27
  if ext == ".txt":
 
28
  return TextLoader(file_path, encoding="utf-8")
 
29
  elif ext == ".pdf":
 
30
  return PyPDFLoader(file_path)
 
31
  elif ext == ".csv":
32
  return CSVLoader(file_path, encoding="utf-8")
 
33
  elif ext in [".doc", ".docx"]:
 
34
  return Docx2txtLoader(file_path)
 
35
  elif ext == ".md":
 
36
  return TextLoader(file_path, encoding="utf-8")
 
37
  else:
 
 
38
  return UnstructuredFileLoader(file_path)
39
 
40
+ # --- UPDATED: Added user_id and db session ---
41
+ async def process_file(file_path: str, session_id: str, user_id: str, db: AsyncSession):
42
  """
43
+ Processes a single uploaded file strictly using the USER'S database.
 
44
  """
45
+ print(f"INFO: [Ingestion] Starting secure processing for user {user_id}: {file_path}")
46
 
47
  try:
48
+ # 1. DATABASE VERIFICATION: Check if user has Qdrant connected
49
+ stmt = select(UserIntegration).where(
50
+ UserIntegration.user_id == str(user_id),
51
+ UserIntegration.provider == "qdrant",
52
+ UserIntegration.is_active == True
53
+ )
54
+ result = await db.execute(stmt)
55
+ integration = result.scalars().first()
56
+
57
+ if not integration:
58
+ print(f"❌ ERROR: User {user_id} has no Qdrant connected.")
59
+ return -1 # Special code for 'No Database'
60
+
61
+ # 2. Extract Credentials
62
+ creds = json.loads(integration.credentials) if isinstance(integration.credentials, str) else integration.credentials
63
 
64
+ # 3. Connect to User's Cloud Qdrant (No Fallback to Localhost)
65
+ vector_store = get_vector_store(credentials=creds)
66
+
67
+ # 4. File Loading
68
+ loader = get_loader(file_path)
69
  docs = await asyncio.to_thread(loader.load)
70
 
71
+ if not docs:
72
+ print(f"WARNING: No content extracted from {file_path}")
73
+ return 0
 
 
 
 
74
 
75
+ # 5. Chunks Creation
76
+ text_splitter = RecursiveCharacterTextSplitter(
77
+ chunk_size=1000,
78
+ chunk_overlap=200,
79
+ length_function=len
80
+ )
81
+ split_docs = text_splitter.split_documents(docs)
82
+
83
+ # Metadata logic
84
+ for doc in split_docs:
85
+ doc.metadata["session_id"] = session_id
86
+ doc.metadata["user_id"] = user_id
87
+ doc.metadata["file_name"] = os.path.basename(file_path)
88
+ doc.metadata["source"] = os.path.basename(file_path) # Search ke liye source zaroori hai
89
 
90
+ # 6. Upload to User's Vector DB
 
 
91
  await vector_store.aadd_documents(split_docs)
92
+ print(f"SUCCESS: Processed {len(split_docs)} chunks to user's Cloud Qdrant.")
93
  return len(split_docs)
94
+
95
  except Exception as e:
96
+ print(f"ERROR: [Ingestion] Critical failure: {e}")
97
  return 0
backend/src/services/ingestion/guardrail_factory.py CHANGED
@@ -1,28 +1,45 @@
1
  from sentence_transformers import CrossEncoder
2
- from functools import lru_cache
3
  import asyncio
 
4
 
5
- # Global Cache
6
  _model_instance = None
7
 
8
  def get_guardrail_model():
9
  """
10
- Model ko sirf ek baar load karega.
 
11
  """
12
  global _model_instance
13
  if _model_instance is None:
14
- print("⏳ INFO: Loading AI Guardrail Model into RAM (First Time Only)...")
15
- # 'nli-distilroberta-base' thoda heavy hai, agar PC slow hai to 'cross-encoder/ms-marco-TinyBERT-L-2' use karein
16
- _model_instance = CrossEncoder('cross-encoder/nli-distilroberta-base')
17
- print("✅ INFO: AI Guardrail Model Loaded!")
 
 
 
 
 
 
 
 
18
  return _model_instance
19
 
20
- async def predict_with_model(text, label):
21
  """
22
- Prediction ko background thread mein chalata hai taake server hang na ho.
 
23
  """
24
- model = get_guardrail_model()
25
-
26
- # Ye line magic hai: Heavy kaam ko alag thread mein bhej do
27
- scores = await asyncio.to_thread(model.predict, [(text, label)])
28
- return scores[0]
 
 
 
 
 
 
 
 
1
  from sentence_transformers import CrossEncoder
 
2
  import asyncio
3
+ import os
4
 
5
+ # Global Cache for Singleton Pattern
6
  _model_instance = None
7
 
8
  def get_guardrail_model():
9
  """
10
+ Skill: AI Guardrail Loader. Loads model into RAM only once.
11
+ Optimized for SaaS performance.
12
  """
13
  global _model_instance
14
  if _model_instance is None:
15
+ # Railway RAM optimization: Agar heavy model crash kare, toh TinyBERT use karein
16
+ # Default: nli-distilroberta-base
17
+ model_name = os.getenv("GUARDRAIL_MODEL", "cross-encoder/nli-distilroberta-base")
18
+
19
+ print(f"⏳ [AI-Guardrail] Loading Model: {model_name}...")
20
+ try:
21
+ _model_instance = CrossEncoder(model_name)
22
+ print("✅ [AI-Guardrail] Model ready for inference.")
23
+ except Exception as e:
24
+ print(f"❌ [AI-Guardrail] Failed to load model: {e}")
25
+ raise e
26
+
27
  return _model_instance
28
 
29
+ async def predict_with_model(text: str, label: str):
30
  """
31
+ Skill: Asynchronous AI Prediction.
32
+ Ensures that heavy CPU tasks don't block the FastAPI event loop.
33
  """
34
+ try:
35
+ model = get_guardrail_model()
36
+
37
+ # Heavy computation offloaded to a separate thread (Non-blocking SaaS)
38
+ scores = await asyncio.to_thread(model.predict, [(text, label)])
39
+
40
+ # Returning only the score list
41
+ return scores[0]
42
+ except Exception as e:
43
+ print(f"⚠️ [AI-Guardrail] Prediction Error: {e}")
44
+ # Default score return (Neutral/Allow) in case of error to keep ingestion running
45
+ return [0.0, 0.0, 0.0]
backend/src/services/ingestion/web_processor.py CHANGED
@@ -1,17 +1,40 @@
1
  import asyncio
 
 
 
 
2
  from langchain_community.document_loaders import WebBaseLoader
3
  from langchain_text_splitters import RecursiveCharacterTextSplitter
4
  from backend.src.services.vector_store.qdrant_adapter import get_vector_store
 
5
 
6
- async def process_url(url: str, session_id: str):
7
  """
8
- Ek URL se data scrape karta hai, chunks banata hai aur Qdrant mein save karta hai.
9
  """
10
- print(f"INFO: [Ingestion] Starting scraping for URL: {url}")
11
 
12
  try:
13
- # 1. Load Data from URL
14
- # Hum loader ko async thread mein chalayenge taake server block na ho
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
15
  def load_data():
16
  loader = WebBaseLoader(url)
17
  return loader.load()
@@ -22,32 +45,28 @@ async def process_url(url: str, session_id: str):
22
  print(f"WARNING: [Ingestion] No content found at {url}")
23
  return 0
24
 
25
- print(f"INFO: [Ingestion] Successfully fetched content. Length: {len(docs[0].page_content)} chars.")
26
 
27
- except Exception as e:
28
- print(f"ERROR: [Ingestion] Failed to scrape URL {url}: {e}")
29
- raise e # Error upar bhejenge taake API user ko bata sake
30
-
31
- # 2. Split Text into Chunks
32
- text_splitter = RecursiveCharacterTextSplitter(
33
- chunk_size=1000,
34
- chunk_overlap=200,
35
- length_function=len
36
- )
37
- split_docs = text_splitter.split_documents(docs)
38
-
39
- # 3. Add Metadata (Bohat Zaroori)
40
- for doc in split_docs:
41
- doc.metadata["session_id"] = session_id
42
- doc.metadata["source"] = url # Taake pata chale ye data kahan se aaya
43
- doc.metadata["type"] = "web_scrape"
44
 
45
- # 4. Save to Qdrant
46
- try:
47
- vector_store = get_vector_store()
48
  await vector_store.aadd_documents(split_docs)
49
- print(f"SUCCESS: [Ingestion] Processed {len(split_docs)} chunks from {url}")
50
  return len(split_docs)
 
51
  except Exception as e:
52
- print(f"ERROR: [Ingestion] Failed to upload to Qdrant: {e}")
53
  return 0
 
1
  import asyncio
2
+ import json
3
+ from sqlalchemy.ext.asyncio import AsyncSession
4
+ from sqlalchemy.future import select
5
+
6
  from langchain_community.document_loaders import WebBaseLoader
7
  from langchain_text_splitters import RecursiveCharacterTextSplitter
8
  from backend.src.services.vector_store.qdrant_adapter import get_vector_store
9
+ from backend.src.models.integration import UserIntegration # SaaS Logic ke liye
10
 
11
+ async def process_url(url: str, session_id: str, user_id: str, db: AsyncSession):
12
  """
13
+ SaaS Skill: Scrapes a URL strictly into the USER'S personal Cloud Qdrant.
14
  """
15
+ print(f"INFO: [Ingestion] Verifying Database for User {user_id} before scraping: {url}")
16
 
17
  try:
18
+ # 1. PEHLA KAAM: Database Verification (No Key, No Scrape)
19
+ stmt = select(UserIntegration).where(
20
+ UserIntegration.user_id == str(user_id),
21
+ UserIntegration.provider == "qdrant",
22
+ UserIntegration.is_active == True
23
+ )
24
+ result = await db.execute(stmt)
25
+ integration = result.scalars().first()
26
+
27
+ if not integration:
28
+ print(f"❌ ERROR: User {user_id} has no Qdrant connected.")
29
+ return -1 # 'No Database' code for the API to handle
30
+
31
+ # 2. Extract User's Secret Credentials
32
+ creds = json.loads(integration.credentials) if isinstance(integration.credentials, str) else integration.credentials
33
+
34
+ # 3. Secure Connection to Cloud (Passing credentials)
35
+ vector_store = get_vector_store(credentials=creds)
36
+
37
+ # 4. Load Data from URL (Async Thread)
38
  def load_data():
39
  loader = WebBaseLoader(url)
40
  return loader.load()
 
45
  print(f"WARNING: [Ingestion] No content found at {url}")
46
  return 0
47
 
48
+ print(f"INFO: [Ingestion] Scrape Success. Content Length: {len(docs[0].page_content)} chars.")
49
 
50
+ # 5. Text Splitting (Chunks)
51
+ text_splitter = RecursiveCharacterTextSplitter(
52
+ chunk_size=1000,
53
+ chunk_overlap=200,
54
+ length_function=len
55
+ )
56
+ split_docs = text_splitter.split_documents(docs)
57
+
58
+ # 6. Add Strict Metadata for Multi-tenancy
59
+ for doc in split_docs:
60
+ doc.metadata["session_id"] = session_id
61
+ doc.metadata["user_id"] = user_id # Zaroori: Taake chat sirf apna data dhoonde
62
+ doc.metadata["source"] = url
63
+ doc.metadata["type"] = "web_scrape"
 
 
 
64
 
65
+ # 7. Upload to User's Vector DB
 
 
66
  await vector_store.aadd_documents(split_docs)
67
+ print(f"SUCCESS: [Ingestion] {len(split_docs)} chunks synced to User's Cloud Database.")
68
  return len(split_docs)
69
+
70
  except Exception as e:
71
+ print(f"ERROR: [Ingestion] Processing failed for {url}: {e}")
72
  return 0
backend/src/services/ingestion/zip_processor.py CHANGED
@@ -2,44 +2,81 @@ import zipfile
2
  import os
3
  import shutil
4
  import asyncio
 
5
  from sqlalchemy.ext.asyncio import AsyncSession
 
 
6
  from backend.src.models.ingestion import IngestionJob, JobStatus
 
7
  from backend.src.services.ingestion.file_processor import process_file
8
  from backend.src.services.vector_store.qdrant_adapter import get_vector_store
9
  from qdrant_client.http import models
10
 
11
- # --- CONFIGURATION ---
12
  SUPPORTED_EXTENSIONS = ['.pdf', '.txt', '.md', '.docx', '.csv']
13
  MAX_FILES_IN_ZIP = 500
14
 
15
  class SmartZipProcessor:
16
- def __init__(self, job_id: int, zip_path: str, session_id: str, db: AsyncSession):
 
17
  self.job_id = job_id
18
  self.zip_path = zip_path
19
  self.session_id = session_id
20
  self.db = db
21
- self.vector_store = get_vector_store()
 
22
  self.temp_dir = f"./temp_unzip_{job_id}"
23
  self.report = []
24
 
25
  async def log_status(self, status: str, processed=0, total=0, error=None):
26
- """Database mein job status update karta hai"""
27
  try:
28
- job = await self.db.get(IngestionJob, self.job_id)
 
 
29
  if job:
30
  job.status = status
31
  job.items_processed = processed
32
  job.total_items = total
33
- job.details = self.report # Report bhi save karo
34
  if error:
35
  job.error_message = str(error)
36
  await self.db.commit()
37
  except Exception as e:
38
  print(f"DB Log Error: {e}")
39
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
40
  async def clean_existing_data(self):
41
- """Update Logic: Is session ka purana data saaf karo"""
42
- print(f"INFO: Cleaning old data for session_id: {self.session_id}")
43
  try:
44
  self.vector_store.client.delete(
45
  collection_name=self.vector_store.collection_name,
@@ -49,69 +86,76 @@ class SmartZipProcessor:
49
  models.FieldCondition(
50
  key="metadata.session_id",
51
  match=models.MatchValue(value=self.session_id)
 
 
 
 
 
52
  )
53
  ]
54
  )
55
  )
56
  )
57
  except Exception as e:
58
- print(f"Warning: Clean data failed (maybe first upload): {e}")
59
 
60
  def inspect_zip(self) -> list:
61
- """Zip ko bina extract kiye check karta hai"""
62
  with zipfile.ZipFile(self.zip_path, 'r') as zf:
63
  file_list = zf.infolist()
64
-
65
- # Guardrail 1: File Count
66
  if len(file_list) > MAX_FILES_IN_ZIP:
67
- raise ValueError(f"Zip contains too many files ({len(file_list)}). Max allowed is {MAX_FILES_IN_ZIP}.")
68
-
69
- # Sirf "Files" return karo, folders nahi
70
  return [f for f in file_list if not f.is_dir()]
71
 
72
  def extract_zip(self):
73
- """Zip ko temp folder mein extract karta hai"""
74
  os.makedirs(self.temp_dir, exist_ok=True)
75
  with zipfile.ZipFile(self.zip_path, 'r') as zf:
76
  zf.extractall(self.temp_dir)
77
 
78
  def cleanup(self):
79
- """Temp files/folders delete karta hai"""
80
  if os.path.exists(self.temp_dir):
81
  shutil.rmtree(self.temp_dir)
82
  if os.path.exists(self.zip_path):
83
  os.remove(self.zip_path)
84
 
85
  async def start(self):
86
- """Main Processing Loop"""
87
  try:
88
- # Step 1: Inspect
 
 
 
89
  files_to_process = self.inspect_zip()
90
  total_files = len(files_to_process)
91
  await self.log_status(JobStatus.PROCESSING, total=total_files)
92
 
93
- # Step 2: Clean old data (Atomic Update)
94
  await self.clean_existing_data()
95
 
96
- # Step 3: Extract
97
  self.extract_zip()
98
 
99
- # Step 4: Process each file
100
  processed_count = 0
101
  for file_info in files_to_process:
102
  file_path = os.path.join(self.temp_dir, file_info.filename)
103
 
104
- # Guardrail 2: Supported Extension
105
  ext = os.path.splitext(file_path)[1].lower()
106
  if ext not in SUPPORTED_EXTENSIONS:
107
  self.report.append({"file": file_info.filename, "status": "skipped", "reason": "unsupported_type"})
108
  continue
109
 
110
- # Process the file
111
  try:
112
- # process_file (jo humne pehle banaya tha) ko call karo
113
- chunks_added = await process_file(file_path, self.session_id)
114
- if chunks_added > 0:
 
 
 
 
 
 
 
 
 
115
  self.report.append({"file": file_info.filename, "status": "success", "chunks": chunks_added})
116
  else:
117
  raise ValueError("No content extracted")
@@ -120,10 +164,10 @@ class SmartZipProcessor:
120
 
121
  processed_count += 1
122
  await self.log_status(JobStatus.PROCESSING, processed=processed_count, total=total_files)
123
- await asyncio.sleep(0.1) # Thoda saans lene do
124
 
125
  await self.log_status(JobStatus.COMPLETED, processed=processed_count, total=total_files)
126
- print(f"SUCCESS: Zip processing finished. Processed {processed_count}/{total_files} files.")
127
 
128
  except Exception as e:
129
  print(f"ERROR: Zip processing failed: {e}")
 
2
  import os
3
  import shutil
4
  import asyncio
5
+ import json
6
  from sqlalchemy.ext.asyncio import AsyncSession
7
+ from sqlalchemy.future import select
8
+
9
  from backend.src.models.ingestion import IngestionJob, JobStatus
10
+ from backend.src.models.integration import UserIntegration # SaaS Logic
11
  from backend.src.services.ingestion.file_processor import process_file
12
  from backend.src.services.vector_store.qdrant_adapter import get_vector_store
13
  from qdrant_client.http import models
14
 
 
15
  SUPPORTED_EXTENSIONS = ['.pdf', '.txt', '.md', '.docx', '.csv']
16
  MAX_FILES_IN_ZIP = 500
17
 
18
  class SmartZipProcessor:
19
+ # 1. Init mein 'user_id' add kiya
20
+ def __init__(self, job_id: int, zip_path: str, session_id: str, db: AsyncSession, user_id: str):
21
  self.job_id = job_id
22
  self.zip_path = zip_path
23
  self.session_id = session_id
24
  self.db = db
25
+ self.user_id = user_id # Owner ID
26
+ self.vector_store = None # Verification ke baad initialize hoga
27
  self.temp_dir = f"./temp_unzip_{job_id}"
28
  self.report = []
29
 
30
  async def log_status(self, status: str, processed=0, total=0, error=None):
 
31
  try:
32
+ # SQL Alchemy 2.0 style query
33
+ result = await self.db.execute(select(IngestionJob).where(IngestionJob.id == self.job_id))
34
+ job = result.scalars().first()
35
  if job:
36
  job.status = status
37
  job.items_processed = processed
38
  job.total_items = total
39
+ job.details = self.report
40
  if error:
41
  job.error_message = str(error)
42
  await self.db.commit()
43
  except Exception as e:
44
  print(f"DB Log Error: {e}")
45
 
46
+ # --- NEW: SaaS DATABASE VERIFICATION ---
47
+ async def verify_and_connect_db(self) -> bool:
48
+ """
49
+ ZIP processing se pehle check karo ke user ka Qdrant Cloud connected hai ya nahi.
50
+ """
51
+ print(f"🔍 Verifying Database for ZIP Processing. User ID: {self.user_id}")
52
+ try:
53
+ stmt = select(UserIntegration).where(
54
+ UserIntegration.user_id == str(self.user_id),
55
+ UserIntegration.provider == "qdrant",
56
+ UserIntegration.is_active == True
57
+ )
58
+ result = await self.db.execute(stmt)
59
+ integration = result.scalars().first()
60
+
61
+ if not integration:
62
+ error_msg = "❌ No Qdrant Cloud connected. Cannot process ZIP."
63
+ await self.log_status(JobStatus.FAILED, error=error_msg)
64
+ return False
65
+
66
+ # Extract Credentials
67
+ creds = json.loads(integration.credentials) if isinstance(integration.credentials, str) else integration.credentials
68
+
69
+ # Smart Adapter ko user ki chabiyan bhejien (No Fallback)
70
+ self.vector_store = get_vector_store(credentials=creds)
71
+ return True
72
+
73
+ except Exception as e:
74
+ print(f"❌ DB Verification Failed: {e}")
75
+ return False
76
+
77
  async def clean_existing_data(self):
78
+ """SaaS Logic: Sirf is session aur is user ka purana data delete karo"""
79
+ print(f"INFO: Cleaning old data for session: {self.session_id}")
80
  try:
81
  self.vector_store.client.delete(
82
  collection_name=self.vector_store.collection_name,
 
86
  models.FieldCondition(
87
  key="metadata.session_id",
88
  match=models.MatchValue(value=self.session_id)
89
+ ),
90
+ # SECURITY: Ensure we only delete THIS user's data
91
+ models.FieldCondition(
92
+ key="metadata.user_id",
93
+ match=models.MatchValue(value=str(self.user_id))
94
  )
95
  ]
96
  )
97
  )
98
  )
99
  except Exception as e:
100
+ print(f"Warning: Clean data failed: {e}")
101
 
102
  def inspect_zip(self) -> list:
 
103
  with zipfile.ZipFile(self.zip_path, 'r') as zf:
104
  file_list = zf.infolist()
 
 
105
  if len(file_list) > MAX_FILES_IN_ZIP:
106
+ raise ValueError(f"Zip too large: {len(file_list)} files.")
 
 
107
  return [f for f in file_list if not f.is_dir()]
108
 
109
  def extract_zip(self):
 
110
  os.makedirs(self.temp_dir, exist_ok=True)
111
  with zipfile.ZipFile(self.zip_path, 'r') as zf:
112
  zf.extractall(self.temp_dir)
113
 
114
  def cleanup(self):
 
115
  if os.path.exists(self.temp_dir):
116
  shutil.rmtree(self.temp_dir)
117
  if os.path.exists(self.zip_path):
118
  os.remove(self.zip_path)
119
 
120
  async def start(self):
 
121
  try:
122
+ # 1. PEHLA KAAM: Database check
123
+ db_ready = await self.verify_and_connect_db()
124
+ if not db_ready: return
125
+
126
  files_to_process = self.inspect_zip()
127
  total_files = len(files_to_process)
128
  await self.log_status(JobStatus.PROCESSING, total=total_files)
129
 
130
+ # 2. Atomic Clean
131
  await self.clean_existing_data()
132
 
133
+ # 3. Extract
134
  self.extract_zip()
135
 
136
+ # 4. Loop through files
137
  processed_count = 0
138
  for file_info in files_to_process:
139
  file_path = os.path.join(self.temp_dir, file_info.filename)
140
 
 
141
  ext = os.path.splitext(file_path)[1].lower()
142
  if ext not in SUPPORTED_EXTENSIONS:
143
  self.report.append({"file": file_info.filename, "status": "skipped", "reason": "unsupported_type"})
144
  continue
145
 
 
146
  try:
147
+ # process_file (jo humne pehle update kiya tha) ko call karo
148
+ # Ab isko 'user_id' aur 'db' session bhi bhej rahe hain 🚀
149
+ chunks_added = await process_file(
150
+ file_path=file_path,
151
+ session_id=self.session_id,
152
+ user_id=self.user_id,
153
+ db=self.db
154
+ )
155
+
156
+ if chunks_added == -1: # No Database error from process_file
157
+ raise ValueError("Database connection lost or not configured.")
158
+ elif chunks_added > 0:
159
  self.report.append({"file": file_info.filename, "status": "success", "chunks": chunks_added})
160
  else:
161
  raise ValueError("No content extracted")
 
164
 
165
  processed_count += 1
166
  await self.log_status(JobStatus.PROCESSING, processed=processed_count, total=total_files)
167
+ await asyncio.sleep(0.05)
168
 
169
  await self.log_status(JobStatus.COMPLETED, processed=processed_count, total=total_files)
170
+ print(f"SUCCESS: Secure Zip ingestion complete.")
171
 
172
  except Exception as e:
173
  print(f"ERROR: Zip processing failed: {e}")
backend/src/services/vector_store/qdrant_adapter.py CHANGED
@@ -1,78 +1,50 @@
1
-
2
- import qdrant_client
3
  from qdrant_client import QdrantClient
4
  from qdrant_client.http import models
5
  from langchain_qdrant import QdrantVectorStore
6
- from backend.src.core.config import settings
7
  from backend.src.services.embeddings.factory import get_embedding_model
8
- from typing import Optional, Dict
9
 
10
- # @lru_cache() HATA DIYA - We can't cache user-specific connections
11
- def get_vector_store(credentials: Optional[Dict[str, str]] = None):
12
  """
13
- Dynamic Vector Store Connector.
14
- 1. Agar 'credentials' hain, to unhein use karega (User's Cloud Qdrant).
15
- 2. Agar nahi, to global settings use karega (Fallback/Admin).
16
  """
17
- embedding_model = get_embedding_model() # Ye local hai, isko keys nahi chahiye
18
-
19
- # --- DYNAMIC CONFIGURATION LOGIC ---
20
- if credentials:
21
- # User-specific Cloud settings
22
- qdrant_url = credentials.get("url")
23
- qdrant_api_key = credentials.get("api_key")
24
- collection_name = credentials.get("collection_name", "user_default_collection")
25
- else:
26
- # Global fallback settings
27
- qdrant_url = settings.QDRANT_URL
28
- qdrant_api_key = settings.QDRANT_API_KEY
29
- collection_name = settings.QDRANT_COLLECTION_NAME
30
 
31
- if not qdrant_url:
32
- raise ValueError("Qdrant URL is not configured for this user or globally.")
 
33
 
34
- print(f"INFO: [VectorDB] Connecting to Qdrant at '{qdrant_url}'...")
35
-
36
- # 1. Qdrant Client banayen (User ki keys ke sath)
37
- client = QdrantClient(
38
- url=qdrant_url,
39
- api_key=qdrant_api_key,
40
- )
41
 
42
- # 2. CHECK: Kya Collection exist karti hai?
43
- # Hum 'try-except' use karenge taake connection errors bhi pakde jayen
44
  try:
45
- # collection_exists is deprecated, use get_collection instead
46
- client.get_collection(collection_name=collection_name)
47
- print(f"INFO: [VectorDB] Collection '{collection_name}' already exists.")
48
- except Exception as e:
49
- # Agar error "Not found" hai, to collection banayenge
50
- if "404" in str(e) or "Not found" in str(e):
51
- print(f"INFO: Collection '{collection_name}' not found. Creating it now...")
52
-
53
- # Embedding size pata karna
54
- dummy_embedding = embedding_model.embed_query("test")
55
- vector_size = len(dummy_embedding)
56
-
57
  client.create_collection(
58
  collection_name=collection_name,
59
- vectors_config=models.VectorParams(
60
- size=vector_size,
61
- distance=models.Distance.COSINE
62
- )
63
  )
64
- print(f"SUCCESS: Created collection '{collection_name}' with vector size {vector_size}.")
65
- else:
66
- # Koi aur error (e.g., connection refused)
67
- raise ConnectionError(f"Failed to connect or access Qdrant: {e}")
68
-
69
- # 3. Vector Store object bana kar return karein
70
- vector_store = QdrantVectorStore(
71
- client=client,
72
- collection_name=collection_name,
73
- embedding=embedding_model,
74
- content_payload_key="page_content",
75
- metadata_payload_key="metadata"
76
- )
77
 
78
- return vector_store
 
 
 
 
 
 
 
 
 
1
+ # backend/src/services/vector_store/qdrant_adapter.py
 
2
  from qdrant_client import QdrantClient
3
  from qdrant_client.http import models
4
  from langchain_qdrant import QdrantVectorStore
 
5
  from backend.src.services.embeddings.factory import get_embedding_model
6
+ from typing import Dict
7
 
8
+ def get_vector_store(credentials: Dict[str, str]):
 
9
  """
10
+ Strict SaaS Vector Store Connector.
11
+ NO GLOBAL FALLBACK. User MUST provide their own Cloud Qdrant.
 
12
  """
13
+ if not credentials or not credentials.get("url"):
14
+ # Yeh error seedha user ko dikhayi dega
15
+ raise ValueError("Database Connection Missing: Please connect your Qdrant Cloud in 'User Settings' first.")
 
 
 
 
 
 
 
 
 
 
16
 
17
+ qdrant_url = credentials.get("url")
18
+ qdrant_api_key = credentials.get("api_key")
19
+ collection_name = credentials.get("collection_name", "user_default_collection")
20
 
21
+ # Cloud Check: Ensure HTTPS
22
+ if "cloud.qdrant.io" in qdrant_url and not qdrant_url.startswith("https://"):
23
+ qdrant_url = f"https://{qdrant_url}"
 
 
 
 
24
 
25
+ print(f"📡 [VectorDB] Strictly connecting to User Database: {qdrant_url}")
26
+
27
  try:
28
+ client = QdrantClient(url=qdrant_url, api_key=qdrant_api_key, timeout=30)
29
+
30
+ # Collection check/create logic
31
+ try:
32
+ client.get_collection(collection_name=collection_name)
33
+ except Exception:
34
+ print(f"Creating new collection: {collection_name}")
35
+ embedding_model = get_embedding_model()
36
+ vector_size = len(embedding_model.embed_query("test"))
 
 
 
37
  client.create_collection(
38
  collection_name=collection_name,
39
+ vectors_config=models.VectorParams(size=vector_size, distance=models.Distance.COSINE)
 
 
 
40
  )
 
 
 
 
 
 
 
 
 
 
 
 
 
41
 
42
+ return QdrantVectorStore(
43
+ client=client,
44
+ collection_name=collection_name,
45
+ embedding=get_embedding_model(),
46
+ content_payload_key="page_content",
47
+ metadata_payload_key="metadata"
48
+ )
49
+ except Exception as e:
50
+ raise ConnectionError(f"Qdrant Connection Failed: {str(e)}")