|
|
""" |
|
|
Patent upload and management endpoints |
|
|
""" |
|
|
|
|
|
from fastapi import APIRouter, UploadFile, File, HTTPException |
|
|
from fastapi.responses import FileResponse |
|
|
from pathlib import Path |
|
|
import uuid |
|
|
import shutil |
|
|
from datetime import datetime |
|
|
from typing import List, Dict |
|
|
from loguru import logger |
|
|
|
|
|
router = APIRouter() |
|
|
|
|
|
UPLOAD_DIR = Path("uploads/patents") |
|
|
UPLOAD_DIR.mkdir(parents=True, exist_ok=True) |
|
|
|
|
|
@router.post("/upload", response_model=Dict) |
|
|
async def upload_patent(file: UploadFile = File(...)): |
|
|
""" |
|
|
Upload a patent PDF for analysis. |
|
|
|
|
|
Args: |
|
|
file: PDF file to upload |
|
|
|
|
|
Returns: |
|
|
Patent metadata including unique ID |
|
|
""" |
|
|
logger.info(f"Received upload request for: {file.filename}") |
|
|
|
|
|
|
|
|
if not file.filename.endswith('.pdf'): |
|
|
raise HTTPException( |
|
|
status_code=400, |
|
|
detail="Only PDF files are supported. Please upload a .pdf file." |
|
|
) |
|
|
|
|
|
|
|
|
file.file.seek(0, 2) |
|
|
file_size = file.file.tell() |
|
|
file.file.seek(0) |
|
|
|
|
|
if file_size > 50 * 1024 * 1024: |
|
|
raise HTTPException( |
|
|
status_code=400, |
|
|
detail="File too large. Maximum size is 50MB." |
|
|
) |
|
|
|
|
|
try: |
|
|
|
|
|
patent_id = str(uuid.uuid4()) |
|
|
|
|
|
|
|
|
file_path = UPLOAD_DIR / f"{patent_id}.pdf" |
|
|
with file_path.open("wb") as buffer: |
|
|
shutil.copyfileobj(file.file, buffer) |
|
|
|
|
|
|
|
|
from api.main import app_state |
|
|
|
|
|
metadata = { |
|
|
"id": patent_id, |
|
|
"filename": file.filename, |
|
|
"path": str(file_path), |
|
|
"size": file_size, |
|
|
"uploaded_at": datetime.utcnow().isoformat(), |
|
|
"status": "uploaded", |
|
|
"workflow_id": None |
|
|
} |
|
|
|
|
|
app_state["patents"][patent_id] = metadata |
|
|
|
|
|
logger.success(f"✅ Patent uploaded: {patent_id} ({file.filename})") |
|
|
|
|
|
return { |
|
|
"patent_id": patent_id, |
|
|
"filename": file.filename, |
|
|
"size": file_size, |
|
|
"uploaded_at": metadata["uploaded_at"], |
|
|
"message": "Patent uploaded successfully" |
|
|
} |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"❌ Upload failed: {e}") |
|
|
raise HTTPException( |
|
|
status_code=500, |
|
|
detail=f"Upload failed: {str(e)}" |
|
|
) |
|
|
|
|
|
@router.get("/{patent_id}", response_model=Dict) |
|
|
async def get_patent(patent_id: str): |
|
|
""" |
|
|
Get patent metadata by ID. |
|
|
|
|
|
Args: |
|
|
patent_id: Unique patent identifier |
|
|
|
|
|
Returns: |
|
|
Patent metadata |
|
|
""" |
|
|
from api.main import app_state |
|
|
|
|
|
if patent_id not in app_state["patents"]: |
|
|
raise HTTPException( |
|
|
status_code=404, |
|
|
detail=f"Patent not found: {patent_id}" |
|
|
) |
|
|
|
|
|
return app_state["patents"][patent_id] |
|
|
|
|
|
@router.get("/", response_model=List[Dict]) |
|
|
async def list_patents( |
|
|
status: str = None, |
|
|
limit: int = 100, |
|
|
offset: int = 0 |
|
|
): |
|
|
""" |
|
|
List all uploaded patents. |
|
|
|
|
|
Args: |
|
|
status: Filter by status (uploaded, analyzing, analyzed, failed) |
|
|
limit: Maximum number of results |
|
|
offset: Pagination offset |
|
|
|
|
|
Returns: |
|
|
List of patent metadata |
|
|
""" |
|
|
from api.main import app_state |
|
|
|
|
|
patents = list(app_state["patents"].values()) |
|
|
|
|
|
|
|
|
if status: |
|
|
patents = [p for p in patents if p["status"] == status] |
|
|
|
|
|
|
|
|
patents.sort(key=lambda x: x["uploaded_at"], reverse=True) |
|
|
|
|
|
|
|
|
patents = patents[offset:offset + limit] |
|
|
|
|
|
return patents |
|
|
|
|
|
@router.delete("/{patent_id}") |
|
|
async def delete_patent(patent_id: str): |
|
|
""" |
|
|
Delete a patent and its associated files. |
|
|
|
|
|
Args: |
|
|
patent_id: Unique patent identifier |
|
|
|
|
|
Returns: |
|
|
Success message |
|
|
""" |
|
|
from api.main import app_state |
|
|
|
|
|
if patent_id not in app_state["patents"]: |
|
|
raise HTTPException( |
|
|
status_code=404, |
|
|
detail=f"Patent not found: {patent_id}" |
|
|
) |
|
|
|
|
|
try: |
|
|
patent = app_state["patents"][patent_id] |
|
|
|
|
|
|
|
|
file_path = Path(patent["path"]) |
|
|
if file_path.exists(): |
|
|
file_path.unlink() |
|
|
|
|
|
|
|
|
del app_state["patents"][patent_id] |
|
|
|
|
|
logger.info(f"Deleted patent: {patent_id}") |
|
|
|
|
|
return {"message": "Patent deleted successfully"} |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Delete failed: {e}") |
|
|
raise HTTPException( |
|
|
status_code=500, |
|
|
detail=f"Delete failed: {str(e)}" |
|
|
) |
|
|
|
|
|
@router.get("/{patent_id}/download") |
|
|
async def download_patent(patent_id: str): |
|
|
""" |
|
|
Download the original patent PDF. |
|
|
|
|
|
Args: |
|
|
patent_id: Unique patent identifier |
|
|
|
|
|
Returns: |
|
|
PDF file |
|
|
""" |
|
|
from api.main import app_state |
|
|
|
|
|
if patent_id not in app_state["patents"]: |
|
|
raise HTTPException( |
|
|
status_code=404, |
|
|
detail=f"Patent not found: {patent_id}" |
|
|
) |
|
|
|
|
|
patent = app_state["patents"][patent_id] |
|
|
file_path = Path(patent["path"]) |
|
|
|
|
|
if not file_path.exists(): |
|
|
raise HTTPException( |
|
|
status_code=404, |
|
|
detail="Patent file not found on disk" |
|
|
) |
|
|
|
|
|
return FileResponse( |
|
|
path=file_path, |
|
|
media_type="application/pdf", |
|
|
filename=patent["filename"] |
|
|
) |
|
|
|