| | from fastapi import APIRouter, HTTPException, status, BackgroundTasks, UploadFile, Query |
| | from .Schema import EditorRequest, TaskInfo |
| | from App.Worker import celery_task, concatenate_videos |
| | from celery.result import AsyncResult |
| | import aiofiles, os, uuid, aiohttp |
| | from App import SERVER_STATE, Task |
| |
|
| | videditor_router = APIRouter(tags=["vidEditor"]) |
| |
|
| |
|
| | @videditor_router.post("/create-video") |
| | async def create_video(videoRequest: EditorRequest, background_task: BackgroundTasks): |
| | background_task.add_task(celery_task, videoRequest) |
| | return {"task_id": "started"} |
| |
|
| |
|
| | @videditor_router.post("/create-chunks") |
| | async def create_chunks(videoRequest: EditorRequest, background_task: BackgroundTasks): |
| | video_duration = videoRequest.constants.duration |
| | task_id = uuid.uuid4() |
| | new_task = Task(TASK_ID=task_id) |
| |
|
| | active_nodes = [ |
| | node |
| | for node in SERVER_STATE.NODES |
| | if await new_task._check_node_online(node.SPACE_HOST) |
| | ] |
| | number_of_nodes = len(active_nodes) |
| | ranges = [ |
| | [i, i + number_of_nodes] for i in range(0, video_duration, number_of_nodes) |
| | ] |
| | for i, node in enumerate(active_nodes): |
| | await new_task.add_node(node, i) |
| |
|
| | SERVER_STATE.TASKS[task_id] = new_task |
| |
|
| | async with aiohttp.ClientSession() as session: |
| | for i, node in enumerate(active_nodes): |
| | videoRequest.constants.frames = ranges[i] |
| | if node.SPACE_HOST == SERVER_STATE.SPACE_HOST: |
| | background_task.add_task(celery_task, videoRequest) |
| | async with session.post( |
| | "node.SPACE_HOST/create-video", json=videoRequest |
| | ) as response: |
| | if response.status != 200: |
| | raise HTTPException( |
| | status_code=response.status, |
| | detail="Failed to post request to node", |
| | ) |
| |
|
| | return {"task_id": "started"} |
| |
|
| |
|
| | @videditor_router.post("/uploadfile/") |
| | async def create_file( |
| | background_tasks: BackgroundTasks, |
| | file: UploadFile, |
| | node: str, |
| | chunk: int, |
| | task: str, |
| | ): |
| |
|
| | chunk_directory = f"/tmp/Video/{task}" |
| | file_name = f"{chunk_directory}/{chunk}.mp4" |
| | |
| | os.makedirs(chunk_directory, exist_ok=True) |
| |
|
| | try: |
| | async with aiofiles.open(file_name, "wb") as f: |
| | while contents := await file.read(1024 * 1): |
| | await f.write(contents) |
| |
|
| | except Exception as e: |
| | return { |
| | "message": f"There was an error uploading the file, error message {str(e)} " |
| | } |
| | finally: |
| | await file.close() |
| | running_task = SERVER_STATE.TASKS[task] |
| | running_task.mark_node_completed(node) |
| | if running_task.is_completed(): |
| | background_tasks.add_task(concatenate_videos, chunk_directory) |
| |
|
| | return {"message": "File uploaded successfully"} |
| |
|