#!/usr/bin/python3 # -*- coding: utf-8 -*- import asyncio import json from toolbox.porter.tasks.base_task import BaseTask class PorterManager(object): def __init__(self): # state self.coro_task_set = set() @staticmethod def get_coro_task_set_by_task_file(tasks_file: str): with open(tasks_file, "r", encoding="utf-8") as f: tasks = json.load(f) coro_task_set = set() for task in tasks: enable = task.pop("enable") task_type = task.pop("type") if not enable: continue task_cls: type(BaseTask) = BaseTask.by_name(task_type) task_obj = task_cls(**task) coro_task_set.add(task_obj.start()) return coro_task_set def add_tasks_by_task_file(self, tasks_file: str): coro_task_set = self.get_coro_task_set_by_task_file(tasks_file) self.coro_task_set.update(coro_task_set) return len(coro_task_set) async def run(self): future_tasks = list() for task in self.coro_task_set: task = asyncio.ensure_future(task) # task = asyncio.create_task(task) future_tasks.append(task) await asyncio.sleep(3) await asyncio.wait(future_tasks) async def main(): import log from project_settings import environment, project_path, log_directory, time_zone_info log.setup_size_rotating(log_directory=log_directory, tz_info=time_zone_info) porter_task_file = environment.get("porter_tasks_file") youtube_video_upload_tasks_file = project_path / porter_task_file manager = PorterManager(youtube_video_upload_tasks_file) await manager.run() return if __name__ == "__main__": asyncio.run(main())