|
|
| import os |
| import uuid |
| import hashlib |
| import subprocess |
| import shutil |
| import signal |
| import re |
| import math |
| from datetime import datetime, timedelta |
| from pathlib import Path |
| from typing import Dict, Any, List, Union |
| from enum import Enum |
|
|
| from fastapi import FastAPI, File, UploadFile, BackgroundTasks, HTTPException, Path as FastApiPath, Depends, Query, status |
| from fastapi.responses import JSONResponse, FileResponse |
| from pydantic import BaseModel, Field, validator |
| from apscheduler.schedulers.asyncio import AsyncIOScheduler |
|
|
| |
|
|
| |
| API_KEY = os.getenv("API_KEY") |
|
|
| |
| |
| |
| |
| VFR_TO_CFR_CRF = 17 |
| VFR_TO_CFR_PRESET = 4 |
|
|
| |
| BASE_DATA_DIR = Path("data") |
| UPLOADS_DIR = BASE_DATA_DIR / "uploads" |
| OUTPUTS_DIR = BASE_DATA_DIR / "outputs" |
| LOGS_DIR = BASE_DATA_DIR / "logs" |
|
|
| |
| FILES_DB: Dict[str, Dict[str, Any]] = {} |
| TASKS_DB: Dict[str, Dict[str, Any]] = {} |
|
|
| |
| MAX_AGE_HOURS = 24 |
| MAX_TOTAL_SIZE_GB = 50 |
| CLEANUP_INTERVAL_MINUTES = 30 |
|
|
| |
| scheduler = AsyncIOScheduler() |
|
|
| |
|
|
| async def verify_token(token: str = Query(..., description="Токен для доступа к API.")): |
| """Проверяет, соответствует ли предоставленный токен ключу API сервера.""" |
| if not API_KEY: |
| print("КРИТИЧЕСКАЯ ОШИБКА: Переменная окружения API_KEY не установлена!") |
| raise HTTPException( |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, |
| detail="Ключ API не сконфигурирован на сервере." |
| ) |
| if token != API_KEY: |
| raise HTTPException( |
| status_code=status.HTTP_401_UNAUTHORIZED, |
| detail="Неверный или отсутствующий токен API." |
| ) |
|
|
| |
|
|
| class Encoder(str, Enum): |
| av1 = "svt-av1" |
| x265 = "libx265" |
| x264 = "libx264" |
|
|
| class X265Preset(str, Enum): |
| ultrafast = "ultrafast" |
| superfast = "superfast" |
| veryfast = "veryfast" |
| faster = "faster" |
| fast = "fast" |
| medium = "medium" |
| slow = "slow" |
| slower = "slower" |
| veryslow = "veryslow" |
| placebo = "placebo" |
|
|
| |
|
|
| class UploadResponse(BaseModel): |
| message: str |
| file_hash: str |
| file_path: str |
| original_filename: str |
| is_new: bool |
|
|
| class BaseTaskRequest(BaseModel): |
| input_hash: str = Field(..., description="SHA256 хеш загруженного файла.") |
| encoder: Encoder = Field(Encoder.av1, description="Выбор кодека.") |
| extra_options: str = Field("", description="Дополнительные опции для ab-av1.") |
|
|
| class AutoEncodeRequest(BaseTaskRequest): |
| preset: Union[int, X265Preset] = Field(3, description="Пресет для кодировщика. Для AV1: 0-12 (int). Для x265: 'slow', 'medium' и т.д. (str).") |
| min_vmaf: int = Field(96, description="Целевой минимальный VMAF.", ge=0, le=100) |
|
|
| @validator('preset') |
| def preset_must_match_encoder(cls, v, values): |
| encoder = values.get('encoder') |
| if encoder == Encoder.av1 and not isinstance(v, int): |
| raise ValueError('Для AV1 пресет должен быть числом (0-12).') |
| if encoder == Encoder.x265 and not isinstance(v, X265Preset): |
| raise ValueError("Для x265 пресет должен быть строкой (например, 'slow').") |
| if isinstance(v, int) and (v < 0 or v > 12): |
| raise ValueError('Пресет для AV1 должен быть в диапазоне от 0 до 12.') |
| return v |
|
|
| class CrfSearchRequest(AutoEncodeRequest): |
| pass |
|
|
| class EncodeRequest(BaseTaskRequest): |
| preset: Union[int, X265Preset] = Field(3, description="Пресет для кодировщика. Для AV1: 0-12 (int). Для x265: 'slow', 'medium' и т.д. (str).") |
| crf: float = Field(20, description="Значение CRF (Constant Rate Factor). Допускаются дробные значения.") |
|
|
| @validator('preset') |
| def preset_must_match_encoder(cls, v, values): |
| encoder = values.get('encoder') |
| if encoder == Encoder.av1 and not isinstance(v, int): |
| raise ValueError('Для AV1 пресет должен быть числом (0-12).') |
| if encoder == Encoder.x265 and not isinstance(v, X265Preset): |
| raise ValueError("Для x265 пресет должен быть строкой (например, 'slow').") |
| if isinstance(v, int) and (v < 0 or v > 12): |
| raise ValueError('Пресет для AV1 должен быть в диапазоне от 0 до 12.') |
| return v |
|
|
| class TaskStatusResponse(BaseModel): |
| task_id: str |
| task_type: str |
| status: str |
| input_hash: str |
| command: str |
| created_at: datetime |
| started_at: datetime | None |
| finished_at: datetime | None |
| output_path: str | None |
| log_path: str |
| last_log_line: str | None |
|
|
| encoder: Encoder | None = None |
| preset: Union[int, str] | None = None |
| crf: float | None = None |
| min_vmaf: int | None = None |
| extra_options: str | None = None |
|
|
|
|
| class TaskCreateResponse(BaseModel): |
| message: str |
| task_id: str |
| status_url: str |
| log_url: str |
| manage_url: str |
|
|
| |
| async def cleanup_files(): |
| """Периодическая задача для удаления старых файлов и контроля размера хранилища.""" |
| print(f"[{datetime.utcnow()}] Запуск задачи очистки...") |
| |
| now = datetime.utcnow() |
| max_age_limit = now - timedelta(hours=MAX_AGE_HOURS) |
| max_size_bytes = MAX_TOTAL_SIZE_GB * (1024**3) |
| |
| all_managed_files: List[Path] = [] |
| for dir_path in [UPLOADS_DIR, OUTPUTS_DIR, LOGS_DIR]: |
| all_managed_files.extend(list(dir_path.glob("**/*"))) |
|
|
| active_files = set() |
| for task_id, task in TASKS_DB.items(): |
| if task['status'] in ['pending', 'running']: |
| if task['input_hash'] in FILES_DB: |
| active_files.add(Path(FILES_DB[task['input_hash']]['path'])) |
| if task.get('output_path'): |
| active_files.add(Path(task['output_path'])) |
| if task.get('log_path'): |
| active_files.add(Path(task['log_path'])) |
| |
| files_to_delete = set() |
| files_with_meta = [] |
|
|
| for file_path in all_managed_files: |
| if not file_path.is_file() or file_path in active_files: |
| continue |
| try: |
| mtime = datetime.utcfromtimestamp(file_path.stat().st_mtime) |
| if mtime < max_age_limit: |
| files_to_delete.add(file_path) |
| else: |
| files_with_meta.append({'path': file_path, 'mtime': mtime, 'size': file_path.stat().st_size}) |
| except FileNotFoundError: |
| continue |
|
|
| current_total_size = sum(f.stat().st_size for f in all_managed_files if f.is_file()) |
| if current_total_size > max_size_bytes: |
| files_with_meta.sort(key=lambda x: x['mtime']) |
| |
| size_to_free = current_total_size - max_size_bytes |
| freed_size = 0 |
| |
| for file_meta in files_with_meta: |
| if freed_size >= size_to_free: |
| break |
| if file_meta['path'] not in files_to_delete and file_meta['path'] not in active_files: |
| files_to_delete.add(file_meta['path']) |
| freed_size += file_meta['size'] |
|
|
| deleted_count = 0 |
| for file_path in files_to_delete: |
| try: |
| file_path.unlink() |
| deleted_count += 1 |
| if str(file_path).startswith(str(UPLOADS_DIR)): |
| file_hash_to_del = file_path.stem |
| FILES_DB.pop(file_hash_to_del, None) |
| elif str(file_path).startswith(str(OUTPUTS_DIR)) or str(file_path).startswith(str(LOGS_DIR)): |
| task_id_to_del = file_path.stem |
| TASKS_DB.pop(task_id_to_del, None) |
| except Exception as e: |
| print(f"Ошибка при удалении файла {file_path}: {e}") |
| |
| if deleted_count > 0: |
| print(f"Очистка завершена. Удалено {deleted_count} файлов.") |
|
|
| |
| app = FastAPI( |
| title="ab-av1 API Server", |
| description="REST API для асинхронного кодирования видео. **Интерактивная документация: `/docs`**.", |
| version="2.8.0", |
| ) |
|
|
| @app.on_event("startup") |
| async def on_startup(): |
| """Создаем директории, проверяем API_KEY и запускаем планировщик.""" |
| if not API_KEY: |
| print("КРИТИЧЕСКАЯ ОШИБКА: Переменная окружения API_KEY не установлена. Сервер не сможет авторизовать запросы.") |
| else: |
| print("Ключ API успешно загружен.") |
|
|
| for dir_path in [UPLOADS_DIR, OUTPUTS_DIR, LOGS_DIR]: |
| dir_path.mkdir(parents=True, exist_ok=True) |
| |
| scheduler.add_job(cleanup_files, 'interval', minutes=CLEANUP_INTERVAL_MINUTES) |
| scheduler.start() |
| print("Планировщик очистки файлов запущен.") |
|
|
| @app.on_event("shutdown") |
| async def on_shutdown(): |
| """Останавливаем планировщик и активные процессы при выключении.""" |
| scheduler.shutdown() |
| print("Планировщик остановлен.") |
| for task_id, task in TASKS_DB.items(): |
| if task.get('process') and task['status'] == 'running': |
| print(f"Остановка процесса для задачи {task_id}...") |
| task['process'].terminate() |
| try: |
| task['process'].wait(timeout=5) |
| except subprocess.TimeoutExpired: |
| task['process'].kill() |
|
|
| |
|
|
| def run_simple_task(task_id: str, command: list[str], log_path: Path): |
| """Выполняет простую одношаговую команду в фоне (для encode и crf-search).""" |
| task = TASKS_DB[task_id] |
| task["started_at"] = datetime.utcnow() |
| task["status"] = "running" |
| try: |
| process = subprocess.Popen( |
| command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, |
| text=True, encoding='utf-8', bufsize=1, preexec_fn=os.setsid |
| ) |
| task['process'] = process |
| |
| with open(log_path, "w", encoding='utf-8') as log_file: |
| log_file.write(f"Executing command: {' '.join(command)}\n\n") |
| for line in iter(process.stdout.readline, ''): |
| log_file.write(line) |
| log_file.flush() |
| task["last_log_line"] = line.strip() |
| |
| process.wait() |
| |
| if task.get('was_cancelled'): |
| task["status"] = "cancelled" |
| elif process.returncode == 0: |
| task["status"] = "completed" |
| else: |
| task["status"] = "failed" |
| task["last_log_line"] = f"Процесс завершился с кодом {process.returncode}" |
| |
| except Exception as e: |
| task["status"] = "failed" |
| error_message = f"Произошло исключение Python: {e}" |
| task["last_log_line"] = error_message |
| with open(log_path, "a", encoding='utf-8') as log_file: |
| log_file.write(f"\n--- PYTHON EXCEPTION ---\n{error_message}") |
| finally: |
| task["finished_at"] = datetime.utcnow() |
| task.pop('process', None) |
|
|
| def run_auto_encode_workflow(task_id: str, request: AutoEncodeRequest): |
| """ |
| Выполняет сложный воркфлоу для auto-encode с проверкой VFR. |
| """ |
| task = TASKS_DB[task_id] |
| log_path = Path(task["log_path"]) |
| original_input_path = Path(FILES_DB[request.input_hash]["path"]) |
| temp_cfr_path = None |
|
|
| def _log_message(message: str, level: str = "INFO"): |
| line = f"[{datetime.utcnow().strftime('%H:%M:%S')}] [{level}] {message}\n" |
| print(f"Task {task_id}: {line.strip()}") |
| with open(log_path, "a", encoding='utf-8') as log_file: |
| log_file.write(line) |
| task["last_log_line"] = message |
|
|
| def _run_sub_task(command: list[str], sub_task_name: str) -> str: |
| _log_message(f"Запуск подзадачи: {sub_task_name}") |
| _log_message(f"Команда: {' '.join(command)}") |
| |
| process = subprocess.Popen( |
| command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, |
| text=True, encoding='utf-8', bufsize=1, preexec_fn=os.setsid |
| ) |
| task['process'] = process |
| |
| full_output = [] |
| with open(log_path, "a", encoding='utf-8') as log_file: |
| for line in iter(process.stdout.readline, ''): |
| log_file.write(line) |
| log_file.flush() |
| full_output.append(line) |
| if line.strip(): |
| task["last_log_line"] = line.strip() |
| |
| process.wait() |
| task.pop('process', None) |
|
|
| if process.returncode != 0: |
| raise RuntimeError(f"Подзадача '{sub_task_name}' завершилась с ошибкой (код: {process.returncode}). Смотрите лог для деталей.") |
| |
| return "".join(full_output) |
|
|
| try: |
| task["status"] = "running" |
| task["started_at"] = datetime.utcnow() |
| with open(log_path, "w", encoding='utf-8') as f: |
| f.write(f"Starting auto-encode workflow for task {task_id}\n") |
|
|
| if not shutil.which("mediainfo"): |
| raise RuntimeError("Утилита 'mediainfo' не найдена в системном PATH. Пожалуйста, установите ее.") |
| |
| _log_message("Шаг 1/4: Проверка на Variable Frame Rate (VFR)") |
| vfr_check_cmd = ["mediainfo", "--Inform=Video;%FrameRate_Mode%", str(original_input_path)] |
| vfr_check = subprocess.run(vfr_check_cmd, capture_output=True, text=True, check=True) |
| frame_rate_mode = vfr_check.stdout.strip() |
| |
| crf_search_input = original_input_path |
| |
| if "VFR" in frame_rate_mode.upper(): |
| _log_message(f"Обнаружен VFR ({frame_rate_mode}). Начало конвертации в CFR.") |
| |
| max_fps_res_cmd = ["mediainfo", "--Inform=Video;%FrameRate_Maximum%", str(original_input_path)] |
| max_fps_res = subprocess.run(max_fps_res_cmd, capture_output=True, text=True, check=True) |
| target_fps = math.ceil(float(max_fps_res.stdout.strip())) |
| _log_message(f"Целевой FPS для CFR: {target_fps}") |
|
|
| temp_cfr_path = OUTPUTS_DIR / f"{task_id}_temp_cfr.mp4" |
| |
| |
| cfr_cmd = [ |
| "ffmpeg", "-i", str(original_input_path), "-vf", f"fps={target_fps}", |
| "-c:v", "libsvtav1", |
| "-crf", str(VFR_TO_CFR_CRF), |
| "-preset", str(VFR_TO_CFR_PRESET), |
| "-c:a", "copy", "-c:s", "copy", str(temp_cfr_path) |
| ] |
| _run_sub_task(cfr_cmd, "VFR в CFR конвертация") |
| crf_search_input = temp_cfr_path |
| _log_message("Конвертация в CFR успешно завершена.") |
| else: |
| _log_message(f"Обнаружен CFR ({frame_rate_mode}). Конвертация не требуется.") |
|
|
| _log_message("Шаг 2/4: Поиск оптимального CRF (crf-search)") |
| |
| preset_value = request.preset.value if isinstance(request.preset, Enum) else str(request.preset) |
| |
| search_cmd = [ |
| "ab-av1", "crf-search", "-i", str(crf_search_input), |
| "-e", request.encoder.value, "--preset", preset_value, |
| "--min-vmaf", str(request.min_vmaf) |
| ] |
| if request.extra_options: |
| search_cmd.extend(request.extra_options.split()) |
| |
| search_output = _run_sub_task(search_cmd, "Поиск CRF") |
| |
| found_crf = None |
| match = re.search(r"encode .*--crf ([\d.]+)", search_output) |
| if match: |
| found_crf = match.group(1) |
| else: |
| match = re.search(r"^crf ([\d.]+) VMAF", search_output, re.MULTILINE) |
| if match: |
| found_crf = match.group(1) |
|
|
| if not found_crf: |
| raise RuntimeError("Не удалось найти рекомендованный CRF в выводе crf-search.") |
| _log_message(f"Найден оптимальный CRF: {found_crf}") |
|
|
| _log_message("Шаг 3/4: Финальное кодирование") |
| final_output_path = Path(task["output_path"]) |
| encode_cmd = [ |
| "ab-av1", "encode", "-i", str(crf_search_input), "-o", str(final_output_path), |
| "--crf", found_crf, "-e", request.encoder.value, "--preset", preset_value |
| ] |
| if request.extra_options: |
| encode_cmd.extend(request.extra_options.split()) |
| |
| _run_sub_task(encode_cmd, "Финальное кодирование") |
| |
| task["status"] = "completed" |
| _log_message("Шаг 4/4: Задача успешно завершена!") |
|
|
| except Exception as e: |
| task["status"] = "failed" |
| error_message = f"Воркфлоу завершился с ошибкой: {e}" |
| _log_message(error_message, level="ERROR") |
| finally: |
| if temp_cfr_path and temp_cfr_path.exists(): |
| _log_message("Очистка временных файлов...") |
| temp_cfr_path.unlink() |
| task["finished_at"] = datetime.utcnow() |
| task.pop('process', None) |
|
|
| def create_task(task_type: str, request: BaseTaskRequest, background_tasks: BackgroundTasks): |
| """Общая функция для создания и запуска любого типа задачи.""" |
| if request.input_hash not in FILES_DB: |
| raise HTTPException(status_code=404, detail=f"Файл с хешем {request.input_hash} не найден.") |
| |
| input_path = Path(FILES_DB[request.input_hash]["path"]) |
| if not input_path.exists(): |
| raise HTTPException(status_code=404, detail=f"Исходный файл для хеша {request.input_hash} был удален.") |
| |
| task_id = str(uuid.uuid4()) |
| output_path = OUTPUTS_DIR / f"{task_id}.mp4" |
| log_path = LOGS_DIR / f"{task_id}.log" |
| |
| command = [] |
| command_str = "" |
| |
| if task_type in ["encode", "crf-search"]: |
| preset_value = request.preset.value if isinstance(request.preset, Enum) else str(request.preset) |
| |
| command = ["ab-av1", task_type, "-i", str(input_path)] |
| command.extend(["-e", request.encoder.value, "--preset", preset_value]) |
| if task_type == "encode": |
| crf_value = request.crf if hasattr(request, 'crf') else 20 |
| command.extend(["-o", str(output_path), "--crf", str(crf_value)]) |
| else: |
| min_vmaf_value = request.min_vmaf if hasattr(request, 'min_vmaf') else 96 |
| command.extend(["--min-vmaf", str(min_vmaf_value)]) |
| if request.extra_options: |
| command.extend(request.extra_options.split()) |
| command_str = " ".join(command) |
| else: |
| command_str = f"auto-encode workflow for {request.input_hash}" |
|
|
| TASKS_DB[task_id] = { |
| "task_id": task_id, |
| "task_type": task_type, |
| "status": "pending", |
| "input_hash": request.input_hash, |
| "command": command_str, |
| "created_at": datetime.utcnow(), |
| "started_at": None, |
| "finished_at": None, |
| "output_path": str(output_path) if task_type != "crf-search" else None, |
| "log_path": str(log_path), |
| "last_log_line": "Задача поставлена в очередь.", |
|
|
| "encoder": request.encoder, |
| "preset": request.preset, |
| "extra_options": request.extra_options, |
| "crf": getattr(request, 'crf', None), |
| "min_vmaf": getattr(request, 'min_vmaf', None), |
| } |
| |
| if task_type == "auto-encode": |
| background_tasks.add_task(run_auto_encode_workflow, task_id, request) |
| else: |
| background_tasks.add_task(run_simple_task, task_id, command, log_path) |
| |
| return TaskCreateResponse( |
| message=f"Задача '{task_type}' успешно создана.", |
| task_id=task_id, status_url=f"/tasks/{task_id}/status", |
| log_url=f"/tasks/{task_id}/log", manage_url="/manage" |
| ) |
|
|
| |
|
|
| @app.get("/", summary="Статус API") |
| def read_root(): |
| return {"status": "ok"} |
|
|
| @app.post("/upload", response_model=UploadResponse, summary="Загрузка видеофайла", dependencies=[Depends(verify_token)]) |
| async def upload_file(file: UploadFile = File(...)): |
| contents = await file.read() |
| file_hash = hashlib.sha256(contents).hexdigest() |
| |
| if file_hash in FILES_DB and Path(FILES_DB[file_hash]["path"]).exists(): |
| return UploadResponse( |
| message="Файл с таким хешем уже существует.", |
| file_hash=file_hash, |
| file_path=FILES_DB[file_hash]["path"], |
| original_filename=FILES_DB[file_hash]["original_filename"], |
| is_new=False |
| ) |
| |
| file_extension = Path(file.filename).suffix |
| saved_path = UPLOADS_DIR / f"{file_hash}{file_extension}" |
| |
| with open(saved_path, "wb") as f: |
| f.write(contents) |
| |
| FILES_DB[file_hash] = { |
| "path": str(saved_path), |
| "original_filename": file.filename, |
| "uploaded_at": datetime.utcnow(), |
| "size": len(contents) |
| } |
| |
| return UploadResponse( |
| message="Файл успешно загружен.", |
| file_hash=file_hash, |
| file_path=str(saved_path), |
| original_filename=file.filename, |
| is_new=True |
| ) |
|
|
| @app.post("/tasks/auto-encode", response_model=TaskCreateResponse, status_code=202, summary="Запуск задачи auto-encode с логикой VFR", dependencies=[Depends(verify_token)]) |
| async def task_auto_encode(request: AutoEncodeRequest, background_tasks: BackgroundTasks): |
| return create_task("auto-encode", request, background_tasks) |
|
|
| @app.post("/tasks/crf-search", response_model=TaskCreateResponse, status_code=202, summary="Запуск задачи crf-search", dependencies=[Depends(verify_token)]) |
| async def task_crf_search(request: CrfSearchRequest, background_tasks: BackgroundTasks): |
| return create_task("crf-search", request, background_tasks) |
|
|
| @app.post("/tasks/encode", response_model=TaskCreateResponse, status_code=202, summary="Запуск задачи encode", dependencies=[Depends(verify_token)]) |
| async def task_encode(request: EncodeRequest, background_tasks: BackgroundTasks): |
| return create_task("encode", request, background_tasks) |
|
|
| @app.get("/tasks/{task_id}/status", response_model=TaskStatusResponse, summary="Получить статус задачи", dependencies=[Depends(verify_token)]) |
| def get_task_status(task_id: str = FastApiPath(..., description="ID задачи")): |
| if task_id not in TASKS_DB: |
| raise HTTPException(status_code=404, detail="Задача не найдена.") |
| task_info = TASKS_DB[task_id].copy() |
| task_info.pop('process', None) |
| return task_info |
|
|
| @app.get("/tasks/{task_id}/log", summary="Получить полный лог задачи", dependencies=[Depends(verify_token)]) |
| def get_task_log(task_id: str = FastApiPath(..., description="ID задачи")): |
| if task_id not in TASKS_DB: |
| raise HTTPException(status_code=404, detail="Задача не найдена.") |
| log_path = Path(TASKS_DB[task_id]["log_path"]) |
| if not log_path.exists(): |
| return JSONResponse(status_code=404, content={"detail": "Файл лога не найден."}) |
| return FileResponse(log_path, media_type="text/plain", filename=log_path.name) |
|
|
| @app.get("/download/{task_id}", summary="Скачать результат кодирования", dependencies=[Depends(verify_token)]) |
| def download_result(task_id: str = FastApiPath(..., description="ID задачи")): |
| if task_id not in TASKS_DB: |
| raise HTTPException(status_code=404, detail="Задача не найдена.") |
| task = TASKS_DB[task_id] |
| if task["status"] != "completed": |
| raise HTTPException(status_code=400, detail=f"Задача еще не завершена. Статус: {task['status']}.") |
| output_path = task.get("output_path") |
| if not output_path or not Path(output_path).exists(): |
| raise HTTPException(status_code=404, detail="Выходной файл не найден.") |
| |
| output_path = Path(output_path) |
| input_hash = task["input_hash"] |
| original_filename = Path(FILES_DB[input_hash]["original_filename"]).stem |
| |
| return FileResponse(output_path, filename=f"{original_filename}_{task['task_type']}_{task_id[:8]}{output_path.suffix}") |
|
|
| |
|
|
| @app.get("/manage", summary="Получить списки задач и файлов (JSON)", dependencies=[Depends(verify_token)]) |
| async def get_management_page(): |
| """ |
| Возвращает JSON со списками всех задач и загруженных файлов. |
| """ |
| tasks_list = [] |
| for task_id, task_data in TASKS_DB.items(): |
| task_info = task_data.copy() |
| task_info.pop('process', None) |
| tasks_list.append(task_info) |
| |
| sorted_tasks = sorted(tasks_list, key=lambda x: x['created_at'], reverse=True) |
|
|
| return {"tasks": sorted_tasks, "files": FILES_DB} |
|
|
|
|
| @app.post("/manage/task/{task_id}/cancel", summary="Прервать выполняющуюся задачу", dependencies=[Depends(verify_token)]) |
| def cancel_task(task_id: str = FastApiPath(..., description="ID задачи для прерывания")): |
| if task_id not in TASKS_DB: |
| raise HTTPException(status_code=404, detail="Задача не найдена.") |
| task = TASKS_DB[task_id] |
| if task['status'] != 'running' or 'process' not in task: |
| raise HTTPException(status_code=400, detail="Задачу нельзя прервать (она не выполняется).") |
| |
| print(f"Прерывание задачи {task_id} (PID: {task['process'].pid})...") |
| task['was_cancelled'] = True |
| os.killpg(os.getpgid(task['process'].pid), signal.SIGTERM) |
| |
| return {"message": f"Команда на прерывание задачи {task_id} отправлена."} |
|
|
| @app.delete("/manage/task/{task_id}", summary="Удалить задачу и ее файлы", dependencies=[Depends(verify_token)]) |
| def delete_task(task_id: str = FastApiPath(..., description="ID задачи для удаления")): |
| if task_id not in TASKS_DB: |
| raise HTTPException(status_code=404, detail="Задача не найдена.") |
| |
| task = TASKS_DB[task_id] |
| if task['status'] == 'running': |
| raise HTTPException(status_code=400, detail="Сначала прервите выполняющуюся задачу.") |
|
|
| if task.get('output_path') and Path(task['output_path']).exists(): |
| Path(task['output_path']).unlink(missing_ok=True) |
| if task.get('log_path') and Path(task['log_path']).exists(): |
| Path(task['log_path']).unlink(missing_ok=True) |
| |
| del TASKS_DB[task_id] |
| |
| return {"message": f"Задача {task_id} и ее файлы были удалены."} |
|
|
| @app.delete("/manage/file/{file_hash}", summary="Удалить загруженный файл", dependencies=[Depends(verify_token)]) |
| def delete_file(file_hash: str = FastApiPath(..., description="Хеш файла для удаления")): |
| if file_hash not in FILES_DB: |
| raise HTTPException(status_code=404, detail="Файл не найден.") |
|
|
| for task in TASKS_DB.values(): |
| if task['input_hash'] == file_hash and task['status'] in ['running', 'pending']: |
| raise HTTPException(status_code=400, detail=f"Нельзя удалить файл, так как он используется активной задачей {task['task_id']}.") |
| |
| file_path = Path(FILES_DB[file_hash]['path']) |
| if file_path.exists(): |
| file_path.unlink() |
| |
| del FILES_DB[file_hash] |
| |
| return {"message": f"Файл {file_hash} был удален."} |
|
|
|
|