File size: 4,014 Bytes
255cbd1 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 | import os
import tempfile
import time
from pathlib import Path
from datetime import datetime
from fastapi import FastAPI, UploadFile, File, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from dotenv import load_dotenv
load_dotenv(".env")
# Agents
from backend.agents.file_discovery import FileDiscoveryAgent, FileDiscoveryInput
from backend.agents.document_parsing import DocumentParsingAgent, DocumentParsingInput
from backend.agents.table_extraction import TableExtractionAgent, TableExtractionInput
from backend.agents.media_extraction import MediaExtractionAgent, MediaExtractionInput
from backend.agents.indexing import IndexingAgent, IndexingInput
from backend.agents.schema_mapping_simple import SchemaMappingAgent
from backend.models.schemas import SchemaMappingInput
from backend.agents.validation_agent import ValidationAgent
from backend.models.schemas import ValidationInput as ValidationInputSchema
from backend.utils.storage_manager import StorageManager
from backend.models.schemas import PageIndex
app = FastAPI(title="Digi-Biz API")
# Allow CORS for Next.js
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
def generate_job_id() -> str:
return f"job_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
@app.post("/upload")
async def process_zip(file: UploadFile = File(...)):
if not file.filename.endswith('.zip'):
raise HTTPException(status_code=400, detail="Must be a ZIP file")
job_id = generate_job_id()
temp_dir = Path(tempfile.gettempdir()) / "digi_biz" / job_id
temp_dir.mkdir(parents=True, exist_ok=True)
zip_path = temp_dir / file.filename
with open(zip_path, "wb") as f:
f.write(await file.read())
storage_manager = StorageManager(storage_base=str(temp_dir))
print("Step 1: File Discovery")
discovery_agent = FileDiscoveryAgent(storage_manager=storage_manager)
discovery_output = discovery_agent.discover(
FileDiscoveryInput(zip_file_path=str(zip_path), job_id=job_id)
)
if not discovery_output.success:
raise HTTPException(status_code=500, detail="File discovery failed")
print("Step 2: Document Parsing")
parsing_agent = DocumentParsingAgent(enable_ocr=False)
parsing_output = parsing_agent.parse(
DocumentParsingInput(documents=discovery_output.documents, job_id=job_id, enable_ocr=False)
)
print("Step 3: Table Extraction")
table_agent = TableExtractionAgent()
tables_output = table_agent.extract(
TableExtractionInput(parsed_documents=parsing_output.parsed_documents, job_id=job_id)
)
print("Step 4: Media Extraction")
media_agent = MediaExtractionAgent(enable_deduplication=False)
media_output = media_agent.extract_all(
MediaExtractionInput(
parsed_documents=parsing_output.parsed_documents,
standalone_files=[img.file_path for img in discovery_output.images],
job_id=job_id
)
)
print("Step 5: Indexing")
indexing_agent = IndexingAgent()
page_index = indexing_agent.build_index(
IndexingInput(
parsed_documents=parsing_output.parsed_documents,
tables=tables_output.tables,
images=media_output.media.images if media_output.success else [],
job_id=job_id
)
)
print("Step 6: Schema Mapping")
schema_agent = SchemaMappingAgent()
mapping_output = schema_agent.map_to_schema(
SchemaMappingInput(page_index=page_index, job_id=job_id)
)
if not mapping_output.success:
raise HTTPException(status_code=500, detail=f"Schema mapping failed: {mapping_output.errors}")
print("Step 7: Validation")
val_agent = ValidationAgent()
val_out = val_agent.validate(
ValidationInputSchema(profile=mapping_output.profile, job_id=job_id)
)
return val_out.model_dump(mode="json")
|