RM / app /api /v1 /maps.py
trretretret's picture
Initial commit: Add research assistant application
b708f13
import logging
import time
from enum import Enum
from typing import List
from fastapi import APIRouter, Depends, Query, HTTPException, status
from fastapi.responses import StreamingResponse
from pydantic import BaseModel, Field
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
from app.api import deps
from app.models.user import User
from app.models.paper import Paper
from app.services.discovery.maps import discovery_map_service
from app.utils.converters import export_service
logger = logging.getLogger("rm_research.api.maps")
router = APIRouter()
class ExportFormat(str, Enum):
"""Supported citation formats for institutional export."""
BIBTEX = "bibtex"
RIS = "ris"
CSV = "csv"
class ExportRequest(BaseModel):
"""Payload for bulk exporting papers from a map view."""
paper_ids: List[str] = Field(..., min_length=1, max_length=5000)
# --- 1. The Visualization Endpoint (WebGL Optimized) ---
@router.get("/generate", summary="Generate WebGL-ready graph data for large-scale discovery")
async def generate_discovery_map(
seed_id: str = Query(..., description="The OpenAlex ID used as the map anchor"),
limit: int = Query(1000, ge=1, le=50000, description="Max node count"),
db: AsyncSession = Depends(deps.get_db),
current_user: User = Depends(deps.get_current_active_user)
):
"""
Fulfills Requirement 3.3: High-scale WebGL payloads for >10,000 nodes.
💰 Subscription Gating:
- Free: 1,000 nodes max.
- Premium: Up to 50,000 nodes.
"""
effective_limit = limit if current_user.is_premium else min(limit, 1000)
try:
# Build WebGL payload (nodes/edges/metadata)
# RESOLUTION: Stateless service call (Reviewer 1 #57)
return await discovery_map_service.build_webgl_graph(db, seed_id, effective_limit)
except Exception as e:
logger.exception(f"WebGL map generation failed for seed {seed_id}: {str(e)}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Strategic Discovery Map engine failed to generate the network graph."
)
# --- 2. The Institutional Export Endpoint ---
@router.post("/export/{format}", summary="Institutional metadata export")
async def export_discovery_map(
format: ExportFormat,
request: ExportRequest,
db: AsyncSession = Depends(deps.get_db),
current_user: User = Depends(deps.get_current_active_user)
):
"""
Fulfills Phase 6: BibTeX, RIS, and CSV export for institutional use.
RESOLUTION: Materialized Content Pattern (Reviewer 1 #71).
Fetches and resolves all data before streaming to prevent DB connection leaks.
"""
# 1. Fetch metadata and close DB context immediately
stmt = select(Paper).where(Paper.openalex_id.in_(request.paper_ids))
result = await db.execute(stmt)
papers = result.scalars().all()
if not papers:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Specified papers were not found in the local repository."
)
# 2. Convert and Materialize (Safe up to 5k items in memory)
# This ensures the DB session is released back to the pool before the stream starts.
if format == ExportFormat.BIBTEX:
content = export_service.to_bibtex(papers)
media_type = "application/x-bibtex"
elif format == ExportFormat.RIS:
content = export_service.to_ris(papers)
media_type = "application/x-research-info-systems"
else:
content = export_service.to_csv(papers)
media_type = "text/csv; charset=utf-8"
# 3. Stream pre-generated content
filename = f"rm_export_{int(time.time())}.{format.value}"
headers = {"Content-Disposition": f'attachment; filename="{filename}"'}
return StreamingResponse(
iter([content]), # Pass as iterator to ensure compliance with StreamingResponse
media_type=media_type,
headers=headers
)