bluenevus commited on
Commit
06e6609
·
verified ·
1 Parent(s): f5ad955

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +718 -5
app.py CHANGED
@@ -1,7 +1,720 @@
1
- import gradio as gr
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
2
 
3
- def greet(name):
4
- return "Hello " + name + "!!"
 
 
 
 
 
 
 
 
5
 
6
- demo = gr.Interface(fn=greet, inputs="text", outputs="text")
7
- demo.launch()
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from fastapi import FastAPI, File, UploadFile, HTTPException, Depends, BackgroundTasks, Security, status
2
+ from fastapi.security import APIKeyHeader
3
+ from fastapi.responses import JSONResponse, FileResponse
4
+ from fastapi.middleware.cors import CORSMiddleware
5
+ from pydantic import BaseModel, Field, validator
6
+ from typing import List, Optional, Dict, Any, Tuple
7
+ import asyncio
8
+ import aiofiles
9
+ from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
10
+ import hashlib
11
+ import uuid
12
+ from datetime import datetime, timedelta
13
+ import os
14
+ import shutil
15
+ from pathlib import Path
16
+ import pikepdf # Python wrapper for qpdf
17
+ import io
18
+ import logging
19
+ from queue import Queue, PriorityQueue
20
+ import threading
21
+ import time
22
+ from enum import Enum
23
+ from contextlib import asynccontextmanager
24
+ import tempfile
25
+ import traceback
26
+ import json
27
+ from dataclasses import dataclass, asdict
28
+ import redis
29
+ from celery import Celery
30
 
31
+ # Configure logging
32
+ logging.basicConfig(
33
+ level=logging.INFO,
34
+ format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
35
+ handlers=[
36
+ logging.FileHandler('pdf_processor.log'),
37
+ logging.StreamHandler()
38
+ ]
39
+ )
40
+ logger = logging.getLogger(__name__)
41
 
42
+ # Configuration
43
+ class Config:
44
+ MAX_FILE_SIZE_MB = 5
45
+ CHUNK_SIZE_MB = 4.5
46
+ MAX_FILE_SIZE_BYTES = MAX_FILE_SIZE_MB * 1024 * 1024
47
+ CHUNK_SIZE_BYTES = int(CHUNK_SIZE_MB * 1024 * 1024)
48
+ UPLOAD_DIR = Path("uploads")
49
+ OUTPUT_DIR = Path("outputs")
50
+ TEMP_DIR = Path("temp")
51
+ MAX_WORKERS = min(32, (os.cpu_count() or 1) * 2)
52
+ MAX_QUEUE_SIZE = 1000
53
+ API_KEY_HEADER = "X-API-Key"
54
+ REDIS_URL = os.getenv("REDIS_URL", "redis://localhost:6379")
55
+ CELERY_BROKER_URL = os.getenv("CELERY_BROKER_URL", "redis://localhost:6379/0")
56
+ CELERY_RESULT_BACKEND = os.getenv("CELERY_RESULT_BACKEND", "redis://localhost:6379/1")
57
+
58
+ # qpdf specific settings
59
+ QPDF_COMPRESSION_LEVEL = 9 # Maximum compression
60
+ QPDF_STREAM_DATA = "compress" # Keep streams compressed
61
+ QPDF_OBJECT_STREAMS = "generate" # Generate object streams for better compression
62
+
63
+ # Create directories
64
+ for dir_path in [Config.UPLOAD_DIR, Config.OUTPUT_DIR, Config.TEMP_DIR]:
65
+ dir_path.mkdir(parents=True, exist_ok=True)
66
+
67
+ # Initialize Redis
68
+ redis_client = redis.from_url(Config.REDIS_URL, decode_responses=True)
69
+
70
+ # Initialize Celery
71
+ celery_app = Celery(
72
+ 'pdf_processor',
73
+ broker=Config.CELERY_BROKER_URL,
74
+ backend=Config.CELERY_RESULT_BACKEND
75
+ )
76
+
77
+ # API Key Management
78
+ class APIKeyManager:
79
+ def __init__(self):
80
+ self.valid_keys = self._load_api_keys()
81
+
82
+ def _load_api_keys(self) -> Dict[str, Dict]:
83
+ """Load API keys from Redis or environment"""
84
+ keys = {}
85
+ # Load from environment for demo
86
+ demo_keys = os.getenv("API_KEYS", "demo-key-123,test-key-456").split(",")
87
+ for key in demo_keys:
88
+ keys[key] = {
89
+ "created_at": datetime.utcnow().isoformat(),
90
+ "rate_limit": 100,
91
+ "active": True
92
+ }
93
+ return keys
94
+
95
+ def validate_key(self, api_key: str) -> bool:
96
+ """Validate API key and check rate limits"""
97
+ if api_key not in self.valid_keys:
98
+ return False
99
+
100
+ key_info = self.valid_keys[api_key]
101
+ if not key_info.get("active", False):
102
+ return False
103
+
104
+ # Check rate limit using Redis
105
+ key_name = f"rate_limit:{api_key}"
106
+ try:
107
+ current_count = redis_client.incr(key_name)
108
+ if current_count == 1:
109
+ redis_client.expire(key_name, 3600) # 1 hour window
110
+
111
+ if current_count > key_info.get("rate_limit", 100):
112
+ return False
113
+ except Exception as e:
114
+ logger.error(f"Rate limit check failed: {e}")
115
+
116
+ return True
117
+
118
+ # Initialize API Key Manager
119
+ api_key_manager = APIKeyManager()
120
+
121
+ # Job Status Enum
122
+ class JobStatus(str, Enum):
123
+ PENDING = "pending"
124
+ PROCESSING = "processing"
125
+ COMPLETED = "completed"
126
+ FAILED = "failed"
127
+ CANCELLED = "cancelled"
128
+
129
+ # Job Model
130
+ @dataclass
131
+ class ProcessingJob:
132
+ job_id: str
133
+ api_key: str
134
+ filename: str
135
+ status: JobStatus
136
+ created_at: datetime
137
+ updated_at: datetime
138
+ total_pages: Optional[int] = None
139
+ segments_created: Optional[int] = None
140
+ segments_discarded: Optional[int] = None
141
+ error_message: Optional[str] = None
142
+ output_files: List[str] = None
143
+ processing_time: Optional[float] = None
144
+
145
+ def __post_init__(self):
146
+ if self.output_files is None:
147
+ self.output_files = []
148
+
149
+ # PDF Processor with qpdf
150
+ class QPDFProcessor:
151
+ def __init__(self):
152
+ self.executor = ThreadPoolExecutor(max_workers=Config.MAX_WORKERS)
153
+
154
+ async def split_pdf_with_qpdf(self,
155
+ input_path: Path,
156
+ output_dir: Path,
157
+ job_id: str) -> Tuple[List[Path], List[Path], Dict]:
158
+ """
159
+ Split PDF using qpdf for efficient handling of compressed PDFs
160
+ """
161
+ try:
162
+ start_time = time.time()
163
+ kept_files = []
164
+ discarded_files = []
165
+ stats = {
166
+ "total_pages": 0,
167
+ "segments_created": 0,
168
+ "segments_discarded": 0,
169
+ "compression_ratio": 0
170
+ }
171
+
172
+ # Open PDF with pikepdf (qpdf wrapper)
173
+ with pikepdf.open(input_path) as pdf:
174
+ total_pages = len(pdf.pages)
175
+ stats["total_pages"] = total_pages
176
+
177
+ # Calculate pages per segment based on file size
178
+ file_size = input_path.stat().st_size
179
+ avg_page_size = file_size / total_pages
180
+ pages_per_segment = max(1, int(Config.CHUNK_SIZE_BYTES / avg_page_size))
181
+
182
+ segment_num = 0
183
+ page_start = 0
184
+
185
+ while page_start < total_pages:
186
+ page_end = min(page_start + pages_per_segment, total_pages)
187
+ segment_num += 1
188
+
189
+ # Create output path for segment
190
+ segment_filename = f"{job_id}_segment_{segment_num:04d}.pdf"
191
+ segment_path = output_dir / segment_filename
192
+
193
+ # Create new PDF with selected pages
194
+ segment_pdf = pikepdf.new()
195
+
196
+ # Copy pages efficiently without decompressing
197
+ for page_num in range(page_start, page_end):
198
+ segment_pdf.pages.append(pdf.pages[page_num])
199
+
200
+ # Save with compression settings
201
+ segment_pdf.save(
202
+ segment_path,
203
+ compress_streams=True,
204
+ stream_decode_level=pikepdf.StreamDecodeLevel.none, # Don't decode streams
205
+ object_stream_mode=pikepdf.ObjectStreamMode.generate,
206
+ linearize=True, # Web optimization
207
+ min_version=pdf.pdf_version
208
+ )
209
+
210
+ # Check segment size
211
+ segment_size = segment_path.stat().st_size
212
+
213
+ if segment_size <= Config.MAX_FILE_SIZE_BYTES:
214
+ kept_files.append(segment_path)
215
+ stats["segments_created"] += 1
216
+ logger.info(f"Created segment {segment_num}: {segment_size / 1024 / 1024:.2f} MB")
217
+ else:
218
+ # Try to re-split if segment is too large
219
+ if pages_per_segment > 1:
220
+ # Recursively split this segment
221
+ logger.warning(f"Segment {segment_num} too large ({segment_size / 1024 / 1024:.2f} MB), re-splitting...")
222
+ segment_path.unlink() # Delete oversized segment
223
+
224
+ # Adjust pages per segment
225
+ pages_per_segment = max(1, pages_per_segment // 2)
226
+ continue
227
+ else:
228
+ # Single page is too large, discard
229
+ discarded_files.append(segment_path)
230
+ stats["segments_discarded"] += 1
231
+ logger.warning(f"Discarded segment {segment_num}: {segment_size / 1024 / 1024:.2f} MB")
232
+
233
+ page_start = page_end
234
+
235
+ # Calculate compression ratio
236
+ original_size = input_path.stat().st_size
237
+ total_output_size = sum(f.stat().st_size for f in kept_files)
238
+ if original_size > 0:
239
+ stats["compression_ratio"] = (1 - total_output_size / original_size) * 100
240
+
241
+ stats["processing_time"] = time.time() - start_time
242
+
243
+ return kept_files, discarded_files, stats
244
+
245
+ except Exception as e:
246
+ logger.error(f"Error splitting PDF with qpdf: {str(e)}")
247
+ raise
248
+
249
+ async def optimize_pdf(self, input_path: Path, output_path: Path) -> Path:
250
+ """
251
+ Optimize PDF using qpdf's advanced features
252
+ """
253
+ try:
254
+ with pikepdf.open(input_path) as pdf:
255
+ # Remove unnecessary elements
256
+ pdf.remove_unreferenced_resources()
257
+
258
+ # Save with maximum optimization
259
+ pdf.save(
260
+ output_path,
261
+ compress_streams=True,
262
+ object_stream_mode=pikepdf.ObjectStreamMode.generate,
263
+ linearize=True,
264
+ recompress_flate=True,
265
+ deterministic_id=True
266
+ )
267
+
268
+ return output_path
269
+
270
+ except Exception as e:
271
+ logger.error(f"Error optimizing PDF: {str(e)}")
272
+ raise
273
+
274
+ # Job Queue Manager
275
+ class JobQueueManager:
276
+ def __init__(self):
277
+ self.queue = PriorityQueue(maxsize=Config.MAX_QUEUE_SIZE)
278
+ self.jobs: Dict[str, ProcessingJob] = {}
279
+ self.lock = threading.Lock()
280
+ self.processor = QPDFProcessor()
281
+ self.processing_thread = threading.Thread(target=self._process_queue, daemon=True)
282
+ self.processing_thread.start()
283
+
284
+ def add_job(self, job: ProcessingJob) -> str:
285
+ """Add job to queue"""
286
+ with self.lock:
287
+ if self.queue.full():
288
+ raise HTTPException(
289
+ status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
290
+ detail="Queue is full. Please try again later."
291
+ )
292
+
293
+ priority = 1 if job.api_key in ["premium-key"] else 2
294
+ self.queue.put((priority, job.created_at, job.job_id))
295
+ self.jobs[job.job_id] = job
296
+
297
+ # Store in Redis for persistence
298
+ redis_client.setex(
299
+ f"job:{job.job_id}",
300
+ 86400, # 24 hours TTL
301
+ json.dumps(asdict(job), default=str)
302
+ )
303
+
304
+ return job.job_id
305
+
306
+ def get_job_status(self, job_id: str) -> Optional[ProcessingJob]:
307
+ """Get job status"""
308
+ with self.lock:
309
+ if job_id in self.jobs:
310
+ return self.jobs[job_id]
311
+
312
+ # Try to get from Redis
313
+ job_data = redis_client.get(f"job:{job_id}")
314
+ if job_data:
315
+ return ProcessingJob(**json.loads(job_data))
316
+
317
+ return None
318
+
319
+ def _process_queue(self):
320
+ """Background thread to process queue"""
321
+ while True:
322
+ try:
323
+ if not self.queue.empty():
324
+ _, _, job_id = self.queue.get(timeout=1)
325
+
326
+ with self.lock:
327
+ job = self.jobs.get(job_id)
328
+
329
+ if job and job.status == JobStatus.PENDING:
330
+ asyncio.run(self._process_job(job))
331
+
332
+ time.sleep(0.1)
333
+
334
+ except Exception as e:
335
+ logger.error(f"Queue processing error: {str(e)}")
336
+
337
+ async def _process_job(self, job: ProcessingJob):
338
+ """Process a single job"""
339
+ try:
340
+ start_time = time.time()
341
+
342
+ # Update job status
343
+ job.status = JobStatus.PROCESSING
344
+ job.updated_at = datetime.utcnow()
345
+ self._update_job(job)
346
+
347
+ # Process PDF
348
+ input_path = Config.UPLOAD_DIR / job.filename
349
+ output_dir = Config.OUTPUT_DIR / job.job_id
350
+ output_dir.mkdir(parents=True, exist_ok=True)
351
+
352
+ # Split PDF using qpdf
353
+ kept_files, discarded_files, stats = await self.processor.split_pdf_with_qpdf(
354
+ input_path, output_dir, job.job_id
355
+ )
356
+
357
+ # Update job with results
358
+ job.status = JobStatus.COMPLETED
359
+ job.total_pages = stats["total_pages"]
360
+ job.segments_created = stats["segments_created"]
361
+ job.segments_discarded = stats["segments_discarded"]
362
+ job.output_files = [str(f.name) for f in kept_files]
363
+ job.processing_time = time.time() - start_time
364
+ job.updated_at = datetime.utcnow()
365
+
366
+ # Clean up discarded files
367
+ for file in discarded_files:
368
+ try:
369
+ file.unlink()
370
+ except Exception as e:
371
+ logger.error(f"Error deleting discarded file: {e}")
372
+
373
+ self._update_job(job)
374
+ logger.info(f"Job {job.job_id} completed successfully")
375
+
376
+ except Exception as e:
377
+ logger.error(f"Job processing failed: {str(e}")
378
+ job.status = JobStatus.FAILED
379
+ job.error_message = str(e)
380
+ job.updated_at = datetime.utcnow()
381
+ self._update_job(job)
382
+
383
+ def _update_job(self, job: ProcessingJob):
384
+ """Update job in memory and Redis"""
385
+ with self.lock:
386
+ self.jobs[job.job_id] = job
387
+ redis_client.setex(
388
+ f"job:{job.job_id}",
389
+ 86400,
390
+ json.dumps(asdict(job), default=str)
391
+ )
392
+
393
+ # Initialize Job Queue Manager
394
+ job_queue_manager = JobQueueManager()
395
+
396
+ # Lifespan context manager for startup/shutdown
397
+ @asynccontextmanager
398
+ async def lifespan(app: FastAPI):
399
+ # Startup
400
+ logger.info("Starting PDF Processor API")
401
+ yield
402
+ # Shutdown
403
+ logger.info("Shutting down PDF Processor API")
404
+
405
+ # Initialize FastAPI app with OpenAPI documentation
406
+ app = FastAPI(
407
+ title="PDF Splitter API",
408
+ description="High-performance API for splitting large PDFs into segments using qpdf",
409
+ version="1.0.0",
410
+ docs_url="/docs",
411
+ redoc_url="/redoc",
412
+ openapi_url="/openapi.json",
413
+ lifespan=lifespan
414
+ )
415
+
416
+ # Add CORS middleware
417
+ app.add_middleware(
418
+ CORSMiddleware,
419
+ allow_origins=["*"],
420
+ allow_credentials=True,
421
+ allow_methods=["*"],
422
+ allow_headers=["*"],
423
+ )
424
+
425
+ # Security dependency
426
+ api_key_header = APIKeyHeader(name=Config.API_KEY_HEADER, auto_error=False)
427
+
428
+ async def verify_api_key(api_key: str = Security(api_key_header)) -> str:
429
+ """Verify API key"""
430
+ if not api_key:
431
+ raise HTTPException(
432
+ status_code=status.HTTP_401_UNAUTHORIZED,
433
+ detail="API key required"
434
+ )
435
+
436
+ if not api_key_manager.validate_key(api_key):
437
+ raise HTTPException(
438
+ status_code=status.HTTP_403_FORBIDDEN,
439
+ detail="Invalid or rate-limited API key"
440
+ )
441
+
442
+ return api_key
443
+
444
+ # Response Models
445
+ class JobResponse(BaseModel):
446
+ job_id: str = Field(..., description="Unique job identifier")
447
+ status: JobStatus = Field(..., description="Current job status")
448
+ message: str = Field(..., description="Status message")
449
+ created_at: str = Field(..., description="Job creation timestamp")
450
+
451
+ class JobStatusResponse(BaseModel):
452
+ job_id: str
453
+ status: JobStatus
454
+ filename: str
455
+ created_at: str
456
+ updated_at: str
457
+ total_pages: Optional[int] = None
458
+ segments_created: Optional[int] = None
459
+ segments_discarded: Optional[int] = None
460
+ error_message: Optional[str] = None
461
+ output_files: List[str] = []
462
+ processing_time: Optional[float] = None
463
+
464
+ class ErrorResponse(BaseModel):
465
+ detail: str
466
+ error_code: Optional[str] = None
467
+ timestamp: str = Field(default_factory=lambda: datetime.utcnow().isoformat())
468
+
469
+ # API Endpoints
470
+ @app.post(
471
+ "/api/v1/upload",
472
+ response_model=JobResponse,
473
+ status_code=status.HTTP_202_ACCEPTED,
474
+ summary="Upload PDF for splitting",
475
+ description="Upload a large PDF file to be split into 4.5MB segments",
476
+ responses={
477
+ 202: {"description": "Job accepted and queued for processing"},
478
+ 400: {"model": ErrorResponse, "description": "Invalid file or request"},
479
+ 401: {"model": ErrorResponse, "description": "Missing or invalid API key"},
480
+ 403: {"model": ErrorResponse, "description": "Rate limit exceeded"},
481
+ 413: {"model": ErrorResponse, "description": "File too large"},
482
+ 503: {"model": ErrorResponse, "description": "Service unavailable"}
483
+ }
484
+ )
485
+ async def upload_pdf(
486
+ background_tasks: BackgroundTasks,
487
+ file: UploadFile = File(..., description="PDF file to upload"),
488
+ api_key: str = Depends(verify_api_key)
489
+ ):
490
+ """
491
+ Upload a PDF file for splitting into segments.
492
+
493
+ - Files are split into segments of approximately 4.5MB
494
+ - Segments larger than 5MB are discarded
495
+ - Processing is done asynchronously
496
+ - Returns a job ID for tracking progress
497
+ """
498
+ try:
499
+ # Validate file type
500
+ if not file.filename.lower().endswith('.pdf'):
501
+ raise HTTPException(
502
+ status_code=status.HTTP_400_BAD_REQUEST,
503
+ detail="Only PDF files are accepted"
504
+ )
505
+
506
+ # Generate unique job ID
507
+ job_id = str(uuid.uuid4())
508
+ timestamp = datetime.utcnow()
509
+
510
+ # Save uploaded file
511
+ upload_path = Config.UPLOAD_DIR / f"{job_id}_{file.filename}"
512
+
513
+ # Stream file to disk to handle large files efficiently
514
+ async with aiofiles.open(upload_path, 'wb') as f:
515
+ chunk_size = 1024 * 1024 # 1MB chunks
516
+ while content := await file.read(chunk_size):
517
+ await f.write(content)
518
+
519
+ # Verify it's a valid PDF using pikepdf
520
+ try:
521
+ with pikepdf.open(upload_path) as pdf:
522
+ page_count = len(pdf.pages)
523
+ logger.info(f"Valid PDF uploaded: {file.filename}, {page_count} pages")
524
+ except Exception as e:
525
+ upload_path.unlink() # Delete invalid file
526
+ raise HTTPException(
527
+ status_code=status.HTTP_400_BAD_REQUEST,
528
+ detail=f"Invalid PDF file: {str(e)}"
529
+ )
530
+
531
+ # Create job
532
+ job = ProcessingJob(
533
+ job_id=job_id,
534
+ api_key=api_key,
535
+ filename=upload_path.name,
536
+ status=JobStatus.PENDING,
537
+ created_at=timestamp,
538
+ updated_at=timestamp
539
+ )
540
+
541
+ # Add to queue
542
+ job_queue_manager.add_job(job)
543
+
544
+ return JobResponse(
545
+ job_id=job_id,
546
+ status=JobStatus.PENDING,
547
+ message="PDF uploaded successfully and queued for processing",
548
+ created_at=timestamp.isoformat()
549
+ )
550
+
551
+ except HTTPException:
552
+ raise
553
+ except Exception as e:
554
+ logger.error(f"Upload error: {str(e)}")
555
+ raise HTTPException(
556
+ status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
557
+ detail=f"Upload failed: {str(e)}"
558
+ )
559
+
560
+ @app.get(
561
+ "/api/v1/job/{job_id}",
562
+ response_model=JobStatusResponse,
563
+ summary="Get job status",
564
+ description="Check the status of a PDF splitting job",
565
+ responses={
566
+ 200: {"description": "Job status retrieved successfully"},
567
+ 401: {"model": ErrorResponse, "description": "Missing or invalid API key"},
568
+ 404: {"model": ErrorResponse, "description": "Job not found"}
569
+ }
570
+ )
571
+ async def get_job_status(
572
+ job_id: str = Path(..., description="Job ID to check"),
573
+ api_key: str = Depends(verify_api_key)
574
+ ):
575
+ """
576
+ Get the current status of a PDF splitting job.
577
+ """
578
+ job = job_queue_manager.get_job_status(job_id)
579
+
580
+ if not job:
581
+ raise HTTPException(
582
+ status_code=status.HTTP_404_NOT_FOUND,
583
+ detail=f"Job {job_id} not found"
584
+ )
585
+
586
+ # Verify API key matches
587
+ if job.api_key != api_key:
588
+ raise HTTPException(
589
+ status_code=status.HTTP_403_FORBIDDEN,
590
+ detail="Access denied to this job"
591
+ )
592
+
593
+ return JobStatusResponse(
594
+ job_id=job.job_id,
595
+ status=job.status,
596
+ filename=job.filename,
597
+ created_at=job.created_at.isoformat(),
598
+ updated_at=job.updated_at.isoformat(),
599
+ total_pages=job.total_pages,
600
+ segments_created=job.segments_created,
601
+ segments_discarded=job.segments_discarded,
602
+ error_message=job.error_message,
603
+ output_files=job.output_files,
604
+ processing_time=job.processing_time
605
+ )
606
+
607
+ @app.get(
608
+ "/api/v1/job/{job_id}/download/{segment_name}",
609
+ response_class=FileResponse,
610
+ summary="Download segment",
611
+ description="Download a specific segment from a completed job",
612
+ responses={
613
+ 200: {"description": "Segment file", "content": {"application/pdf": {}}},
614
+ 401: {"model": ErrorResponse, "description": "Missing or invalid API key"},
615
+ 404: {"model": ErrorResponse, "description": "Job or segment not found"}
616
+ }
617
+ )
618
+ async def download_segment(
619
+ job_id: str = Path(..., description="Job ID"),
620
+ segment_name: str = Path(..., description="Segment filename"),
621
+ api_key: str = Depends(verify_api_key)
622
+ ):
623
+ """
624
+ Download a specific PDF segment from a completed job.
625
+ """
626
+ job = job_queue_manager.get_job_status(job_id)
627
+
628
+ if not job:
629
+ raise HTTPException(
630
+ status_code=status.HTTP_404_NOT_FOUND,
631
+ detail=f"Job {job_id} not found"
632
+ )
633
+
634
+ if job.api_key != api_key:
635
+ raise HTTPException(
636
+ status_code=status.HTTP_403_FORBIDDEN,
637
+ detail="Access denied"
638
+ )
639
+
640
+ if job.status != JobStatus.COMPLETED:
641
+ raise HTTPException(
642
+ status_code=status.HTTP_400_BAD_REQUEST,
643
+ detail=f"Job is {job.status}, not completed"
644
+ )
645
+
646
+ if segment_name not in job.output_files:
647
+ raise HTTPException(
648
+ status_code=status.HTTP_404_NOT_FOUND,
649
+ detail=f"Segment {segment_name} not found"
650
+ )
651
+
652
+ file_path = Config.OUTPUT_DIR / job_id / segment_name
653
+
654
+ if not file_path.exists():
655
+ raise HTTPException(
656
+ status_code=status.HTTP_404_NOT_FOUND,
657
+ detail=f"Segment file not found on disk"
658
+ )
659
+
660
+ return FileResponse(
661
+ path=file_path,
662
+ media_type="application/pdf",
663
+ filename=segment_name
664
+ )
665
+
666
+ @app.get(
667
+ "/api/v1/health",
668
+ summary="Health check",
669
+ description="Check API health and system status"
670
+ )
671
+ async def health_check():
672
+ """
673
+ Health check endpoint for monitoring.
674
+ """
675
+ try:
676
+ # Check Redis connection
677
+ redis_status = "healthy" if redis_client.ping() else "unhealthy"
678
+ except:
679
+ redis_status = "unhealthy"
680
+
681
+ return {
682
+ "status": "healthy",
683
+ "timestamp": datetime.utcnow().isoformat(),
684
+ "redis": redis_status,
685
+ "queue_size": job_queue_manager.queue.qsize(),
686
+ "active_jobs": len(job_queue_manager.jobs)
687
+ }
688
+
689
+ # Error handlers
690
+ @app.exception_handler(HTTPException)
691
+ async def http_exception_handler(request, exc: HTTPException):
692
+ return JSONResponse(
693
+ status_code=exc.status_code,
694
+ content={
695
+ "detail": exc.detail,
696
+ "timestamp": datetime.utcnow().isoformat()
697
+ }
698
+ )
699
+
700
+ @app.exception_handler(Exception)
701
+ async def general_exception_handler(request, exc: Exception):
702
+ logger.error(f"Unhandled exception: {str(exc)}")
703
+ return JSONResponse(
704
+ status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
705
+ content={
706
+ "detail": "Internal server error",
707
+ "timestamp": datetime.utcnow().isoformat()
708
+ }
709
+ )
710
+
711
+ if __name__ == "__main__":
712
+ import uvicorn
713
+ uvicorn.run(
714
+ "app:app",
715
+ host="0.0.0.0",
716
+ port=8000,
717
+ reload=True,
718
+ log_level="info",
719
+ access_log=True
720
+ )