from fastapi import FastAPI, UploadFile, File, BackgroundTasks, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from fastapi.staticfiles import StaticFiles
from fastapi.responses import HTMLResponse
import shutil
import os
import uuid
from services.audio_separator import AudioSeparator
from services.chord_analyzer import ChordAnalyzer
from services.midi_converter import MidiConverter
from services.beat_tracker import BeatTracker
app = FastAPI(title="Tunebase AI Engine", description="High-performance audio separation for Math Rock", version="1.0.0")
@app.get("/", response_class=HTMLResponse)
async def read_root():
return """
Tunebase
"""
# Setup CORS
origins = ["*"] # Allow all for dev
app.add_middleware(
CORSMiddleware,
allow_origins=origins,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Directories
UPLOAD_DIR = "uploads"
PROCESSED_DIR = "processed"
os.makedirs(UPLOAD_DIR, exist_ok=True)
os.makedirs(PROCESSED_DIR, exist_ok=True)
# Mount statis files agar frontend bisa play hasil audio
app.mount("/processed", StaticFiles(directory=PROCESSED_DIR), name="processed")
# Mount uploads for verification if needed, but risky. Processed is enough.
# Initialize Services
# Warning: Loading models takes time and RAM.
try:
separator = AudioSeparator()
analyzer = ChordAnalyzer()
midi_converter = MidiConverter()
beat_tracker = BeatTracker()
except Exception as e:
print(f"Warning: Failed to load models on startup. {e}")
import traceback
traceback.print_exc()
separator = None
analyzer = None
midi_converter = None
beat_tracker = None
@app.get("/")
def read_root():
return {"message": "Tunebase AI Engine Ready 🎸"}
@app.post("/upload")
async def upload_audio(background_tasks: BackgroundTasks, file: UploadFile = File(...)):
"""
Upload file audio dan mulai proses separasi di background.
"""
file_id = str(uuid.uuid4())
file_ext = file.filename.split(".")[-1]
file_path = os.path.join(UPLOAD_DIR, f"{file_id}.{file_ext}")
with open(file_path, "wb") as buffer:
shutil.copyfileobj(file.file, buffer)
return {"id": file_id, "filename": file.filename, "status": "uploaded"}
# Cleanup Task
import time
import shutil
def cleanup_old_files():
print("Running cleanup task...")
now = time.time()
cutoff = now - 3600 # 1 hour (3600 seconds)
for folder in os.listdir(PROCESSED_DIR):
folder_path = os.path.join(PROCESSED_DIR, folder)
if os.path.isdir(folder_path):
try:
# Check creation/modify time
mtime = os.path.getmtime(folder_path)
if mtime < cutoff:
print(f"Deleting old session: {folder}")
shutil.rmtree(folder_path)
except Exception as e:
print(f"Error cleaning {folder}: {e}")
# Global Progress Store (Simple in-memory)
processing_status = {}
@app.get("/status/{file_id}")
async def get_status(file_id: str):
return processing_status.get(file_id, {"status": "unknown", "progress": 0, "step": "Waiting"})
def update_progress(file_id, step, progress, status="processing", data=None):
state = {
"status": status,
"progress": progress,
"step": step
}
if data:
state.update(data)
processing_status[file_id] = state
@app.post("/process/{file_id}")
def process_audio(file_id: str, background_tasks: BackgroundTasks, mode: str = "4stem"):
"""
Trigger separasi dan analisis.
Modes: 2stem (vocals+instruments), 4stem (default), 6stem (full)
"""
if not separator:
raise HTTPException(status_code=503, detail="AI Engine not initialized")
# Cari file
input_path = None
# Simple search
for f in os.listdir(UPLOAD_DIR):
if f.startswith(file_id):
input_path = os.path.join(UPLOAD_DIR, f)
break
if not input_path:
raise HTTPException(status_code=404, detail="File not found")
output_dir = os.path.join(PROCESSED_DIR, file_id)
# Dynamic base URL for HF Space vs localhost
space_host = os.environ.get("SPACE_HOST")
if space_host:
base_url = f"https://{space_host}/processed"
else:
base_url = "http://localhost:8000/processed"
try:
update_progress(file_id, "Separating Audio Stems...", 10)
# 1. Separate Audio with specified mode
def progress_cb(step, prog):
update_progress(file_id, step, prog)
stems, duration = separator.separate(input_path, output_dir, callback=progress_cb, mode=mode)
update_progress(file_id, "Analyzing Rhythm...", 50)
# 3. MIDI & Beat Analysis
update_progress(file_id, "Converting to MIDI...", 70)
midi_files = {}
if midi_converter:
# Convert separate stems to MIDI (e.g., Piano, Bass, Guitar)
# Basic Pitch works best on monophonic/polyphonic instruments, less on drums
target_stems = ['piano', 'bass', 'guitar_rhythm', 'guitar_lead', 'vocals']
for stem_name in target_stems:
if stem_name in stems:
stem_path = stems[stem_name]
midi_out = os.path.join(output_dir, f"{stem_name}.mid")
if midi_converter.convert(stem_path, midi_out):
midi_files[stem_name] = f"{base_url}/{file_id}/{stem_name}.mid"
update_progress(file_id, "Analyzing Rhythm...", 85)
bpm = 0
beats = []
if beat_tracker:
# Use Drums for beat tracking if available, otherwise 'other' or input
beat_source = stems.get('drums') or stems.get('other') or input_path
rhythm_data = beat_tracker.track(beat_source)
bpm = rhythm_data['bpm']
beats = rhythm_data['beats']
update_progress(file_id, "Finalizing...", 95)
update_progress(file_id, "Finalizing...", 95)
# Construct full URLs for frontend
stems_url = {k: f"{base_url}/{file_id}/{os.path.basename(v)}" for k, v in stems.items()}
final_data = {
"stems": stems_url,
"midi": midi_files,
"bpm": bpm,
"beats": beats,
"duration": duration
}
print(f"Final Data for {file_id}: {final_data}") # Debug
update_progress(file_id, "Completed", 100, status="completed", data=final_data)
return {
"status": "completed",
**final_data
}
except Exception as e:
import traceback
traceback.print_exc()
processing_status[file_id] = {"status": "error", "error": str(e)}
print(f"Error processing: {e}")
raise HTTPException(status_code=500, detail=str(e))