| | import gradio as gr |
| | import subprocess |
| | import os |
| | import tempfile |
| | import shutil |
| | import base64 |
| | import mimetypes |
| | from typing import List, Dict, Any, Optional |
| | from fastapi import FastAPI, File, UploadFile, HTTPException |
| | from fastapi.responses import FileResponse |
| | from fastapi.middleware.cors import CORSMiddleware |
| | import uvicorn |
| | import threading |
| | from pathlib import Path |
| | from datetime import datetime |
| | import json |
| |
|
| | |
| | class MusicRecognitionMCPServer: |
| | def __init__(self, allowed_directories: List[str] = None): |
| | """Initialize MCP Server with configurable file access""" |
| | self.allowed_directories = allowed_directories or ["/tmp", "uploads", "output"] |
| | self.processed_files = {} |
| | |
| | |
| | abs_directories = [] |
| | for directory in self.allowed_directories: |
| | |
| | if not os.path.isabs(directory): |
| | directory = os.path.abspath(directory) |
| | |
| | abs_directories.append(directory) |
| | print(f"Checking directory: {directory}") |
| | |
| | |
| | if not os.path.exists(directory): |
| | try: |
| | os.makedirs(directory, exist_ok=True) |
| | os.chmod(directory, 0o755) |
| | print(f"β
Directory created: {directory}") |
| | except PermissionError: |
| | print(f"β οΈ Directory doesn't exist but can't create: {directory}") |
| | print(f" This is normal for system directories like /tmp") |
| | except Exception as e: |
| | print(f"β οΈ Warning creating {directory}: {e}") |
| | else: |
| | print(f"β
Directory exists: {directory}") |
| | |
| | self.allowed_directories = abs_directories |
| | print(f"Final allowed directories: {self.allowed_directories}") |
| | |
| | |
| | self._test_audiveris() |
| | |
| | def list_resources(self) -> List[Dict[str, Any]]: |
| | """List available resources following MCP patterns""" |
| | resources = [] |
| | |
| | |
| | for file_id, file_info in self.processed_files.items(): |
| | resources.append({ |
| | "uri": f"musicxml://{file_id}", |
| | "name": file_info["original_name"], |
| | "description": f"MusicXML file converted from {file_info['original_name']} on {file_info['processed_at']}", |
| | "mimeType": "application/vnd.recordare.musicxml+xml" |
| | }) |
| | |
| | |
| | for directory in self.allowed_directories: |
| | if os.path.exists(directory): |
| | for file_path in Path(directory).rglob("*.pdf"): |
| | if self._is_file_accessible(str(file_path)): |
| | resources.append({ |
| | "uri": f"file://{file_path}", |
| | "name": file_path.name, |
| | "description": f"PDF music score available for processing: {file_path.name}", |
| | "mimeType": "application/pdf" |
| | }) |
| | |
| | return resources |
| | |
| | def read_resource(self, uri: str) -> Dict[str, Any]: |
| | """Read resource content following MCP patterns""" |
| | if uri.startswith("musicxml://"): |
| | |
| | file_id = uri.replace("musicxml://", "") |
| | if file_id in self.processed_files: |
| | file_info = self.processed_files[file_id] |
| | try: |
| | with open(file_info["output_path"], "rb") as f: |
| | content = base64.b64encode(f.read()).decode() |
| | |
| | return { |
| | "contents": [{ |
| | "type": "resource", |
| | "resource": { |
| | "uri": uri, |
| | "text": content, |
| | "mimeType": "application/vnd.recordare.musicxml+xml" |
| | } |
| | }] |
| | } |
| | except FileNotFoundError: |
| | raise Exception(f"MusicXML file not found: {file_info['output_path']}") |
| | else: |
| | raise Exception(f"Resource not found: {uri}") |
| | |
| | elif uri.startswith("file://"): |
| | |
| | file_path = uri.replace("file://", "") |
| | if not self._is_file_accessible(file_path): |
| | raise Exception(f"File access denied: {file_path}") |
| | |
| | try: |
| | with open(file_path, "rb") as f: |
| | content = base64.b64encode(f.read()).decode() |
| | |
| | return { |
| | "contents": [{ |
| | "type": "resource", |
| | "resource": { |
| | "uri": uri, |
| | "text": content, |
| | "mimeType": "application/pdf" |
| | } |
| | }] |
| | } |
| | except FileNotFoundError: |
| | raise Exception(f"File not found: {file_path}") |
| | |
| | else: |
| | raise Exception(f"Unsupported URI scheme: {uri}") |
| | |
| | def _is_file_accessible(self, file_path: str) -> bool: |
| | """Check if file is within allowed directories""" |
| | abs_path = os.path.abspath(file_path) |
| | return any(abs_path.startswith(os.path.abspath(d)) for d in self.allowed_directories) |
| | |
| | def recognize_music_tool(self, pdf_uri: str, output_dir: str = None) -> Dict[str, Any]: |
| | """Tool for music recognition following MCP patterns""" |
| | |
| | if pdf_uri.startswith("file://"): |
| | pdf_path = pdf_uri.replace("file://", "") |
| | elif pdf_uri.startswith("data:"): |
| | |
| | return self._process_data_uri(pdf_uri, output_dir) |
| | else: |
| | |
| | pdf_path = pdf_uri |
| | |
| | if not self._is_file_accessible(pdf_path): |
| | raise Exception(f"File access denied: {pdf_path}") |
| | |
| | if not os.path.exists(pdf_path): |
| | raise Exception(f"PDF file not found: {pdf_path}") |
| | |
| | try: |
| | output_file = self._recognize_music_core(pdf_path, output_dir) |
| | |
| | |
| | file_id = f"music_{len(self.processed_files) + 1}_{int(datetime.now().timestamp())}" |
| | self.processed_files[file_id] = { |
| | "original_name": os.path.basename(pdf_path), |
| | "original_path": pdf_path, |
| | "output_path": output_file, |
| | "processed_at": datetime.now().isoformat(), |
| | "file_id": file_id |
| | } |
| | |
| | |
| | return { |
| | "content": [{ |
| | "type": "text", |
| | "text": f"β
Successfully converted '{os.path.basename(pdf_path)}' to MusicXML.\n\n" |
| | f"π Output file: {output_file}\n" |
| | f"π Resource URI: musicxml://{file_id}\n" |
| | f"π File size: {os.path.getsize(output_file)} bytes\n\n" |
| | f"You can now access this MusicXML file as a resource using the URI: `musicxml://{file_id}`" |
| | }], |
| | "isError": False |
| | } |
| | |
| | except Exception as e: |
| | return { |
| | "content": [{ |
| | "type": "text", |
| | "text": f"β Music recognition failed: {str(e)}" |
| | }], |
| | "isError": True |
| | } |
| | |
| | def _process_data_uri(self, data_uri: str, output_dir: str = None) -> Dict[str, Any]: |
| | """Process base64 encoded data URI""" |
| | try: |
| | |
| | header, data = data_uri.split(',', 1) |
| | mime_type = header.split(';')[0].replace('data:', '') |
| | |
| | if mime_type != 'application/pdf': |
| | raise Exception(f"Unsupported MIME type: {mime_type}") |
| | |
| | |
| | data = self._fix_base64_padding(data) |
| | |
| | |
| | pdf_data = base64.b64decode(data) |
| | |
| | |
| | temp_dir = output_dir or "/tmp" |
| | temp_pdf = os.path.join(temp_dir, f"temp_{int(datetime.now().timestamp())}.pdf") |
| | |
| | with open(temp_pdf, 'wb') as f: |
| | f.write(pdf_data) |
| | |
| | |
| | return self.recognize_music_tool(f"file://{temp_pdf}", output_dir) |
| | |
| | except Exception as e: |
| | raise Exception(f"Failed to process data URI: {str(e)}") |
| | |
| | def _fix_base64_padding(self, data: str) -> str: |
| | """Fix base64 padding to make it valid""" |
| | |
| | data = data.strip().replace('\n', '').replace('\r', '').replace(' ', '') |
| | |
| | |
| | missing_padding = len(data) % 4 |
| | if missing_padding: |
| | data += '=' * (4 - missing_padding) |
| | |
| | return data |
| | |
| | def _recognize_music_core(self, pdf_file_path: str, output_dir: str = None) -> str: |
| | """Core music recognition function""" |
| | audiveris = "/opt/audiveris/bin/Audiveris" |
| | |
| | if output_dir is None: |
| | output_dir = "/tmp/output" |
| | |
| | |
| | os.makedirs(output_dir, exist_ok=True) |
| | try: |
| | os.chmod(output_dir, 0o755) |
| | except Exception as e: |
| | print(f"Warning: Could not set permissions for {output_dir}: {e}") |
| | |
| | if not self._is_file_accessible(output_dir): |
| | raise Exception(f"Output directory access denied: {output_dir}") |
| | |
| | |
| | if not os.path.exists(pdf_file_path): |
| | raise Exception(f"Input PDF file not found: {pdf_file_path}") |
| | |
| | pdf_file_name = os.path.basename(pdf_file_path) |
| | pdf_name_without_ext = os.path.splitext(pdf_file_name)[0] |
| | |
| | |
| | possible_extensions = [".mxl", ".xml", ".musicxml"] |
| | output_files = [os.path.join(output_dir, f"{pdf_name_without_ext}{ext}") for ext in possible_extensions] |
| |
|
| | cmd = [ |
| | audiveris, "-batch", "-export", "-output", output_dir, pdf_file_path |
| | ] |
| |
|
| | print(f"Running Audiveris command: {' '.join(cmd)}") |
| | result = subprocess.run(cmd, capture_output=True, text=True) |
| | |
| | print(f"Audiveris stdout: {result.stdout}") |
| | print(f"Audiveris stderr: {result.stderr}") |
| | print(f"Audiveris return code: {result.returncode}") |
| | |
| | |
| | if os.path.exists(output_dir): |
| | files_in_output = os.listdir(output_dir) |
| | print(f"Files in output directory: {files_in_output}") |
| | |
| | |
| | existing_output = None |
| | for output_file in output_files: |
| | if os.path.exists(output_file): |
| | existing_output = output_file |
| | break |
| | |
| | if existing_output: |
| | print(f"Found output file: {existing_output}") |
| | return existing_output |
| | |
| | |
| | error_msg = f"Audiveris processing failed.\n" |
| | error_msg += f"Return code: {result.returncode}\n" |
| | error_msg += f"Stdout: {result.stdout}\n" |
| | error_msg += f"Stderr: {result.stderr}\n" |
| | error_msg += f"Expected files: {output_files}\n" |
| | error_msg += f"Files in output dir: {os.listdir(output_dir) if os.path.exists(output_dir) else 'Directory does not exist'}\n" |
| | |
| | raise Exception(error_msg) |
| | |
| | def _test_audiveris(self): |
| | """Test if Audiveris is properly installed""" |
| | audiveris = "/opt/audiveris/bin/Audiveris" |
| | |
| | if not os.path.exists(audiveris): |
| | print(f"β οΈ Warning: Audiveris not found at {audiveris}") |
| | return False |
| | |
| | try: |
| | |
| | result = subprocess.run([audiveris, "-help"], capture_output=True, text=True, timeout=10) |
| | if "Audiveris" in result.stdout or "Audiveris" in result.stderr: |
| | print("β
Audiveris installation verified") |
| | return True |
| | else: |
| | print(f"β οΈ Warning: Audiveris may not be working properly") |
| | print(f"Output: {result.stdout}") |
| | print(f"Error: {result.stderr}") |
| | return False |
| | except Exception as e: |
| | print(f"β οΈ Warning: Could not test Audiveris: {e}") |
| | return False |
| |
|
| | |
| | mcp_server = MusicRecognitionMCPServer(["/tmp", "uploads", "output"]) |
| |
|
| | |
| | app = FastAPI(title="Music Recognition API", version="1.0.0") |
| |
|
| | app.add_middleware( |
| | CORSMiddleware, |
| | allow_origins=["*"], |
| | allow_credentials=True, |
| | allow_methods=["*"], |
| | allow_headers=["*"], |
| | ) |
| |
|
| | |
| | @app.get("/mcp/resources") |
| | async def list_mcp_resources(): |
| | """List available MCP resources""" |
| | try: |
| | resources = mcp_server.list_resources() |
| | return {"resources": resources} |
| | except Exception as e: |
| | raise HTTPException(status_code=500, detail=str(e)) |
| |
|
| | @app.post("/mcp/resources/read") |
| | async def read_mcp_resource(request: dict): |
| | """Read MCP resource content""" |
| | try: |
| | uri = request.get("uri") |
| | if not uri: |
| | raise HTTPException(status_code=400, detail="URI is required") |
| | |
| | content = mcp_server.read_resource(uri) |
| | return content |
| | except Exception as e: |
| | raise HTTPException(status_code=500, detail=str(e)) |
| |
|
| | @app.post("/mcp/tools/recognize_music") |
| | async def mcp_recognize_music_tool(request: dict): |
| | """MCP tool for music recognition""" |
| | try: |
| | pdf_uri = request.get("pdf_uri") |
| | output_dir = request.get("output_dir") |
| | |
| | if not pdf_uri: |
| | raise HTTPException(status_code=400, detail="pdf_uri is required") |
| | |
| | result = mcp_server.recognize_music_tool(pdf_uri, output_dir) |
| | return result |
| | except Exception as e: |
| | raise HTTPException(status_code=500, detail=str(e)) |
| |
|
| | |
| | @app.post("/api/recognize-music") |
| | async def recognize_music_api(file: UploadFile = File(...)): |
| | """FastAPI endpoint for music recognition""" |
| | if not file.filename.lower().endswith('.pdf'): |
| | raise HTTPException(status_code=400, detail="Only PDF files are allowed") |
| | |
| | with tempfile.TemporaryDirectory() as temp_dir: |
| | input_path = os.path.join(temp_dir, file.filename) |
| | with open(input_path, "wb") as buffer: |
| | shutil.copyfileobj(file.file, buffer) |
| | |
| | try: |
| | output_file = mcp_server._recognize_music_core(input_path, temp_dir) |
| | return FileResponse( |
| | path=output_file, |
| | filename=os.path.basename(output_file), |
| | media_type='application/octet-stream' |
| | ) |
| | except Exception as e: |
| | raise HTTPException(status_code=500, detail=str(e)) |
| |
|
| | @app.get("/api/health") |
| | async def health_check(): |
| | """Health check endpoint""" |
| | return {"status": "healthy", "service": "music-recognition"} |
| |
|
| | @app.get("/api/info") |
| | async def get_info(): |
| | """Get API information""" |
| | return { |
| | "name": "Music Recognition API", |
| | "version": "1.0.0", |
| | "description": "Convert PDF music scores to MusicXML using Audiveris", |
| | "mcp_compliant": True, |
| | "endpoints": { |
| | "GET /mcp/resources": "List MCP resources", |
| | "POST /mcp/resources/read": "Read MCP resource content", |
| | "POST /mcp/tools/recognize_music": "MCP music recognition tool", |
| | "POST /api/recognize-music": "Upload PDF and get MusicXML", |
| | "GET /api/health": "Health check", |
| | "GET /api/info": "API information" |
| | } |
| | } |
| |
|
| | |
| | def recognize_music_gradio(pdf_file): |
| | """Gradio wrapper for music recognition""" |
| | try: |
| | print(f"Processing file: {pdf_file.name}") |
| | result = mcp_server.recognize_music_tool(f"file://{pdf_file.name}") |
| | |
| | if result.get("isError"): |
| | error_msg = result["content"][0]["text"] |
| | print(f"Error in music recognition: {error_msg}") |
| | return None |
| | |
| | |
| | response_text = result["content"][0]["text"] |
| | print(f"Response text: {response_text}") |
| | |
| | if "musicxml://" in response_text: |
| | file_id = response_text.split("musicxml://")[1].split("`")[0] |
| | print(f"Extracted file ID: {file_id}") |
| | |
| | if file_id in mcp_server.processed_files: |
| | file_info = mcp_server.processed_files[file_id] |
| | output_path = file_info["output_path"] |
| | print(f"Output path from cache: {output_path}") |
| | |
| | if os.path.exists(output_path): |
| | print(f"β
File exists: {output_path}") |
| | return output_path |
| | else: |
| | print(f"β File not found: {output_path}") |
| | |
| | |
| | pdf_basename = os.path.splitext(os.path.basename(pdf_file.name))[0] |
| | possible_files = [ |
| | f"/tmp/output/{pdf_basename}.mxl", |
| | f"/tmp/output/{pdf_basename}.xml", |
| | f"/tmp/output/{pdf_basename}.musicxml" |
| | ] |
| | |
| | for file_path in possible_files: |
| | print(f"Checking: {file_path}") |
| | if os.path.exists(file_path): |
| | print(f"β
Found file: {file_path}") |
| | return file_path |
| | |
| | print("β No output file found in any expected location") |
| | print(f"Files in /tmp/output: {os.listdir('/tmp/output') if os.path.exists('/tmp/output') else 'Directory not found'}") |
| | return None |
| | |
| | except Exception as e: |
| | print(f"Exception in Gradio wrapper: {str(e)}") |
| | import traceback |
| | traceback.print_exc() |
| | return None |
| |
|
| | |
| | gradio_interface = gr.Interface( |
| | fn=recognize_music_gradio, |
| | inputs=gr.File(file_types=[".pdf"], label="Upload PDF music score"), |
| | outputs=gr.File(label="Download MusicXML file"), |
| | title="Music Score Recognition", |
| | description="Upload a PDF music score and create a MusicXML file from it.", |
| | ) |
| |
|
| | def run_gradio(): |
| | """Run Gradio in a separate thread""" |
| | gradio_interface.launch( |
| | server_name="0.0.0.0", |
| | server_port=7860, |
| | share=True, |
| | mcp_server=True, |
| | prevent_thread_lock=True |
| | ) |
| |
|
| | def run_fastapi(): |
| | """Run FastAPI server""" |
| | uvicorn.run( |
| | app, |
| | host="0.0.0.0", |
| | port=8000, |
| | log_level="info" |
| | ) |
| |
|
| | if __name__ == "__main__": |
| | |
| | gradio_thread = threading.Thread(target=run_gradio, daemon=True) |
| | gradio_thread.start() |
| | |
| | print("π΅ MCP-Compliant Music Recognition Service Starting...") |
| | print("π± Gradio UI: http://localhost:7860") |
| | print("π FastAPI: http://localhost:8000") |
| | print("π API Docs: http://localhost:8000/docs") |
| | print("π MCP Resources: http://localhost:8000/mcp/resources") |
| | |
| | |
| | run_fastapi() |
| | |