Spaces:
Paused
Paused
| #!/usr/bin/env python3 | |
| from __future__ import annotations | |
| import asyncio | |
| import dataclasses | |
| import enum | |
| from abc import ABC, abstractmethod | |
| from datetime import datetime | |
| from functools import wraps | |
| from typing import ( | |
| Any, AsyncIterator, Callable, ClassVar, Dict, Generic, | |
| List, Optional, Protocol, TypeVar, Union | |
| ) | |
| # Type variable definitions | |
| T = TypeVar('T') | |
| K = TypeVar('K') | |
| V = TypeVar('V') | |
| # Protocol definition | |
| class Processable(Protocol): | |
| def process(self) -> None: ... | |
| def validate(self) -> bool: ... | |
| # Enum definition | |
| class Status(enum.Enum): | |
| PENDING = "pending" | |
| ACTIVE = "active" | |
| COMPLETED = "completed" | |
| FAILED = "failed" | |
| def __str__(self) -> str: | |
| return self.value | |
| # Dataclass with frozen and slots options | |
| class UserCredentials: | |
| username: str | |
| email: str | |
| created_at: datetime = dataclasses.field(default_factory=datetime.now) | |
| # Abstract base class | |
| class BaseProcessor(ABC, Generic[T]): | |
| def __init__(self) -> None: | |
| self._items: List[T] = [] | |
| self._processed_count: int = 0 | |
| async def process_item(self, item: T) -> None: | |
| pass | |
| def processed_count(self) -> int: | |
| return self._processed_count | |
| # Decorator definition | |
| def log_execution(func: Callable) -> Callable: | |
| async def wrapper(*args: Any, **kwargs: Any) -> Any: | |
| print(f"Executing {func.__name__}") | |
| try: | |
| result = await func(*args, **kwargs) | |
| print(f"Completed {func.__name__}") | |
| return result | |
| except Exception as e: | |
| print(f"Error in {func.__name__}: {e}") | |
| raise | |
| return wrapper | |
| # Class implementing abstract base class and protocol | |
| class DataProcessor(BaseProcessor[UserCredentials], Processable): | |
| # Class variable | |
| DEFAULT_BATCH_SIZE: ClassVar[int] = 100 | |
| def __init__(self, batch_size: Optional[int] = None) -> None: | |
| super().__init__() | |
| self.batch_size = batch_size or self.DEFAULT_BATCH_SIZE | |
| self._status = Status.PENDING | |
| # Property with getter and setter | |
| def status(self) -> Status: | |
| return self._status | |
| def status(self, value: Status) -> None: | |
| if not isinstance(value, Status): | |
| raise ValueError("Status must be a Status enum value") | |
| self._status = value | |
| # Context manager methods | |
| async def __aenter__(self) -> DataProcessor: | |
| self.status = Status.ACTIVE | |
| return self | |
| async def __aexit__(self, exc_type, exc_val, exc_tb) -> None: | |
| self.status = Status.COMPLETED if exc_type is None else Status.FAILED | |
| # Generator method | |
| async def process_batch(self) -> AsyncIterator[List[UserCredentials]]: | |
| for i in range(0, len(self._items), self.batch_size): | |
| batch = self._items[i:i + self.batch_size] | |
| yield batch | |
| await asyncio.sleep(0.1) | |
| # Implementation of abstract method | |
| async def process_item(self, item: UserCredentials) -> None: | |
| if not self.validate(): | |
| raise ValueError("Processor is not in a valid state") | |
| self._items.append(item) | |
| self._processed_count += 1 | |
| # Implementation of protocol method | |
| def process(self) -> None: | |
| if not self._items: | |
| raise ValueError("No items to process") | |
| self.status = Status.ACTIVE | |
| def validate(self) -> bool: | |
| return self.status != Status.FAILED | |
| # Custom exception | |
| class ProcessingError(Exception): | |
| def __init__(self, message: str, item: Any) -> None: | |
| self.item = item | |
| super().__init__(f"Error processing {item}: {message}") | |
| # Async main function | |
| async def main() -> None: | |
| async with DataProcessor(batch_size=10) as processor: | |
| # Create test data | |
| user = UserCredentials( | |
| username="test_user", | |
| email="test@example.com" | |
| ) | |
| try: | |
| await processor.process_item(user) | |
| async for batch in processor.process_batch(): | |
| print(f"Processing batch of {len(batch)} items") | |
| except ProcessingError as e: | |
| print(f"Processing failed: {e}") | |
| if __name__ == "__main__": | |
| asyncio.run(main()) | |