Spaces:
Running
Running
| """Post-processing pipeline base interfaces and orchestrator.""" | |
| from typing import Protocol, List | |
| from .context import RunContext | |
| class PostProcessor(Protocol): | |
| name: str | |
| def process(self, ctx: RunContext) -> None: | |
| ... | |
| class PostProcessingOrchestrator: | |
| def __init__(self, processors: List[PostProcessor]): | |
| self._processors = processors | |
| def run(self, ctx: RunContext) -> None: | |
| for processor in self._processors: | |
| try: | |
| processor.process(ctx) | |
| except Exception: | |
| # Best effort: don't break the response if a processor fails | |
| continue | |