GodSpeed / ingestion /api.py
AdithyaVardan's picture
feat: channel RBAC through ingest, graph stream, admin endpoints, email invite
e8b3591
from __future__ import annotations
import logging
import uuid
from datetime import datetime
from typing import Optional
from fastapi import APIRouter, HTTPException, UploadFile
from fastapi.responses import JSONResponse
from ingestion.models import (
ConfluenceIngestRequest,
GithubIngestRequest,
IngestJobResponse,
IngestJobStatus,
IngestSourcePayload,
)
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/ingest", tags=["ingest"])
def _dispatch(payload: IngestSourcePayload) -> IngestJobResponse:
from ingestion.jobs.ingest_job import run_ingest
from ingestion.models import IngestJobRecord
from ingestion.storage.supabase_store import get_client, upsert_job
task = run_ingest.delay(payload.model_dump())
job_id = task.id
record = IngestJobRecord(
job_id=job_id,
celery_task_id=task.id,
status=IngestJobStatus.pending,
source_type=payload.source_type,
team_id=payload.team_id,
created_at=datetime.utcnow(),
)
try:
upsert_job(record, client=get_client())
except Exception:
logger.exception("api: failed to persist job record for task %s", job_id)
return IngestJobResponse(job_id=job_id, status=IngestJobStatus.pending)
@router.post("/confluence", response_model=IngestJobResponse)
async def ingest_confluence(request: ConfluenceIngestRequest) -> IngestJobResponse:
payload = IngestSourcePayload(
source_type="confluence",
team_id=request.team_id,
channel_id=request.channel_id,
params={"space_key": request.space_key, "page_ids": request.page_ids},
)
return _dispatch(payload)
@router.post("/github", response_model=IngestJobResponse)
async def ingest_github(request: GithubIngestRequest) -> IngestJobResponse:
payload = IngestSourcePayload(
source_type="github",
team_id=request.team_id,
channel_id=request.channel_id,
params={
"repo_url": request.repo_url,
"path_filter": request.path_filter,
"branch": request.branch,
},
)
return _dispatch(payload)
@router.post("/upload", response_model=IngestJobResponse)
async def ingest_pdf(team_id: str, file: UploadFile, channel_id: Optional[str] = None) -> IngestJobResponse:
if not file.filename or not file.filename.lower().endswith(".pdf"):
raise HTTPException(status_code=400, detail="Only PDF files are accepted")
content = await file.read()
if not content:
raise HTTPException(status_code=400, detail="Uploaded file is empty")
# PDF bytes are not JSON-serialisable; store transiently and pass filename+content via task
# For production, upload to object storage (S3/GCS) and pass the URL instead
import base64
payload = IngestSourcePayload(
source_type="pdf",
team_id=team_id,
channel_id=channel_id,
params={
"filename": file.filename,
"content": base64.b64encode(content).decode(),
},
)
return _dispatch(payload)
@router.get("/jobs/{job_id}", response_model=IngestJobResponse)
async def get_job_status(job_id: str) -> IngestJobResponse:
from ingestion.storage.supabase_store import get_job
record = get_job(job_id)
if record is None:
# Fall back to Celery task state if Supabase record not yet written
from ingestion.jobs.celery_app import celery_app
task = celery_app.AsyncResult(job_id)
state_map = {
"PENDING": IngestJobStatus.pending,
"STARTED": IngestJobStatus.running,
"SUCCESS": IngestJobStatus.completed,
"FAILURE": IngestJobStatus.failed,
}
status = state_map.get(task.state, IngestJobStatus.pending)
return IngestJobResponse(job_id=job_id, status=status)
return IngestJobResponse(
job_id=record["job_id"],
status=IngestJobStatus(record["status"]),
)