File size: 4,342 Bytes
93d826e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
#!/usr/bin/env python3
from __future__ import annotations

import asyncio
import dataclasses
import enum
from abc import ABC, abstractmethod
from datetime import datetime
from functools import wraps
from typing import (
    Any, AsyncIterator, Callable, ClassVar, Dict, Generic, 
    List, Optional, Protocol, TypeVar, Union
)

# Type variable definitions
T = TypeVar('T')
K = TypeVar('K')
V = TypeVar('V')

# Protocol definition
class Processable(Protocol):
    def process(self) -> None: ...
    def validate(self) -> bool: ...

# Enum definition
class Status(enum.Enum):
    PENDING = "pending"
    ACTIVE = "active"
    COMPLETED = "completed"
    FAILED = "failed"

    def __str__(self) -> str:
        return self.value

# Dataclass with frozen and slots options
@dataclasses.dataclass(frozen=True, slots=True)
class UserCredentials:
    username: str
    email: str
    created_at: datetime = dataclasses.field(default_factory=datetime.now)

# Abstract base class
class BaseProcessor(ABC, Generic[T]):
    def __init__(self) -> None:
        self._items: List[T] = []
        self._processed_count: int = 0

    @abstractmethod
    async def process_item(self, item: T) -> None:
        pass

    @property
    def processed_count(self) -> int:
        return self._processed_count

# Decorator definition
def log_execution(func: Callable) -> Callable:
    @wraps(func)
    async def wrapper(*args: Any, **kwargs: Any) -> Any:
        print(f"Executing {func.__name__}")
        try:
            result = await func(*args, **kwargs)
            print(f"Completed {func.__name__}")
            return result
        except Exception as e:
            print(f"Error in {func.__name__}: {e}")
            raise
    return wrapper

# Class implementing abstract base class and protocol
class DataProcessor(BaseProcessor[UserCredentials], Processable):
    # Class variable
    DEFAULT_BATCH_SIZE: ClassVar[int] = 100

    def __init__(self, batch_size: Optional[int] = None) -> None:
        super().__init__()
        self.batch_size = batch_size or self.DEFAULT_BATCH_SIZE
        self._status = Status.PENDING

    # Property with getter and setter
    @property
    def status(self) -> Status:
        return self._status

    @status.setter
    def status(self, value: Status) -> None:
        if not isinstance(value, Status):
            raise ValueError("Status must be a Status enum value")
        self._status = value

    # Context manager methods
    async def __aenter__(self) -> DataProcessor:
        self.status = Status.ACTIVE
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
        self.status = Status.COMPLETED if exc_type is None else Status.FAILED

    # Generator method
    async def process_batch(self) -> AsyncIterator[List[UserCredentials]]:
        for i in range(0, len(self._items), self.batch_size):
            batch = self._items[i:i + self.batch_size]
            yield batch
            await asyncio.sleep(0.1)

    # Implementation of abstract method
    @log_execution
    async def process_item(self, item: UserCredentials) -> None:
        if not self.validate():
            raise ValueError("Processor is not in a valid state")
        self._items.append(item)
        self._processed_count += 1

    # Implementation of protocol method
    def process(self) -> None:
        if not self._items:
            raise ValueError("No items to process")
        self.status = Status.ACTIVE

    def validate(self) -> bool:
        return self.status != Status.FAILED

# Custom exception
class ProcessingError(Exception):
    def __init__(self, message: str, item: Any) -> None:
        self.item = item
        super().__init__(f"Error processing {item}: {message}")

# Async main function
async def main() -> None:
    async with DataProcessor(batch_size=10) as processor:
        # Create test data
        user = UserCredentials(
            username="test_user",
            email="test@example.com"
        )

        try:
            await processor.process_item(user)
            
            async for batch in processor.process_batch():
                print(f"Processing batch of {len(batch)} items")
                
        except ProcessingError as e:
            print(f"Processing failed: {e}")

if __name__ == "__main__":
    asyncio.run(main())