feat: implement backend ingestion pipeline with Celery workers, Qdrant integration, and session management
Browse files- backend/clean_db.py +18 -12
- backend/main.py +35 -34
- backend/requirements.txt +2 -0
- backend/src/config.py +3 -2
- backend/src/matching/stage1.py +20 -18
- backend/src/routers/matching.py +6 -6
- backend/src/routers/sessions.py +14 -7
- backend/src/workers/celery_app.py +0 -2
- backend/src/workers/ingest.py +28 -26
- backend/test_pinecone.py +0 -36
backend/clean_db.py
CHANGED
|
@@ -2,7 +2,8 @@ import asyncio
|
|
| 2 |
from src.database import engine
|
| 3 |
from sqlalchemy import text
|
| 4 |
from src.config import get_settings
|
| 5 |
-
from
|
|
|
|
| 6 |
|
| 7 |
async def main():
|
| 8 |
async with engine.begin() as conn:
|
|
@@ -11,22 +12,27 @@ async def main():
|
|
| 11 |
await conn.execute(text('CREATE SCHEMA public'))
|
| 12 |
print('Postgres schema wiped.')
|
| 13 |
|
|
|
|
| 14 |
settings = get_settings()
|
| 15 |
try:
|
| 16 |
-
|
| 17 |
-
|
| 18 |
-
|
| 19 |
-
pc.delete_index(settings.index_name)
|
| 20 |
|
| 21 |
-
|
| 22 |
-
|
| 23 |
-
|
| 24 |
-
|
| 25 |
-
|
| 26 |
)
|
| 27 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 28 |
except Exception as e:
|
| 29 |
-
print('
|
| 30 |
|
| 31 |
print("\n------------------------------")
|
| 32 |
print("Database is completely purged but empty.")
|
|
|
|
| 2 |
from src.database import engine
|
| 3 |
from sqlalchemy import text
|
| 4 |
from src.config import get_settings
|
| 5 |
+
from qdrant_client import QdrantClient
|
| 6 |
+
from qdrant_client.models import Distance, VectorParams, PayloadSchemaType
|
| 7 |
|
| 8 |
async def main():
|
| 9 |
async with engine.begin() as conn:
|
|
|
|
| 12 |
await conn.execute(text('CREATE SCHEMA public'))
|
| 13 |
print('Postgres schema wiped.')
|
| 14 |
|
| 15 |
+
import qdrant_client
|
| 16 |
settings = get_settings()
|
| 17 |
try:
|
| 18 |
+
q = QdrantClient(url=settings.qdrant_url, api_key=settings.qdrant_api_key)
|
| 19 |
+
q.delete_collection(settings.collection_name)
|
| 20 |
+
q.create_collection(settings.collection_name, vectors_config=VectorParams(size=384, distance=Distance.COSINE))
|
|
|
|
| 21 |
|
| 22 |
+
# Reinject indices required natively by the pipeline
|
| 23 |
+
q.create_payload_index(
|
| 24 |
+
collection_name=settings.collection_name,
|
| 25 |
+
field_name="session_id",
|
| 26 |
+
field_schema=PayloadSchemaType.KEYWORD
|
| 27 |
)
|
| 28 |
+
q.create_payload_index(
|
| 29 |
+
collection_name=settings.collection_name,
|
| 30 |
+
field_name="years_of_experience",
|
| 31 |
+
field_schema=PayloadSchemaType.FLOAT
|
| 32 |
+
)
|
| 33 |
+
print('Qdrant collection wiped and re-indexed.')
|
| 34 |
except Exception as e:
|
| 35 |
+
print('Qdrant error:', e)
|
| 36 |
|
| 37 |
print("\n------------------------------")
|
| 38 |
print("Database is completely purged but empty.")
|
backend/main.py
CHANGED
|
@@ -1,12 +1,11 @@
|
|
| 1 |
import os
|
| 2 |
import logging
|
| 3 |
-
import json
|
| 4 |
-
import redis.asyncio as redis
|
| 5 |
from contextlib import asynccontextmanager
|
| 6 |
from fastapi import FastAPI, Request
|
| 7 |
from fastapi.middleware.cors import CORSMiddleware
|
| 8 |
from fastapi.staticfiles import StaticFiles
|
| 9 |
-
from
|
|
|
|
| 10 |
|
| 11 |
from src.config import get_settings
|
| 12 |
from src.models import JobDescription, Candidate, MatchResult, Session
|
|
@@ -15,48 +14,52 @@ from src.routers import jds, candidates, matching, sessions
|
|
| 15 |
logger = logging.getLogger(__name__)
|
| 16 |
settings = get_settings()
|
| 17 |
|
| 18 |
-
|
| 19 |
-
|
| 20 |
-
_redis_client: redis.Redis | None = None
|
| 21 |
|
| 22 |
|
| 23 |
-
def
|
| 24 |
-
return
|
| 25 |
-
|
| 26 |
-
|
| 27 |
-
def get_redis() -> redis.Redis:
|
| 28 |
-
return _redis_client
|
| 29 |
|
| 30 |
|
| 31 |
@asynccontextmanager
|
| 32 |
async def lifespan(app: FastAPI):
|
| 33 |
-
global
|
| 34 |
-
|
| 35 |
-
_redis_client = redis.from_url(settings.redis_url, decode_responses=False)
|
| 36 |
|
| 37 |
try:
|
| 38 |
-
existing = [
|
| 39 |
-
if settings.
|
| 40 |
-
|
| 41 |
-
|
| 42 |
-
|
| 43 |
-
|
| 44 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 45 |
)
|
| 46 |
-
|
| 47 |
-
logger.info("
|
| 48 |
except Exception as exc:
|
| 49 |
-
|
| 50 |
logger.warning(
|
| 51 |
-
"
|
|
|
|
| 52 |
exc,
|
| 53 |
)
|
| 54 |
|
| 55 |
-
app.state.
|
| 56 |
-
app.state.
|
| 57 |
-
app.state.redis = _redis_client
|
| 58 |
yield
|
| 59 |
-
|
| 60 |
|
| 61 |
|
| 62 |
app = FastAPI(
|
|
@@ -82,13 +85,11 @@ app.include_router(matching.router, prefix="/api/match", tags=["Matching"])
|
|
| 82 |
|
| 83 |
@app.get("/health")
|
| 84 |
async def health(request: "Request"):
|
| 85 |
-
|
| 86 |
-
redis_conn = getattr(request.app.state, "redis", None)
|
| 87 |
return {
|
| 88 |
"status": "ok",
|
| 89 |
"version": "1.0.0",
|
| 90 |
-
"
|
| 91 |
-
"redis": "ok" if redis_conn else "missing",
|
| 92 |
}
|
| 93 |
|
| 94 |
|
|
|
|
| 1 |
import os
|
| 2 |
import logging
|
|
|
|
|
|
|
| 3 |
from contextlib import asynccontextmanager
|
| 4 |
from fastapi import FastAPI, Request
|
| 5 |
from fastapi.middleware.cors import CORSMiddleware
|
| 6 |
from fastapi.staticfiles import StaticFiles
|
| 7 |
+
from qdrant_client import QdrantClient
|
| 8 |
+
from qdrant_client.models import Distance, VectorParams, PayloadSchemaType
|
| 9 |
|
| 10 |
from src.config import get_settings
|
| 11 |
from src.models import JobDescription, Candidate, MatchResult, Session
|
|
|
|
| 14 |
logger = logging.getLogger(__name__)
|
| 15 |
settings = get_settings()
|
| 16 |
|
| 17 |
+
_qdrant_client: QdrantClient | None = None
|
| 18 |
+
_qdrant_ready: bool = False
|
|
|
|
| 19 |
|
| 20 |
|
| 21 |
+
def get_qdrant() -> QdrantClient:
|
| 22 |
+
return _qdrant_client
|
|
|
|
|
|
|
|
|
|
|
|
|
| 23 |
|
| 24 |
|
| 25 |
@asynccontextmanager
|
| 26 |
async def lifespan(app: FastAPI):
|
| 27 |
+
global _qdrant_client, _qdrant_ready
|
| 28 |
+
_qdrant_client = QdrantClient(url=settings.qdrant_url, api_key=settings.qdrant_api_key)
|
|
|
|
| 29 |
|
| 30 |
try:
|
| 31 |
+
existing = [c.name for c in _qdrant_client.get_collections().collections]
|
| 32 |
+
if settings.collection_name not in existing:
|
| 33 |
+
_qdrant_client.create_collection(
|
| 34 |
+
collection_name=settings.collection_name,
|
| 35 |
+
vectors_config=VectorParams(size=settings.vector_size, distance=Distance.COSINE),
|
| 36 |
+
)
|
| 37 |
+
# Create indexing for the session_id to allow fast filtering
|
| 38 |
+
_qdrant_client.create_payload_index(
|
| 39 |
+
collection_name=settings.collection_name,
|
| 40 |
+
field_name="session_id",
|
| 41 |
+
field_schema=PayloadSchemaType.UUID,
|
| 42 |
+
)
|
| 43 |
+
# Create indexing for years_of_experience for range filtering
|
| 44 |
+
_qdrant_client.create_payload_index(
|
| 45 |
+
collection_name=settings.collection_name,
|
| 46 |
+
field_name="years_of_experience",
|
| 47 |
+
field_schema=PayloadSchemaType.FLOAT,
|
| 48 |
)
|
| 49 |
+
_qdrant_ready = True
|
| 50 |
+
logger.info("Qdrant connected — collection '%s' ready", settings.collection_name)
|
| 51 |
except Exception as exc:
|
| 52 |
+
_qdrant_ready = False
|
| 53 |
logger.warning(
|
| 54 |
+
"Qdrant unavailable at startup (%s). "
|
| 55 |
+
"The API will start but vector search will fail until Qdrant is reachable.",
|
| 56 |
exc,
|
| 57 |
)
|
| 58 |
|
| 59 |
+
app.state.qdrant = _qdrant_client
|
| 60 |
+
app.state.qdrant_ready = _qdrant_ready
|
|
|
|
| 61 |
yield
|
| 62 |
+
_qdrant_client.close()
|
| 63 |
|
| 64 |
|
| 65 |
app = FastAPI(
|
|
|
|
| 85 |
|
| 86 |
@app.get("/health")
|
| 87 |
async def health(request: "Request"):
|
| 88 |
+
qdrant_ok = getattr(request.app.state, "qdrant_ready", False)
|
|
|
|
| 89 |
return {
|
| 90 |
"status": "ok",
|
| 91 |
"version": "1.0.0",
|
| 92 |
+
"qdrant": "connected" if qdrant_ok else "unavailable",
|
|
|
|
| 93 |
}
|
| 94 |
|
| 95 |
|
backend/requirements.txt
CHANGED
|
@@ -4,6 +4,7 @@ sqlalchemy[asyncio]==2.0.36
|
|
| 4 |
asyncpg==0.30.0
|
| 5 |
psycopg[binary]==3.2.3
|
| 6 |
alembic==1.13.3
|
|
|
|
| 7 |
celery[redis]==5.4.0
|
| 8 |
redis==5.1.0
|
| 9 |
sentence-transformers==3.3.0
|
|
@@ -17,4 +18,5 @@ numpy==1.26.4
|
|
| 17 |
python-dotenv==1.0.1
|
| 18 |
httpx==0.27.2
|
| 19 |
aiofiles==24.1.0
|
|
|
|
| 20 |
pinecone
|
|
|
|
| 4 |
asyncpg==0.30.0
|
| 5 |
psycopg[binary]==3.2.3
|
| 6 |
alembic==1.13.3
|
| 7 |
+
qdrant-client==1.11.0
|
| 8 |
celery[redis]==5.4.0
|
| 9 |
redis==5.1.0
|
| 10 |
sentence-transformers==3.3.0
|
|
|
|
| 18 |
python-dotenv==1.0.1
|
| 19 |
httpx==0.27.2
|
| 20 |
aiofiles==24.1.0
|
| 21 |
+
pinecone-client
|
| 22 |
pinecone
|
backend/src/config.py
CHANGED
|
@@ -7,13 +7,14 @@ class Settings(BaseSettings):
|
|
| 7 |
model_config = SettingsConfigDict(env_file=".env", env_file_encoding="utf-8", extra="ignore")
|
| 8 |
|
| 9 |
database_url: str
|
| 10 |
-
|
|
|
|
| 11 |
redis_url: str
|
| 12 |
groq_api_key: str
|
| 13 |
groq_model: str = "llama-3.3-70b-versatile"
|
| 14 |
embedding_model: str = "BAAI/bge-small-en-v1.5"
|
| 15 |
reranker_model: str = "BAAI/bge-reranker-v2-m3"
|
| 16 |
-
|
| 17 |
vector_size: int = 384
|
| 18 |
|
| 19 |
@property
|
|
|
|
| 7 |
model_config = SettingsConfigDict(env_file=".env", env_file_encoding="utf-8", extra="ignore")
|
| 8 |
|
| 9 |
database_url: str
|
| 10 |
+
qdrant_url: str
|
| 11 |
+
qdrant_api_key: str
|
| 12 |
redis_url: str
|
| 13 |
groq_api_key: str
|
| 14 |
groq_model: str = "llama-3.3-70b-versatile"
|
| 15 |
embedding_model: str = "BAAI/bge-small-en-v1.5"
|
| 16 |
reranker_model: str = "BAAI/bge-reranker-v2-m3"
|
| 17 |
+
collection_name: str = "candidates_v1"
|
| 18 |
vector_size: int = 384
|
| 19 |
|
| 20 |
@property
|
backend/src/matching/stage1.py
CHANGED
|
@@ -1,5 +1,6 @@
|
|
| 1 |
from typing import Any
|
| 2 |
-
from
|
|
|
|
| 3 |
from sqlalchemy.ext.asyncio import AsyncSession
|
| 4 |
from sqlalchemy import select
|
| 5 |
|
|
@@ -15,19 +16,21 @@ DEFAULT_WEIGHTS = {
|
|
| 15 |
}
|
| 16 |
|
| 17 |
|
| 18 |
-
def
|
| 19 |
-
conditions =
|
| 20 |
if session_id:
|
| 21 |
-
conditions
|
| 22 |
if jd.get("min_yoe") is not None:
|
| 23 |
-
conditions
|
| 24 |
-
|
|
|
|
|
|
|
| 25 |
|
| 26 |
|
| 27 |
async def stage1_retrieve(
|
| 28 |
jd: dict,
|
| 29 |
db: AsyncSession,
|
| 30 |
-
|
| 31 |
session_id: str | None = None,
|
| 32 |
top_k: int = 500,
|
| 33 |
weights: dict | None = None,
|
|
@@ -38,21 +41,20 @@ async def stage1_retrieve(
|
|
| 38 |
jd_text = f"{jd.get('title', '')} {jd.get('raw_text', '')}"
|
| 39 |
query_vector = embed_query(jd_text)
|
| 40 |
|
| 41 |
-
|
| 42 |
-
|
| 43 |
-
|
| 44 |
-
|
| 45 |
-
|
| 46 |
-
|
| 47 |
-
|
| 48 |
)
|
| 49 |
|
| 50 |
-
if not search_results
|
| 51 |
return []
|
| 52 |
|
| 53 |
-
|
| 54 |
-
|
| 55 |
-
score_by_qdrant_id = {r["id"]: float(r["score"]) for r in matches}
|
| 56 |
|
| 57 |
result = await db.execute(select(Candidate).where(Candidate.qdrant_id.in_(qdrant_ids)))
|
| 58 |
candidates = {c.qdrant_id: c for c in result.scalars().all()}
|
|
|
|
| 1 |
from typing import Any
|
| 2 |
+
from qdrant_client import QdrantClient
|
| 3 |
+
from qdrant_client.models import Filter, FieldCondition, MatchValue, Range
|
| 4 |
from sqlalchemy.ext.asyncio import AsyncSession
|
| 5 |
from sqlalchemy import select
|
| 6 |
|
|
|
|
| 16 |
}
|
| 17 |
|
| 18 |
|
| 19 |
+
def _build_qdrant_filter(jd: dict, session_id: str | None) -> Filter | None:
|
| 20 |
+
conditions = []
|
| 21 |
if session_id:
|
| 22 |
+
conditions.append(FieldCondition(key="session_id", match=MatchValue(value=session_id)))
|
| 23 |
if jd.get("min_yoe") is not None:
|
| 24 |
+
conditions.append(FieldCondition(key="years_of_experience", range=Range(gte=max(0, jd["min_yoe"] - 2))))
|
| 25 |
+
if not conditions:
|
| 26 |
+
return None
|
| 27 |
+
return Filter(must=conditions)
|
| 28 |
|
| 29 |
|
| 30 |
async def stage1_retrieve(
|
| 31 |
jd: dict,
|
| 32 |
db: AsyncSession,
|
| 33 |
+
qdrant: QdrantClient,
|
| 34 |
session_id: str | None = None,
|
| 35 |
top_k: int = 500,
|
| 36 |
weights: dict | None = None,
|
|
|
|
| 41 |
jd_text = f"{jd.get('title', '')} {jd.get('raw_text', '')}"
|
| 42 |
query_vector = embed_query(jd_text)
|
| 43 |
|
| 44 |
+
qdrant_filter = _build_qdrant_filter(jd, session_id)
|
| 45 |
+
search_results = qdrant.search(
|
| 46 |
+
collection_name=settings.collection_name,
|
| 47 |
+
query_vector=query_vector.tolist(),
|
| 48 |
+
query_filter=qdrant_filter,
|
| 49 |
+
limit=top_k,
|
| 50 |
+
with_payload=True,
|
| 51 |
)
|
| 52 |
|
| 53 |
+
if not search_results:
|
| 54 |
return []
|
| 55 |
|
| 56 |
+
qdrant_ids = [r.id for r in search_results]
|
| 57 |
+
score_by_qdrant_id = {r.id: float(r.score) for r in search_results}
|
|
|
|
| 58 |
|
| 59 |
result = await db.execute(select(Candidate).where(Candidate.qdrant_id.in_(qdrant_ids)))
|
| 60 |
candidates = {c.qdrant_id: c for c in result.scalars().all()}
|
backend/src/routers/matching.py
CHANGED
|
@@ -23,8 +23,8 @@ from ..matching.scorer import rerank_with_weights
|
|
| 23 |
router = APIRouter()
|
| 24 |
|
| 25 |
|
| 26 |
-
def
|
| 27 |
-
return request.app.state.
|
| 28 |
|
| 29 |
|
| 30 |
async def _load_jd(jd_id: uuid.UUID, db: AsyncSession) -> JobDescription:
|
|
@@ -76,11 +76,11 @@ async def trigger_match(
|
|
| 76 |
db: AsyncSession = Depends(get_db),
|
| 77 |
):
|
| 78 |
jd = await _load_jd(jd_id, db)
|
| 79 |
-
|
| 80 |
jd_dict = _build_jd_dict(jd)
|
| 81 |
sid_str = str(session_id) if session_id else None
|
| 82 |
|
| 83 |
-
shortlist = await stage1_retrieve(jd_dict, db,
|
| 84 |
final_ranked = await stage2_rerank(jd_dict, shortlist)
|
| 85 |
|
| 86 |
await db.execute(
|
|
@@ -128,11 +128,11 @@ async def trigger_match(
|
|
| 128 |
@router.get("/{jd_id}", response_model=MatchResponse)
|
| 129 |
async def get_match_results(
|
| 130 |
jd_id: uuid.UUID,
|
| 131 |
-
request: Request,
|
| 132 |
session_id: uuid.UUID | None = Query(None),
|
| 133 |
db: AsyncSession = Depends(get_db),
|
| 134 |
):
|
| 135 |
-
|
|
|
|
| 136 |
cache_key = f"match_v2:{jd_id}:{session_id or 'none'}"
|
| 137 |
|
| 138 |
try:
|
|
|
|
| 23 |
router = APIRouter()
|
| 24 |
|
| 25 |
|
| 26 |
+
def _get_qdrant(request: Request):
|
| 27 |
+
return request.app.state.qdrant
|
| 28 |
|
| 29 |
|
| 30 |
async def _load_jd(jd_id: uuid.UUID, db: AsyncSession) -> JobDescription:
|
|
|
|
| 76 |
db: AsyncSession = Depends(get_db),
|
| 77 |
):
|
| 78 |
jd = await _load_jd(jd_id, db)
|
| 79 |
+
qdrant = _get_qdrant(request)
|
| 80 |
jd_dict = _build_jd_dict(jd)
|
| 81 |
sid_str = str(session_id) if session_id else None
|
| 82 |
|
| 83 |
+
shortlist = await stage1_retrieve(jd_dict, db, qdrant, session_id=sid_str)
|
| 84 |
final_ranked = await stage2_rerank(jd_dict, shortlist)
|
| 85 |
|
| 86 |
await db.execute(
|
|
|
|
| 128 |
@router.get("/{jd_id}", response_model=MatchResponse)
|
| 129 |
async def get_match_results(
|
| 130 |
jd_id: uuid.UUID,
|
|
|
|
| 131 |
session_id: uuid.UUID | None = Query(None),
|
| 132 |
db: AsyncSession = Depends(get_db),
|
| 133 |
):
|
| 134 |
+
settings = get_settings()
|
| 135 |
+
r = redis.Redis.from_url(settings.redis_url)
|
| 136 |
cache_key = f"match_v2:{jd_id}:{session_id or 'none'}"
|
| 137 |
|
| 138 |
try:
|
backend/src/routers/sessions.py
CHANGED
|
@@ -34,13 +34,14 @@ async def get_session(session_id: uuid.UUID, db: AsyncSession = Depends(get_db))
|
|
| 34 |
return sess
|
| 35 |
|
| 36 |
|
| 37 |
-
from
|
|
|
|
| 38 |
from ..config import get_settings
|
| 39 |
from ..models.match_result import MatchResult
|
| 40 |
|
| 41 |
-
def
|
| 42 |
settings = get_settings()
|
| 43 |
-
return
|
| 44 |
|
| 45 |
@router.delete("/{session_id}", status_code=204)
|
| 46 |
async def delete_session(session_id: uuid.UUID, db: AsyncSession = Depends(get_db)):
|
|
@@ -53,11 +54,17 @@ async def delete_session(session_id: uuid.UUID, db: AsyncSession = Depends(get_d
|
|
| 53 |
settings = get_settings()
|
| 54 |
|
| 55 |
try:
|
| 56 |
-
|
| 57 |
-
|
| 58 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 59 |
except Exception as e:
|
| 60 |
-
print(f"Warning: Failed deleting
|
| 61 |
|
| 62 |
await db.execute(MatchResult.__table__.delete().where(MatchResult.session_id == session_id))
|
| 63 |
await db.execute(Candidate.__table__.delete().where(Candidate.session_id == session_id))
|
|
|
|
| 34 |
return sess
|
| 35 |
|
| 36 |
|
| 37 |
+
from qdrant_client import QdrantClient
|
| 38 |
+
from qdrant_client.models import Filter, FieldCondition, MatchValue
|
| 39 |
from ..config import get_settings
|
| 40 |
from ..models.match_result import MatchResult
|
| 41 |
|
| 42 |
+
def _get_qdrant() -> QdrantClient:
|
| 43 |
settings = get_settings()
|
| 44 |
+
return QdrantClient(url=settings.qdrant_url, api_key=settings.qdrant_api_key)
|
| 45 |
|
| 46 |
@router.delete("/{session_id}", status_code=204)
|
| 47 |
async def delete_session(session_id: uuid.UUID, db: AsyncSession = Depends(get_db)):
|
|
|
|
| 54 |
settings = get_settings()
|
| 55 |
|
| 56 |
try:
|
| 57 |
+
qdrant = _get_qdrant()
|
| 58 |
+
qdrant.delete(
|
| 59 |
+
collection_name=settings.collection_name,
|
| 60 |
+
points_selector=Filter(
|
| 61 |
+
must=[
|
| 62 |
+
FieldCondition(key="session_id", match=MatchValue(value=session_str))
|
| 63 |
+
]
|
| 64 |
+
)
|
| 65 |
+
)
|
| 66 |
except Exception as e:
|
| 67 |
+
print(f"Warning: Failed deleting Qdrant targets for session {session_str}: {e}")
|
| 68 |
|
| 69 |
await db.execute(MatchResult.__table__.delete().where(MatchResult.session_id == session_id))
|
| 70 |
await db.execute(Candidate.__table__.delete().where(Candidate.session_id == session_id))
|
backend/src/workers/celery_app.py
CHANGED
|
@@ -21,6 +21,4 @@ celery_app.conf.update(
|
|
| 21 |
result_expires=3600,
|
| 22 |
worker_prefetch_multiplier=1,
|
| 23 |
task_acks_late=True,
|
| 24 |
-
broker_pool_limit=1,
|
| 25 |
-
redis_max_connections=5,
|
| 26 |
)
|
|
|
|
| 21 |
result_expires=3600,
|
| 22 |
worker_prefetch_multiplier=1,
|
| 23 |
task_acks_late=True,
|
|
|
|
|
|
|
| 24 |
)
|
backend/src/workers/ingest.py
CHANGED
|
@@ -4,7 +4,8 @@ import re
|
|
| 4 |
import json
|
| 5 |
from typing import Any
|
| 6 |
from sqlalchemy import select, func
|
| 7 |
-
from
|
|
|
|
| 8 |
|
| 9 |
from .celery_app import celery_app
|
| 10 |
from ..config import get_settings
|
|
@@ -21,9 +22,9 @@ from ..ml.feature_builder import (
|
|
| 21 |
)
|
| 22 |
|
| 23 |
|
| 24 |
-
def
|
| 25 |
settings = get_settings()
|
| 26 |
-
return
|
| 27 |
|
| 28 |
|
| 29 |
def _normalize_bool(val: Any) -> bool | None:
|
|
@@ -73,8 +74,7 @@ def ingest_candidates_batch(self, rows: list[dict], session_id: str | None = Non
|
|
| 73 |
|
| 74 |
async def _ingest_candidates_async(rows: list[dict], session_id: str | None) -> dict:
|
| 75 |
settings = get_settings()
|
| 76 |
-
|
| 77 |
-
index = pc.Index(settings.index_name)
|
| 78 |
texts = []
|
| 79 |
processed = []
|
| 80 |
sess_uuid = uuid.UUID(session_id) if session_id else None
|
|
@@ -101,7 +101,7 @@ async def _ingest_candidates_async(rows: list[dict], session_id: str | None) ->
|
|
| 101 |
embeddings = embed_texts(texts)
|
| 102 |
|
| 103 |
async with AsyncSessionLocal() as session:
|
| 104 |
-
|
| 105 |
inserted = 0
|
| 106 |
|
| 107 |
for i, p in enumerate(processed):
|
|
@@ -154,30 +154,32 @@ async def _ingest_candidates_async(rows: list[dict], session_id: str | None) ->
|
|
| 154 |
)
|
| 155 |
session.add(cand)
|
| 156 |
|
| 157 |
-
|
| 158 |
-
|
| 159 |
-
|
| 160 |
-
|
| 161 |
-
|
| 162 |
-
|
| 163 |
-
|
| 164 |
-
|
| 165 |
-
|
| 166 |
-
|
| 167 |
-
|
| 168 |
-
|
| 169 |
-
|
| 170 |
-
|
| 171 |
-
|
| 172 |
-
|
| 173 |
-
|
| 174 |
-
|
|
|
|
|
|
|
| 175 |
inserted += 1
|
| 176 |
|
| 177 |
await session.commit()
|
| 178 |
|
| 179 |
-
if
|
| 180 |
-
|
| 181 |
|
| 182 |
if sess_uuid:
|
| 183 |
result = await session.execute(
|
|
|
|
| 4 |
import json
|
| 5 |
from typing import Any
|
| 6 |
from sqlalchemy import select, func
|
| 7 |
+
from qdrant_client import QdrantClient
|
| 8 |
+
from qdrant_client.models import PointStruct
|
| 9 |
|
| 10 |
from .celery_app import celery_app
|
| 11 |
from ..config import get_settings
|
|
|
|
| 22 |
)
|
| 23 |
|
| 24 |
|
| 25 |
+
def _get_qdrant() -> QdrantClient:
|
| 26 |
settings = get_settings()
|
| 27 |
+
return QdrantClient(url=settings.qdrant_url, api_key=settings.qdrant_api_key)
|
| 28 |
|
| 29 |
|
| 30 |
def _normalize_bool(val: Any) -> bool | None:
|
|
|
|
| 74 |
|
| 75 |
async def _ingest_candidates_async(rows: list[dict], session_id: str | None) -> dict:
|
| 76 |
settings = get_settings()
|
| 77 |
+
qdrant = _get_qdrant()
|
|
|
|
| 78 |
texts = []
|
| 79 |
processed = []
|
| 80 |
sess_uuid = uuid.UUID(session_id) if session_id else None
|
|
|
|
| 101 |
embeddings = embed_texts(texts)
|
| 102 |
|
| 103 |
async with AsyncSessionLocal() as session:
|
| 104 |
+
qdrant_points = []
|
| 105 |
inserted = 0
|
| 106 |
|
| 107 |
for i, p in enumerate(processed):
|
|
|
|
| 154 |
)
|
| 155 |
session.add(cand)
|
| 156 |
|
| 157 |
+
qdrant_points.append(
|
| 158 |
+
PointStruct(
|
| 159 |
+
id=qdrant_id,
|
| 160 |
+
vector=embeddings[i].tolist(),
|
| 161 |
+
payload={
|
| 162 |
+
"candidate_id": str(candidate_id),
|
| 163 |
+
"session_id": session_id or "",
|
| 164 |
+
"role_type": row.get("role_type"),
|
| 165 |
+
"engineer_type": row.get("engineer_type"),
|
| 166 |
+
"years_of_experience": _normalize_float(row.get("years_of_experience")),
|
| 167 |
+
"looking_for": row.get("looking_for"),
|
| 168 |
+
"open_to_working_at": row.get("open_to_working_at"),
|
| 169 |
+
"skills": all_skills[:50],
|
| 170 |
+
"gen_ai_experience": _normalize_bool(row.get("gen_ai_experience")),
|
| 171 |
+
"growth_velocity": p["velocity"],
|
| 172 |
+
"is_funded": _normalize_bool(row.get("most_recent_company_is_funded")),
|
| 173 |
+
"currently_employed": _normalize_bool(row.get("currently_employed")),
|
| 174 |
+
},
|
| 175 |
+
)
|
| 176 |
+
)
|
| 177 |
inserted += 1
|
| 178 |
|
| 179 |
await session.commit()
|
| 180 |
|
| 181 |
+
if qdrant_points:
|
| 182 |
+
qdrant.upsert(collection_name=settings.collection_name, points=qdrant_points)
|
| 183 |
|
| 184 |
if sess_uuid:
|
| 185 |
result = await session.execute(
|
backend/test_pinecone.py
DELETED
|
@@ -1,36 +0,0 @@
|
|
| 1 |
-
import os
|
| 2 |
-
from pinecone import Pinecone
|
| 3 |
-
from dotenv import load_dotenv
|
| 4 |
-
|
| 5 |
-
# Load .env file
|
| 6 |
-
load_dotenv()
|
| 7 |
-
|
| 8 |
-
api_key = os.getenv("PINECONE_API_KEY")
|
| 9 |
-
|
| 10 |
-
if not api_key:
|
| 11 |
-
print("Error: PINECONE_API_KEY not found in .env file")
|
| 12 |
-
exit(1)
|
| 13 |
-
|
| 14 |
-
print(f"Connecting to Pinecone with key: {api_key[:10]}...")
|
| 15 |
-
|
| 16 |
-
try:
|
| 17 |
-
pc = Pinecone(api_key=api_key)
|
| 18 |
-
|
| 19 |
-
# Testing general connectivity by listing indexes
|
| 20 |
-
print("Listing indexes...")
|
| 21 |
-
indexes = pc.list_indexes()
|
| 22 |
-
print(f"Successfully connected! Found {len(indexes)} indexes.")
|
| 23 |
-
for idx in indexes:
|
| 24 |
-
print(f" - {idx.name}")
|
| 25 |
-
|
| 26 |
-
# Testing Assistant API connectivity
|
| 27 |
-
print("\nTesting Assistant API...")
|
| 28 |
-
assistants = pc.assistant.list_assistants()
|
| 29 |
-
print(f"Successfully reached Assistant API! Found {len(assistants)} assistants.")
|
| 30 |
-
for a in assistants:
|
| 31 |
-
print(f" - {a.name} ({a.status})")
|
| 32 |
-
|
| 33 |
-
print("\nConnection test passed successfully!")
|
| 34 |
-
|
| 35 |
-
except Exception as e:
|
| 36 |
-
print(f"\nConnection failed: {e}")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|