RM / app /api /v1 /veritas.py
trretretret's picture
Initial commit: Add research assistant application
b708f13
from fastapi import APIRouter, Depends, HTTPException, BackgroundTasks, status
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
from typing import List, Optional, Dict, Any # 🔥 Added Dict, Any
from app.api import deps
from app.schemas.veritas import (
VeritasScanRequest,
IntegrityReport,
VeritasQuickSummary,
VeritasScanResponse
)
# 🔥 Import the service classes needed for initialization
from app.services.veritas.engine import VeritasEngine
from app.services.veritas.shield_one import SemanticFingerprinterAsync
from app.services.veritas.shield_two import ParaphraseDetector
from app.services.veritas.shield_three import ClaimVerifier
from app.tasks.veritas_scan import run_veritas_task
from app.models.audit import AuditRecord
from app.core.config import settings
router = APIRouter()
# 🔥 FIXED: Initialize sub-services first, then pass to VeritasEngine
semantic_svc = SemanticFingerprinterAsync(index_path=settings.VERITAS_LOCAL_INDEX_PATH)
structural_svc = ParaphraseDetector()
fact_svc = ClaimVerifier()
veritas_engine = VeritasEngine(
semantic_service=semantic_svc,
structural_service=structural_svc,
fact_service=fact_svc
)
@router.post("/check", response_model=Dict[str, Any]) # 🔥 Changed to Dict since run_quick_check returns dict
async def check_originality(
request: VeritasScanRequest,
current_user = Depends(deps.get_current_active_user)
):
"""
Real-time 'Adaptive' integrity check.
Triggered during writing (Mode A/B). Returns a high-level summary
of originality and semantic matches without full structural analysis.
"""
# 🔥 FIXED: Changed from .check_integrity() to .run_quick_check()
# 🔥 REMOVED: mode parameter (not supported by run_quick_check)
result = await veritas_engine.run_quick_check(
text=request.text,
user_prior_work=request.user_prior_work
)
return result
@router.post("/deep-scan", status_code=status.HTTP_202_ACCEPTED)
async def trigger_deep_scan(
request: VeritasScanRequest,
background_tasks: BackgroundTasks,
db: AsyncSession = Depends(deps.get_db),
current_user = Depends(deps.get_current_active_user)
):
"""
Triggers a 'Doctoral-Grade' deep integrity audit.
Since this process involves cross-encoding and NLI claim verification
(10-30 seconds), it is executed as a background task.
"""
# 1. Create initial audit record
new_audit = AuditRecord(
user_id=current_user.id,
status="pending",
mode="deep"
)
db.add(new_audit)
await db.commit()
await db.refresh(new_audit)
# 2. Enqueue background task
background_tasks.add_task(
run_veritas_task,
document_id=new_audit.document_id,
text=request.text,
prior_work=request.user_prior_work
)
return {"document_id": new_audit.document_id, "status": "queued"}
@router.get("/report/{document_id}", response_model=IntegrityReport)
async def get_integrity_report(
document_id: str,
db: AsyncSession = Depends(deps.get_db),
current_user = Depends(deps.get_current_active_user)
):
"""
Retrieves the completed 'Doctoral-Grade' integrity report.
"""
result = await db.execute(
select(AuditRecord).where(
AuditRecord.document_id == document_id,
AuditRecord.user_id == current_user.id
)
)
audit = result.scalar_one_or_none()
if not audit:
raise HTTPException(status_code=404, detail="Report not found")
if audit.status != "completed":
raise HTTPException(
status_code=400,
detail=f"Report is not ready. Current status: {audit.status}"
)
return audit.report_json
@router.get("/status/{document_id}")
async def get_scan_status(
document_id: str,
db: AsyncSession = Depends(deps.get_db),
current_user = Depends(deps.get_current_active_user)
):
"""
Pollable endpoint for checking the progress of a deep scan.
"""
result = await db.execute(
select(AuditRecord.status, AuditRecord.overall_score).where(
AuditRecord.document_id == document_id,
AuditRecord.user_id == current_user.id
)
)
row = result.fetchone()
if not row:
raise HTTPException(status_code=404, detail="Audit not found")
return {"status": row.status, "score": row.overall_score}