bluenevus commited on
Commit
c21b0a9
Β·
verified Β·
1 Parent(s): 6e431b7

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +271 -623
app.py CHANGED
@@ -1,183 +1,66 @@
1
- from fastapi import FastAPI, File, UploadFile, HTTPException, Depends, BackgroundTasks, Security, status
2
- from fastapi.security import APIKeyHeader
3
- from fastapi.responses import JSONResponse, FileResponse
4
- from fastapi.middleware.cors import CORSMiddleware
5
- from pydantic import BaseModel, Field, validator
6
- from typing import List, Optional, Dict, Any, Tuple
7
- import asyncio
8
- import aiofiles
9
- from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
10
- import hashlib
11
- import uuid
12
- from datetime import datetime, timedelta
13
  import os
 
 
14
  import shutil
15
  from pathlib import Path
16
- import pikepdf # Python wrapper for qpdf
17
- import io
18
- import logging
19
- from queue import Queue, PriorityQueue
20
  import threading
21
  import time
22
- from enum import Enum
23
- from contextlib import asynccontextmanager
24
- import tempfile
25
- import traceback
26
- import json
27
- from dataclasses import dataclass, asdict
28
- import redis
29
- from celery import Celery
30
 
31
  # Configure logging
32
- logging.basicConfig(
33
- level=logging.INFO,
34
- format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
35
- handlers=[
36
- logging.FileHandler('pdf_processor.log'),
37
- logging.StreamHandler()
38
- ]
39
- )
40
  logger = logging.getLogger(__name__)
41
 
42
  # Configuration
43
- class Config:
44
- MAX_FILE_SIZE_MB = 5
45
- CHUNK_SIZE_MB = 4.5
46
- MAX_FILE_SIZE_BYTES = MAX_FILE_SIZE_MB * 1024 * 1024
47
- CHUNK_SIZE_BYTES = int(CHUNK_SIZE_MB * 1024 * 1024)
48
- UPLOAD_DIR = Path("uploads")
49
- OUTPUT_DIR = Path("outputs")
50
- TEMP_DIR = Path("temp")
51
- MAX_WORKERS = min(32, (os.cpu_count() or 1) * 2)
52
- MAX_QUEUE_SIZE = 1000
53
- API_KEY_HEADER = "X-API-Key"
54
- REDIS_URL = os.getenv("REDIS_URL", "redis://localhost:6379")
55
- CELERY_BROKER_URL = os.getenv("CELERY_BROKER_URL", "redis://localhost:6379/0")
56
- CELERY_RESULT_BACKEND = os.getenv("CELERY_RESULT_BACKEND", "redis://localhost:6379/1")
57
-
58
- # qpdf specific settings
59
- QPDF_COMPRESSION_LEVEL = 9 # Maximum compression
60
- QPDF_STREAM_DATA = "compress" # Keep streams compressed
61
- QPDF_OBJECT_STREAMS = "generate" # Generate object streams for better compression
62
 
63
- # Create directories
64
- for dir_path in [Config.UPLOAD_DIR, Config.OUTPUT_DIR, Config.TEMP_DIR]:
65
- dir_path.mkdir(parents=True, exist_ok=True)
66
 
67
- # Initialize Redis
68
- redis_client = redis.from_url(Config.REDIS_URL, decode_responses=True)
69
 
70
- # Initialize Celery
71
- celery_app = Celery(
72
- 'pdf_processor',
73
- broker=Config.CELERY_BROKER_URL,
74
- backend=Config.CELERY_RESULT_BACKEND
75
- )
76
-
77
- # API Key Management
78
- class APIKeyManager:
79
- def __init__(self):
80
- self.valid_keys = self._load_api_keys()
81
-
82
- def _load_api_keys(self) -> Dict[str, Dict]:
83
- """Load API keys from Redis or environment"""
84
- keys = {}
85
- # Load from environment for demo
86
- demo_keys = os.getenv("API_KEYS", "demo-key-123,test-key-456").split(",")
87
- for key in demo_keys:
88
- keys[key] = {
89
- "created_at": datetime.utcnow().isoformat(),
90
- "rate_limit": 100,
91
- "active": True
92
- }
93
- return keys
94
 
95
- def validate_key(self, api_key: str) -> bool:
96
- """Validate API key and check rate limits"""
97
- if api_key not in self.valid_keys:
98
- return False
99
-
100
- key_info = self.valid_keys[api_key]
101
- if not key_info.get("active", False):
102
- return False
103
-
104
- # Check rate limit using Redis
105
- key_name = f"rate_limit:{api_key}"
106
- try:
107
- current_count = redis_client.incr(key_name)
108
- if current_count == 1:
109
- redis_client.expire(key_name, 3600) # 1 hour window
110
-
111
- if current_count > key_info.get("rate_limit", 100):
112
- return False
113
- except Exception as e:
114
- logger.error(f"Rate limit check failed: {e}")
115
-
116
- return True
117
-
118
- # Initialize API Key Manager
119
- api_key_manager = APIKeyManager()
120
-
121
- # Job Status Enum
122
- class JobStatus(str, Enum):
123
- PENDING = "pending"
124
- PROCESSING = "processing"
125
- COMPLETED = "completed"
126
- FAILED = "failed"
127
- CANCELLED = "cancelled"
128
-
129
- # Job Model
130
- @dataclass
131
- class ProcessingJob:
132
- job_id: str
133
- api_key: str
134
- filename: str
135
- status: JobStatus
136
- created_at: datetime
137
- updated_at: datetime
138
- total_pages: Optional[int] = None
139
- segments_created: Optional[int] = None
140
- segments_discarded: Optional[int] = None
141
- error_message: Optional[str] = None
142
- output_files: List[str] = None
143
- processing_time: Optional[float] = None
144
-
145
- def __post_init__(self):
146
- if self.output_files is None:
147
- self.output_files = []
148
-
149
- # PDF Processor with qpdf
150
- class QPDFProcessor:
151
- def __init__(self):
152
- self.executor = ThreadPoolExecutor(max_workers=Config.MAX_WORKERS)
153
-
154
- async def split_pdf_with_qpdf(self,
155
- input_path: Path,
156
- output_dir: Path,
157
- job_id: str) -> Tuple[List[Path], List[Path], Dict]:
158
  """
159
- Split PDF using qpdf for efficient handling of compressed PDFs
 
160
  """
 
 
 
 
 
 
 
 
 
161
  try:
162
- start_time = time.time()
163
- kept_files = []
164
- discarded_files = []
165
- stats = {
166
- "total_pages": 0,
167
- "segments_created": 0,
168
- "segments_discarded": 0,
169
- "compression_ratio": 0
170
- }
171
 
172
- # Open PDF with pikepdf (qpdf wrapper)
173
  with pikepdf.open(input_path) as pdf:
174
  total_pages = len(pdf.pages)
175
  stats["total_pages"] = total_pages
176
 
177
- # Calculate pages per segment based on file size
178
  file_size = input_path.stat().st_size
179
- avg_page_size = file_size / total_pages
180
- pages_per_segment = max(1, int(Config.CHUNK_SIZE_BYTES / avg_page_size))
181
 
182
  segment_num = 0
183
  page_start = 0
@@ -186,535 +69,300 @@ class QPDFProcessor:
186
  page_end = min(page_start + pages_per_segment, total_pages)
187
  segment_num += 1
188
 
189
- # Create output path for segment
190
- segment_filename = f"{job_id}_segment_{segment_num:04d}.pdf"
 
 
 
 
 
191
  segment_path = output_dir / segment_filename
192
 
193
  # Create new PDF with selected pages
194
  segment_pdf = pikepdf.new()
195
-
196
- # Copy pages efficiently without decompressing
197
  for page_num in range(page_start, page_end):
198
  segment_pdf.pages.append(pdf.pages[page_num])
199
 
200
- # Save with compression settings
201
  segment_pdf.save(
202
  segment_path,
203
  compress_streams=True,
204
- stream_decode_level=pikepdf.StreamDecodeLevel.none, # Don't decode streams
205
  object_stream_mode=pikepdf.ObjectStreamMode.generate,
206
- linearize=True, # Web optimization
207
- min_version=pdf.pdf_version
208
  )
209
 
210
  # Check segment size
211
  segment_size = segment_path.stat().st_size
212
 
213
- if segment_size <= Config.MAX_FILE_SIZE_BYTES:
214
- kept_files.append(segment_path)
215
  stats["segments_created"] += 1
 
216
  logger.info(f"Created segment {segment_num}: {segment_size / 1024 / 1024:.2f} MB")
217
  else:
218
- # Try to re-split if segment is too large
219
- if pages_per_segment > 1:
220
- # Recursively split this segment
221
- logger.warning(f"Segment {segment_num} too large ({segment_size / 1024 / 1024:.2f} MB), re-splitting...")
222
- segment_path.unlink() # Delete oversized segment
223
-
224
- # Adjust pages per segment
 
225
  pages_per_segment = max(1, pages_per_segment // 2)
226
  continue
227
- else:
228
- # Single page is too large, discard
229
- discarded_files.append(segment_path)
230
- stats["segments_discarded"] += 1
231
- logger.warning(f"Discarded segment {segment_num}: {segment_size / 1024 / 1024:.2f} MB")
232
 
233
  page_start = page_end
234
-
235
- # Calculate compression ratio
236
- original_size = input_path.stat().st_size
237
- total_output_size = sum(f.stat().st_size for f in kept_files)
238
- if original_size > 0:
239
- stats["compression_ratio"] = (1 - total_output_size / original_size) * 100
240
-
241
- stats["processing_time"] = time.time() - start_time
242
-
243
- return kept_files, discarded_files, stats
244
-
245
- except Exception as e:
246
- logger.error(f"Error splitting PDF with qpdf: {str(e)}")
247
- raise
248
-
249
- async def optimize_pdf(self, input_path: Path, output_path: Path) -> Path:
250
- """
251
- Optimize PDF using qpdf's advanced features
252
- """
253
- try:
254
- with pikepdf.open(input_path) as pdf:
255
- # Remove unnecessary elements
256
- pdf.remove_unreferenced_resources()
257
 
258
- # Save with maximum optimization
259
- pdf.save(
260
- output_path,
261
- compress_streams=True,
262
- object_stream_mode=pikepdf.ObjectStreamMode.generate,
263
- linearize=True,
264
- recompress_flate=True,
265
- deterministic_id=True
266
- )
267
-
268
- return output_path
269
-
270
  except Exception as e:
271
- logger.error(f"Error optimizing PDF: {str(e)}")
272
  raise
 
 
273
 
274
- # Job Queue Manager
275
- class JobQueueManager:
276
- def __init__(self):
277
- self.queue = PriorityQueue(maxsize=Config.MAX_QUEUE_SIZE)
278
- self.jobs: Dict[str, ProcessingJob] = {}
279
- self.lock = threading.Lock()
280
- self.processor = QPDFProcessor()
281
- self.processing_thread = threading.Thread(target=self._process_queue, daemon=True)
282
- self.processing_thread.start()
283
-
284
- def add_job(self, job: ProcessingJob) -> str:
285
- """Add job to queue"""
286
- with self.lock:
287
- if self.queue.full():
288
- raise HTTPException(
289
- status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
290
- detail="Queue is full. Please try again later."
291
- )
292
-
293
- priority = 1 if job.api_key in ["premium-key"] else 2
294
- self.queue.put((priority, job.created_at, job.job_id))
295
- self.jobs[job.job_id] = job
296
-
297
- # Store in Redis for persistence
298
- redis_client.setex(
299
- f"job:{job.job_id}",
300
- 86400, # 24 hours TTL
301
- json.dumps(asdict(job), default=str)
302
- )
303
-
304
- return job.job_id
305
 
306
- def get_job_status(self, job_id: str) -> Optional[ProcessingJob]:
307
- """Get job status"""
308
- with self.lock:
309
- if job_id in self.jobs:
310
- return self.jobs[job_id]
311
-
312
- # Try to get from Redis
313
- job_data = redis_client.get(f"job:{job_id}")
314
- if job_data:
315
- return ProcessingJob(**json.loads(job_data))
316
-
317
- return None
318
-
319
- def _process_queue(self):
320
- """Background thread to process queue"""
321
- while True:
322
- try:
323
- if not self.queue.empty():
324
- _, _, job_id = self.queue.get(timeout=1)
325
-
326
- with self.lock:
327
- job = self.jobs.get(job_id)
328
-
329
- if job and job.status == JobStatus.PENDING:
330
- asyncio.run(self._process_job(job))
331
-
332
- time.sleep(0.1)
333
-
334
- except Exception as e:
335
- logger.error(f"Queue processing error: {str(e)}")
336
 
337
- async def _process_job(self, job: ProcessingJob):
338
- """Process a single job"""
339
- try:
340
- start_time = time.time()
341
-
342
- # Update job status
343
- job.status = JobStatus.PROCESSING
344
- job.updated_at = datetime.utcnow()
345
- self._update_job(job)
346
-
347
- # Process PDF
348
- input_path = Config.UPLOAD_DIR / job.filename
349
- output_dir = Config.OUTPUT_DIR / job.job_id
350
- output_dir.mkdir(parents=True, exist_ok=True)
351
-
352
- # Split PDF using qpdf
353
- kept_files, discarded_files, stats = await self.processor.split_pdf_with_qpdf(
354
- input_path, output_dir, job.job_id
355
- )
356
-
357
- # Update job with results
358
- job.status = JobStatus.COMPLETED
359
- job.total_pages = stats["total_pages"]
360
- job.segments_created = stats["segments_created"]
361
- job.segments_discarded = stats["segments_discarded"]
362
- job.output_files = [str(f.name) for f in kept_files]
363
- job.processing_time = time.time() - start_time
364
- job.updated_at = datetime.utcnow()
365
-
366
- # Clean up discarded files
367
- for file in discarded_files:
368
  try:
369
- file.unlink()
 
 
370
  except Exception as e:
371
- logger.error(f"Error deleting discarded file: {e}")
372
-
373
- self._update_job(job)
374
- logger.info(f"Job {job.job_id} completed successfully")
375
-
376
- except Exception as e:
377
- logger.error(f"Job processing failed: {str(e)}")
378
- job.status = JobStatus.FAILED
379
- job.error_message = str(e)
380
- job.updated_at = datetime.utcnow()
381
- self._update_job(job)
382
-
383
- def _update_job(self, job: ProcessingJob):
384
- """Update job in memory and Redis"""
385
- with self.lock:
386
- self.jobs[job.job_id] = job
387
- redis_client.setex(
388
- f"job:{job.job_id}",
389
- 86400,
390
- json.dumps(asdict(job), default=str)
391
- )
392
-
393
- # Initialize Job Queue Manager
394
- job_queue_manager = JobQueueManager()
395
-
396
- # Lifespan context manager for startup/shutdown
397
- @asynccontextmanager
398
- async def lifespan(app: FastAPI):
399
- # Startup
400
- logger.info("Starting PDF Processor API")
401
- yield
402
- # Shutdown
403
- logger.info("Shutting down PDF Processor API")
404
-
405
- # Initialize FastAPI app with OpenAPI documentation
406
- app = FastAPI(
407
- title="PDF Splitter API",
408
- description="High-performance API for splitting large PDFs into segments using qpdf",
409
- version="1.0.0",
410
- docs_url="/docs",
411
- redoc_url="/redoc",
412
- openapi_url="/openapi.json",
413
- lifespan=lifespan
414
- )
415
-
416
- # Add CORS middleware
417
- app.add_middleware(
418
- CORSMiddleware,
419
- allow_origins=["*"],
420
- allow_credentials=True,
421
- allow_methods=["*"],
422
- allow_headers=["*"],
423
- )
424
-
425
- # Security dependency
426
- api_key_header = APIKeyHeader(name=Config.API_KEY_HEADER, auto_error=False)
427
-
428
- async def verify_api_key(api_key: str = Security(api_key_header)) -> str:
429
- """Verify API key"""
430
- if not api_key:
431
- raise HTTPException(
432
- status_code=status.HTTP_401_UNAUTHORIZED,
433
- detail="API key required"
434
- )
435
-
436
- if not api_key_manager.validate_key(api_key):
437
- raise HTTPException(
438
- status_code=status.HTTP_403_FORBIDDEN,
439
- detail="Invalid or rate-limited API key"
440
- )
441
-
442
- return api_key
443
-
444
- # Response Models
445
- class JobResponse(BaseModel):
446
- job_id: str = Field(..., description="Unique job identifier")
447
- status: JobStatus = Field(..., description="Current job status")
448
- message: str = Field(..., description="Status message")
449
- created_at: str = Field(..., description="Job creation timestamp")
450
 
451
- class JobStatusResponse(BaseModel):
452
- job_id: str
453
- status: JobStatus
454
- filename: str
455
- created_at: str
456
- updated_at: str
457
- total_pages: Optional[int] = None
458
- segments_created: Optional[int] = None
459
- segments_discarded: Optional[int] = None
460
- error_message: Optional[str] = None
461
- output_files: List[str] = []
462
- processing_time: Optional[float] = None
463
 
464
- class ErrorResponse(BaseModel):
465
- detail: str
466
- error_code: Optional[str] = None
467
- timestamp: str = Field(default_factory=lambda: datetime.utcnow().isoformat())
468
 
469
- # API Endpoints
470
- @app.post(
471
- "/api/v1/upload",
472
- response_model=JobResponse,
473
- status_code=status.HTTP_202_ACCEPTED,
474
- summary="Upload PDF for splitting",
475
- description="Upload a large PDF file to be split into 4.5MB segments",
476
- responses={
477
- 202: {"description": "Job accepted and queued for processing"},
478
- 400: {"model": ErrorResponse, "description": "Invalid file or request"},
479
- 401: {"model": ErrorResponse, "description": "Missing or invalid API key"},
480
- 403: {"model": ErrorResponse, "description": "Rate limit exceeded"},
481
- 413: {"model": ErrorResponse, "description": "File too large"},
482
- 503: {"model": ErrorResponse, "description": "Service unavailable"}
483
- }
484
- )
485
- async def upload_pdf(
486
- background_tasks: BackgroundTasks,
487
- file: UploadFile = File(..., description="PDF file to upload"),
488
- api_key: str = Depends(verify_api_key)
489
- ):
490
  """
491
- Upload a PDF file for splitting into segments.
492
-
493
- - Files are split into segments of approximately 4.5MB
494
- - Segments larger than 5MB are discarded
495
- - Processing is done asynchronously
496
- - Returns a job ID for tracking progress
497
  """
 
 
 
 
 
 
498
  try:
499
- # Validate file type
500
- if not file.filename.lower().endswith('.pdf'):
501
- raise HTTPException(
502
- status_code=status.HTTP_400_BAD_REQUEST,
503
- detail="Only PDF files are accepted"
504
- )
505
-
506
- # Generate unique job ID
507
- job_id = str(uuid.uuid4())
508
- timestamp = datetime.utcnow()
509
 
510
  # Save uploaded file
511
- upload_path = Config.UPLOAD_DIR / f"{job_id}_{file.filename}"
512
 
513
- # Stream file to disk to handle large files efficiently
514
- async with aiofiles.open(upload_path, 'wb') as f:
515
- chunk_size = 1024 * 1024 # 1MB chunks
516
- while content := await file.read(chunk_size):
517
- await f.write(content)
 
518
 
519
- # Verify it's a valid PDF using pikepdf
520
- try:
521
- with pikepdf.open(upload_path) as pdf:
522
- page_count = len(pdf.pages)
523
- logger.info(f"Valid PDF uploaded: {file.filename}, {page_count} pages")
524
- except Exception as e:
525
- upload_path.unlink() # Delete invalid file
526
- raise HTTPException(
527
- status_code=status.HTTP_400_BAD_REQUEST,
528
- detail=f"Invalid PDF file: {str(e)}"
529
- )
530
 
531
- # Create job
532
- job = ProcessingJob(
533
- job_id=job_id,
534
- api_key=api_key,
535
- filename=upload_path.name,
536
- status=JobStatus.PENDING,
537
- created_at=timestamp,
538
- updated_at=timestamp
539
- )
540
 
541
- # Add to queue
542
- job_queue_manager.add_job(job)
543
 
544
- return JobResponse(
545
- job_id=job_id,
546
- status=JobStatus.PENDING,
547
- message="PDF uploaded successfully and queued for processing",
548
- created_at=timestamp.isoformat()
 
 
 
 
549
  )
550
 
551
- except HTTPException:
552
- raise
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
553
  except Exception as e:
554
- logger.error(f"Upload error: {str(e)}")
555
- raise HTTPException(
556
- status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
557
- detail=f"Upload failed: {str(e)}"
558
- )
 
 
559
 
560
- @app.get(
561
- "/api/v1/job/{job_id}",
562
- response_model=JobStatusResponse,
563
- summary="Get job status",
564
- description="Check the status of a PDF splitting job",
565
- responses={
566
- 200: {"description": "Job status retrieved successfully"},
567
- 401: {"model": ErrorResponse, "description": "Missing or invalid API key"},
568
- 404: {"model": ErrorResponse, "description": "Job not found"}
 
 
569
  }
570
- )
571
- async def get_job_status(
572
- job_id: str = Path(..., description="Job ID to check"),
573
- api_key: str = Depends(verify_api_key)
574
- ):
575
- """
576
- Get the current status of a PDF splitting job.
577
  """
578
- job = job_queue_manager.get_job_status(job_id)
579
 
580
- if not job:
581
- raise HTTPException(
582
- status_code=status.HTTP_404_NOT_FOUND,
583
- detail=f"Job {job_id} not found"
584
- )
585
 
586
- # Verify API key matches
587
- if job.api_key != api_key:
588
- raise HTTPException(
589
- status_code=status.HTTP_403_FORBIDDEN,
590
- detail="Access denied to this job"
591
- )
592
 
593
- return JobStatusResponse(
594
- job_id=job.job_id,
595
- status=job.status,
596
- filename=job.filename,
597
- created_at=job.created_at.isoformat(),
598
- updated_at=job.updated_at.isoformat(),
599
- total_pages=job.total_pages,
600
- segments_created=job.segments_created,
601
- segments_discarded=job.segments_discarded,
602
- error_message=job.error_message,
603
- output_files=job.output_files,
604
- processing_time=job.processing_time
605
- )
606
-
607
- @app.get(
608
- "/api/v1/job/{job_id}/download/{segment_name}",
609
- response_class=FileResponse,
610
- summary="Download segment",
611
- description="Download a specific segment from a completed job",
612
- responses={
613
- 200: {"description": "Segment file", "content": {"application/pdf": {}}},
614
- 401: {"model": ErrorResponse, "description": "Missing or invalid API key"},
615
- 404: {"model": ErrorResponse, "description": "Job or segment not found"}
616
- }
617
- )
618
- async def download_segment(
619
- job_id: str = Path(..., description="Job ID"),
620
- segment_name: str = Path(..., description="Segment filename"),
621
- api_key: str = Depends(verify_api_key)
622
- ):
623
- """
624
- Download a specific PDF segment from a completed job.
625
- """
626
- job = job_queue_manager.get_job_status(job_id)
627
 
628
- if not job:
629
- raise HTTPException(
630
- status_code=status.HTTP_404_NOT_FOUND,
631
- detail=f"Job {job_id} not found"
632
- )
633
 
634
- if job.api_key != api_key:
635
- raise HTTPException(
636
- status_code=status.HTTP_403_FORBIDDEN,
637
- detail="Access denied"
638
- )
639
 
640
- if job.status != JobStatus.COMPLETED:
641
- raise HTTPException(
642
- status_code=status.HTTP_400_BAD_REQUEST,
643
- detail=f"Job is {job.status}, not completed"
644
- )
 
 
 
 
 
 
 
 
645
 
646
- if segment_name not in job.output_files:
647
- raise HTTPException(
648
- status_code=status.HTTP_404_NOT_FOUND,
649
- detail=f"Segment {segment_name} not found"
650
- )
651
 
652
- file_path = Config.OUTPUT_DIR / job_id / segment_name
 
653
 
654
- if not file_path.exists():
655
- raise HTTPException(
656
- status_code=status.HTTP_404_NOT_FOUND,
657
- detail=f"Segment file not found on disk"
658
  )
659
 
660
- return FileResponse(
661
- path=file_path,
662
- media_type="application/pdf",
663
- filename=segment_name
 
664
  )
665
-
666
- @app.get(
667
- "/api/v1/health",
668
- summary="Health check",
669
- description="Check API health and system status"
670
- )
671
- async def health_check():
672
- """
673
- Health check endpoint for monitoring.
674
- """
675
- try:
676
- # Check Redis connection
677
- redis_status = "healthy" if redis_client.ping() else "unhealthy"
678
- except:
679
- redis_status = "unhealthy"
680
 
681
- return {
682
- "status": "healthy",
683
- "timestamp": datetime.utcnow().isoformat(),
684
- "redis": redis_status,
685
- "queue_size": job_queue_manager.queue.qsize(),
686
- "active_jobs": len(job_queue_manager.jobs)
687
- }
688
-
689
- # Error handlers
690
- @app.exception_handler(HTTPException)
691
- async def http_exception_handler(request, exc: HTTPException):
692
- return JSONResponse(
693
- status_code=exc.status_code,
694
- content={
695
- "detail": exc.detail,
696
- "timestamp": datetime.utcnow().isoformat()
697
- }
698
- )
699
-
700
- @app.exception_handler(Exception)
701
- async def general_exception_handler(request, exc: Exception):
702
- logger.error(f"Unhandled exception: {str(exc)}")
703
- return JSONResponse(
704
- status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
705
- content={
706
- "detail": "Internal server error",
707
- "timestamp": datetime.utcnow().isoformat()
708
- }
709
- )
710
 
 
711
  if __name__ == "__main__":
712
- import uvicorn
713
- uvicorn.run(
714
- "app:app",
715
- host="0.0.0.0",
716
- port=8000,
717
- reload=True,
718
- log_level="info",
719
- access_log=True
720
  )
 
1
+ import gradio as gr
2
+ import pikepdf
 
 
 
 
 
 
 
 
 
 
3
  import os
4
+ import zipfile
5
+ import tempfile
6
  import shutil
7
  from pathlib import Path
8
+ import uuid
9
+ from datetime import datetime, timedelta
 
 
10
  import threading
11
  import time
12
+ from typing import Tuple, Optional
13
+ import logging
 
 
 
 
 
 
14
 
15
  # Configure logging
16
+ logging.basicConfig(level=logging.INFO)
 
 
 
 
 
 
 
17
  logger = logging.getLogger(__name__)
18
 
19
  # Configuration
20
+ MAX_FILE_SIZE_MB = 5
21
+ CHUNK_SIZE_MB = 4.5
22
+ MAX_FILE_SIZE_BYTES = MAX_FILE_SIZE_MB * 1024 * 1024
23
+ CHUNK_SIZE_BYTES = int(CHUNK_SIZE_MB * 1024 * 1024)
24
+ TEMP_DIR = Path("temp_files")
25
+ CLEANUP_AFTER_MINUTES = 10
 
 
 
 
 
 
 
 
 
 
 
 
 
26
 
27
+ # Create temp directory
28
+ TEMP_DIR.mkdir(exist_ok=True)
 
29
 
30
+ # Store user sessions
31
+ user_sessions = {}
32
 
33
+ class PDFProcessor:
34
+ """Handle PDF splitting with qpdf/pikepdf"""
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
35
 
36
+ @staticmethod
37
+ def split_pdf(input_path: Path, output_dir: Path, progress_callback=None) -> Tuple[list, dict]:
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
38
  """
39
+ Split PDF into chunks using pikepdf (qpdf wrapper)
40
+ Returns: (list of output files, statistics dict)
41
  """
42
+ output_files = []
43
+ stats = {
44
+ "total_pages": 0,
45
+ "segments_created": 0,
46
+ "segments_discarded": 0,
47
+ "original_size_mb": 0,
48
+ "total_output_size_mb": 0
49
+ }
50
+
51
  try:
52
+ # Get original file size
53
+ stats["original_size_mb"] = input_path.stat().st_size / 1024 / 1024
 
 
 
 
 
 
 
54
 
55
+ # Open PDF with pikepdf
56
  with pikepdf.open(input_path) as pdf:
57
  total_pages = len(pdf.pages)
58
  stats["total_pages"] = total_pages
59
 
60
+ # Calculate pages per segment
61
  file_size = input_path.stat().st_size
62
+ avg_page_size = file_size / total_pages if total_pages > 0 else file_size
63
+ pages_per_segment = max(1, int(CHUNK_SIZE_BYTES / avg_page_size))
64
 
65
  segment_num = 0
66
  page_start = 0
 
69
  page_end = min(page_start + pages_per_segment, total_pages)
70
  segment_num += 1
71
 
72
+ # Update progress
73
+ if progress_callback:
74
+ progress = (page_start / total_pages)
75
+ progress_callback(progress, f"Processing segment {segment_num}...")
76
+
77
+ # Create segment filename
78
+ segment_filename = f"segment_{segment_num:04d}_pages_{page_start+1}-{page_end}.pdf"
79
  segment_path = output_dir / segment_filename
80
 
81
  # Create new PDF with selected pages
82
  segment_pdf = pikepdf.new()
 
 
83
  for page_num in range(page_start, page_end):
84
  segment_pdf.pages.append(pdf.pages[page_num])
85
 
86
+ # Save with compression
87
  segment_pdf.save(
88
  segment_path,
89
  compress_streams=True,
 
90
  object_stream_mode=pikepdf.ObjectStreamMode.generate,
91
+ linearize=True
 
92
  )
93
 
94
  # Check segment size
95
  segment_size = segment_path.stat().st_size
96
 
97
+ if segment_size <= MAX_FILE_SIZE_BYTES:
98
+ output_files.append(segment_path)
99
  stats["segments_created"] += 1
100
+ stats["total_output_size_mb"] += segment_size / 1024 / 1024
101
  logger.info(f"Created segment {segment_num}: {segment_size / 1024 / 1024:.2f} MB")
102
  else:
103
+ # If single page is too large, still keep it but mark as oversized
104
+ if page_end - page_start == 1:
105
+ output_files.append(segment_path)
106
+ stats["segments_discarded"] += 1
107
+ logger.warning(f"Segment {segment_num} exceeds size limit but kept (single page)")
108
+ else:
109
+ # Try with fewer pages
110
+ segment_path.unlink()
111
  pages_per_segment = max(1, pages_per_segment // 2)
112
  continue
 
 
 
 
 
113
 
114
  page_start = page_end
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
115
 
116
+ if progress_callback:
117
+ progress_callback(1.0, "Splitting complete!")
118
+
 
 
 
 
 
 
 
 
 
119
  except Exception as e:
120
+ logger.error(f"Error splitting PDF: {str(e)}")
121
  raise
122
+
123
+ return output_files, stats
124
 
125
+ class SessionManager:
126
+ """Manage user sessions and cleanup"""
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
127
 
128
+ @staticmethod
129
+ def create_session(session_id: str) -> Path:
130
+ """Create a new user session directory"""
131
+ session_dir = TEMP_DIR / session_id
132
+ session_dir.mkdir(exist_ok=True)
133
+ user_sessions[session_id] = {
134
+ "created": datetime.now(),
135
+ "dir": session_dir
136
+ }
137
+ return session_dir
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
138
 
139
+ @staticmethod
140
+ def cleanup_old_sessions():
141
+ """Remove old session directories"""
142
+ current_time = datetime.now()
143
+ sessions_to_remove = []
144
+
145
+ for session_id, session_info in user_sessions.items():
146
+ if current_time - session_info["created"] > timedelta(minutes=CLEANUP_AFTER_MINUTES):
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
147
  try:
148
+ shutil.rmtree(session_info["dir"], ignore_errors=True)
149
+ sessions_to_remove.append(session_id)
150
+ logger.info(f"Cleaned up session: {session_id}")
151
  except Exception as e:
152
+ logger.error(f"Error cleaning session {session_id}: {e}")
153
+
154
+ for session_id in sessions_to_remove:
155
+ del user_sessions[session_id]
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
156
 
157
+ # Start cleanup thread
158
+ def cleanup_worker():
159
+ """Background thread for cleaning old files"""
160
+ while True:
161
+ try:
162
+ SessionManager.cleanup_old_sessions()
163
+ time.sleep(60) # Check every minute
164
+ except Exception as e:
165
+ logger.error(f"Cleanup error: {e}")
 
 
 
166
 
167
+ cleanup_thread = threading.Thread(target=cleanup_worker, daemon=True)
168
+ cleanup_thread.start()
 
 
169
 
170
+ def process_pdf(file_obj, progress=gr.Progress()) -> Tuple[Optional[str], str, str]:
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
171
  """
172
+ Main processing function for Gradio interface
173
+ Returns: (zip_file_path, statistics_html, status_message)
 
 
 
 
174
  """
175
+ if file_obj is None:
176
+ return None, "", "⚠️ Please upload a PDF file"
177
+
178
+ session_id = str(uuid.uuid4())
179
+ session_dir = SessionManager.create_session(session_id)
180
+
181
  try:
182
+ # Update progress
183
+ progress(0.1, "Initializing...")
 
 
 
 
 
 
 
 
184
 
185
  # Save uploaded file
186
+ input_path = session_dir / "input.pdf"
187
 
188
+ # Handle both file path string and file object
189
+ if isinstance(file_obj, str):
190
+ shutil.copy(file_obj, input_path)
191
+ else:
192
+ with open(input_path, 'wb') as f:
193
+ f.write(file_obj.read() if hasattr(file_obj, 'read') else file_obj)
194
 
195
+ # Verify it's a valid PDF
196
+ progress(0.2, "Verifying PDF...")
197
+ with pikepdf.open(input_path) as pdf:
198
+ page_count = len(pdf.pages)
199
+ logger.info(f"Valid PDF with {page_count} pages")
 
 
 
 
 
 
200
 
201
+ # Create output directory
202
+ output_dir = session_dir / "output"
203
+ output_dir.mkdir(exist_ok=True)
 
 
 
 
 
 
204
 
205
+ # Split PDF with progress updates
206
+ progress(0.3, "Splitting PDF into segments...")
207
 
208
+ def update_progress(value, message):
209
+ # Scale progress from 0.3 to 0.8 for splitting phase
210
+ scaled_progress = 0.3 + (value * 0.5)
211
+ progress(scaled_progress, message)
212
+
213
+ output_files, stats = PDFProcessor.split_pdf(
214
+ input_path,
215
+ output_dir,
216
+ progress_callback=update_progress
217
  )
218
 
219
+ if not output_files:
220
+ return None, "", "❌ No valid segments created"
221
+
222
+ # Create ZIP file
223
+ progress(0.9, "Creating ZIP archive...")
224
+ zip_path = session_dir / f"pdf_segments_{session_id[:8]}.zip"
225
+
226
+ with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
227
+ for file_path in output_files:
228
+ zipf.write(file_path, file_path.name)
229
+
230
+ # Generate statistics HTML
231
+ stats_html = f"""
232
+ <div style="padding: 20px; background: #f0f9ff; border-radius: 10px; margin: 10px 0;">
233
+ <h3 style="color: #0369a1; margin-top: 0;">πŸ“Š Processing Results</h3>
234
+ <table style="width: 100%; border-collapse: collapse;">
235
+ <tr style="border-bottom: 1px solid #e0e0e0;">
236
+ <td style="padding: 8px; font-weight: bold;">πŸ“„ Total Pages:</td>
237
+ <td style="padding: 8px; text-align: right;">{stats['total_pages']}</td>
238
+ </tr>
239
+ <tr style="border-bottom: 1px solid #e0e0e0;">
240
+ <td style="padding: 8px; font-weight: bold;">βœ… Segments Created:</td>
241
+ <td style="padding: 8px; text-align: right;">{stats['segments_created']}</td>
242
+ </tr>
243
+ <tr style="border-bottom: 1px solid #e0e0e0;">
244
+ <td style="padding: 8px; font-weight: bold;">πŸ“¦ Original Size:</td>
245
+ <td style="padding: 8px; text-align: right;">{stats['original_size_mb']:.2f} MB</td>
246
+ </tr>
247
+ <tr style="border-bottom: 1px solid #e0e0e0;">
248
+ <td style="padding: 8px; font-weight: bold;">πŸ“ Total Output Size:</td>
249
+ <td style="padding: 8px; text-align: right;">{stats['total_output_size_mb']:.2f} MB</td>
250
+ </tr>
251
+ <tr>
252
+ <td style="padding: 8px; font-weight: bold;">πŸ’Ύ Compression Ratio:</td>
253
+ <td style="padding: 8px; text-align: right;">
254
+ {((1 - stats['total_output_size_mb'] / stats['original_size_mb']) * 100):.1f}%
255
+ </td>
256
+ </tr>
257
+ </table>
258
+ <p style="margin-top: 15px; color: #059669; font-weight: bold;">
259
+ ✨ Your file has been split successfully!
260
+ </p>
261
+ <p style="margin-top: 10px; color: #6b7280; font-size: 0.9em;">
262
+ ⏱️ Files will be automatically deleted after {CLEANUP_AFTER_MINUTES} minutes
263
+ </p>
264
+ </div>
265
+ """
266
+
267
+ progress(1.0, "Complete! πŸŽ‰")
268
+
269
+ # Clean up input file to save space
270
+ input_path.unlink()
271
+
272
+ return str(zip_path), stats_html, "βœ… Processing complete! Download your ZIP file below."
273
+
274
  except Exception as e:
275
+ logger.error(f"Processing error: {str(e)}")
276
+ # Cleanup on error
277
+ try:
278
+ shutil.rmtree(session_dir, ignore_errors=True)
279
+ except:
280
+ pass
281
+ return None, "", f"❌ Error: {str(e)}"
282
 
283
+ # Create Gradio interface
284
+ with gr.Blocks(
285
+ title="PDF Splitter - Fast & Simple",
286
+ theme=gr.themes.Soft(),
287
+ css="""
288
+ .gradio-container {
289
+ max-width: 800px;
290
+ margin: auto;
291
+ }
292
+ footer {
293
+ display: none !important;
294
  }
 
 
 
 
 
 
 
295
  """
296
+ ) as app:
297
 
298
+ gr.Markdown("""
299
+ # πŸ“„ PDF Splitter Tool
 
 
 
300
 
301
+ **Split large PDFs into smaller segments quickly and efficiently!**
 
 
 
 
 
302
 
303
+ This tool uses advanced compression to split your PDF into segments of approximately **4.5 MB** each.
304
+ Files are processed using qpdf for optimal performance without decompressing the PDF.
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
305
 
306
+ ### How to use:
307
+ 1. Upload your PDF file
308
+ 2. Click "Split PDF"
309
+ 3. Download the ZIP file containing all segments
 
310
 
311
+ *Note: Files are automatically deleted after 10 minutes for your privacy.*
312
+ """)
 
 
 
313
 
314
+ with gr.Row():
315
+ with gr.Column():
316
+ file_input = gr.File(
317
+ label="Upload PDF",
318
+ file_types=[".pdf"],
319
+ type="filepath"
320
+ )
321
+
322
+ split_btn = gr.Button(
323
+ "πŸš€ Split PDF",
324
+ variant="primary",
325
+ size="lg"
326
+ )
327
 
328
+ with gr.Row():
329
+ status_text = gr.Markdown("Ready to process your PDF...")
 
 
 
330
 
331
+ with gr.Row():
332
+ stats_output = gr.HTML()
333
 
334
+ with gr.Row():
335
+ download_file = gr.File(
336
+ label="πŸ“¦ Download ZIP",
337
+ visible=True
338
  )
339
 
340
+ # Handle processing
341
+ split_btn.click(
342
+ fn=process_pdf,
343
+ inputs=[file_input],
344
+ outputs=[download_file, stats_output, status_text]
345
  )
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
346
 
347
+ # Add examples
348
+ gr.Markdown("""
349
+ ---
350
+ ### πŸ’‘ Features:
351
+ - βœ… Handles compressed PDFs efficiently using qpdf
352
+ - βœ… Automatic file cleanup for privacy
353
+ - βœ… Progress tracking during processing
354
+ - βœ… Creates ZIP archive for easy download
355
+ - βœ… Optimized for Hugging Face Spaces
356
+
357
+ ### πŸ”’ Privacy:
358
+ All uploaded files are automatically deleted after processing and download.
359
+ No files are stored permanently on the server.
360
+ """)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
361
 
362
+ # Launch the app
363
  if __name__ == "__main__":
364
+ app.launch(
365
+ server_name="0.0.0.0",
366
+ server_port=7860,
367
+ share=False
 
 
 
 
368
  )