|
|
from fastapi import FastAPI, UploadFile, File, BackgroundTasks, HTTPException |
|
|
from fastapi.middleware.cors import CORSMiddleware |
|
|
from fastapi.staticfiles import StaticFiles |
|
|
from fastapi.responses import HTMLResponse |
|
|
import shutil |
|
|
import os |
|
|
import uuid |
|
|
from services.audio_separator import AudioSeparator |
|
|
from services.chord_analyzer import ChordAnalyzer |
|
|
from services.midi_converter import MidiConverter |
|
|
from services.beat_tracker import BeatTracker |
|
|
|
|
|
app = FastAPI(title="Tunebase AI Engine", description="High-performance audio separation for Math Rock", version="1.0.0") |
|
|
|
|
|
@app.get("/", response_class=HTMLResponse) |
|
|
async def read_root(): |
|
|
return """ |
|
|
<!DOCTYPE html> |
|
|
<html> |
|
|
<head> |
|
|
<title>Tunebase</title> |
|
|
<style> |
|
|
body, html { margin: 0; padding: 0; height: 100%; overflow: hidden; } |
|
|
iframe { width: 100%; height: 100%; border: none; } |
|
|
</style> |
|
|
</head> |
|
|
<body> |
|
|
<iframe src="https://tunebase.vercel.app" allow="accelerometer; autoplay; clipboard-write; encrypted-media; gyroscope; picture-in-picture" allowfullscreen></iframe> |
|
|
</body> |
|
|
</html> |
|
|
""" |
|
|
|
|
|
|
|
|
origins = ["*"] |
|
|
app.add_middleware( |
|
|
CORSMiddleware, |
|
|
allow_origins=origins, |
|
|
allow_credentials=True, |
|
|
allow_methods=["*"], |
|
|
allow_headers=["*"], |
|
|
) |
|
|
|
|
|
|
|
|
UPLOAD_DIR = "uploads" |
|
|
PROCESSED_DIR = "processed" |
|
|
os.makedirs(UPLOAD_DIR, exist_ok=True) |
|
|
os.makedirs(PROCESSED_DIR, exist_ok=True) |
|
|
|
|
|
|
|
|
app.mount("/processed", StaticFiles(directory=PROCESSED_DIR), name="processed") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
try: |
|
|
separator = AudioSeparator() |
|
|
analyzer = ChordAnalyzer() |
|
|
midi_converter = MidiConverter() |
|
|
beat_tracker = BeatTracker() |
|
|
except Exception as e: |
|
|
print(f"Warning: Failed to load models on startup. {e}") |
|
|
import traceback |
|
|
traceback.print_exc() |
|
|
separator = None |
|
|
analyzer = None |
|
|
midi_converter = None |
|
|
beat_tracker = None |
|
|
|
|
|
@app.get("/") |
|
|
def read_root(): |
|
|
return {"message": "Tunebase AI Engine Ready 🎸"} |
|
|
|
|
|
@app.post("/upload") |
|
|
async def upload_audio(background_tasks: BackgroundTasks, file: UploadFile = File(...)): |
|
|
""" |
|
|
Upload file audio dan mulai proses separasi di background. |
|
|
""" |
|
|
file_id = str(uuid.uuid4()) |
|
|
file_ext = file.filename.split(".")[-1] |
|
|
file_path = os.path.join(UPLOAD_DIR, f"{file_id}.{file_ext}") |
|
|
|
|
|
with open(file_path, "wb") as buffer: |
|
|
shutil.copyfileobj(file.file, buffer) |
|
|
|
|
|
return {"id": file_id, "filename": file.filename, "status": "uploaded"} |
|
|
|
|
|
|
|
|
import time |
|
|
import shutil |
|
|
|
|
|
def cleanup_old_files(): |
|
|
print("Running cleanup task...") |
|
|
now = time.time() |
|
|
cutoff = now - 3600 |
|
|
|
|
|
for folder in os.listdir(PROCESSED_DIR): |
|
|
folder_path = os.path.join(PROCESSED_DIR, folder) |
|
|
if os.path.isdir(folder_path): |
|
|
try: |
|
|
|
|
|
mtime = os.path.getmtime(folder_path) |
|
|
if mtime < cutoff: |
|
|
print(f"Deleting old session: {folder}") |
|
|
shutil.rmtree(folder_path) |
|
|
except Exception as e: |
|
|
print(f"Error cleaning {folder}: {e}") |
|
|
|
|
|
|
|
|
processing_status = {} |
|
|
|
|
|
@app.get("/status/{file_id}") |
|
|
async def get_status(file_id: str): |
|
|
return processing_status.get(file_id, {"status": "unknown", "progress": 0, "step": "Waiting"}) |
|
|
|
|
|
def update_progress(file_id, step, progress, status="processing", data=None): |
|
|
state = { |
|
|
"status": status, |
|
|
"progress": progress, |
|
|
"step": step |
|
|
} |
|
|
if data: |
|
|
state.update(data) |
|
|
processing_status[file_id] = state |
|
|
|
|
|
@app.post("/process/{file_id}") |
|
|
def process_audio(file_id: str, background_tasks: BackgroundTasks, mode: str = "4stem"): |
|
|
""" |
|
|
Trigger separasi dan analisis. |
|
|
Modes: 2stem (vocals+instruments), 4stem (default), 6stem (full) |
|
|
""" |
|
|
if not separator: |
|
|
raise HTTPException(status_code=503, detail="AI Engine not initialized") |
|
|
|
|
|
|
|
|
input_path = None |
|
|
|
|
|
for f in os.listdir(UPLOAD_DIR): |
|
|
if f.startswith(file_id): |
|
|
input_path = os.path.join(UPLOAD_DIR, f) |
|
|
break |
|
|
|
|
|
if not input_path: |
|
|
raise HTTPException(status_code=404, detail="File not found") |
|
|
|
|
|
output_dir = os.path.join(PROCESSED_DIR, file_id) |
|
|
|
|
|
|
|
|
space_host = os.environ.get("SPACE_HOST") |
|
|
if space_host: |
|
|
base_url = f"https://{space_host}/processed" |
|
|
else: |
|
|
base_url = "http://localhost:8000/processed" |
|
|
|
|
|
try: |
|
|
update_progress(file_id, "Separating Audio Stems...", 10) |
|
|
|
|
|
|
|
|
def progress_cb(step, prog): |
|
|
update_progress(file_id, step, prog) |
|
|
|
|
|
stems, duration = separator.separate(input_path, output_dir, callback=progress_cb, mode=mode) |
|
|
|
|
|
update_progress(file_id, "Analyzing Rhythm...", 50) |
|
|
|
|
|
|
|
|
update_progress(file_id, "Converting to MIDI...", 70) |
|
|
midi_files = {} |
|
|
if midi_converter: |
|
|
|
|
|
|
|
|
target_stems = ['piano', 'bass', 'guitar_rhythm', 'guitar_lead', 'vocals'] |
|
|
for stem_name in target_stems: |
|
|
if stem_name in stems: |
|
|
stem_path = stems[stem_name] |
|
|
midi_out = os.path.join(output_dir, f"{stem_name}.mid") |
|
|
if midi_converter.convert(stem_path, midi_out): |
|
|
midi_files[stem_name] = f"{base_url}/{file_id}/{stem_name}.mid" |
|
|
|
|
|
update_progress(file_id, "Analyzing Rhythm...", 85) |
|
|
bpm = 0 |
|
|
beats = [] |
|
|
if beat_tracker: |
|
|
|
|
|
beat_source = stems.get('drums') or stems.get('other') or input_path |
|
|
rhythm_data = beat_tracker.track(beat_source) |
|
|
bpm = rhythm_data['bpm'] |
|
|
beats = rhythm_data['beats'] |
|
|
|
|
|
update_progress(file_id, "Finalizing...", 95) |
|
|
|
|
|
update_progress(file_id, "Finalizing...", 95) |
|
|
|
|
|
|
|
|
stems_url = {k: f"{base_url}/{file_id}/{os.path.basename(v)}" for k, v in stems.items()} |
|
|
|
|
|
final_data = { |
|
|
"stems": stems_url, |
|
|
"midi": midi_files, |
|
|
"bpm": bpm, |
|
|
"beats": beats, |
|
|
"duration": duration |
|
|
} |
|
|
print(f"Final Data for {file_id}: {final_data}") |
|
|
update_progress(file_id, "Completed", 100, status="completed", data=final_data) |
|
|
|
|
|
return { |
|
|
"status": "completed", |
|
|
**final_data |
|
|
} |
|
|
|
|
|
except Exception as e: |
|
|
import traceback |
|
|
traceback.print_exc() |
|
|
processing_status[file_id] = {"status": "error", "error": str(e)} |
|
|
print(f"Error processing: {e}") |
|
|
raise HTTPException(status_code=500, detail=str(e)) |
|
|
|