Spaces:
Runtime error
Runtime error
| """ | |
| Router de processamento de acórdãos | |
| ENDPOINT PRINCIPAL: Upload JSONL → Processamento → Download TAR.GZ | |
| """ | |
| from fastapi import APIRouter, UploadFile, File, BackgroundTasks, HTTPException, Query | |
| from fastapi.responses import FileResponse | |
| import uuid | |
| import json | |
| from pathlib import Path | |
| from datetime import datetime | |
| import hashlib | |
| from api.models.requests import ProcessingOptionsRequest | |
| from api.models.responses import ProcessingResponse, ProcessingStatus, FileInfoResponse | |
| from api.services.processing_service import ProcessingService | |
| from api.utils.logger import setup_logger | |
| from api.config import get_settings | |
| router = APIRouter() | |
| logger = setup_logger(__name__) | |
| settings = get_settings() | |
| # Storage de tasks (em produção usar Redis ou Database) | |
| processing_tasks = {} | |
| async def upload_and_process( | |
| background_tasks: BackgroundTasks, | |
| file: UploadFile = File(..., description="Arquivo JSONL com acórdãos"), | |
| llm_provider: str = Query("groq", description="Provedor LLM (groq/openai/anthropic)"), | |
| model_type: str = Query("balanced", description="Tipo de modelo (fast/balanced/quality)"), | |
| enable_parallel: bool = Query(True, description="Processar em paralelo"), | |
| max_workers: int = Query(3, ge=1, le=10, description="Workers paralelos"), | |
| save_to_db: bool = Query(False, description="Salvar resultados no banco") | |
| ): | |
| """ | |
| **Upload de arquivo JSONL e início do processamento em background.** | |
| ## Fluxo: | |
| 1. Upload do arquivo JSONL | |
| 2. Validação do formato | |
| 3. Criação de task de processamento | |
| 4. Processamento em background (9 especialistas) | |
| 5. Geração de arquivo TAR.GZ com resultados | |
| ## Formato JSONL esperado: | |
| ```json | |
| {"acordao_id": "001", "tribunal": "TJPR", "ementa": "...", "integra": "..."} | |
| {"acordao_id": "002", "tribunal": "TJSP", "ementa": "...", "integra": "..."} | |
| ``` | |
| ## Response: | |
| - **task_id**: ID único para consultar status | |
| - **status**: Status inicial (pending) | |
| - Use `/process/status/{task_id}` para acompanhar | |
| - Use `/process/download/{task_id}` para baixar resultados | |
| """ | |
| # Validar extensão | |
| if not file.filename.endswith(('.jsonl', '.json')): | |
| raise HTTPException( | |
| status_code=400, | |
| detail="Arquivo deve ser .jsonl ou .json" | |
| ) | |
| # Validar tamanho | |
| content = await file.read() | |
| size_mb = len(content) / (1024 * 1024) | |
| if size_mb > settings.MAX_UPLOAD_SIZE_MB: | |
| raise HTTPException( | |
| status_code=413, | |
| detail=f"Arquivo muito grande: {size_mb:.2f}MB (máx: {settings.MAX_UPLOAD_SIZE_MB}MB)" | |
| ) | |
| # Criar task ID | |
| task_id = f"task-{uuid.uuid4()}" | |
| # Criar diretórios | |
| upload_dir = Path(settings.UPLOAD_PATH) | |
| upload_dir.mkdir(parents=True, exist_ok=True) | |
| # Salvar arquivo | |
| file_path = upload_dir / f"{task_id}_{file.filename}" | |
| file_path.write_bytes(content) | |
| # Calcular hash | |
| file_hash = hashlib.sha256(content).hexdigest() | |
| # Validar e contar registros | |
| total_records = 0 | |
| try: | |
| for line_num, line in enumerate(content.decode('utf-8').strip().split('\n'), 1): | |
| if not line.strip(): | |
| continue | |
| try: | |
| record = json.loads(line) | |
| # Validar campos obrigatórios | |
| if 'ementa' not in record or 'integra' not in record: | |
| raise HTTPException( | |
| status_code=422, | |
| detail=f"Linha {line_num}: campos obrigatórios ausentes (ementa, integra)" | |
| ) | |
| total_records += 1 | |
| except json.JSONDecodeError as e: | |
| raise HTTPException( | |
| status_code=422, | |
| detail=f"JSONL inválido na linha {line_num}: {str(e)}" | |
| ) | |
| except UnicodeDecodeError: | |
| raise HTTPException( | |
| status_code=422, | |
| detail="Arquivo deve estar em UTF-8" | |
| ) | |
| # Criar response inicial | |
| response = ProcessingResponse( | |
| task_id=task_id, | |
| status=ProcessingStatus.PENDING, | |
| message=f"Processamento agendado para {total_records} registros", | |
| total_records=total_records, | |
| processed_records=0, | |
| failed_records=0, | |
| started_at=datetime.now() | |
| ) | |
| # Armazenar task | |
| processing_tasks[task_id] = response.dict() | |
| # Adicionar metadados | |
| processing_tasks[task_id]['metadata'] = { | |
| 'filename': file.filename, | |
| 'size_bytes': len(content), | |
| 'hash_sha256': file_hash, | |
| 'llm_provider': llm_provider, | |
| 'model_type': model_type, | |
| 'enable_parallel': enable_parallel, | |
| 'max_workers': max_workers, | |
| 'save_to_db': save_to_db | |
| } | |
| # Iniciar processamento em background | |
| background_tasks.add_task( | |
| process_acordaos_background, | |
| task_id=task_id, | |
| file_path=str(file_path), | |
| llm_provider=llm_provider, | |
| model_type=model_type, | |
| enable_parallel=enable_parallel, | |
| max_workers=max_workers, | |
| save_to_db=save_to_db | |
| ) | |
| logger.info(f"✅ Task {task_id} criada - {total_records} registros - {size_mb:.2f}MB") | |
| return response | |
| async def get_processing_status(task_id: str): | |
| """ | |
| **Consulta status de processamento.** | |
| Retorna informações atualizadas sobre a task: | |
| - Status atual (pending/processing/completed/error) | |
| - Progresso (processados/total) | |
| - Tempo estimado restante | |
| - URL de download (quando concluído) | |
| """ | |
| if task_id not in processing_tasks: | |
| raise HTTPException( | |
| status_code=404, | |
| detail=f"Task '{task_id}' não encontrada" | |
| ) | |
| return ProcessingResponse(**processing_tasks[task_id]) | |
| async def list_all_tasks(): | |
| """ | |
| **Lista todas as tasks de processamento.** | |
| Útil para debug e monitoramento. | |
| """ | |
| return { | |
| "total": len(processing_tasks), | |
| "tasks": [ | |
| { | |
| "task_id": task_id, | |
| "status": data["status"], | |
| "progress": f"{data['processed_records']}/{data['total_records']}", | |
| "started_at": data.get("started_at") | |
| } | |
| for task_id, data in processing_tasks.items() | |
| ] | |
| } | |
| async def download_result(task_id: str): | |
| """ | |
| **Download do arquivo TAR.GZ com resultados.** | |
| Disponível apenas quando status = "completed". | |
| ## Conteúdo do arquivo: | |
| - `{task_id}_results.json`: Resultados completos | |
| - Análises de cada especialista | |
| - Metadados do processamento | |
| - Logs e métricas | |
| """ | |
| if task_id not in processing_tasks: | |
| raise HTTPException( | |
| status_code=404, | |
| detail=f"Task '{task_id}' não encontrada" | |
| ) | |
| task = processing_tasks[task_id] | |
| if task['status'] != ProcessingStatus.COMPLETED: | |
| raise HTTPException( | |
| status_code=400, | |
| detail=f"Processamento ainda não concluído. Status atual: {task['status']}" | |
| ) | |
| # Procurar arquivo | |
| output_file = Path(settings.OUTPUT_PATH) / "archives" / f"{task_id}.tar.gz" | |
| if not output_file.exists(): | |
| raise HTTPException( | |
| status_code=404, | |
| detail="Arquivo de resultado não encontrado" | |
| ) | |
| logger.info(f"📦 Download iniciado: {task_id}") | |
| return FileResponse( | |
| path=str(output_file), | |
| filename=f"para_ai_resultado_{task_id}.tar.gz", | |
| media_type="application/gzip", | |
| headers={ | |
| "Content-Disposition": f"attachment; filename=para_ai_resultado_{task_id}.tar.gz" | |
| } | |
| ) | |
| async def delete_task(task_id: str): | |
| """ | |
| **Deleta uma task e seus arquivos.** | |
| Útil para limpeza de tasks antigas. | |
| """ | |
| if task_id not in processing_tasks: | |
| raise HTTPException( | |
| status_code=404, | |
| detail=f"Task '{task_id}' não encontrada" | |
| ) | |
| # Remover arquivos | |
| upload_dir = Path(settings.UPLOAD_PATH) | |
| output_dir = Path(settings.OUTPUT_PATH) | |
| for file in upload_dir.glob(f"{task_id}_*"): | |
| file.unlink() | |
| for file in output_dir.glob(f"{task_id}*"): | |
| file.unlink() | |
| # Remover da memória | |
| del processing_tasks[task_id] | |
| logger.info(f"🗑️ Task deletada: {task_id}") | |
| return {"message": f"Task {task_id} deletada com sucesso"} | |
| # ============================================================================ | |
| # FUNÇÃO DE BACKGROUND | |
| # ============================================================================ | |
| async def process_acordaos_background( | |
| task_id: str, | |
| file_path: str, | |
| llm_provider: str, | |
| model_type: str, | |
| enable_parallel: bool, | |
| max_workers: int, | |
| save_to_db: bool | |
| ): | |
| """ | |
| Função executada em background para processar acórdãos. | |
| Atualiza o status da task conforme progresso. | |
| """ | |
| try: | |
| # Atualizar status | |
| processing_tasks[task_id]['status'] = ProcessingStatus.PROCESSING | |
| processing_tasks[task_id]['message'] = "Processamento em andamento..." | |
| logger.info(f"🚀 Iniciando processamento background: {task_id}") | |
| # Inicializar serviço | |
| service = ProcessingService() | |
| # Processar | |
| result = await service.process_jsonl_file( | |
| file_path=file_path, | |
| task_id=task_id, | |
| llm_provider=llm_provider, | |
| model_type=model_type, | |
| enable_parallel=enable_parallel, | |
| max_workers=max_workers | |
| ) | |
| # Atualizar task com sucesso | |
| processing_tasks[task_id].update({ | |
| 'status': ProcessingStatus.COMPLETED, | |
| 'message': f"Processamento concluído com sucesso em {result['elapsed_seconds']:.2f}s", | |
| 'processed_records': result['processed'], | |
| 'failed_records': result['failed'], | |
| 'completed_at': datetime.now(), | |
| 'download_url': f"/api/v1/process/download/{task_id}", | |
| 'result_metadata': { | |
| 'archive_path': result['archive_path'], | |
| 'hash_sha256': result['hash'], | |
| 'elapsed_seconds': result['elapsed_seconds'] | |
| } | |
| }) | |
| logger.info(f"✅ Task {task_id} concluída - {result['processed']} processados, {result['failed']} falhas") | |
| except Exception as e: | |
| # Atualizar task com erro | |
| logger.error(f"❌ Erro na task {task_id}: {str(e)}", exc_info=True) | |
| processing_tasks[task_id].update({ | |
| 'status': ProcessingStatus.ERROR, | |
| 'message': f"Erro no processamento: {str(e)}", | |
| 'completed_at': datetime.now(), | |
| 'errors': [str(e)] | |
| }) |