cpu-text-generation-demo / workflow_example.py
OnyxlMunkey's picture
Initial deploy to Hugging Face Spaces
c9c2d7e
��Sure, here's a comprehensive Python example of a Data Ingestion Workflow that includes a main controller script, a mock Scraper, a mock VectorDB processor, and a mock Alerter function:
```python
import time
import uuid
from typing import Dict, List
from abc import ABC, abstractmethod
from dataclasses import dataclass
from datetime import datetime
# Mock Scraper
class Scraper:
def scrape(self, url: str) -> Dict[str, str]:
"""Mocks scraping data from a website."""
return {
"id": str(uuid.uuid4()),
"title": "Sample Article",
"content": "This is a sample article content.",
"url": url,
"timestamp": datetime.now().isoformat(),
}
# Mock VectorDB Processor
class VectorDBProcessor:
def process(self, data: Dict[str, str]) -> Dict[str, str]:
"""Mocks processing data and storing it in a vector database."""
print(f"Storing data in VectorDB: {data}")
return data
# Mock Alerter
def alert(data: Dict[str, str]) -> None:
"""Mocks sending an alert about the processed data."""
print(f"Alerting about processed data: {data}")
# Abstract Base Class for Tasks
class Task(ABC):
@abstractmethod
def run(self, input_data: Dict[str, str]) -> Dict[str, str]:
pass
# Task Implementations
class ScraperTask(Task):
def __init__(self, scraper: Scraper):
self.scraper = scraper
def run(self, input_data: Dict[str, str]) -> Dict[str, str]:
url = input_data.get("url")
if url:
return self.scraper.scrape(url)
return {}
class VectorDBProcessorTask(Task):
def __init__(self, vector_db_processor: VectorDBProcessor):
self.vector_db_processor = vector_db_processor
def run(self, input_data: Dict[str, str]) -> Dict[str, str]:
return self.vector_db_processor.process(input_data)
class AlerterTask(Task):
def run(self, input_data: Dict[str, str]) -> Dict[str, str]:
alert(input_data)
return input_data
# Main Controller
@dataclass
class DataIngestionWorkflow:
scraper: Scraper
vector_db_processor: VectorDBProcessor
def run(self, urls: List[str]) -> None:
scraper_task = ScraperTask(self.scraper)
vector_db_task = VectorDBProcessorTask(self.vector_db_processor)
alerter_task = AlerterTask()
for url in urls:
scraped_data = scraper_task.run({"url": url})
processed_data = vector_db_task.run(scraped_data)
alerter_task.run(processed_data)
time.sleep(1) # Simulating some processing time
if __name__ == "__main__":
workflow = DataIngestionWorkflow(Scraper(), VectorDBProcessor())
workflow.run(["https://example.com", "https://another-example.com"])
```
This example demonstrates the following:
1. **Main Controller**: The `DataIngestionWorkflow` class acts as the main controller, orchestrating the execution of the scraper, vector DB processor, and alerter tasks.
2. **Task Abstraction**: The `Task` abstract base class defines the interface for all tasks, ensuring they have a consistent `run` method.
3. **Task Implementations**: The `ScraperTask`, `VectorDBProcessorTask`, and `AlerterTask` classes implement the specific logic for each task.
4. **Mock Implementations**: The `Scraper`, `VectorDBProcessor`, and `alert` functions provide mock implementations of the scraping, vector DB processing, and alerting functionalities.
5. **Data Flow**: The main controller runs the tasks in sequence, passing the output of one task as the input to the next, simulating the data flow in the workflow.
To run the example, execute the `__main