Spaces:
Runtime error
Runtime error
| import os | |
| import io | |
| import json | |
| import random | |
| import time | |
| import threading | |
| import queue | |
| from dataclasses import dataclass | |
| from typing import Any, Dict, List, Tuple | |
| from pathlib import Path | |
| import warnings | |
| import torch | |
| import torch.nn as nn | |
| import torchaudio | |
| import torchaudio.transforms as T | |
| import torchaudio.functional as F | |
| import soundfile as sf | |
| import gradio as gr | |
| import numpy as np | |
| from transformers import AutoTokenizer, AutoModelForCausalLM, BitsAndBytesConfig, pipeline as hf_pipeline | |
| import kagglehub | |
| import sounddevice as sd | |
| # -------------------------------------------------- | |
| # CONFIGURATION | |
| # -------------------------------------------------- | |
| class Config: | |
| # Paths | |
| data_dir: Path = Path("./fart_dataset") | |
| syn_dir: Path = data_dir / "synthetic" | |
| log_file: Path = Path("fart-lab/logs/evolution_log.jsonl") | |
| model_path: Path = Path("evofart_net.pt") # EvoFartNet save | |
| # Audio | |
| target_sr: int = 16000 | |
| chunk_len: float = 1.0 | |
| n_mels: int = 128 | |
| n_fft: int = 400 | |
| hop_length: int = 160 | |
| target_frames: int = 100 | |
| # Training (Incremental Best Practices) | |
| batch_size: int = 32 | |
| lr: float = 0.001 | |
| replay_size: int = 200 # Buffer for anti-forgetting () | |
| walk_window: int = 100 | |
| flawless_acc: float = 0.99 | |
| micro_neurons_min: int = 3 | |
| micro_neurons_max: int = 9 | |
| # LLM | |
| llm_model_id: str = "deepseek-ai/deepseek-r1-0528" # This is likely a Hugging Face model ID, not KaggleHub | |
| llm_max_tokens: int = 128 | |
| # Cartman (Uncensored Diss Tracks) | |
| cartman_disses: List[str] = None | |
| def __post_init__(self): | |
| if self.cartman_disses is None: | |
| self.cartman_disses = [ | |
| "This burp? Kyle's mom's morning breath—napalm level, hippie!", | |
| "Bark like a therapy dog? I'll fart on your therapy, Butters!", | |
| "Curse echo? 'Fahhhh-tboard!'—sounds like your dad's regrets." | |
| ] | |
| self.syn_dir.mkdir(parents=True, exist_ok=True) | |
| self.log_file.parent.mkdir(parents=True, exist_ok=True) | |
| self.log_file.touch() | |
| config = Config() | |
| # Labels (Expanded Roast-Ready) | |
| SOUND_LABELS = ['fart', 'burp', 'bark', 'curse', 'door_slam', 'everyday_noise', 'not_sound'] | |
| # Globals | |
| model = None # EvoFartNet | |
| llm_pipe = None | |
| llm_tokenizer = None | |
| training_active = False | |
| accuracy = 0.0 | |
| mode = "synth" | |
| replay_buffer = [] # (wave, label, timestamp) | |
| q = queue.Queue() | |
| # -------------------------------------------------- | |
| # LLM Setup (Micro-Neurons) | |
| # -------------------------------------------------- | |
| def load_local_llm(): | |
| global llm_pipe, llm_tokenizer | |
| if llm_pipe: | |
| return | |
| try: | |
| # The original code tried kagglehub.model_download with a Hugging Face ID | |
| # This will fail. We need to use transformers directly for Hugging Face models. | |
| print(f"[INFO] Attempting to load LLM from Hugging Face: {config.llm_model_id}") | |
| # According to the error, the model is already quantized with FineGrainedFP8Config. | |
| # Passing BitsAndBytesConfig explicitly might clash or be redundant. | |
| # Attempt to load without explicit BitsAndBytesConfig first. | |
| # If it requires 4-bit, we might need a custom quantization_config from DeepSeek's side, | |
| # or to verify if it can be loaded with load_in_4bit=True without a dedicated config object. | |
| # For now, let's remove the conflicting bnb_config. | |
| llm_tokenizer = AutoTokenizer.from_pretrained(config.llm_model_id, trust_remote_code=True) | |
| llm_model = AutoModelForCausalLM.from_pretrained( | |
| config.llm_model_id, | |
| device_map="auto", | |
| trust_remote_code=True # Removed quantization_config=bnb_config | |
| ) | |
| llm_pipe = hf_pipeline("text-generation", model=llm_model, tokenizer=llm_tokenizer, | |
| max_new_tokens=config.llm_max_tokens, do_sample=True, temperature=0.8, | |
| pad_token_id=llm_tokenizer.eos_token_id) | |
| print("[INFO] DeepSeek-R1 loaded—jury's in session.") | |
| except Exception as e: | |
| warnings.warn(f"LLM load failed: {e}. Dummy mode.") | |
| llm_pipe = None | |
| load_local_llm() | |
| # -------------------------------------------------- | |
| # EvoFartNet: Deeper CNN (w/ RNN Option) | |
| # --------------------------------0------------------ | |
| class EvoFartNet(nn.Module): | |
| def __init__(self, num_classes=len(SOUND_LABELS), use_rnn=False): | |
| super().__init__() | |
| self.use_rnn = use_rnn | |
| self.conv1 = nn.Conv2d(1, 32, 3, padding=1) | |
| self.conv2 = nn.Conv2d(32, 64, 3, padding=1) | |
| self.pool = nn.MaxPool2d(2, 2) | |
| self.dropout = nn.Dropout(0.25) | |
| self.fc1 = nn.Linear(64 * 25 * 25, 128) # Post-pool | |
| if use_rnn: | |
| self.rnn = nn.LSTM(128, 64, batch_first=True, bidirectional=True) | |
| self.fc2 = nn.Linear(128, num_classes) # Bi-LSTM out | |
| else: | |
| self.fc2 = nn.Linear(128, num_classes) | |
| self.relu = nn.ReLU() | |
| def forward(self, x): | |
| x = self.pool(self.relu(self.conv1(x))) | |
| x = self.pool(self.relu(self.conv2(x))) | |
| x = x.view(-1, 64 * 25 * 25) | |
| x = self.dropout(self.relu(self.fc1(x))) | |
| if self.use_rnn: | |
| x = x.unsqueeze(1).repeat(1, x.shape[1], 1) # Fake seq for demo | |
| x, _ = self.rnn(x) | |
| x = x[:, -1, :] | |
| x = self.fc2(x) | |
| return x | |
| def load_or_init_model(use_rnn=False): | |
| global model | |
| if config.model_path.exists(): | |
| try: | |
| model = EvoFartNet(use_rnn=use_rnn) | |
| model.load_state_dict(torch.load(config.model_path, map_location='cpu')) | |
| print(f"[INFO] Loaded EvoFartNet (RNN: {use_rnn}).") | |
| except Exception: | |
| pass | |
| if model is None: | |
| model = EvoFartNet(use_rnn=use_rnn) | |
| model.train() | |
| optimizer = torch.optim.Adam(model.parameters(), lr=config.lr) # Best prac: Adam over SGD | |
| criterion = nn.CrossEntropyLoss() | |
| return optimizer, criterion | |
| optimizer, criterion = load_or_init_model(use_rnn=False) # Toggle RNN for echoes | |
| # -------------------------------------------------- | |
| # Synth Gen w/ Torchaudio Augs | |
| # -------------------------------------------------- | |
| def generate_synthetic_sound(label: str, duration: float = config.chunk_len) -> Tuple[np.ndarray, int]: | |
| sr = config.target_sr | |
| t = np.linspace(0, duration, int(sr * duration), endpoint=False) | |
| if label == 'fart': | |
| wave = 0.3 * np.sin(2 * np.pi * 90 * t) + 0.6 * np.random.normal(0, 0.2, len(t)) | |
| elif label == 'burp': | |
| rumble = 0.5 * np.sin(2 * np.pi * 50 * t) * np.exp(-t * 5) | |
| pop_len = int(sr * 0.1) | |
| pop = 0.3 * np.random.normal(0, 0.5, pop_len) * np.exp(-np.linspace(0, 10, pop_len)) | |
| pop = np.pad(pop, (0, len(t) - pop_len)) | |
| wave = rumble + pop | |
| elif label == 'bark': | |
| freq_mod = 800 + 400 * np.sin(2 * np.pi * 10 * t) | |
| phase = 2 * np.pi * np.cumsum(freq_mod) / sr | |
| wave = 0.4 * np.sin(phase) + 0.2 * np.random.normal(0, 0.3, len(t)) | |
| elif label == 'curse': | |
| # Toe-stub symphony: 3 angry bursts w/ 0.3s echo sim | |
| wave = np.zeros(len(t)) | |
| for i in range(3): | |
| start = int(i * len(t)/4) | |
| end = int((i+1) * len(t)/4) | |
| sub_t = t[start:end] - t[start] | |
| burst = 0.4 * np.sin(2 * np.pi * (500 + 200 * np.sin(20 * sub_t)) * sub_t) + np.random.normal(0, 0.2, end-start) | |
| wave[start:end] += burst | |
| # Echo: 0.3s delay | |
| echo_start = int(start + sr * 0.3) | |
| if echo_start < end: | |
| echo_len = min(end - echo_start, len(burst)) | |
| wave[echo_start:echo_start+echo_len] += 0.3 * burst[:echo_len] | |
| elif label == 'door_slam': | |
| impulse = np.zeros(len(t)); impulse[0] = 1.0 | |
| decay = np.exp(-np.linspace(0, 5, int(sr * 0.5))) | |
| wave = np.convolve(impulse, decay)[:len(t)] + 0.1 * np.random.normal(0, 1, len(t)) | |
| elif label == 'everyday_noise': | |
| sub = random.choice(['door_slam', 'bark']) | |
| wave, _ = generate_synthetic_sound(sub, duration) | |
| wave += 0.2 * np.random.normal(0, 0.1, len(t)) | |
| else: | |
| wave = 0.1 * np.random.normal(0, 0.05, len(t)) | |
| peak = np.max(np.abs(wave)) + 1e-9 | |
| return (wave / peak).astype(np.float32), sr | |
| def augment_audio(waveform: torch.Tensor, sr: int) -> torch.Tensor: | |
| """Torchaudio augs: Pitch shift, reverb, noise ()""" | |
| # Pitch shift | |
| shift = random.uniform(-2, 2) # Semitones | |
| waveform = F.pitch_shift(waveform, sr, shift) | |
| # Add noise | |
| noise = torch.randn_like(waveform) * 0.05 | |
| waveform += noise | |
| # Sim reverb (RIR stub) | |
| if random.random() < 0.5: | |
| # Simple delay | |
| delay_samples = int(sr * random.uniform(0.05, 0.2)) | |
| if delay_samples < waveform.shape[1]: | |
| echo = waveform[:, :waveform.shape[1] - delay_samples] * 0.3 | |
| waveform = torch.cat([waveform, torch.zeros_like(waveform[:, :delay_samples])], dim=1) | |
| waveform[:, delay_samples:] += echo | |
| return waveform | |
| # -------------------------------------------------- | |
| # Preprocess & Predict | |
| # -------------------------------------------------- | |
| def preprocess_audio(wave: np.ndarray, sr: int, augment=False) -> torch.Tensor: | |
| if isinstance(wave, np.ndarray): | |
| waveform = torch.from_numpy(wave).unsqueeze(0).float() | |
| else: | |
| waveform = wave | |
| if sr != config.target_sr: | |
| resampler = T.Resample(sr, config.target_sr) | |
| waveform = resampler(waveform) | |
| if waveform.shape[0] > 1: | |
| waveform = torch.mean(waveform, dim=0, keepdim=True) | |
| max_samples = config.target_sr * config.chunk_len | |
| if waveform.shape[1] > max_samples: | |
| waveform = waveform[:, :max_samples] | |
| else: | |
| waveform = nn.functional.pad(waveform, (0, max_samples - waveform.shape[1])) | |
| if augment: | |
| waveform = augment_audio(waveform, config.target_sr) | |
| return waveform | |
| def predict(waveform: torch.Tensor) -> Dict[str, float]: | |
| if model is None: | |
| probs = np.random.dirichlet(np.ones(len(SOUND_LABELS))) | |
| return {label: float(p) for label, p in zip(SOUND_LABELS, probs)} | |
| model.eval() | |
| with torch.no_grad(): | |
| mel = T.MelSpectrogram(sample_rate=config.target_sr, n_mels=config.n_mels, | |
| n_fft=config.n_fft, hop_length=config.hop_length)(waveform) | |
| mel = T.AmplitudeToDB()(mel) | |
| mel = (mel - mel.mean()) / (mel.std() + 1e-6) | |
| frames = mel.shape[2] | |
| if frames < config.target_frames: | |
| mel = nn.functional.pad(mel, (0, config.target_frames - frames)) | |
| else: | |
| mel = mel[:, :, :config.target_frames] | |
| logits = model(mel.unsqueeze(0)) | |
| probs = torch.softmax(logits, dim=1).squeeze(0).tolist() | |
| model.train() | |
| return {label: prob for label, prob in zip(SOUND_LABELS, probs)} | |
| # -------------------------------------------------- | |
| # LLM Micro-Neurons (3-9 Votes) | |
| # -------------------------------------------------- | |
| def llm_micro_vote(prompt_base: str) -> float: | |
| if llm_pipe is None: | |
| return random.random() | |
| num_votes = random.randint(config.micro_neurons_min, config.micro_neurons_max) | |
| votes = [] | |
| for _ in range(num_votes): | |
| flair = random.choice(["Cartman testify: Agree or roast?", "Echo check: Naughty score?", "Jury: Guilty of being epic?"]) | |
| mutated = f"{prompt_base} {flair}" | |
| try: | |
| out = llm_pipe(mutated, return_full_text=False) | |
| resp = out[0]["generated_text"].lower() | |
| vote = 1.0 if any(word in resp for word in ['agree', 'guilty', 'epic', 'roast approved']) else 0.5 | |
| votes.append(vote) | |
| except: | |
| votes.append(0.5) | |
| return np.mean(votes) | |
| def hybrid_agree(cnn_probs: Dict[str, float], true_label: str, desc: str = "") -> float: | |
| pred_label = max(cnn_probs, key=cnn_probs.get) | |
| cnn_conf = cnn_probs.get(true_label, 0.0) | |
| prompt = f"Desc: {desc}. True: {true_label}, Pred: {pred_label} (conf {cnn_conf:.2f}). Vote 0-1?" | |
| llm_agree = llm_micro_vote(prompt) | |
| return 0.7 * cnn_conf + 0.3 * llm_agree # Weighted jury | |
| # -------------------------------------------------- | |
| # Incremental Training Loop w/ Replay | |
| # -------------------------------------------------- | |
| def training_loop(): | |
| global accuracy, mode, replay_buffer | |
| optimizer, criterion = load_or_init_model() | |
| print("[INFO] EvoFartNet online—replay armed.") | |
| while training_active: | |
| try: | |
| # Batch: New synth + replay sample | |
| batch_waves = [] | |
| batch_labels = [] | |
| # New | |
| for _ in range(config.batch_size // 2): | |
| label = random.choice(SOUND_LABELS) | |
| wave, _ = generate_synthetic_sound(label) | |
| aug_wave = preprocess_audio(wave, config.target_sr, augment=True).numpy().squeeze() | |
| batch_waves.append(aug_wave) | |
| batch_labels.append(SOUND_LABELS.index(label)) | |
| # Replay (anti-forgetting) | |
| if replay_buffer: | |
| for _ in range(config.batch_size // 2): | |
| old = random.choice(replay_buffer) | |
| batch_waves.append(old[0]) | |
| batch_labels.append(old[1]) | |
| batch_tensor = torch.stack([preprocess_audio(w, config.target_sr) for w in batch_waves]) | |
| labels_tensor = torch.tensor(batch_labels) | |
| # Forward/Back | |
| logits = model(batch_tensor) | |
| loss = criterion(logits, labels_tensor) | |
| preds = torch.argmax(logits, dim=1) | |
| nn_acc = (preds == labels_tensor).float().mean().item() | |
| # Hybrid w/ micro-jury (sample 8) | |
| hybrid_acc = 0 | |
| for i in range(min(8, len(batch_waves))): | |
| desc = f"Aug {SOUND_LABELS[batch_labels[i]]} chunk." | |
| probs = predict(batch_tensor[i:i+1]) | |
| hybrid_acc += hybrid_agree(probs, SOUND_LABELS[batch_labels[i]], desc) | |
| hybrid_acc /= min(8, len(batch_waves)) | |
| acc = 0.8 * nn_acc + 0.2 * hybrid_acc | |
| optimizer.zero_grad() | |
| loss.backward() | |
| optimizer.step() | |
| # Buffer mgmt | |
| for i, (wave, lbl) in enumerate(zip(batch_waves[:config.batch_size//2], batch_labels[:config.batch_size//2])): # New only | |
| replay_buffer.append((wave, lbl, time.time())) | |
| if len(replay_buffer) > config.replay_size: | |
| replay_buffer.pop(0) | |
| # Walk-forward val (last window) | |
| if len(replay_buffer) >= config.walk_window: | |
| val_idx = np.random.choice(len(replay_buffer), min(32, len(replay_buffer)), replace=False) | |
| val_batch = torch.stack([preprocess_audio(buf[0], config.target_sr) for buf in [replay_buffer[i] for i in val_idx]]) | |
| val_labels = torch.tensor([replay_buffer[i][1] for i in val_idx]) | |
| with torch.no_grad(): | |
| v_logits = model(val_batch) | |
| v_preds = torch.argmax(v_logits, dim=1) | |
| val_acc = (v_preds == val_labels).float().mean().item() | |
| accuracy = val_acc | |
| # Log & Save | |
| log_record = {"loss": loss.item(), "acc": acc, "mode": mode, "ts": time.time()} | |
| with open(config.log_file, "a") as f: | |
| f.write(json.dumps(log_record) + "\n") | |
| torch.save(model.state_dict(), config.model_path) | |
| # Mode switch | |
| if mode == "synth" and accuracy > config.flawless_acc: | |
| mode = "real" | |
| print("[INFO] Synth flawless—mic drop to real mode!") | |
| time.sleep(0.1) | |
| except Exception as e: | |
| print(f"[ERROR] Loop: {e}") | |
| time.sleep(1) | |
| # Real Mic (Pseudo-Label Hybrid) | |
| def audio_callback(indata, frames, time, status): | |
| if status: print(status) | |
| q.put(indata.copy().flatten()) | |
| def start_real_recording(): | |
| global training_active | |
| if mode == "real" and training_active: | |
| stream = sd.InputStream(samplerate=config.target_sr, channels=1, callback=audio_callback, | |
| blocksize=int(config.target_sr * config.chunk_len)) | |
| stream.start() | |
| def process_queue(): | |
| while training_active: | |
| try: | |
| chunk = q.get(timeout=1).numpy() | |
| waveform = torch.from_numpy(chunk).unsqueeze(0).float() | |
| probs = predict(waveform) | |
| pred_label = max(probs, key=probs.get) | |
| # LLM pseudo-label vote | |
| desc = "Real-time mystery sound." | |
| pseudo_conf = hybrid_agree(probs, pred_label, desc) | |
| if pseudo_conf > 0.7: # Confident? | |
| replay_buffer.append((chunk, SOUND_LABELS.index(pred_label), time.time())) | |
| if len(replay_buffer) > config.replay_size: | |
| replay_buffer.pop(0) | |
| except queue.Empty: | |
| pass | |
| threading.Thread(target=process_queue, daemon=True).start() | |
| print("[INFO] Mic live—pseudo-labeling chaos.") | |
| # -------------------------------------------------- | |
| # Neuromorphic Neon Dark UI (Pulsing Acc) | |
| # -------------------------------------------------- | |
| neuromorphic_css = """ | |
| body { background: #0a0a0a; color: #00ff88; font-family: 'Courier New', monospace; } | |
| .gradio-container { background: linear-gradient(135deg, #0a0a0a, #1a1a1a); } | |
| .card { | |
| background: #1a1a1a; border-radius: 20px; | |
| box-shadow: inset 4px 4px 8px rgba(0,0,0,0.3), -4px -4px 8px rgba(0,255,136,0.1); | |
| padding: 20px; margin: 10px; border: none; | |
| } | |
| button { | |
| background: #1a1a1a !important; color: #ff00ff !important; border-radius: 15px !important; | |
| box-shadow: 2px 2px 5px rgba(0,0,0,0.5), -2px -2px 5px rgba(255,0,255,0.2) !important; | |
| transition: all 0.3s; | |
| } | |
| button:hover { | |
| box-shadow: inset 2px 2px 5px rgba(0,0,0,0.5), -2px -2px 5px rgba(255,0,255,0.4) !important; | |
| color: #00ff88 !important; | |
| } | |
| .textbox { background: #0f0f0f; color: #00ff88; border: 1px solid #333; border-radius: 10px; } | |
| @keyframes pulse { 0% { box-shadow: 0 0 0 0 rgba(0,255,136,0.7); } 70% { box-shadow: 0 0 0 10px rgba(0,255,136,0); } 100% { box-shadow: 0 0 0 0 rgba(0,255,136,0); } } | |
| .acc-high { animation: pulse 2s infinite; background: linear-gradient(45deg, #ff00ff, #00ff88); } | |
| """ | |
| def start_training(): | |
| global training_active | |
| if not training_active: | |
| training_active = True | |
| threading.Thread(target=training_loop, daemon=True).start() | |
| if mode == "real": | |
| start_real_recording() | |
| diss = random.choice(config.cartman_disses) | |
| return f"Evo started! Mode: {mode} | Acc: {accuracy:.3f} | {diss}" | |
| return "Already evolving, ya nasty." | |
| def stop_training(): | |
| global training_active | |
| training_active = False | |
| return "Evolution halted—replay buffer intact." | |
| def get_status(): | |
| pulse_class = "acc-high" if accuracy > 0.95 else "" | |
| diss = random.choice(config.cartman_disses) | |
| return f'<div class="{pulse_class}">Status: {mode} | Acc: {accuracy:.3f} | {diss}</div>' | |
| with gr.Blocks(css=neuromorphic_css, title="🧪 EvoFartNet: Neon Incremental Fartboard") as demo: | |
| gr.Markdown("# 💨 EvoFartNet Dashboard: Replay-Powered Audio Apocalypse") | |
| gr.Markdown("Incremental CNN/RNN + LLM Jury | Synth Augs → Real Pseudo | Neon Pulse UX") | |
| with gr.Row(): | |
| with gr.Column(): | |
| start_btn = gr.Button("🚀 Ignite Evolution", variant="primary") | |
| stop_btn = gr.Button("⏹️ Abort Mission") | |
| status = gr.HTML(label="Live Status", value=get_status()) | |
| with gr.Column(): | |
| toggle_rnn = gr.Checkbox(label="RNN Mode (Echoes)", value=False) | |
| acc_log = gr.Textbox(label="Acc Evolution Log", lines=5, value="Pre-roast warmup...") | |
| start_btn.click(start_training, outputs=status) | |
| stop_btn.click(stop_training, outputs=status) | |
| toggle_rnn.change(lambda v: load_or_init_model(v), inputs=toggle_rnn) | |
| demo.load(get_status, outputs=status) | |
| if __name__ == "__main__": | |
| port = int(os.getenv("GRADIO_PORT", 7860)) | |
| share = os.getenv("GRADIO_SHARE", "False").lower() == "true" | |
| server_name = "0.0.0.0" if os.getenv("EXPOSE_ALL", "false").lower() == "true" else "127.0.0.1" | |
| port_found = False | |
| for p in range(port, port + 10): | |
| try: | |
| demo.launch(share=share, server_name=server_name, server_port=p) | |
| port_found = True | |
| break | |
| except OSError as e: | |
| if "Cannot find empty port" in str(e): | |
| print(f"[WARN] Port {p} is in use. Trying next available port.") | |
| continue | |
| else: | |
| raise e | |
| if not port_found: | |
| print("[ERROR] Could not find an available port for Gradio after several attempts.") | |