## Donwload depedency

In [None]:
!pip install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu118
!pip install --upgrade librosa pydub

## Download dataset and processing features

In [None]:
!mkdir /content/datasets
!wget https://os.unil.cloud.switch.ch/fma/fma_small.zip
!wget https://huggingface.co/datasets/johaness14/ProcessedFeaturesFMASmall/resolve/main/ProcessedFeaturesFMASmall.zip?download=true -O ProcessedFeaturesFMASmall.zip
!unzip /content/fma_small.zip
!unzip /content/ProcessedFeaturesFMASmall.zip
!mv /content/fma_small /content/datasets
!mv /content/ProcessedFeaturesFMASmall /content/datasets

## Download pre-trained models

In [None]:
!mkdir /content/models
!wget https://huggingface.co/johaness14/AI_MiniDJ/resolve/main/mixability_dataset.pkl?download=true -O mixability_dataset.pkl
!wget https://huggingface.co/johaness14/AI_MiniDJ/resolve/main/scaler.pkl?download=true -O scaler.pkl
!wget https://huggingface.co/johaness14/AI_MiniDJ/resolve/main/model_checkpoint.pth?download=true -O model_checkpoint.pth
!mv /content/mixability_dataset.pkl /content/models
!mv /content/scaler.pkl /content/models
!mv /content/model_checkpoint.pth /content/models

## Import library

In [None]:
import torch
import torch.nn as nn

import os
import glob
import random
import pickle
import librosa
import numpy as np
from tqdm import tqdm
from pydub import AudioSegment
from IPython.display import Audio, display

## Configure the path

In [None]:
# --- Path ---
BASE_PATH = "./datasets"
FEATURES_PATH = os.path.join(BASE_PATH, "ProcessedFeaturesFMASmall")
FMA_AUDIO_PATH = os.path.join(BASE_PATH, "fma_small")
MIX_OUTPUT_PATH = os.path.join(BASE_PATH, "Mix_Results")
os.makedirs(MIX_OUTPUT_PATH, exist_ok=True)

MODELS_PATH = "./models"
SCALER_FILE = os.path.join(MODELS_PATH, "scaler.pkl")
CHECKPOINT_FILE = os.path.join(MODELS_PATH, "model_checkpoint.pth")

# --- Setup Device ---
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Use device: {device}")

## Model

In [None]:
# Redefine model architecture
class MixabilityScorer(nn.Module):
    def __init__(self, input_features=7):
        super(MixabilityScorer, self).__init__()
        self.network = nn.Sequential(
            nn.Linear(input_features, 64), nn.ReLU(), nn.Dropout(0.5),
            nn.Linear(64, 32), nn.ReLU(),
            nn.Linear(32, 16), nn.ReLU(),
            nn.Linear(16, 1), nn.Sigmoid()
        )

    def forward(self, x):
        return self.network(x)

# Initialize model and load checkpoint
model = MixabilityScorer().to(device)
checkpoint = torch.load(CHECKPOINT_FILE, map_location=device)
model.load_state_dict(checkpoint['model_state_dict'])
model.eval()
print("Model loaded from checkpoint successfully.")

# Load scaler
with open(SCALER_FILE, 'rb') as f:
    scaler = pickle.load(f)
print("Scaler loaded successfully.")

# Load all features into memory for hunting
print("Loading all feature data into memory for hunting...")
all_feature_files = glob.glob(os.path.join(FEATURES_PATH, '*.npy'))
all_features_data = [np.load(f, allow_pickle=True).item()
                     for f in tqdm(all_feature_files, desc="Loading Features")]
print(f"{len(all_features_data)} feature data ready.")

In [None]:
# Helper function to check key compatibility
def are_keys_compatible(key1, key2):
    """Check if two musical keys are compatible for mixing"""
    key_map = {'C': 1, 'C#': 2, 'D': 3, 'D#': 4, 'E': 5, 'F': 6,
               'F#': 7, 'G': 8, 'G#': 9, 'A': 10, 'A#': 11, 'B': 12}

    def parse_key(key_str):
        parts = key_str.split()
        return key_map.get(parts[0], 0), parts[1]

    note1, mode1 = parse_key(key1)
    note2, mode2 = parse_key(key2)

    # Return 0 if invalid keys
    if note1 == 0 or note2 == 0:
        return 0.0

    # Same key and mode
    if note1 == note2 and mode1 == mode2:
        return 1.0

    # Adjacent keys (semitone apart)
    if abs(note1 - note2) == 1 or abs(note1 - note2) == 11:
        return 1.0

    # Relative major/minor keys
    if mode1 != mode2 and (note1 - note2) % 12 == 3 or (note2 - note1) % 12 == 3:
        return 1.0

    return 0.0


def find_audio_path(track_id, root_path):
    """Find audio file path based on track ID"""
    subfolder = track_id[:3]
    return os.path.join(root_path, subfolder, f"{track_id}.mp3")


def run_mixing_engine(track_id1, track_id2):
    """Execute mixing engine to create crossfaded mix of two tracks"""
    print("   High score! Running Mixing Engine...")

    try:
        # Load feature data for both tracks
        f1 = next(item for item in all_features_data if item["track_id"] == track_id1)
        f2 = next(item for item in all_features_data if item["track_id"] == track_id2)

        # Load audio files
        y1, sr1 = librosa.load(find_audio_path(track_id1, FMA_AUDIO_PATH), sr=None)
        y2, sr2 = librosa.load(find_audio_path(track_id2, FMA_AUDIO_PATH), sr=sr1)

        # Extract BPM and beat timing data
        bpm1, beats1 = float(f1['bpm']), f1['beat_times']
        bpm2, beats2 = float(f2['bpm']), f2['beat_times']

        # Time-stretch second track to match first track's BPM
        stretch_rate = bpm1 / bpm2
        y2_stretched = librosa.effects.time_stretch(y2, rate=stretch_rate)
        beats2_stretched = beats2 / stretch_rate

        # Calculate transition parameters
        song1_duration = librosa.get_duration(y=y1, sr=sr1)
        beat_duration_s = 60.0 / bpm1
        transition_duration_s = beat_duration_s * 8  # 8 beats transition
        transition_start_point_s1 = song1_duration - transition_duration_s

        # Check if song is long enough for transition
        if transition_start_point_s1 < beats1.min():
            return False

        # Find beat-aligned cut points
        last_beat_s1 = beats1[beats1 < transition_start_point_s1].max()
        if not beats2_stretched.any():
            return False
        first_beat_s2 = beats2_stretched[0]

        # Trim audio at beat boundaries
        trim_sample_s1 = librosa.time_to_samples(last_beat_s1, sr=sr1)
        trim_sample_s2 = librosa.time_to_samples(first_beat_s2, sr=sr1)
        y1_trimmed = y1[:trim_sample_s1]
        y2_synced = y2_stretched[trim_sample_s2:]

        # Convert to AudioSegment for crossfading
        song1_segment = AudioSegment(
            np.int16(y1_trimmed * 32767).tobytes(),
            frame_rate=sr1, sample_width=2, channels=y1_trimmed.ndim
        )

        song2_segment = AudioSegment(
            np.int16(y2_synced * 32767).tobytes(),
            frame_rate=sr1, sample_width=2, channels=y2_synced.ndim
        )

        # Create crossfaded mix
        crossfade_duration_ms = int(transition_duration_s * 1000)
        final_mix = song1_segment.append(song2_segment, crossfade=crossfade_duration_ms)

        # Export final mix
        output_filename = f"AI_REC_{track_id1}_to_{track_id2}.mp3"
        output_filepath = os.path.join(MIX_OUTPUT_PATH, output_filename)
        final_mix.export(output_filepath, format="mp3")

        print(f"   AI MIX RECOMMENDATION CREATED: {output_filepath}")
        display(Audio(output_filepath))
        return True

    except Exception as e:
        print(f"   Failed to run mixing engine: {e}")
        return False

## Hunting mode with AI

In [None]:
# AI Hunting Mode - Search for highly compatible track pairs
print("\n>>> Part 3: Starting Hunting Mode...")
print("AI will randomly search until finding 2 highly compatible pairs.")

found_pairs_count = 0
attempts = 0
max_attempts = 10000

while found_pairs_count < 2 and attempts < max_attempts:
    attempts += 1

    # Randomly select two tracks from loaded feature data
    f1, f2 = random.sample(all_features_data, 2)

    try:
        # Extract BPM and calculate key compatibility
        bpm1, bpm2 = float(f1['bpm']), float(f2['bpm'])
        key_compat_score = are_keys_compatible(f1['key'], f2['key'])

        # Create feature vector for prediction
        feature_vector = np.array([[
            bpm1, bpm2, key_compat_score,
            float(f1['rms']), float(f2['rms']),
            float(f1['spectral_centroid']), float(f2['spectral_centroid'])
        ]])

        # Scale features and predict compatibility
        feature_vector_scaled = scaler.transform(feature_vector)
        input_tensor = torch.tensor(feature_vector_scaled, dtype=torch.float32).to(device)

        with torch.no_grad():
            score = model(input_tensor).item()

        # Check for high compatibility score
        if score > 0.95:
            print("\n" + "="*50)
            print(f"COMPATIBLE PAIR FOUND! (Attempt #{attempts})")
            print(f"   Track IDs: {f1['track_id']} vs {f2['track_id']}")
            print(f"   AI Prediction Score: {score:.4f}")

            # Display feature analysis
            print("   --- Raw Feature Analysis ---")
            bpm_diff_percent = abs(bpm1 - bpm2) / max(bpm1, bpm2) * 100
            print(f"   BPM: {bpm1:.2f} vs {bpm2:.2f} (Diff: {bpm_diff_percent:.2f}%)")
            print(f"   Key: {f1['key']} vs {f2['key']} (Compatible: {key_compat_score == 1.0})")
            print("   " + "-"*28)

            # Run mixing engine and increment counter on success
            if run_mixing_engine(f1['track_id'], f2['track_id']):
                found_pairs_count += 1

    except Exception as e:
        # Continue on data errors
        continue

print("\n>>> HUNTING COMPLETE! <<<")