ketannnn commited on
Commit
5655f74
·
1 Parent(s): 6106435

feat: implement session-based candidate matching architecture

Browse files
backend/alembic/versions/002_add_sessions.py ADDED
@@ -0,0 +1,42 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """add sessions table and session_id columns
2
+
3
+ Revision ID: 002
4
+ Revises: 001
5
+ Create Date: 2026-04-12 10:00:00.000000
6
+ """
7
+ from typing import Sequence, Union
8
+ from alembic import op
9
+ import sqlalchemy as sa
10
+ from sqlalchemy.dialects.postgresql import UUID
11
+
12
+ revision: str = "002"
13
+ down_revision: Union[str, None] = "001"
14
+ branch_labels: Union[str, Sequence[str], None] = None
15
+ depends_on: Union[str, Sequence[str], None] = None
16
+
17
+
18
+ def upgrade() -> None:
19
+ op.create_table(
20
+ "sessions",
21
+ sa.Column("id", UUID(as_uuid=True), primary_key=True),
22
+ sa.Column("name", sa.String(255), nullable=False),
23
+ sa.Column("description", sa.Text, nullable=True),
24
+ sa.Column("candidate_count", sa.Integer, nullable=False, server_default="0"),
25
+ sa.Column("status", sa.String(32), nullable=False, server_default="active"),
26
+ sa.Column("created_at", sa.DateTime(timezone=True), server_default=sa.func.now()),
27
+ sa.Column("updated_at", sa.DateTime(timezone=True), server_default=sa.func.now()),
28
+ )
29
+
30
+ op.add_column("candidates", sa.Column("session_id", UUID(as_uuid=True), sa.ForeignKey("sessions.id", ondelete="SET NULL"), nullable=True))
31
+ op.create_index("ix_candidates_session_id", "candidates", ["session_id"])
32
+
33
+ op.add_column("match_results", sa.Column("session_id", UUID(as_uuid=True), sa.ForeignKey("sessions.id", ondelete="CASCADE"), nullable=True))
34
+ op.create_index("ix_match_results_session_id", "match_results", ["session_id"])
35
+
36
+
37
+ def downgrade() -> None:
38
+ op.drop_index("ix_match_results_session_id", "match_results")
39
+ op.drop_column("match_results", "session_id")
40
+ op.drop_index("ix_candidates_session_id", "candidates")
41
+ op.drop_column("candidates", "session_id")
42
+ op.drop_table("sessions")
backend/requirements.txt CHANGED
@@ -8,7 +8,7 @@ qdrant-client==1.11.0
8
  celery[redis]==5.4.0
9
  redis==5.1.0
10
  sentence-transformers==3.3.0
11
- FlagEmbedding==1.2.15
12
  groq==0.12.0
13
  python-multipart==0.0.12
14
  pydantic==2.9.2
 
8
  celery[redis]==5.4.0
9
  redis==5.1.0
10
  sentence-transformers==3.3.0
11
+ FlagEmbedding==1.3.5
12
  groq==0.12.0
13
  python-multipart==0.0.12
14
  pydantic==2.9.2
backend/src/models/__init__.py CHANGED
@@ -1,5 +1,6 @@
1
  from .jd import JobDescription
2
  from .candidate import Candidate
3
  from .match_result import MatchResult
 
4
 
5
- __all__ = ["JobDescription", "Candidate", "MatchResult"]
 
1
  from .jd import JobDescription
2
  from .candidate import Candidate
3
  from .match_result import MatchResult
4
+ from .session import Session
5
 
6
+ __all__ = ["JobDescription", "Candidate", "MatchResult", "Session"]
backend/src/models/candidate.py CHANGED
@@ -1,6 +1,6 @@
1
  import uuid
2
  from datetime import datetime
3
- from sqlalchemy import String, Text, DateTime, JSON, Float, Integer, Boolean, func
4
  from sqlalchemy.orm import Mapped, mapped_column
5
  from sqlalchemy.dialects.postgresql import UUID
6
  from ..database import Base
@@ -45,6 +45,7 @@ class Candidate(Base):
45
  time_in_current_company: Mapped[float | None] = mapped_column(Float, nullable=True)
46
  is_actively_or_passively_looking: Mapped[str | None] = mapped_column(String(100), nullable=True)
47
 
 
48
  growth_velocity: Mapped[float] = mapped_column(Float, default=0.5)
49
  embedding_hash: Mapped[str | None] = mapped_column(String(64), nullable=True)
50
  qdrant_id: Mapped[str | None] = mapped_column(String(64), nullable=True, index=True)
 
1
  import uuid
2
  from datetime import datetime
3
+ from sqlalchemy import String, Text, DateTime, JSON, Float, Integer, Boolean, func, ForeignKey
4
  from sqlalchemy.orm import Mapped, mapped_column
5
  from sqlalchemy.dialects.postgresql import UUID
6
  from ..database import Base
 
45
  time_in_current_company: Mapped[float | None] = mapped_column(Float, nullable=True)
46
  is_actively_or_passively_looking: Mapped[str | None] = mapped_column(String(100), nullable=True)
47
 
48
+ session_id: Mapped[uuid.UUID | None] = mapped_column(UUID(as_uuid=True), ForeignKey("sessions.id", ondelete="SET NULL"), nullable=True, index=True)
49
  growth_velocity: Mapped[float] = mapped_column(Float, default=0.5)
50
  embedding_hash: Mapped[str | None] = mapped_column(String(64), nullable=True)
51
  qdrant_id: Mapped[str | None] = mapped_column(String(64), nullable=True, index=True)
backend/src/models/match_result.py CHANGED
@@ -1,6 +1,6 @@
1
  import uuid
2
  from datetime import datetime
3
- from sqlalchemy import String, DateTime, JSON, Float, ForeignKey, func
4
  from sqlalchemy.orm import Mapped, mapped_column
5
  from sqlalchemy.dialects.postgresql import UUID
6
  from ..database import Base
@@ -12,6 +12,7 @@ class MatchResult(Base):
12
  id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
13
  jd_id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), ForeignKey("job_descriptions.id", ondelete="CASCADE"), index=True)
14
  candidate_id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), ForeignKey("candidates.id", ondelete="CASCADE"), index=True)
 
15
 
16
  rank: Mapped[int | None] = mapped_column(nullable=True)
17
  stage1_score: Mapped[float] = mapped_column(Float, default=0.0)
 
1
  import uuid
2
  from datetime import datetime
3
+ from sqlalchemy import String, DateTime, JSON, Float, ForeignKey, func, Integer
4
  from sqlalchemy.orm import Mapped, mapped_column
5
  from sqlalchemy.dialects.postgresql import UUID
6
  from ..database import Base
 
12
  id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
13
  jd_id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), ForeignKey("job_descriptions.id", ondelete="CASCADE"), index=True)
14
  candidate_id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), ForeignKey("candidates.id", ondelete="CASCADE"), index=True)
15
+ session_id: Mapped[uuid.UUID | None] = mapped_column(UUID(as_uuid=True), ForeignKey("sessions.id", ondelete="CASCADE"), nullable=True, index=True)
16
 
17
  rank: Mapped[int | None] = mapped_column(nullable=True)
18
  stage1_score: Mapped[float] = mapped_column(Float, default=0.0)
backend/src/models/session.py ADDED
@@ -0,0 +1,18 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import uuid
2
+ from datetime import datetime
3
+ from sqlalchemy import String, Text, DateTime, Integer, func
4
+ from sqlalchemy.orm import Mapped, mapped_column
5
+ from sqlalchemy.dialects.postgresql import UUID
6
+ from ..database import Base
7
+
8
+
9
+ class Session(Base):
10
+ __tablename__ = "sessions"
11
+
12
+ id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
13
+ name: Mapped[str] = mapped_column(String(255))
14
+ description: Mapped[str | None] = mapped_column(Text, nullable=True)
15
+ candidate_count: Mapped[int] = mapped_column(Integer, default=0)
16
+ status: Mapped[str] = mapped_column(String(32), default="active")
17
+ created_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), server_default=func.now())
18
+ updated_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), server_default=func.now(), onupdate=func.now())
backend/src/routers/__init__.py CHANGED
@@ -1,5 +1,6 @@
1
  from .jds import router as jds_router
2
  from .candidates import router as candidates_router
3
  from .matching import router as matching_router
 
4
 
5
- __all__ = ["jds_router", "candidates_router", "matching_router"]
 
1
  from .jds import router as jds_router
2
  from .candidates import router as candidates_router
3
  from .matching import router as matching_router
4
+ from .sessions import router as sessions_router
5
 
6
+ __all__ = ["jds_router", "candidates_router", "matching_router", "sessions_router"]
backend/src/routers/candidates.py CHANGED
@@ -1,12 +1,14 @@
1
  import io
2
  import json
 
3
  import pandas as pd
4
- from fastapi import APIRouter, Depends, HTTPException, UploadFile, File
5
  from sqlalchemy.ext.asyncio import AsyncSession
6
  from sqlalchemy import select, func
7
 
8
  from ..database import get_db
9
  from ..models.candidate import Candidate
 
10
  from ..schemas.candidate import CandidateResponse, UploadResponse, TaskStatusResponse
11
  from ..workers.celery_app import celery_app
12
  from ..workers.ingest import ingest_candidates_batch
@@ -17,7 +19,17 @@ BATCH_SIZE = 100
17
 
18
 
19
  @router.post("/upload", response_model=UploadResponse)
20
- async def upload_candidates(file: UploadFile = File(...)):
 
 
 
 
 
 
 
 
 
 
21
  content = await file.read()
22
  filename = file.filename or ""
23
 
@@ -48,7 +60,7 @@ async def upload_candidates(file: UploadFile = File(...)):
48
  else:
49
  clean[k] = v
50
  serializable.append(clean)
51
- task = ingest_candidates_batch.delay(serializable)
52
  task_ids.append(task.id)
53
 
54
  return UploadResponse(
@@ -69,17 +81,21 @@ async def task_status(task_id: str):
69
 
70
 
71
  @router.get("/count")
72
- async def candidate_count(db: AsyncSession = Depends(get_db)):
73
- result = await db.execute(select(func.count()).select_from(Candidate))
74
- count = result.scalar()
75
- return {"count": count}
 
 
 
 
 
76
 
77
 
78
  @router.get("/{candidate_id}", response_model=CandidateResponse)
79
  async def get_candidate(candidate_id: str, db: AsyncSession = Depends(get_db)):
80
- import uuid as _uuid
81
  try:
82
- cid = _uuid.UUID(candidate_id)
83
  except ValueError:
84
  raise HTTPException(status_code=400, detail="Invalid candidate ID")
85
  result = await db.execute(select(Candidate).where(Candidate.id == cid))
 
1
  import io
2
  import json
3
+ import uuid
4
  import pandas as pd
5
+ from fastapi import APIRouter, Depends, HTTPException, UploadFile, File, Query
6
  from sqlalchemy.ext.asyncio import AsyncSession
7
  from sqlalchemy import select, func
8
 
9
  from ..database import get_db
10
  from ..models.candidate import Candidate
11
+ from ..models.session import Session
12
  from ..schemas.candidate import CandidateResponse, UploadResponse, TaskStatusResponse
13
  from ..workers.celery_app import celery_app
14
  from ..workers.ingest import ingest_candidates_batch
 
19
 
20
 
21
  @router.post("/upload", response_model=UploadResponse)
22
+ async def upload_candidates(
23
+ file: UploadFile = File(...),
24
+ session_id: uuid.UUID | None = Query(None, description="Session to attach candidates to"),
25
+ db: AsyncSession = Depends(get_db),
26
+ ):
27
+ if session_id:
28
+ result = await db.execute(select(Session).where(Session.id == session_id))
29
+ sess = result.scalar_one_or_none()
30
+ if not sess:
31
+ raise HTTPException(status_code=404, detail="Session not found")
32
+
33
  content = await file.read()
34
  filename = file.filename or ""
35
 
 
60
  else:
61
  clean[k] = v
62
  serializable.append(clean)
63
+ task = ingest_candidates_batch.delay(serializable, str(session_id) if session_id else None)
64
  task_ids.append(task.id)
65
 
66
  return UploadResponse(
 
81
 
82
 
83
  @router.get("/count")
84
+ async def candidate_count(
85
+ session_id: uuid.UUID | None = Query(None),
86
+ db: AsyncSession = Depends(get_db),
87
+ ):
88
+ q = select(func.count()).select_from(Candidate)
89
+ if session_id:
90
+ q = q.where(Candidate.session_id == session_id)
91
+ result = await db.execute(q)
92
+ return {"count": result.scalar()}
93
 
94
 
95
  @router.get("/{candidate_id}", response_model=CandidateResponse)
96
  async def get_candidate(candidate_id: str, db: AsyncSession = Depends(get_db)):
 
97
  try:
98
+ cid = uuid.UUID(candidate_id)
99
  except ValueError:
100
  raise HTTPException(status_code=400, detail="Invalid candidate ID")
101
  result = await db.execute(select(Candidate).where(Candidate.id == cid))
backend/src/routers/sessions.py ADDED
@@ -0,0 +1,45 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import uuid
2
+ from fastapi import APIRouter, Depends, HTTPException
3
+ from sqlalchemy.ext.asyncio import AsyncSession
4
+ from sqlalchemy import select, func
5
+
6
+ from ..database import get_db
7
+ from ..models.session import Session
8
+ from ..models.candidate import Candidate
9
+ from ..schemas.session import SessionCreate, SessionResponse
10
+
11
+ router = APIRouter()
12
+
13
+
14
+ @router.post("", response_model=SessionResponse, status_code=201)
15
+ async def create_session(payload: SessionCreate, db: AsyncSession = Depends(get_db)):
16
+ session = Session(id=uuid.uuid4(), name=payload.name, description=payload.description)
17
+ db.add(session)
18
+ await db.commit()
19
+ await db.refresh(session)
20
+ return session
21
+
22
+
23
+ @router.get("", response_model=list[SessionResponse])
24
+ async def list_sessions(db: AsyncSession = Depends(get_db)):
25
+ result = await db.execute(select(Session).order_by(Session.created_at.desc()).limit(50))
26
+ return result.scalars().all()
27
+
28
+
29
+ @router.get("/{session_id}", response_model=SessionResponse)
30
+ async def get_session(session_id: uuid.UUID, db: AsyncSession = Depends(get_db)):
31
+ result = await db.execute(select(Session).where(Session.id == session_id))
32
+ sess = result.scalar_one_or_none()
33
+ if not sess:
34
+ raise HTTPException(status_code=404, detail="Session not found")
35
+ return sess
36
+
37
+
38
+ @router.delete("/{session_id}", status_code=204)
39
+ async def delete_session(session_id: uuid.UUID, db: AsyncSession = Depends(get_db)):
40
+ result = await db.execute(select(Session).where(Session.id == session_id))
41
+ sess = result.scalar_one_or_none()
42
+ if not sess:
43
+ raise HTTPException(status_code=404, detail="Session not found")
44
+ await db.delete(sess)
45
+ await db.commit()
backend/src/schemas/match.py CHANGED
@@ -44,6 +44,7 @@ class MatchResponse(BaseModel):
44
  total_matched: int
45
  results: list[MatchedCandidate]
46
  weights_used: dict[str, float] = {}
 
47
 
48
 
49
  class CandidateDetailResponse(BaseModel):
 
44
  total_matched: int
45
  results: list[MatchedCandidate]
46
  weights_used: dict[str, float] = {}
47
+ session_id: UUID | None = None
48
 
49
 
50
  class CandidateDetailResponse(BaseModel):
backend/src/schemas/session.py ADDED
@@ -0,0 +1,19 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import uuid
2
+ from datetime import datetime
3
+ from pydantic import BaseModel
4
+
5
+
6
+ class SessionCreate(BaseModel):
7
+ name: str
8
+ description: str | None = None
9
+
10
+
11
+ class SessionResponse(BaseModel):
12
+ id: uuid.UUID
13
+ name: str
14
+ description: str | None = None
15
+ candidate_count: int
16
+ status: str
17
+ created_at: datetime
18
+
19
+ model_config = {"from_attributes": True}
backend/src/workers/ingest.py CHANGED
@@ -1,9 +1,9 @@
1
  import uuid
2
  import asyncio
3
  import re
 
4
  from typing import Any
5
- from sqlalchemy import text, select
6
- from sqlalchemy.dialects.postgresql import insert as pg_insert
7
  from qdrant_client import QdrantClient
8
  from qdrant_client.models import PointStruct
9
 
@@ -11,8 +11,9 @@ from .celery_app import celery_app
11
  from ..config import get_settings
12
  from ..database import AsyncSessionLocal
13
  from ..models.candidate import Candidate
 
14
  from ..models.jd import JobDescription
15
- from ..ml.embedder import embed_texts, embed_query, compute_text_hash
16
  from ..ml.feature_builder import (
17
  build_candidate_text,
18
  compute_growth_velocity,
@@ -52,40 +53,36 @@ def _parse_list(val: Any) -> list:
52
 
53
 
54
  @celery_app.task(bind=True, name="ingest_candidates_batch", max_retries=3)
55
- def ingest_candidates_batch(self, rows: list[dict]) -> dict:
56
  try:
57
- return asyncio.run(_ingest_candidates_async(rows))
58
  except Exception as exc:
59
  raise self.retry(exc=exc, countdown=30)
60
 
61
 
62
- async def _ingest_candidates_async(rows: list[dict]) -> dict:
63
  settings = get_settings()
64
  qdrant = _get_qdrant()
65
  texts = []
66
  processed = []
 
67
 
68
  for row in rows:
69
  work_exp = row.get("parsed_work_experience") or []
70
  if isinstance(work_exp, str):
71
  try:
72
- import json
73
  work_exp = json.loads(work_exp)
74
  except Exception:
75
  work_exp = []
76
 
77
  is_funded = _normalize_bool(row.get("most_recent_company_is_funded"))
78
  velocity = compute_growth_velocity(work_exp, is_funded=bool(is_funded))
79
-
80
  candidate_text = build_candidate_text({**row, "parsed_work_experience": work_exp})
81
  text_hash = compute_text_hash(candidate_text)
82
 
83
  processed.append({
84
- "row": row,
85
- "work_exp": work_exp,
86
- "velocity": velocity,
87
- "text": candidate_text,
88
- "hash": text_hash,
89
  })
90
  texts.append(candidate_text)
91
 
@@ -100,8 +97,17 @@ async def _ingest_candidates_async(rows: list[dict]) -> dict:
100
  candidate_id = uuid.uuid4()
101
  qdrant_id = str(candidate_id)
102
 
 
 
 
 
 
 
 
 
103
  cand = Candidate(
104
  id=candidate_id,
 
105
  external_id=str(row.get("id") or row.get("candidate_id") or ""),
106
  name=row.get("name"),
107
  email=row.get("email"),
@@ -136,20 +142,13 @@ async def _ingest_candidates_async(rows: list[dict]) -> dict:
136
  )
137
  session.add(cand)
138
 
139
- all_skills = (
140
- _parse_list(row.get("programming_languages"))
141
- + _parse_list(row.get("backend_frameworks"))
142
- + _parse_list(row.get("frontend_technologies"))
143
- )
144
- if row.get("parsed_skills"):
145
- all_skills.extend(_parse_list(row["parsed_skills"]))
146
-
147
  qdrant_points.append(
148
  PointStruct(
149
  id=qdrant_id,
150
  vector=embeddings[i].tolist(),
151
  payload={
152
  "candidate_id": str(candidate_id),
 
153
  "role_type": row.get("role_type"),
154
  "engineer_type": row.get("engineer_type"),
155
  "years_of_experience": _normalize_float(row.get("years_of_experience")),
@@ -170,7 +169,18 @@ async def _ingest_candidates_async(rows: list[dict]) -> dict:
170
  if qdrant_points:
171
  qdrant.upsert(collection_name=settings.collection_name, points=qdrant_points)
172
 
173
- return {"inserted": inserted, "total": len(rows)}
 
 
 
 
 
 
 
 
 
 
 
174
 
175
 
176
  @celery_app.task(bind=True, name="ingest_jd", max_retries=3)
@@ -186,9 +196,7 @@ async def _ingest_jd_async(jd_id: str, raw_text: str, title: str) -> dict:
186
  jd_quality = compute_jd_quality(raw_text, parsed)
187
 
188
  async with AsyncSessionLocal() as session:
189
- result = await session.execute(
190
- select(JobDescription).where(JobDescription.id == uuid.UUID(jd_id))
191
- )
192
  jd = result.scalar_one_or_none()
193
  if jd:
194
  jd.parsed_requirements = parsed
 
1
  import uuid
2
  import asyncio
3
  import re
4
+ import json
5
  from typing import Any
6
+ from sqlalchemy import select, func
 
7
  from qdrant_client import QdrantClient
8
  from qdrant_client.models import PointStruct
9
 
 
11
  from ..config import get_settings
12
  from ..database import AsyncSessionLocal
13
  from ..models.candidate import Candidate
14
+ from ..models.session import Session
15
  from ..models.jd import JobDescription
16
+ from ..ml.embedder import embed_texts, compute_text_hash
17
  from ..ml.feature_builder import (
18
  build_candidate_text,
19
  compute_growth_velocity,
 
53
 
54
 
55
  @celery_app.task(bind=True, name="ingest_candidates_batch", max_retries=3)
56
+ def ingest_candidates_batch(self, rows: list[dict], session_id: str | None = None) -> dict:
57
  try:
58
+ return asyncio.run(_ingest_candidates_async(rows, session_id))
59
  except Exception as exc:
60
  raise self.retry(exc=exc, countdown=30)
61
 
62
 
63
+ async def _ingest_candidates_async(rows: list[dict], session_id: str | None) -> dict:
64
  settings = get_settings()
65
  qdrant = _get_qdrant()
66
  texts = []
67
  processed = []
68
+ sess_uuid = uuid.UUID(session_id) if session_id else None
69
 
70
  for row in rows:
71
  work_exp = row.get("parsed_work_experience") or []
72
  if isinstance(work_exp, str):
73
  try:
 
74
  work_exp = json.loads(work_exp)
75
  except Exception:
76
  work_exp = []
77
 
78
  is_funded = _normalize_bool(row.get("most_recent_company_is_funded"))
79
  velocity = compute_growth_velocity(work_exp, is_funded=bool(is_funded))
 
80
  candidate_text = build_candidate_text({**row, "parsed_work_experience": work_exp})
81
  text_hash = compute_text_hash(candidate_text)
82
 
83
  processed.append({
84
+ "row": row, "work_exp": work_exp, "velocity": velocity,
85
+ "text": candidate_text, "hash": text_hash,
 
 
 
86
  })
87
  texts.append(candidate_text)
88
 
 
97
  candidate_id = uuid.uuid4()
98
  qdrant_id = str(candidate_id)
99
 
100
+ all_skills = (
101
+ _parse_list(row.get("programming_languages"))
102
+ + _parse_list(row.get("backend_frameworks"))
103
+ + _parse_list(row.get("frontend_technologies"))
104
+ )
105
+ if row.get("parsed_skills"):
106
+ all_skills.extend(_parse_list(row["parsed_skills"]))
107
+
108
  cand = Candidate(
109
  id=candidate_id,
110
+ session_id=sess_uuid,
111
  external_id=str(row.get("id") or row.get("candidate_id") or ""),
112
  name=row.get("name"),
113
  email=row.get("email"),
 
142
  )
143
  session.add(cand)
144
 
 
 
 
 
 
 
 
 
145
  qdrant_points.append(
146
  PointStruct(
147
  id=qdrant_id,
148
  vector=embeddings[i].tolist(),
149
  payload={
150
  "candidate_id": str(candidate_id),
151
+ "session_id": session_id or "",
152
  "role_type": row.get("role_type"),
153
  "engineer_type": row.get("engineer_type"),
154
  "years_of_experience": _normalize_float(row.get("years_of_experience")),
 
169
  if qdrant_points:
170
  qdrant.upsert(collection_name=settings.collection_name, points=qdrant_points)
171
 
172
+ if sess_uuid:
173
+ result = await session.execute(
174
+ select(func.count()).select_from(Candidate).where(Candidate.session_id == sess_uuid)
175
+ )
176
+ count = result.scalar() or 0
177
+ sess_result = await session.execute(select(Session).where(Session.id == sess_uuid))
178
+ sess_obj = sess_result.scalar_one_or_none()
179
+ if sess_obj:
180
+ sess_obj.candidate_count = count
181
+ await session.commit()
182
+
183
+ return {"inserted": inserted, "total": len(rows), "session_id": session_id}
184
 
185
 
186
  @celery_app.task(bind=True, name="ingest_jd", max_retries=3)
 
196
  jd_quality = compute_jd_quality(raw_text, parsed)
197
 
198
  async with AsyncSessionLocal() as session:
199
+ result = await session.execute(select(JobDescription).where(JobDescription.id == uuid.UUID(jd_id)))
 
 
200
  jd = result.scalar_one_or_none()
201
  if jd:
202
  jd.parsed_requirements = parsed