|
|
from fastapi import FastAPI, UploadFile, File, HTTPException, BackgroundTasks, Query
|
|
|
from fastapi.responses import FileResponse, JSONResponse, StreamingResponse
|
|
|
from fastapi.middleware.cors import CORSMiddleware
|
|
|
import os
|
|
|
import json
|
|
|
import shutil
|
|
|
import uuid
|
|
|
import time
|
|
|
import threading
|
|
|
import logging
|
|
|
import aiofiles
|
|
|
from pathlib import Path
|
|
|
from typing import Dict, List, Optional
|
|
|
from dataclasses import dataclass, asdict
|
|
|
from datetime import datetime
|
|
|
|
|
|
|
|
|
logging.basicConfig(
|
|
|
level=logging.INFO,
|
|
|
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
|
|
|
handlers=[
|
|
|
logging.FileHandler('api.log'),
|
|
|
logging.StreamHandler()
|
|
|
]
|
|
|
)
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
|
|
|
from error_logger import ErrorLogger
|
|
|
error_logger = ErrorLogger()
|
|
|
|
|
|
|
|
|
from processing_logic import (
|
|
|
processing_status,
|
|
|
uploaded_mp4s,
|
|
|
log_message,
|
|
|
process_hf_files_background,
|
|
|
UPLOAD_DIRECTORY,
|
|
|
MP4_OUTPUT_FOLDER,
|
|
|
hf_api,
|
|
|
DEFAULT_RAR_LIMIT
|
|
|
)
|
|
|
|
|
|
|
|
|
LOCK_TIMEOUT = 300
|
|
|
STATE_FILE = "middleware_state.json"
|
|
|
CHUNK_SIZE = 8192
|
|
|
|
|
|
app = FastAPI(title="Unified MP4 Processing & Distribution API")
|
|
|
|
|
|
|
|
|
app.add_middleware(
|
|
|
CORSMiddleware,
|
|
|
allow_origins=["*"],
|
|
|
allow_credentials=True,
|
|
|
allow_methods=["*"],
|
|
|
allow_headers=["*"],
|
|
|
)
|
|
|
|
|
|
|
|
|
MP4_UPLOAD_FOLDER = os.path.join(UPLOAD_DIRECTORY, "uploads")
|
|
|
os.makedirs(MP4_UPLOAD_FOLDER, exist_ok=True)
|
|
|
os.makedirs(MP4_OUTPUT_FOLDER, exist_ok=True)
|
|
|
|
|
|
processing_thread = None
|
|
|
|
|
|
|
|
|
|
|
|
@dataclass
|
|
|
class FileState:
|
|
|
path: str
|
|
|
locked: bool
|
|
|
lock_holder: Optional[str] = None
|
|
|
lock_time: Optional[float] = None
|
|
|
download_count: int = 0
|
|
|
last_access: Optional[float] = None
|
|
|
|
|
|
class MiddlewareState:
|
|
|
def __init__(self):
|
|
|
self.files: Dict[str, FileState] = {}
|
|
|
self.load_state()
|
|
|
|
|
|
def load_state(self):
|
|
|
"""Load state from disk"""
|
|
|
if os.path.exists(STATE_FILE):
|
|
|
try:
|
|
|
with open(STATE_FILE, 'r') as f:
|
|
|
data = json.load(f)
|
|
|
self.files = {
|
|
|
k: FileState(**v) for k, v in data.items()
|
|
|
}
|
|
|
except Exception as e:
|
|
|
logger.error(f"Error loading state: {e}")
|
|
|
self.files = {}
|
|
|
|
|
|
def save_state(self):
|
|
|
"""Save state to disk"""
|
|
|
try:
|
|
|
with open(STATE_FILE, 'w') as f:
|
|
|
json.dump({
|
|
|
k: asdict(v) for k, v in self.files.items()
|
|
|
}, f, indent=2)
|
|
|
except Exception as e:
|
|
|
logger.error(f"Error saving state: {e}")
|
|
|
|
|
|
def clean_expired_locks(self):
|
|
|
"""Remove expired locks"""
|
|
|
now = time.time()
|
|
|
for file_id, state in self.files.items():
|
|
|
if state.locked and (now - state.lock_time) > LOCK_TIMEOUT:
|
|
|
state.locked = False
|
|
|
state.lock_holder = None
|
|
|
state.lock_time = None
|
|
|
self.save_state()
|
|
|
|
|
|
def get_next_available_file(self, requester_id: str) -> Optional[str]:
|
|
|
"""Get next unlocked file"""
|
|
|
self.clean_expired_locks()
|
|
|
|
|
|
|
|
|
for file_id, state in self.files.items():
|
|
|
if state.locked and state.lock_holder == requester_id:
|
|
|
return file_id
|
|
|
|
|
|
|
|
|
for file_id, state in self.files.items():
|
|
|
if not state.locked:
|
|
|
state.locked = True
|
|
|
state.lock_holder = requester_id
|
|
|
state.lock_time = time.time()
|
|
|
self.save_state()
|
|
|
return file_id
|
|
|
|
|
|
return None
|
|
|
|
|
|
def release_lock(self, file_id: str, requester_id: str) -> bool:
|
|
|
"""Release a file lock"""
|
|
|
if file_id in self.files:
|
|
|
state = self.files[file_id]
|
|
|
if state.lock_holder == requester_id:
|
|
|
state.locked = False
|
|
|
state.lock_holder = None
|
|
|
state.lock_time = None
|
|
|
state.last_access = time.time()
|
|
|
state.download_count += 1
|
|
|
self.save_state()
|
|
|
return True
|
|
|
return False
|
|
|
|
|
|
|
|
|
middleware_state = MiddlewareState()
|
|
|
|
|
|
|
|
|
|
|
|
def save_file(uploaded_file: UploadFile, save_path: str):
|
|
|
os.makedirs(os.path.dirname(save_path), exist_ok=True)
|
|
|
with open(save_path, "wb") as f:
|
|
|
shutil.copyfileobj(uploaded_file.file, f)
|
|
|
|
|
|
def log_request(endpoint: str, params: dict = None):
|
|
|
"""Log API requests for debugging"""
|
|
|
logger.info(f"API Request: {endpoint} - Params: {params}")
|
|
|
|
|
|
|
|
|
|
|
|
@app.exception_handler(HTTPException)
|
|
|
async def http_exception_handler(request, exc):
|
|
|
"""Handle HTTP exceptions with detailed logging"""
|
|
|
error_id = error_logger.log_error(
|
|
|
exc,
|
|
|
request.url.path,
|
|
|
request_info={
|
|
|
"method": request.method,
|
|
|
"url": str(request.url),
|
|
|
"headers": dict(request.headers),
|
|
|
"query_params": dict(request.query_params),
|
|
|
},
|
|
|
context={
|
|
|
"status_code": exc.status_code,
|
|
|
"detail": exc.detail
|
|
|
}
|
|
|
)
|
|
|
return JSONResponse(
|
|
|
status_code=exc.status_code,
|
|
|
content={
|
|
|
"error": exc.detail,
|
|
|
"error_id": error_id,
|
|
|
"type": "http_error",
|
|
|
"status_code": exc.status_code
|
|
|
}
|
|
|
)
|
|
|
|
|
|
@app.exception_handler(Exception)
|
|
|
async def general_exception_handler(request, exc):
|
|
|
"""Handle all other exceptions with detailed logging"""
|
|
|
error_id = error_logger.log_error(
|
|
|
exc,
|
|
|
request.url.path,
|
|
|
request_info={
|
|
|
"method": request.method,
|
|
|
"url": str(request.url),
|
|
|
"headers": dict(request.headers),
|
|
|
"query_params": dict(request.query_params),
|
|
|
}
|
|
|
)
|
|
|
return JSONResponse(
|
|
|
status_code=500,
|
|
|
content={
|
|
|
"error": "Internal server error",
|
|
|
"error_id": error_id,
|
|
|
"type": "server_error",
|
|
|
"detail": str(exc) if app.debug else "An unexpected error occurred"
|
|
|
}
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
@app.get("/")
|
|
|
async def root():
|
|
|
"""API root endpoint"""
|
|
|
return {
|
|
|
"message": "Unified MP4 Processing & Distribution API",
|
|
|
"version": "1.0.0",
|
|
|
"status": "running",
|
|
|
"endpoints": {
|
|
|
"processing": {
|
|
|
"courses": "GET /courses - List all course folders",
|
|
|
"images": "GET /images/{course_folder:path} - List MP4s in course",
|
|
|
"download": "GET /download?course={course}&file={file} - Download MP4 file",
|
|
|
"debug": "GET /debug/structure - Debug file structure"
|
|
|
},
|
|
|
"middleware": {
|
|
|
"status": "GET /middleware/status - Get middleware status",
|
|
|
"register": "POST /middleware/register - Register a new file",
|
|
|
"next": "GET /middleware/next - Get next available file",
|
|
|
"release": "POST /middleware/release/{file_id} - Release a file lock",
|
|
|
"stream": "GET /middleware/stream/{file_id} - Stream a file"
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
@app.get("/courses")
|
|
|
async def get_courses():
|
|
|
"""List all top-level course folders."""
|
|
|
try:
|
|
|
courses = [d.name for d in Path(MP4_OUTPUT_FOLDER).iterdir() if d.is_dir()]
|
|
|
return {"courses": courses, "total": len(courses)}
|
|
|
except Exception as e:
|
|
|
raise HTTPException(status_code=500, detail=f"Failed to list courses: {e}")
|
|
|
|
|
|
@app.get("/images/{course_folder:path}")
|
|
|
async def get_mp4_list(course_folder: str):
|
|
|
"""List all MP4 files within a specific course folder."""
|
|
|
course_path = Path(MP4_OUTPUT_FOLDER) / course_folder
|
|
|
|
|
|
if not course_path.is_dir():
|
|
|
raise HTTPException(status_code=404, detail="Course folder not found")
|
|
|
|
|
|
try:
|
|
|
mp4_files = [f.name for f in course_path.iterdir() if f.is_file() and f.suffix.lower() == ".mp4"]
|
|
|
return mp4_files
|
|
|
except Exception as e:
|
|
|
raise HTTPException(status_code=500, detail=f"Failed to list MP4s: {e}")
|
|
|
|
|
|
@app.get("/download")
|
|
|
async def download_mp4(course: str, file: str):
|
|
|
"""Download a specific MP4 file from a course folder."""
|
|
|
file_path = Path(MP4_OUTPUT_FOLDER) / course / file
|
|
|
|
|
|
if not file_path.is_file():
|
|
|
raise HTTPException(status_code=404, detail="File not found")
|
|
|
|
|
|
return FileResponse(path=file_path, media_type="video/mp4", filename=file)
|
|
|
|
|
|
@app.get("/debug/structure")
|
|
|
async def debug_structure():
|
|
|
"""Debug endpoint to inspect the file structure and sizes."""
|
|
|
mp4_output_folder_path = Path(MP4_OUTPUT_FOLDER)
|
|
|
|
|
|
structure = {}
|
|
|
total_size_bytes = 0
|
|
|
total_mp4_files = 0
|
|
|
|
|
|
if not mp4_output_folder_path.exists():
|
|
|
return JSONResponse(content={
|
|
|
"mp4_output_folder": str(mp4_output_folder_path),
|
|
|
"folder_exists": False,
|
|
|
"total_mp4_files": 0,
|
|
|
"total_size_bytes": 0,
|
|
|
"structure": {}
|
|
|
})
|
|
|
|
|
|
for root, dirs, files in os.walk(mp4_output_folder_path):
|
|
|
current_path = Path(root)
|
|
|
relative_path = str(current_path.relative_to(mp4_output_folder_path))
|
|
|
if relative_path == ".":
|
|
|
relative_path = "/"
|
|
|
|
|
|
structure[relative_path] = {
|
|
|
"directories": [d for d in dirs],
|
|
|
"mp4_files": [],
|
|
|
"other_files": []
|
|
|
}
|
|
|
|
|
|
for file in files:
|
|
|
file_full_path = current_path / file
|
|
|
file_size = file_full_path.stat().st_size
|
|
|
total_size_bytes += file_size
|
|
|
|
|
|
if file.lower().endswith(".mp4"):
|
|
|
structure[relative_path]["mp4_files"].append({"name": file, "size": file_size})
|
|
|
total_mp4_files += 1
|
|
|
else:
|
|
|
structure[relative_path]["other_files"].append({"name": file, "size": file_size})
|
|
|
|
|
|
return {
|
|
|
"mp4_output_folder": str(mp4_output_folder_path),
|
|
|
"folder_exists": mp4_output_folder_path.exists(),
|
|
|
"total_mp4_files": total_mp4_files,
|
|
|
"total_size_bytes": total_size_bytes,
|
|
|
"structure": structure
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
@app.get("/errors/recent")
|
|
|
async def get_recent_errors(limit: int = Query(10, ge=1, le=100)):
|
|
|
"""Get most recent errors"""
|
|
|
return error_logger.get_recent_errors(limit)
|
|
|
|
|
|
@app.get("/errors/{error_id}")
|
|
|
async def get_error_details(error_id: str):
|
|
|
"""Get detailed information about a specific error"""
|
|
|
error = error_logger.get_error(error_id)
|
|
|
if not error:
|
|
|
raise HTTPException(status_code=404, detail="Error ID not found")
|
|
|
return error
|
|
|
|
|
|
@app.get("/errors/summary")
|
|
|
async def get_error_summary():
|
|
|
"""Get summary of errors by type"""
|
|
|
return error_logger.get_error_summary()
|
|
|
|
|
|
|
|
|
|
|
|
@app.get("/middleware/status")
|
|
|
async def get_middleware_status():
|
|
|
"""Get middleware status"""
|
|
|
courses = sum(1 for f in middleware_state.files.keys() if f.startswith("course:"))
|
|
|
images = sum(1 for f in middleware_state.files.keys() if f.startswith("image:"))
|
|
|
|
|
|
return {
|
|
|
"active_locks": sum(1 for f in middleware_state.files.values() if f.locked),
|
|
|
"total_files": len(middleware_state.files),
|
|
|
"total_courses": courses,
|
|
|
"total_images": images,
|
|
|
"downloads_completed": sum(f.download_count for f in middleware_state.files.values())
|
|
|
}
|
|
|
|
|
|
@app.get("/middleware/status/course/{course_id}")
|
|
|
async def get_course_status(course_id: str):
|
|
|
"""Get status of a specific course"""
|
|
|
file_id = f"course:{course_id}"
|
|
|
if file_id not in middleware_state.files:
|
|
|
raise HTTPException(status_code=404, detail="Course not found")
|
|
|
|
|
|
state = middleware_state.files[file_id]
|
|
|
return {
|
|
|
"course_id": course_id,
|
|
|
"locked": state.locked,
|
|
|
"lock_holder": state.lock_holder,
|
|
|
"lock_time": state.lock_time,
|
|
|
"download_count": state.download_count,
|
|
|
"last_access": state.last_access
|
|
|
}
|
|
|
|
|
|
@app.get("/middleware/status/image/{course_folder}/{file_id}")
|
|
|
async def get_image_status(course_folder: str, file_id: str):
|
|
|
"""Get status of a specific image"""
|
|
|
full_id = f"image:{course_folder}/{file_id}"
|
|
|
if full_id not in middleware_state.files:
|
|
|
raise HTTPException(status_code=404, detail="Image not found")
|
|
|
|
|
|
state = middleware_state.files[full_id]
|
|
|
return {
|
|
|
"file_id": file_id,
|
|
|
"course": course_folder,
|
|
|
"locked": state.locked,
|
|
|
"lock_holder": state.lock_holder,
|
|
|
"lock_time": state.lock_time,
|
|
|
"download_count": state.download_count,
|
|
|
"last_access": state.last_access
|
|
|
}
|
|
|
|
|
|
@app.post("/middleware/register")
|
|
|
async def register_file(file_path: str):
|
|
|
"""Register a new file in the system"""
|
|
|
if not os.path.exists(file_path):
|
|
|
raise HTTPException(status_code=404, detail="File not found")
|
|
|
|
|
|
file_id = os.path.basename(file_path)
|
|
|
if file_id not in middleware_state.files:
|
|
|
middleware_state.files[file_id] = FileState(
|
|
|
path=file_path,
|
|
|
locked=False
|
|
|
)
|
|
|
middleware_state.save_state()
|
|
|
|
|
|
return {"file_id": file_id}
|
|
|
|
|
|
@app.get("/middleware/next/course")
|
|
|
async def get_next_course(requester_id: str):
|
|
|
"""Get next available course folder"""
|
|
|
try:
|
|
|
courses = [d.name for d in Path(MP4_OUTPUT_FOLDER).iterdir() if d.is_dir()]
|
|
|
for course in courses:
|
|
|
course_id = f"course:{course}"
|
|
|
if course_id not in middleware_state.files:
|
|
|
middleware_state.files[course_id] = FileState(
|
|
|
path=str(Path(MP4_OUTPUT_FOLDER) / course),
|
|
|
locked=False
|
|
|
)
|
|
|
middleware_state.save_state()
|
|
|
|
|
|
if not middleware_state.files[course_id].locked:
|
|
|
middleware_state.files[course_id].locked = True
|
|
|
middleware_state.files[course_id].lock_holder = requester_id
|
|
|
middleware_state.files[course_id].lock_time = time.time()
|
|
|
middleware_state.save_state()
|
|
|
return {
|
|
|
"course_id": course,
|
|
|
"path": str(Path(MP4_OUTPUT_FOLDER) / course),
|
|
|
"lock_time": middleware_state.files[course_id].lock_time
|
|
|
}
|
|
|
except Exception as e:
|
|
|
raise HTTPException(status_code=500, detail=f"Failed to get next course: {e}")
|
|
|
|
|
|
raise HTTPException(status_code=404, detail="No courses available")
|
|
|
|
|
|
@app.get("/middleware/next/image/{course_folder}")
|
|
|
async def get_next_image(course_folder: str, requester_id: str):
|
|
|
"""Get next available frame image from a specific course's frames directory"""
|
|
|
|
|
|
frames_path = Path(MP4_OUTPUT_FOLDER) / f"{course_folder}_frames"
|
|
|
|
|
|
if not frames_path.is_dir():
|
|
|
raise HTTPException(status_code=404, detail="Course frames directory not found")
|
|
|
|
|
|
try:
|
|
|
|
|
|
frame_files = []
|
|
|
for video_dir in frames_path.iterdir():
|
|
|
if video_dir.is_dir():
|
|
|
frame_files.extend([
|
|
|
(video_dir.name, f)
|
|
|
for f in video_dir.iterdir()
|
|
|
if f.suffix.lower() in ('.jpg', '.jpeg')
|
|
|
])
|
|
|
|
|
|
|
|
|
for video_name, frame_file in frame_files:
|
|
|
file_id = f"frame:{course_folder}/{video_name}/{frame_file.name}"
|
|
|
|
|
|
|
|
|
if file_id not in middleware_state.files:
|
|
|
middleware_state.files[file_id] = FileState(
|
|
|
path=str(frame_file),
|
|
|
locked=False
|
|
|
)
|
|
|
middleware_state.save_state()
|
|
|
|
|
|
|
|
|
if not middleware_state.files[file_id].locked:
|
|
|
middleware_state.files[file_id].locked = True
|
|
|
middleware_state.files[file_id].lock_holder = requester_id
|
|
|
middleware_state.files[file_id].lock_time = time.time()
|
|
|
middleware_state.save_state()
|
|
|
|
|
|
return {
|
|
|
"file_id": file_id,
|
|
|
"course": course_folder,
|
|
|
"video": video_name,
|
|
|
"frame": frame_file.name,
|
|
|
"path": str(frame_file),
|
|
|
"lock_time": middleware_state.files[file_id].lock_time
|
|
|
}
|
|
|
except Exception as e:
|
|
|
raise HTTPException(status_code=500, detail=f"Failed to get next image: {e}")
|
|
|
|
|
|
raise HTTPException(status_code=404, detail="No images available in this course")
|
|
|
|
|
|
@app.get("/middleware/next/any")
|
|
|
async def get_next_any_file(requester_id: str):
|
|
|
"""Get next available file of any type"""
|
|
|
file_id = middleware_state.get_next_available_file(requester_id)
|
|
|
if not file_id:
|
|
|
raise HTTPException(status_code=404, detail="No files available")
|
|
|
|
|
|
file_state = middleware_state.files[file_id]
|
|
|
return {
|
|
|
"file_id": file_id,
|
|
|
"file_path": file_state.path,
|
|
|
"lock_time": file_state.lock_time,
|
|
|
"type": "course" if file_id.startswith("course:") else "image"
|
|
|
}
|
|
|
|
|
|
@app.post("/middleware/release/course/{course_id}")
|
|
|
async def release_course(course_id: str, requester_id: str):
|
|
|
"""Release a course lock"""
|
|
|
file_id = f"course:{course_id}"
|
|
|
if not middleware_state.release_lock(file_id, requester_id):
|
|
|
raise HTTPException(status_code=403, detail="Not lock holder")
|
|
|
return {"status": "ok"}
|
|
|
|
|
|
@app.post("/middleware/release/frame/{course_folder}/{video_name}/{frame_id}")
|
|
|
async def release_frame(course_folder: str, video_name: str, frame_id: str, requester_id: str):
|
|
|
"""Release a frame image lock"""
|
|
|
full_id = f"frame:{course_folder}/{video_name}/{frame_id}"
|
|
|
if not middleware_state.release_lock(full_id, requester_id):
|
|
|
raise HTTPException(status_code=403, detail="Not lock holder")
|
|
|
return {"status": "ok"}
|
|
|
|
|
|
@app.post("/middleware/release/{file_id}")
|
|
|
async def release_file(file_id: str, requester_id: str):
|
|
|
"""Release any file lock (backward compatibility)"""
|
|
|
if not middleware_state.release_lock(file_id, requester_id):
|
|
|
raise HTTPException(status_code=403, detail="Not lock holder")
|
|
|
return {"status": "ok"}
|
|
|
|
|
|
@app.get("/middleware/stream/{file_id}")
|
|
|
async def stream_file(file_id: str, requester_id: str):
|
|
|
"""Stream a file to client"""
|
|
|
if file_id not in middleware_state.files:
|
|
|
raise HTTPException(status_code=404, detail="File not found")
|
|
|
|
|
|
file_state = middleware_state.files[file_id]
|
|
|
if not file_state.locked or file_state.lock_holder != requester_id:
|
|
|
raise HTTPException(status_code=403, detail="Not lock holder")
|
|
|
|
|
|
if not os.path.exists(file_state.path):
|
|
|
raise HTTPException(status_code=404, detail="File not found on disk")
|
|
|
|
|
|
async def file_stream():
|
|
|
async with aiofiles.open(file_state.path, 'rb') as f:
|
|
|
while chunk := await f.read(CHUNK_SIZE):
|
|
|
yield chunk
|
|
|
|
|
|
middleware_state.release_lock(file_id, requester_id)
|
|
|
|
|
|
return StreamingResponse(
|
|
|
file_stream(),
|
|
|
media_type="application/octet-stream",
|
|
|
headers={
|
|
|
"Content-Disposition": f"attachment; filename={file_id}"
|
|
|
}
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
@app.on_event("startup")
|
|
|
async def startup_event():
|
|
|
"""Run the processing loop in the background when the API starts"""
|
|
|
global processing_thread
|
|
|
logger.info("Starting up Unified MP4 Processing & Distribution API...")
|
|
|
|
|
|
if not (processing_thread and processing_thread.is_alive()):
|
|
|
logger.info("🚀 Starting background processing thread...")
|
|
|
processing_thread = threading.Thread(target=process_hf_files_background)
|
|
|
processing_thread.daemon = True
|
|
|
processing_thread.start()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
import uvicorn
|
|
|
uvicorn.run(app, host="0.0.0.0", port=8000) |