File size: 7,468 Bytes
6319e2f 580efab 6319e2f 580efab 6319e2f 207cecb 6319e2f 207cecb 6319e2f |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 |
from fastapi import FastAPI, UploadFile, File, BackgroundTasks, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from fastapi.staticfiles import StaticFiles
from fastapi.responses import HTMLResponse
import shutil
import os
import uuid
from services.audio_separator import AudioSeparator
from services.chord_analyzer import ChordAnalyzer
from services.midi_converter import MidiConverter
from services.beat_tracker import BeatTracker
app = FastAPI(title="Tunebase AI Engine", description="High-performance audio separation for Math Rock", version="1.0.0")
@app.get("/", response_class=HTMLResponse)
async def read_root():
return """
<!DOCTYPE html>
<html>
<head>
<title>Tunebase</title>
<style>
body, html { margin: 0; padding: 0; height: 100%; overflow: hidden; }
iframe { width: 100%; height: 100%; border: none; }
</style>
</head>
<body>
<iframe src="https://tunebase.vercel.app" allow="accelerometer; autoplay; clipboard-write; encrypted-media; gyroscope; picture-in-picture" allowfullscreen></iframe>
</body>
</html>
"""
# Setup CORS
origins = ["*"] # Allow all for dev
app.add_middleware(
CORSMiddleware,
allow_origins=origins,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Directories
UPLOAD_DIR = "uploads"
PROCESSED_DIR = "processed"
os.makedirs(UPLOAD_DIR, exist_ok=True)
os.makedirs(PROCESSED_DIR, exist_ok=True)
# Mount statis files agar frontend bisa play hasil audio
app.mount("/processed", StaticFiles(directory=PROCESSED_DIR), name="processed")
# Mount uploads for verification if needed, but risky. Processed is enough.
# Initialize Services
# Warning: Loading models takes time and RAM.
try:
separator = AudioSeparator()
analyzer = ChordAnalyzer()
midi_converter = MidiConverter()
beat_tracker = BeatTracker()
except Exception as e:
print(f"Warning: Failed to load models on startup. {e}")
import traceback
traceback.print_exc()
separator = None
analyzer = None
midi_converter = None
beat_tracker = None
@app.get("/")
def read_root():
return {"message": "Tunebase AI Engine Ready 🎸"}
@app.post("/upload")
async def upload_audio(background_tasks: BackgroundTasks, file: UploadFile = File(...)):
"""
Upload file audio dan mulai proses separasi di background.
"""
file_id = str(uuid.uuid4())
file_ext = file.filename.split(".")[-1]
file_path = os.path.join(UPLOAD_DIR, f"{file_id}.{file_ext}")
with open(file_path, "wb") as buffer:
shutil.copyfileobj(file.file, buffer)
return {"id": file_id, "filename": file.filename, "status": "uploaded"}
# Cleanup Task
import time
import shutil
def cleanup_old_files():
print("Running cleanup task...")
now = time.time()
cutoff = now - 3600 # 1 hour (3600 seconds)
for folder in os.listdir(PROCESSED_DIR):
folder_path = os.path.join(PROCESSED_DIR, folder)
if os.path.isdir(folder_path):
try:
# Check creation/modify time
mtime = os.path.getmtime(folder_path)
if mtime < cutoff:
print(f"Deleting old session: {folder}")
shutil.rmtree(folder_path)
except Exception as e:
print(f"Error cleaning {folder}: {e}")
# Global Progress Store (Simple in-memory)
processing_status = {}
@app.get("/status/{file_id}")
async def get_status(file_id: str):
return processing_status.get(file_id, {"status": "unknown", "progress": 0, "step": "Waiting"})
def update_progress(file_id, step, progress, status="processing", data=None):
state = {
"status": status,
"progress": progress,
"step": step
}
if data:
state.update(data)
processing_status[file_id] = state
@app.post("/process/{file_id}")
def process_audio(file_id: str, background_tasks: BackgroundTasks, mode: str = "4stem"):
"""
Trigger separasi dan analisis.
Modes: 2stem (vocals+instruments), 4stem (default), 6stem (full)
"""
if not separator:
raise HTTPException(status_code=503, detail="AI Engine not initialized")
# Cari file
input_path = None
# Simple search
for f in os.listdir(UPLOAD_DIR):
if f.startswith(file_id):
input_path = os.path.join(UPLOAD_DIR, f)
break
if not input_path:
raise HTTPException(status_code=404, detail="File not found")
output_dir = os.path.join(PROCESSED_DIR, file_id)
# Dynamic base URL for HF Space vs localhost
space_host = os.environ.get("SPACE_HOST")
if space_host:
base_url = f"https://{space_host}/processed"
else:
base_url = "http://localhost:8000/processed"
try:
update_progress(file_id, "Separating Audio Stems...", 10)
# 1. Separate Audio with specified mode
def progress_cb(step, prog):
update_progress(file_id, step, prog)
stems, duration = separator.separate(input_path, output_dir, callback=progress_cb, mode=mode)
update_progress(file_id, "Analyzing Rhythm...", 50)
# 3. MIDI & Beat Analysis
update_progress(file_id, "Converting to MIDI...", 70)
midi_files = {}
if midi_converter:
# Convert separate stems to MIDI (e.g., Piano, Bass, Guitar)
# Basic Pitch works best on monophonic/polyphonic instruments, less on drums
target_stems = ['piano', 'bass', 'guitar_rhythm', 'guitar_lead', 'vocals']
for stem_name in target_stems:
if stem_name in stems:
stem_path = stems[stem_name]
midi_out = os.path.join(output_dir, f"{stem_name}.mid")
if midi_converter.convert(stem_path, midi_out):
midi_files[stem_name] = f"{base_url}/{file_id}/{stem_name}.mid"
update_progress(file_id, "Analyzing Rhythm...", 85)
bpm = 0
beats = []
if beat_tracker:
# Use Drums for beat tracking if available, otherwise 'other' or input
beat_source = stems.get('drums') or stems.get('other') or input_path
rhythm_data = beat_tracker.track(beat_source)
bpm = rhythm_data['bpm']
beats = rhythm_data['beats']
update_progress(file_id, "Finalizing...", 95)
update_progress(file_id, "Finalizing...", 95)
# Construct full URLs for frontend
stems_url = {k: f"{base_url}/{file_id}/{os.path.basename(v)}" for k, v in stems.items()}
final_data = {
"stems": stems_url,
"midi": midi_files,
"bpm": bpm,
"beats": beats,
"duration": duration
}
print(f"Final Data for {file_id}: {final_data}") # Debug
update_progress(file_id, "Completed", 100, status="completed", data=final_data)
return {
"status": "completed",
**final_data
}
except Exception as e:
import traceback
traceback.print_exc()
processing_status[file_id] = {"status": "error", "error": str(e)}
print(f"Error processing: {e}")
raise HTTPException(status_code=500, detail=str(e))
|