Spaces:
Sleeping
Sleeping
| import logging | |
| import traceback | |
| from typing import Callable, List, Dict, Any, Optional, Union | |
| import inspect | |
| from aworld.output.artifact import Artifact | |
| class WorkspaceObserver: | |
| """Base class for workspace observers""" | |
| async def on_create(self, workspace_id: str, artifact: Artifact) -> None: | |
| pass | |
| async def on_update(self, workspace_id: str, artifact: Artifact) -> None: | |
| pass | |
| async def on_delete(self, workspace_id: str, artifact: Artifact) -> None: | |
| pass | |
| class Handler: | |
| """Handler wrapper to support both functions and class methods""" | |
| def __init__(self, func: Callable, instance=None, workspace_id: Optional[str] = None, filters: Optional[Dict[str, Any]] = None): | |
| self.func = func | |
| self.instance = instance # Class instance if method | |
| self.workspace_id = workspace_id # Specific workspace to monitor | |
| self.filters = filters or {} # Additional filters (e.g., artifact_type) | |
| async def __call__(self, artifact: Artifact, **kwargs) -> Any: | |
| """Call the handler with appropriate arguments""" | |
| # Check if this handler should process the artifact | |
| if self.workspace_id and kwargs.get('workspace_id') != self.workspace_id: | |
| return None | |
| # Check additional filters | |
| for key, value in self.filters.items(): | |
| if key == 'artifact_type': | |
| if artifact.artifact_type != value and artifact.artifact_type.value != value: | |
| return None | |
| elif key in artifact.metadata and artifact.metadata[key] != value: | |
| return None | |
| # Get function signature to determine what arguments it expects | |
| sig = inspect.signature(self.func) | |
| param_count = len(sig.parameters) | |
| # Call based on whether it's a method or function, and parameter count | |
| if self.instance: | |
| if param_count == 0: # Just self | |
| return await self.func() if inspect.iscoroutinefunction(self.func) else self.func() | |
| elif param_count == 1: # Self + artifact | |
| return await self.func(artifact) if inspect.iscoroutinefunction(self.func) else self.func(artifact) | |
| else: # Self + artifact + kwargs | |
| return await self.func(artifact, **kwargs) if inspect.iscoroutinefunction(self.func) else self.func(artifact, **kwargs) | |
| else: | |
| if param_count == 0: # No parameters | |
| return await self.func() if inspect.iscoroutinefunction(self.func) else self.func() | |
| elif param_count == 1: # Just artifact | |
| return await self.func(artifact) if inspect.iscoroutinefunction(self.func) else self.func(artifact) | |
| else: # Artifact + kwargs | |
| return await self.func(artifact, **kwargs) if inspect.iscoroutinefunction(self.func) else self.func(artifact, **kwargs) | |
| class DecoratorBasedObserver(WorkspaceObserver): | |
| """Enhanced decorator-based observer implementation""" | |
| def __init__(self): | |
| self.create_handlers: List[Handler] = [] | |
| self.update_handlers: List[Handler] = [] | |
| self.delete_handlers: List[Handler] = [] | |
| async def on_create(self, workspace_id: str, artifact: Artifact, **kwargs) -> List[Any]: | |
| """Process artifact creation with all handlers""" | |
| results = [] | |
| for handler in self.create_handlers: | |
| try: | |
| result = await handler(workspace_id=workspace_id, artifact=artifact, **kwargs) | |
| if result is not None: | |
| results.append(result) | |
| except Exception as e: | |
| print(f"Create handler failed: error is {e}: {traceback.format_exc()}") | |
| return results | |
| async def on_update(self, artifact: Artifact, **kwargs) -> List[Any]: | |
| """Process artifact update with all handlers""" | |
| results = [] | |
| for handler in self.update_handlers: | |
| try: | |
| result = await handler(artifact, **kwargs) | |
| if result is not None: | |
| results.append(result) | |
| except Exception as e: | |
| print(f"Update handler failed: {e}") | |
| return results | |
| async def on_delete(self, artifact: Artifact, **kwargs) -> List[Any]: | |
| """Process artifact deletion with all handlers""" | |
| results = [] | |
| for handler in self.delete_handlers: | |
| try: | |
| result = await handler(artifact, **kwargs) | |
| if result is not None: | |
| results.append(result) | |
| except Exception as e: | |
| print(f"Delete handler failed: {e}") | |
| return results | |
| def register_create_handler(self, func, instance=None, workspace_id=None, filters=None): | |
| """Register a handler for artifact creation""" | |
| logging.info(f"[📂WORKSPACE]✨ Registering create handler for {func}") | |
| self.create_handlers.append(Handler(func, instance, workspace_id, filters)) | |
| return func | |
| def un_register_create_handler(self, func, instance=None, workspace_id=None): | |
| """Register a handler for artifact creation""" | |
| logging.info(f"[📂WORKSPACE] UnRegister create handler for {func}") | |
| for handler in self.create_handlers: | |
| if handler.func == func: | |
| self.create_handlers.remove(handler) | |
| logging.info(f"[📂WORKSPACE] UnRegister create handler for {func} success") | |
| def register_update_handler(self, func, instance=None, workspace_id=None, filters=None): | |
| """Register a handler for artifact update""" | |
| logging.info(f"[📂WORKSPACE]✨ Registering update handler for {func}") | |
| self.update_handlers.append(Handler(func, instance, workspace_id, filters)) | |
| return func | |
| def register_delete_handler(self, func, instance=None, workspace_id=None, filters=None): | |
| """Register a handler for artifact deletion""" | |
| logging.info(f"[📂WORKSPACE]✨ Registering delete handler for {func}") | |
| self.delete_handlers.append(Handler(func, instance, workspace_id, filters)) | |
| return func | |
| # Global observer instance | |
| _observer = DecoratorBasedObserver() | |
| def get_observer() -> DecoratorBasedObserver: | |
| """Get the global observer instance""" | |
| return _observer | |
| def on_artifact_create(func=None, workspace_id=None, filters=None): | |
| """ | |
| Decorator for artifact creation handlers | |
| Can be used as a simple decorator (@on_artifact_create) or with parameters: | |
| @on_artifact_create(workspace_id='abc', filters={'artifact_type': 'WEB_PAGES'}) | |
| """ | |
| if func is None: | |
| # Called with parameters | |
| def decorator(f): | |
| return _observer.register_create_handler(f, None, workspace_id, filters) | |
| return decorator | |
| # Called as simple decorator | |
| return _observer.register_create_handler(func) | |
| def on_artifact_update(func=None, workspace_id=None, filters=None): | |
| """ | |
| Decorator for artifact update handlers | |
| Can be used as a simple decorator (@on_artifact_update) or with parameters: | |
| @on_artifact_update(workspace_id='abc', filters={'artifact_type': 'WEB_PAGES'}) | |
| """ | |
| if func is None: | |
| # Called with parameters | |
| def decorator(f): | |
| return _observer.register_update_handler(f, None, workspace_id, filters) | |
| return decorator | |
| # Called as simple decorator | |
| return _observer.register_update_handler(func) | |
| def on_artifact_delete(func=None, workspace_id=None, filters=None): | |
| """ | |
| Decorator for artifact deletion handlers | |
| Can be used as a simple decorator (@on_artifact_delete) or with parameters: | |
| @on_artifact_delete(workspace_id='abc', filters={'artifact_type': 'WEB_PAGES'}) | |
| """ | |
| if func is None: | |
| # Called with parameters | |
| def decorator(f): | |
| return _observer.register_delete_handler(f, None, workspace_id, filters) | |
| return decorator | |
| # Called as simple decorator | |
| return _observer.register_delete_handler(func) | |