Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| import librosa | |
| import numpy as np | |
| import torch | |
| from transformers import AutoTokenizer, AutoModelForCausalLM | |
| import os | |
| from huggingface_hub import login | |
| import tempfile | |
| # === CONFIGURATION === | |
| # Get token from environment variable (set this in your Space secrets) | |
| HF_TOKEN = os.environ.get("HUGGINGFACE_TOKEN") | |
| # Use a smaller Gemma model for faster loading | |
| MODEL_NAME = "google/gemma-2b-it" # 2B version is faster than 7B | |
| # Login to Hugging Face (required for gated models) | |
| try: | |
| if HF_TOKEN and HF_TOKEN != "your_hf_token_here": | |
| login(token=HF_TOKEN) | |
| print("β Authenticated with Hugging Face Hub") | |
| else: | |
| print("β οΈ No HF_TOKEN provided, using fallback method") | |
| except Exception as e: | |
| print(f"β οΈ Authentication warning: {e}") | |
| class GemmaAudioEmotionAnalyzer: | |
| def __init__(self, model_name: str = MODEL_NAME): | |
| self.model_name = model_name | |
| self.device = "cuda" if torch.cuda.is_available() else "cpu" | |
| print(f"π Using device: {self.device}") | |
| try: | |
| print("π₯ Loading Gemma tokenizer...") | |
| self.tokenizer = AutoTokenizer.from_pretrained( | |
| model_name, | |
| token=HF_TOKEN if HF_TOKEN != "your_hf_token_here" else None, | |
| trust_remote_code=True | |
| ) | |
| print("π₯ Loading Gemma model...") | |
| self.model = AutoModelForCausalLM.from_pretrained( | |
| model_name, | |
| token=HF_TOKEN if HF_TOKEN != "your_hf_token_here" else None, | |
| torch_dtype=torch.float16 if self.device == "cuda" else torch.float32, | |
| device_map="auto" if self.device == "cuda" else None, | |
| trust_remote_code=True | |
| ) | |
| # Set pad token | |
| if self.tokenizer.pad_token is None: | |
| self.tokenizer.pad_token = self.tokenizer.eos_token | |
| print("β Gemma model loaded successfully!") | |
| except Exception as e: | |
| print(f"β Failed to load Gemma: {e}") | |
| print("π§ Using fallback rule-based analyzer") | |
| self.model = None | |
| self.tokenizer = None | |
| def extract_fast_features(self, audio_path: str) -> dict: | |
| """Extract minimal features quickly""" | |
| try: | |
| # Load only first 3 seconds for speed | |
| y, sr = librosa.load(audio_path, sr=16000, duration=3) | |
| features = { | |
| 'energy': float(np.mean(librosa.feature.rms(y=y))), | |
| 'brightness': float(np.mean(librosa.feature.spectral_centroid(y=y, sr=sr))), | |
| 'pitch': float(np.median(librosa.piptrack(y=y, sr=sr)[0][librosa.piptrack(y=y, sr=sr)[0] > 0]) or 150), | |
| 'tempo': float(librosa.beat.tempo(y=y, sr=sr)[0]), | |
| 'speech_rate': float(np.mean(librosa.feature.zero_crossing_rate(y))) | |
| } | |
| return features | |
| except Exception as e: | |
| print(f"β Feature extraction error: {e}") | |
| return {'energy': 0.05, 'brightness': 1500, 'pitch': 200, 'tempo': 100, 'speech_rate': 0.1} | |
| def create_gemma_prompt(self, features: dict) -> str: | |
| """Create optimized prompt for Gemma""" | |
| prompt = f"""Analyze the emotional content from these audio features: | |
| Audio Characteristics: | |
| - Energy Level: {"High" if features['energy'] > 0.08 else "Low" if features['energy'] < 0.03 else "Medium"} | |
| - Brightness: {"Bright" if features['brightness'] > 2000 else "Dark" if features['brightness'] < 1000 else "Neutral"} | |
| - Average Pitch: {"High" if features['pitch'] > 250 else "Low" if features['pitch'] < 150 else "Medium"} | |
| - Tempo: {"Fast" if features['tempo'] > 140 else "Slow" if features['tempo'] < 90 else "Moderate"} | |
| - Speech Rate: {"Rapid" if features['speech_rate'] > 0.15 else "Slow" if features['speech_rate'] < 0.08 else "Normal"} | |
| Based on these acoustic properties, identify the primary emotion. Choose ONE from: happy, sad, angry, fearful, neutral, excited, calm. | |
| Respond in this exact format: | |
| Emotion: [emotion] | |
| Confidence: [high/medium/low] | |
| Reason: [brief reason based on features] | |
| Analysis:""" | |
| return prompt | |
| def generate_with_gemma(self, prompt: str) -> str: | |
| """Generate response using Gemma with optimized settings""" | |
| if self.model is None: | |
| return "Emotion: neutral\nConfidence: medium\nReason: Using fallback analysis" | |
| try: | |
| # Tokenize | |
| inputs = self.tokenizer( | |
| prompt, | |
| return_tensors="pt", | |
| max_length=512, | |
| truncation=True | |
| ).to(self.device) | |
| # Generate with optimized settings for speed | |
| with torch.no_grad(): | |
| outputs = self.model.generate( | |
| **inputs, | |
| max_new_tokens=100, # Shorter response | |
| temperature=0.7, | |
| do_sample=True, | |
| top_p=0.9, | |
| pad_token_id=self.tokenizer.eos_token_id, | |
| repetition_penalty=1.1 | |
| ) | |
| # Decode | |
| response = self.tokenizer.decode(outputs[0], skip_special_tokens=True) | |
| return response[len(prompt):].strip() | |
| except Exception as e: | |
| print(f"β Gemma generation error: {e}") | |
| return "Emotion: neutral\nConfidence: low\nReason: Analysis unavailable" | |
| def parse_gemma_response(self, response: str) -> dict: | |
| """Parse Gemma's response""" | |
| lines = response.split('\n') | |
| result = { | |
| 'emotion': 'neutral', | |
| 'confidence': 'medium', | |
| 'reason': 'No analysis provided', | |
| 'raw_response': response | |
| } | |
| for line in lines: | |
| line = line.strip() | |
| if line.startswith('Emotion:'): | |
| result['emotion'] = line.split(':', 1)[1].strip().lower() | |
| elif line.startswith('Confidence:'): | |
| result['confidence'] = line.split(':', 1)[1].strip().lower() | |
| elif line.startswith('Reason:'): | |
| result['reason'] = line.split(':', 1)[1].strip() | |
| return result | |
| def analyze_emotion(self, audio_path: str) -> dict: | |
| """Main analysis function""" | |
| print(f"π΅ Analyzing: {os.path.basename(audio_path)}") | |
| # Step 1: Extract features (fast) | |
| features = self.extract_fast_features(audio_path) | |
| # Step 2: Create prompt | |
| prompt = self.create_gemma_prompt(features) | |
| # Step 3: Get Gemma analysis | |
| print("π€ Querying Gemma...") | |
| gemma_response = self.generate_with_gemma(prompt) | |
| # Step 4: Parse response | |
| result = self.parse_gemma_response(gemma_response) | |
| result['features'] = features | |
| print(f"β Gemma result: {result['emotion']}") | |
| return result | |
| # Initialize analyzer | |
| print("π Initializing Gemma Audio Analyzer...") | |
| analyzer = GemmaAudioEmotionAnalyzer() | |
| def process_audio(audio_path: str) -> str: | |
| """Gradio interface function""" | |
| if not audio_path: | |
| return "β Please provide an audio file" | |
| try: | |
| result = analyzer.analyze_emotion(audio_path) | |
| # Format output | |
| emotion_icons = { | |
| 'happy': 'π', 'sad': 'π’', 'angry': 'π ', | |
| 'fearful': 'π¨', 'neutral': 'π', 'excited': 'π€©', 'calm': 'π' | |
| } | |
| icon = emotion_icons.get(result['emotion'], 'π') | |
| output = f""" | |
| {icon} **Emotion**: {result['emotion'].title()} | |
| π **Confidence**: {result['confidence'].title()} | |
| π **Reason**: {result['reason']} | |
| π¬ **Audio Analysis**: | |
| β’ Energy: {result['features']['energy']:.3f} | |
| β’ Brightness: {result['features']['brightness']:.0f} Hz | |
| β’ Pitch: {result['features']['pitch']:.0f} Hz | |
| β’ Tempo: {result['features']['tempo']:.0f} BPM | |
| π€ **Powered by Google Gemma** | |
| """ | |
| return output | |
| except Exception as e: | |
| return f"β Error: {str(e)}" | |
| # Create Gradio interface | |
| demo = gr.Interface( | |
| fn=process_audio, | |
| inputs=gr.Audio( | |
| sources=["upload"], | |
| type="filepath", | |
| label="Upload Audio File", | |
| max_length=10 # Limit to 10 seconds for faster processing | |
| ), | |
| outputs=gr.Markdown(label="Gemma Emotion Analysis"), | |
| title="π΅ Audio Emotion Analysis with Google Gemma", | |
| description="Upload audio to analyze emotions using Google's Gemma model", | |
| examples=[], | |
| allow_flagging="never" | |
| ) | |
| if __name__ == "__main__": | |
| print("π Starting Gemma Audio Emotion Analyzer...") | |
| demo.launch( | |
| server_name="0.0.0.0", | |
| server_port=7860, | |
| share=True | |
| ) | |
| app = FastAPI() | |
| # Enable CORS so your gateway can call this | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| async def api_analyze(audio: UploadFile = File(...)): | |
| """API endpoint for programmatic access""" | |
| # Save uploaded file temporarily | |
| temp_path = f"/tmp/{audio.filename}" | |
| with open(temp_path, "wb") as f: | |
| f.write(await audio.read()) | |
| # Use your existing emotion detection function | |
| result = detect_emotion(temp_path) | |
| return result | |
| # Mount Gradio to FastAPI (this combines both!) | |
| app = gr.mount_gradio_app(app, demo, path="/") |