from fastapi import FastAPI, UploadFile, File, BackgroundTasks, HTTPException from fastapi.middleware.cors import CORSMiddleware from fastapi.staticfiles import StaticFiles from fastapi.responses import HTMLResponse import shutil import os import uuid from services.audio_separator import AudioSeparator from services.chord_analyzer import ChordAnalyzer from services.midi_converter import MidiConverter from services.beat_tracker import BeatTracker app = FastAPI(title="Tunebase AI Engine", description="High-performance audio separation for Math Rock", version="1.0.0") @app.get("/", response_class=HTMLResponse) async def read_root(): return """ Tunebase """ # Setup CORS origins = ["*"] # Allow all for dev app.add_middleware( CORSMiddleware, allow_origins=origins, allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # Directories UPLOAD_DIR = "uploads" PROCESSED_DIR = "processed" os.makedirs(UPLOAD_DIR, exist_ok=True) os.makedirs(PROCESSED_DIR, exist_ok=True) # Mount statis files agar frontend bisa play hasil audio app.mount("/processed", StaticFiles(directory=PROCESSED_DIR), name="processed") # Mount uploads for verification if needed, but risky. Processed is enough. # Initialize Services # Warning: Loading models takes time and RAM. try: separator = AudioSeparator() analyzer = ChordAnalyzer() midi_converter = MidiConverter() beat_tracker = BeatTracker() except Exception as e: print(f"Warning: Failed to load models on startup. {e}") import traceback traceback.print_exc() separator = None analyzer = None midi_converter = None beat_tracker = None @app.get("/") def read_root(): return {"message": "Tunebase AI Engine Ready 🎸"} @app.post("/upload") async def upload_audio(background_tasks: BackgroundTasks, file: UploadFile = File(...)): """ Upload file audio dan mulai proses separasi di background. """ file_id = str(uuid.uuid4()) file_ext = file.filename.split(".")[-1] file_path = os.path.join(UPLOAD_DIR, f"{file_id}.{file_ext}") with open(file_path, "wb") as buffer: shutil.copyfileobj(file.file, buffer) return {"id": file_id, "filename": file.filename, "status": "uploaded"} # Cleanup Task import time import shutil def cleanup_old_files(): print("Running cleanup task...") now = time.time() cutoff = now - 3600 # 1 hour (3600 seconds) for folder in os.listdir(PROCESSED_DIR): folder_path = os.path.join(PROCESSED_DIR, folder) if os.path.isdir(folder_path): try: # Check creation/modify time mtime = os.path.getmtime(folder_path) if mtime < cutoff: print(f"Deleting old session: {folder}") shutil.rmtree(folder_path) except Exception as e: print(f"Error cleaning {folder}: {e}") # Global Progress Store (Simple in-memory) processing_status = {} @app.get("/status/{file_id}") async def get_status(file_id: str): return processing_status.get(file_id, {"status": "unknown", "progress": 0, "step": "Waiting"}) def update_progress(file_id, step, progress, status="processing", data=None): state = { "status": status, "progress": progress, "step": step } if data: state.update(data) processing_status[file_id] = state @app.post("/process/{file_id}") def process_audio(file_id: str, background_tasks: BackgroundTasks, mode: str = "4stem"): """ Trigger separasi dan analisis. Modes: 2stem (vocals+instruments), 4stem (default), 6stem (full) """ if not separator: raise HTTPException(status_code=503, detail="AI Engine not initialized") # Cari file input_path = None # Simple search for f in os.listdir(UPLOAD_DIR): if f.startswith(file_id): input_path = os.path.join(UPLOAD_DIR, f) break if not input_path: raise HTTPException(status_code=404, detail="File not found") output_dir = os.path.join(PROCESSED_DIR, file_id) # Dynamic base URL for HF Space vs localhost space_host = os.environ.get("SPACE_HOST") if space_host: base_url = f"https://{space_host}/processed" else: base_url = "http://localhost:8000/processed" try: update_progress(file_id, "Separating Audio Stems...", 10) # 1. Separate Audio with specified mode def progress_cb(step, prog): update_progress(file_id, step, prog) stems, duration = separator.separate(input_path, output_dir, callback=progress_cb, mode=mode) update_progress(file_id, "Analyzing Rhythm...", 50) # 3. MIDI & Beat Analysis update_progress(file_id, "Converting to MIDI...", 70) midi_files = {} if midi_converter: # Convert separate stems to MIDI (e.g., Piano, Bass, Guitar) # Basic Pitch works best on monophonic/polyphonic instruments, less on drums target_stems = ['piano', 'bass', 'guitar_rhythm', 'guitar_lead', 'vocals'] for stem_name in target_stems: if stem_name in stems: stem_path = stems[stem_name] midi_out = os.path.join(output_dir, f"{stem_name}.mid") if midi_converter.convert(stem_path, midi_out): midi_files[stem_name] = f"{base_url}/{file_id}/{stem_name}.mid" update_progress(file_id, "Analyzing Rhythm...", 85) bpm = 0 beats = [] if beat_tracker: # Use Drums for beat tracking if available, otherwise 'other' or input beat_source = stems.get('drums') or stems.get('other') or input_path rhythm_data = beat_tracker.track(beat_source) bpm = rhythm_data['bpm'] beats = rhythm_data['beats'] update_progress(file_id, "Finalizing...", 95) update_progress(file_id, "Finalizing...", 95) # Construct full URLs for frontend stems_url = {k: f"{base_url}/{file_id}/{os.path.basename(v)}" for k, v in stems.items()} final_data = { "stems": stems_url, "midi": midi_files, "bpm": bpm, "beats": beats, "duration": duration } print(f"Final Data for {file_id}: {final_data}") # Debug update_progress(file_id, "Completed", 100, status="completed", data=final_data) return { "status": "completed", **final_data } except Exception as e: import traceback traceback.print_exc() processing_status[file_id] = {"status": "error", "error": str(e)} print(f"Error processing: {e}") raise HTTPException(status_code=500, detail=str(e))