Spaces:
Paused
Paused
File size: 4,342 Bytes
93d826e | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 | #!/usr/bin/env python3
from __future__ import annotations
import asyncio
import dataclasses
import enum
from abc import ABC, abstractmethod
from datetime import datetime
from functools import wraps
from typing import (
Any, AsyncIterator, Callable, ClassVar, Dict, Generic,
List, Optional, Protocol, TypeVar, Union
)
# Type variable definitions
T = TypeVar('T')
K = TypeVar('K')
V = TypeVar('V')
# Protocol definition
class Processable(Protocol):
def process(self) -> None: ...
def validate(self) -> bool: ...
# Enum definition
class Status(enum.Enum):
PENDING = "pending"
ACTIVE = "active"
COMPLETED = "completed"
FAILED = "failed"
def __str__(self) -> str:
return self.value
# Dataclass with frozen and slots options
@dataclasses.dataclass(frozen=True, slots=True)
class UserCredentials:
username: str
email: str
created_at: datetime = dataclasses.field(default_factory=datetime.now)
# Abstract base class
class BaseProcessor(ABC, Generic[T]):
def __init__(self) -> None:
self._items: List[T] = []
self._processed_count: int = 0
@abstractmethod
async def process_item(self, item: T) -> None:
pass
@property
def processed_count(self) -> int:
return self._processed_count
# Decorator definition
def log_execution(func: Callable) -> Callable:
@wraps(func)
async def wrapper(*args: Any, **kwargs: Any) -> Any:
print(f"Executing {func.__name__}")
try:
result = await func(*args, **kwargs)
print(f"Completed {func.__name__}")
return result
except Exception as e:
print(f"Error in {func.__name__}: {e}")
raise
return wrapper
# Class implementing abstract base class and protocol
class DataProcessor(BaseProcessor[UserCredentials], Processable):
# Class variable
DEFAULT_BATCH_SIZE: ClassVar[int] = 100
def __init__(self, batch_size: Optional[int] = None) -> None:
super().__init__()
self.batch_size = batch_size or self.DEFAULT_BATCH_SIZE
self._status = Status.PENDING
# Property with getter and setter
@property
def status(self) -> Status:
return self._status
@status.setter
def status(self, value: Status) -> None:
if not isinstance(value, Status):
raise ValueError("Status must be a Status enum value")
self._status = value
# Context manager methods
async def __aenter__(self) -> DataProcessor:
self.status = Status.ACTIVE
return self
async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
self.status = Status.COMPLETED if exc_type is None else Status.FAILED
# Generator method
async def process_batch(self) -> AsyncIterator[List[UserCredentials]]:
for i in range(0, len(self._items), self.batch_size):
batch = self._items[i:i + self.batch_size]
yield batch
await asyncio.sleep(0.1)
# Implementation of abstract method
@log_execution
async def process_item(self, item: UserCredentials) -> None:
if not self.validate():
raise ValueError("Processor is not in a valid state")
self._items.append(item)
self._processed_count += 1
# Implementation of protocol method
def process(self) -> None:
if not self._items:
raise ValueError("No items to process")
self.status = Status.ACTIVE
def validate(self) -> bool:
return self.status != Status.FAILED
# Custom exception
class ProcessingError(Exception):
def __init__(self, message: str, item: Any) -> None:
self.item = item
super().__init__(f"Error processing {item}: {message}")
# Async main function
async def main() -> None:
async with DataProcessor(batch_size=10) as processor:
# Create test data
user = UserCredentials(
username="test_user",
email="test@example.com"
)
try:
await processor.process_item(user)
async for batch in processor.process_batch():
print(f"Processing batch of {len(batch)} items")
except ProcessingError as e:
print(f"Processing failed: {e}")
if __name__ == "__main__":
asyncio.run(main())
|