qgyd2021's picture
update
3c9e480
#!/usr/bin/python3
# -*- coding: utf-8 -*-
import asyncio
import json
from toolbox.porter.tasks.base_task import BaseTask
class PorterManager(object):
def __init__(self):
# state
self.coro_task_set = set()
@staticmethod
def get_coro_task_set_by_task_file(tasks_file: str):
with open(tasks_file, "r", encoding="utf-8") as f:
tasks = json.load(f)
coro_task_set = set()
for task in tasks:
enable = task.pop("enable")
task_type = task.pop("type")
if not enable:
continue
task_cls: type(BaseTask) = BaseTask.by_name(task_type)
task_obj = task_cls(**task)
coro_task_set.add(task_obj.start())
return coro_task_set
def add_tasks_by_task_file(self, tasks_file: str):
coro_task_set = self.get_coro_task_set_by_task_file(tasks_file)
self.coro_task_set.update(coro_task_set)
return len(coro_task_set)
async def run(self):
future_tasks = list()
for task in self.coro_task_set:
task = asyncio.ensure_future(task)
# task = asyncio.create_task(task)
future_tasks.append(task)
await asyncio.sleep(3)
await asyncio.wait(future_tasks)
async def main():
import log
from project_settings import environment, project_path, log_directory, time_zone_info
log.setup_size_rotating(log_directory=log_directory, tz_info=time_zone_info)
porter_task_file = environment.get("porter_tasks_file")
youtube_video_upload_tasks_file = project_path / porter_task_file
manager = PorterManager(youtube_video_upload_tasks_file)
await manager.run()
return
if __name__ == "__main__":
asyncio.run(main())