File size: 3,249 Bytes
595267b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
"""Batch processing for bulk operations."""

from __future__ import annotations

import uuid
from datetime import datetime
from typing import Dict, List, Any
from enum import Enum


class JobStatus(str, Enum):
    PENDING = "pending"
    PROCESSING = "processing"
    COMPLETED = "completed"
    FAILED = "failed"


class BatchJob:
    def __init__(self, job_type: str, items: List[Any], metadata: Dict = None):
        self.job_id = uuid.uuid4().hex
        self.job_type = job_type
        self.items = items
        self.metadata = metadata or {}
        self.status = JobStatus.PENDING
        self.results = []
        self.errors = []
        self.created_at = datetime.utcnow()
        self.completed_at = None
        self.progress = 0
        self.total = len(items)

    def update_progress(self, completed: int):
        self.progress = completed

    def complete(self, results: List[Any]):
        self.results = results
        self.status = JobStatus.COMPLETED
        self.completed_at = datetime.utcnow()

    def fail(self, error: str):
        self.status = JobStatus.FAILED
        self.errors.append(error)
        self.completed_at = datetime.utcnow()

    def to_dict(self) -> Dict:
        return {
            "job_id": self.job_id,
            "job_type": self.job_type,
            "status": self.status.value,
            "progress": self.progress,
            "total": self.total,
            "created_at": self.created_at.isoformat(),
            "completed_at": self.completed_at.isoformat() if self.completed_at else None,
            "results": self.results if self.status == JobStatus.COMPLETED else None,
            "errors": self.errors if self.errors else None
        }


class BatchProcessor:
    def __init__(self):
        self._jobs: Dict[str, BatchJob] = {}

    def create_job(self, job_type: str, items: List[Any], metadata: Dict = None) -> str:
        job = BatchJob(job_type, items, metadata)
        self._jobs[job.job_id] = job
        return job.job_id

    def get_job(self, job_id: str) -> BatchJob | None:
        return self._jobs.get(job_id)

    def process_job(self, job_id: str, processor_func):
        job = self.get_job(job_id)
        if not job:
            return

        job.status = JobStatus.PROCESSING
        results = []
        
        try:
            for idx, item in enumerate(job.items):
                result = processor_func(item)
                results.append(result)
                job.update_progress(idx + 1)
            
            job.complete(results)
        except Exception as e:
            job.fail(str(e))

    def cleanup_old_jobs(self, max_age_hours: int = 24):
        """Remove jobs older than max_age_hours."""
        current_time = datetime.utcnow()
        to_delete = []
        
        for job_id, job in self._jobs.items():
            if job.completed_at:
                age = (current_time - job.completed_at).total_seconds() / 3600
                if age > max_age_hours:
                    to_delete.append(job_id)
        
        for job_id in to_delete:
            del self._jobs[job_id]


batch_processor = BatchProcessor()