import gradio as gr import subprocess import os import tempfile import shutil import base64 import mimetypes from typing import List, Dict, Any, Optional from fastapi import FastAPI, File, UploadFile, HTTPException from fastapi.responses import FileResponse from fastapi.middleware.cors import CORSMiddleware import uvicorn import threading from pathlib import Path from datetime import datetime import json # MCP Server Implementation class MusicRecognitionMCPServer: def __init__(self, allowed_directories: List[str] = None): """Initialize MCP Server with configurable file access""" self.allowed_directories = allowed_directories or ["/tmp", "uploads", "output"] self.processed_files = {} # Cache of processed results # Convert relative paths to absolute and check access abs_directories = [] for directory in self.allowed_directories: # Convert relative paths to absolute if not os.path.isabs(directory): directory = os.path.abspath(directory) abs_directories.append(directory) print(f"Checking directory: {directory}") # Only try to create if it doesn't exist and we have permission if not os.path.exists(directory): try: os.makedirs(directory, exist_ok=True) os.chmod(directory, 0o755) print(f"✅ Directory created: {directory}") except PermissionError: print(f"⚠️ Directory doesn't exist but can't create: {directory}") print(f" This is normal for system directories like /tmp") except Exception as e: print(f"⚠️ Warning creating {directory}: {e}") else: print(f"✅ Directory exists: {directory}") self.allowed_directories = abs_directories print(f"Final allowed directories: {self.allowed_directories}") # Test Audiveris installation self._test_audiveris() def list_resources(self) -> List[Dict[str, Any]]: """List available resources following MCP patterns""" resources = [] # Add processed files as resources for file_id, file_info in self.processed_files.items(): resources.append({ "uri": f"musicxml://{file_id}", "name": file_info["original_name"], "description": f"MusicXML file converted from {file_info['original_name']} on {file_info['processed_at']}", "mimeType": "application/vnd.recordare.musicxml+xml" }) # Add available PDF files in allowed directories for directory in self.allowed_directories: if os.path.exists(directory): for file_path in Path(directory).rglob("*.pdf"): if self._is_file_accessible(str(file_path)): resources.append({ "uri": f"file://{file_path}", "name": file_path.name, "description": f"PDF music score available for processing: {file_path.name}", "mimeType": "application/pdf" }) return resources def read_resource(self, uri: str) -> Dict[str, Any]: """Read resource content following MCP patterns""" if uri.startswith("musicxml://"): # Return processed MusicXML file file_id = uri.replace("musicxml://", "") if file_id in self.processed_files: file_info = self.processed_files[file_id] try: with open(file_info["output_path"], "rb") as f: content = base64.b64encode(f.read()).decode() return { "contents": [{ "type": "resource", "resource": { "uri": uri, "text": content, "mimeType": "application/vnd.recordare.musicxml+xml" } }] } except FileNotFoundError: raise Exception(f"MusicXML file not found: {file_info['output_path']}") else: raise Exception(f"Resource not found: {uri}") elif uri.startswith("file://"): # Return PDF file content file_path = uri.replace("file://", "") if not self._is_file_accessible(file_path): raise Exception(f"File access denied: {file_path}") try: with open(file_path, "rb") as f: content = base64.b64encode(f.read()).decode() return { "contents": [{ "type": "resource", "resource": { "uri": uri, "text": content, "mimeType": "application/pdf" } }] } except FileNotFoundError: raise Exception(f"File not found: {file_path}") else: raise Exception(f"Unsupported URI scheme: {uri}") def _is_file_accessible(self, file_path: str) -> bool: """Check if file is within allowed directories""" abs_path = os.path.abspath(file_path) return any(abs_path.startswith(os.path.abspath(d)) for d in self.allowed_directories) def recognize_music_tool(self, pdf_uri: str, output_dir: str = None) -> Dict[str, Any]: """Tool for music recognition following MCP patterns""" # Handle different URI formats if pdf_uri.startswith("file://"): pdf_path = pdf_uri.replace("file://", "") elif pdf_uri.startswith("data:"): # Handle data URIs (base64 encoded) return self._process_data_uri(pdf_uri, output_dir) else: # Assume it's a direct file path pdf_path = pdf_uri if not self._is_file_accessible(pdf_path): raise Exception(f"File access denied: {pdf_path}") if not os.path.exists(pdf_path): raise Exception(f"PDF file not found: {pdf_path}") try: output_file = self._recognize_music_core(pdf_path, output_dir) # Store in processed files cache file_id = f"music_{len(self.processed_files) + 1}_{int(datetime.now().timestamp())}" self.processed_files[file_id] = { "original_name": os.path.basename(pdf_path), "original_path": pdf_path, "output_path": output_file, "processed_at": datetime.now().isoformat(), "file_id": file_id } # Return MCP-compliant response return { "content": [{ "type": "text", "text": f"✅ Successfully converted '{os.path.basename(pdf_path)}' to MusicXML.\n\n" f"📁 Output file: {output_file}\n" f"🔗 Resource URI: musicxml://{file_id}\n" f"📊 File size: {os.path.getsize(output_file)} bytes\n\n" f"You can now access this MusicXML file as a resource using the URI: `musicxml://{file_id}`" }], "isError": False } except Exception as e: return { "content": [{ "type": "text", "text": f"❌ Music recognition failed: {str(e)}" }], "isError": True } def _process_data_uri(self, data_uri: str, output_dir: str = None) -> Dict[str, Any]: """Process base64 encoded data URI""" try: # Parse data URI: data:application/pdf;base64, header, data = data_uri.split(',', 1) mime_type = header.split(';')[0].replace('data:', '') if mime_type != 'application/pdf': raise Exception(f"Unsupported MIME type: {mime_type}") # Fix base64 padding if needed data = self._fix_base64_padding(data) # Decode base64 data pdf_data = base64.b64decode(data) # Save to temporary file temp_dir = output_dir or "/tmp" temp_pdf = os.path.join(temp_dir, f"temp_{int(datetime.now().timestamp())}.pdf") with open(temp_pdf, 'wb') as f: f.write(pdf_data) # Process the file return self.recognize_music_tool(f"file://{temp_pdf}", output_dir) except Exception as e: raise Exception(f"Failed to process data URI: {str(e)}") def _fix_base64_padding(self, data: str) -> str: """Fix base64 padding to make it valid""" # Remove any whitespace data = data.strip().replace('\n', '').replace('\r', '').replace(' ', '') # Add padding if needed missing_padding = len(data) % 4 if missing_padding: data += '=' * (4 - missing_padding) return data def _recognize_music_core(self, pdf_file_path: str, output_dir: str = None) -> str: """Core music recognition function""" audiveris = "/opt/audiveris/bin/Audiveris" if output_dir is None: output_dir = "/tmp/output" # Ensure output directory exists with proper permissions os.makedirs(output_dir, exist_ok=True) try: os.chmod(output_dir, 0o755) except Exception as e: print(f"Warning: Could not set permissions for {output_dir}: {e}") if not self._is_file_accessible(output_dir): raise Exception(f"Output directory access denied: {output_dir}") # Verify input file exists if not os.path.exists(pdf_file_path): raise Exception(f"Input PDF file not found: {pdf_file_path}") pdf_file_name = os.path.basename(pdf_file_path) pdf_name_without_ext = os.path.splitext(pdf_file_name)[0] # Try both possible extensions possible_extensions = [".mxl", ".xml", ".musicxml"] output_files = [os.path.join(output_dir, f"{pdf_name_without_ext}{ext}") for ext in possible_extensions] cmd = [ audiveris, "-batch", "-export", "-output", output_dir, pdf_file_path ] print(f"Running Audiveris command: {' '.join(cmd)}") result = subprocess.run(cmd, capture_output=True, text=True) print(f"Audiveris stdout: {result.stdout}") print(f"Audiveris stderr: {result.stderr}") print(f"Audiveris return code: {result.returncode}") # List files in output directory for debugging if os.path.exists(output_dir): files_in_output = os.listdir(output_dir) print(f"Files in output directory: {files_in_output}") # Check if any of the possible output files exist existing_output = None for output_file in output_files: if os.path.exists(output_file): existing_output = output_file break if existing_output: print(f"Found output file: {existing_output}") return existing_output # If no output file found, provide detailed error error_msg = f"Audiveris processing failed.\n" error_msg += f"Return code: {result.returncode}\n" error_msg += f"Stdout: {result.stdout}\n" error_msg += f"Stderr: {result.stderr}\n" error_msg += f"Expected files: {output_files}\n" error_msg += f"Files in output dir: {os.listdir(output_dir) if os.path.exists(output_dir) else 'Directory does not exist'}\n" raise Exception(error_msg) def _test_audiveris(self): """Test if Audiveris is properly installed""" audiveris = "/opt/audiveris/bin/Audiveris" if not os.path.exists(audiveris): print(f"⚠️ Warning: Audiveris not found at {audiveris}") return False try: # Test Audiveris with help command result = subprocess.run([audiveris, "-help"], capture_output=True, text=True, timeout=10) if "Audiveris" in result.stdout or "Audiveris" in result.stderr: print("✅ Audiveris installation verified") return True else: print(f"⚠️ Warning: Audiveris may not be working properly") print(f"Output: {result.stdout}") print(f"Error: {result.stderr}") return False except Exception as e: print(f"⚠️ Warning: Could not test Audiveris: {e}") return False # Initialize MCP Server mcp_server = MusicRecognitionMCPServer(["/tmp", "uploads", "output"]) # Create FastAPI app app = FastAPI(title="Music Recognition API", version="1.0.0") app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # MCP-compliant endpoints @app.get("/mcp/resources") async def list_mcp_resources(): """List available MCP resources""" try: resources = mcp_server.list_resources() return {"resources": resources} except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @app.post("/mcp/resources/read") async def read_mcp_resource(request: dict): """Read MCP resource content""" try: uri = request.get("uri") if not uri: raise HTTPException(status_code=400, detail="URI is required") content = mcp_server.read_resource(uri) return content except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @app.post("/mcp/tools/recognize_music") async def mcp_recognize_music_tool(request: dict): """MCP tool for music recognition""" try: pdf_uri = request.get("pdf_uri") output_dir = request.get("output_dir") if not pdf_uri: raise HTTPException(status_code=400, detail="pdf_uri is required") result = mcp_server.recognize_music_tool(pdf_uri, output_dir) return result except Exception as e: raise HTTPException(status_code=500, detail=str(e)) # Original FastAPI endpoints for backward compatibility @app.post("/api/recognize-music") async def recognize_music_api(file: UploadFile = File(...)): """FastAPI endpoint for music recognition""" if not file.filename.lower().endswith('.pdf'): raise HTTPException(status_code=400, detail="Only PDF files are allowed") with tempfile.TemporaryDirectory() as temp_dir: input_path = os.path.join(temp_dir, file.filename) with open(input_path, "wb") as buffer: shutil.copyfileobj(file.file, buffer) try: output_file = mcp_server._recognize_music_core(input_path, temp_dir) return FileResponse( path=output_file, filename=os.path.basename(output_file), media_type='application/octet-stream' ) except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @app.get("/api/health") async def health_check(): """Health check endpoint""" return {"status": "healthy", "service": "music-recognition"} @app.get("/api/info") async def get_info(): """Get API information""" return { "name": "Music Recognition API", "version": "1.0.0", "description": "Convert PDF music scores to MusicXML using Audiveris", "mcp_compliant": True, "endpoints": { "GET /mcp/resources": "List MCP resources", "POST /mcp/resources/read": "Read MCP resource content", "POST /mcp/tools/recognize_music": "MCP music recognition tool", "POST /api/recognize-music": "Upload PDF and get MusicXML", "GET /api/health": "Health check", "GET /api/info": "API information" } } # Gradio interface function def recognize_music_gradio(pdf_file): """Gradio wrapper for music recognition""" try: print(f"Processing file: {pdf_file.name}") result = mcp_server.recognize_music_tool(f"file://{pdf_file.name}") if result.get("isError"): error_msg = result["content"][0]["text"] print(f"Error in music recognition: {error_msg}") return None # Extract file ID from the response response_text = result["content"][0]["text"] print(f"Response text: {response_text}") if "musicxml://" in response_text: file_id = response_text.split("musicxml://")[1].split("`")[0] print(f"Extracted file ID: {file_id}") if file_id in mcp_server.processed_files: file_info = mcp_server.processed_files[file_id] output_path = file_info["output_path"] print(f"Output path from cache: {output_path}") if os.path.exists(output_path): print(f"✅ File exists: {output_path}") return output_path else: print(f"❌ File not found: {output_path}") # If the above doesn't work, try to find the file directly pdf_basename = os.path.splitext(os.path.basename(pdf_file.name))[0] possible_files = [ f"/tmp/output/{pdf_basename}.mxl", f"/tmp/output/{pdf_basename}.xml", f"/tmp/output/{pdf_basename}.musicxml" ] for file_path in possible_files: print(f"Checking: {file_path}") if os.path.exists(file_path): print(f"✅ Found file: {file_path}") return file_path print("❌ No output file found in any expected location") print(f"Files in /tmp/output: {os.listdir('/tmp/output') if os.path.exists('/tmp/output') else 'Directory not found'}") return None except Exception as e: print(f"Exception in Gradio wrapper: {str(e)}") import traceback traceback.print_exc() return None # Create Gradio interface gradio_interface = gr.Interface( fn=recognize_music_gradio, inputs=gr.File(file_types=[".pdf"], label="Upload PDF music score"), outputs=gr.File(label="Download MusicXML file"), title="Music Score Recognition", description="Upload a PDF music score and create a MusicXML file from it.", ) def run_gradio(): """Run Gradio in a separate thread""" gradio_interface.launch( server_name="0.0.0.0", server_port=7860, share=True, mcp_server=True, prevent_thread_lock=True ) def run_fastapi(): """Run FastAPI server""" uvicorn.run( app, host="0.0.0.0", port=8000, log_level="info" ) if __name__ == "__main__": # Start Gradio in a separate thread gradio_thread = threading.Thread(target=run_gradio, daemon=True) gradio_thread.start() print("🎵 MCP-Compliant Music Recognition Service Starting...") print("📱 Gradio UI: http://localhost:7860") print("🚀 FastAPI: http://localhost:8000") print("📋 API Docs: http://localhost:8000/docs") print("🔗 MCP Resources: http://localhost:8000/mcp/resources") # Run FastAPI in the main thread run_fastapi()