| import gradio as gr |
| import subprocess |
| import os |
| import base64 |
| from typing import List, Dict, Any |
| from pathlib import Path |
| from datetime import datetime |
|
|
| class MusicRecognitionMCPServer: |
| def __init__(self, allowed_directories: List[str] = None): |
| """Initialize MCP Server with configurable file access""" |
| self.allowed_directories = allowed_directories or ["/tmp", "uploads", "output"] |
| self.processed_files = {} |
| |
| |
| abs_directories = [] |
| for directory in self.allowed_directories: |
| |
| if not os.path.isabs(directory): |
| directory = os.path.abspath(directory) |
| |
| abs_directories.append(directory) |
| print(f"Checking directory: {directory}") |
| |
| |
| if not os.path.exists(directory): |
| try: |
| os.makedirs(directory, exist_ok=True) |
| os.chmod(directory, 0o755) |
| print(f"β
Directory created: {directory}") |
| except PermissionError: |
| print(f"β οΈ Directory doesn't exist but can't create: {directory}") |
| print(f" This is normal for system directories like /tmp") |
| except Exception as e: |
| print(f"β οΈ Warning creating {directory}: {e}") |
| else: |
| print(f"β
Directory exists: {directory}") |
| |
| self.allowed_directories = abs_directories |
| print(f"Final allowed directories: {self.allowed_directories}") |
| |
| |
| self._test_audiveris() |
| |
| def list_resources(self) -> List[Dict[str, Any]]: |
| """List available resources following MCP patterns""" |
| resources = [] |
| |
| |
| for file_id, file_info in self.processed_files.items(): |
| resources.append({ |
| "uri": f"musicxml://{file_id}", |
| "name": file_info["original_name"], |
| "description": f"MusicXML file converted from {file_info['original_name']} on {file_info['processed_at']}", |
| "mimeType": "application/vnd.recordare.musicxml+xml" |
| }) |
| |
| |
| for directory in self.allowed_directories: |
| if os.path.exists(directory): |
| for file_path in Path(directory).rglob("*.pdf"): |
| if self._is_file_accessible(str(file_path)): |
| resources.append({ |
| "uri": f"file://{file_path}", |
| "name": file_path.name, |
| "description": f"PDF music score available for processing: {file_path.name}", |
| "mimeType": "application/pdf" |
| }) |
| |
| return resources |
| |
| def read_resource(self, uri: str) -> Dict[str, Any]: |
| """Read resource content following MCP patterns""" |
| if uri.startswith("musicxml://"): |
| |
| file_id = uri.replace("musicxml://", "") |
| if file_id in self.processed_files: |
| file_info = self.processed_files[file_id] |
| try: |
| with open(file_info["output_path"], "rb") as f: |
| content = base64.b64encode(f.read()).decode() |
| |
| return { |
| "contents": [{ |
| "type": "resource", |
| "resource": { |
| "uri": uri, |
| "text": content, |
| "mimeType": "application/vnd.recordare.musicxml+xml" |
| } |
| }] |
| } |
| except FileNotFoundError: |
| raise Exception(f"MusicXML file not found: {file_info['output_path']}") |
| else: |
| raise Exception(f"Resource not found: {uri}") |
| |
| elif uri.startswith("file://"): |
| |
| file_path = uri.replace("file://", "") |
| if not self._is_file_accessible(file_path): |
| raise Exception(f"File access denied: {file_path}") |
| |
| try: |
| with open(file_path, "rb") as f: |
| content = base64.b64encode(f.read()).decode() |
| |
| return { |
| "contents": [{ |
| "type": "resource", |
| "resource": { |
| "uri": uri, |
| "text": content, |
| "mimeType": "application/pdf" |
| } |
| }] |
| } |
| except FileNotFoundError: |
| raise Exception(f"File not found: {file_path}") |
| |
| else: |
| raise Exception(f"Unsupported URI scheme: {uri}") |
| |
| def _is_file_accessible(self, file_path: str) -> bool: |
| """Check if file is within allowed directories""" |
| abs_path = os.path.abspath(file_path) |
| return any(abs_path.startswith(os.path.abspath(d)) for d in self.allowed_directories) |
| |
| def recognize_music_tool(self, pdf_uri: str, output_dir: str = None) -> Dict[str, Any]: |
| """Tool for music recognition following MCP patterns""" |
| |
| if pdf_uri.startswith("file://"): |
| pdf_path = pdf_uri.replace("file://", "") |
| elif pdf_uri.startswith("data:"): |
| |
| return self._process_data_uri(pdf_uri, output_dir) |
| else: |
| |
| pdf_path = pdf_uri |
| |
| if not self._is_file_accessible(pdf_path): |
| raise Exception(f"File access denied: {pdf_path}") |
| |
| if not os.path.exists(pdf_path): |
| raise Exception(f"PDF file not found: {pdf_path}") |
| |
| try: |
| output_file = self._recognize_music_core(pdf_path, output_dir) |
| |
| |
| file_id = f"music_{len(self.processed_files) + 1}_{int(datetime.now().timestamp())}" |
| self.processed_files[file_id] = { |
| "original_name": os.path.basename(pdf_path), |
| "original_path": pdf_path, |
| "output_path": output_file, |
| "processed_at": datetime.now().isoformat(), |
| "file_id": file_id |
| } |
| |
| |
| return { |
| "content": [{ |
| "type": "text", |
| "text": f"β
Successfully converted '{os.path.basename(pdf_path)}' to MusicXML.\n\n" |
| f"π Output file: {output_file}\n" |
| f"π Resource URI: musicxml://{file_id}\n" |
| f"π File size: {os.path.getsize(output_file)} bytes\n\n" |
| f"You can now access this MusicXML file as a resource using the URI: `musicxml://{file_id}`" |
| }], |
| "isError": False |
| } |
| |
| except Exception as e: |
| return { |
| "content": [{ |
| "type": "text", |
| "text": f"β Music recognition failed: {str(e)}" |
| }], |
| "isError": True |
| } |
| |
| def _process_data_uri(self, data_uri: str, output_dir: str = None) -> Dict[str, Any]: |
| """Process base64 encoded data URI""" |
| try: |
| |
| header, data = data_uri.split(',', 1) |
| mime_type = header.split(';')[0].replace('data:', '') |
| |
| if mime_type != 'application/pdf': |
| raise Exception(f"Unsupported MIME type: {mime_type}") |
| |
| |
| data = self._fix_base64_padding(data) |
| |
| |
| pdf_data = base64.b64decode(data) |
| |
| |
| temp_dir = output_dir or "/tmp" |
| temp_pdf = os.path.join(temp_dir, f"temp_{int(datetime.now().timestamp())}.pdf") |
| |
| with open(temp_pdf, 'wb') as f: |
| f.write(pdf_data) |
| |
| |
| return self.recognize_music_tool(f"file://{temp_pdf}", output_dir) |
| |
| except Exception as e: |
| raise Exception(f"Failed to process data URI: {str(e)}") |
| |
| def _fix_base64_padding(self, data: str) -> str: |
| """Fix base64 padding to make it valid""" |
| |
| data = data.strip().replace('\n', '').replace('\r', '').replace(' ', '') |
| |
| |
| missing_padding = len(data) % 4 |
| if missing_padding: |
| data += '=' * (4 - missing_padding) |
| |
| return data |
| |
| def _recognize_music_core(self, pdf_file_path: str, output_dir: str = None) -> str: |
| """Core music recognition function""" |
| audiveris = "/opt/audiveris/bin/Audiveris" |
| |
| if output_dir is None: |
| output_dir = "/tmp" |
| |
| |
| os.makedirs(output_dir, exist_ok=True) |
| try: |
| os.chmod(output_dir, 0o755) |
| except Exception as e: |
| print(f"Warning: Could not set permissions for {output_dir}: {e}") |
| |
| if not self._is_file_accessible(output_dir): |
| raise Exception(f"Output directory access denied: {output_dir}") |
| |
| |
| if not os.path.exists(pdf_file_path): |
| raise Exception(f"Input PDF file not found: {pdf_file_path}") |
| |
| pdf_file_name = os.path.basename(pdf_file_path) |
| pdf_name_without_ext = os.path.splitext(pdf_file_name)[0] |
| |
| |
| possible_extensions = [".mxl", ".xml", ".musicxml"] |
| output_files = [os.path.join(output_dir, f"{pdf_name_without_ext}{ext}") for ext in possible_extensions] |
|
|
| cmd = [ |
| audiveris, "-batch", "-export", "-output", output_dir, pdf_file_path |
| ] |
|
|
| print(f"Running Audiveris command: {' '.join(cmd)}") |
| result = subprocess.run(cmd, capture_output=True, text=True) |
| |
| print(f"Audiveris stdout: {result.stdout}") |
| print(f"Audiveris stderr: {result.stderr}") |
| print(f"Audiveris return code: {result.returncode}") |
| |
| |
| if os.path.exists(output_dir): |
| files_in_output = os.listdir(output_dir) |
| print(f"Files in output directory: {files_in_output}") |
| |
| |
| existing_output = None |
| for output_file in output_files: |
| if os.path.exists(output_file): |
| existing_output = output_file |
| break |
| |
| if existing_output: |
| print(f"Found output file: {existing_output}") |
| return existing_output |
| |
| |
| error_msg = f"Audiveris processing failed.\n" |
| error_msg += f"Return code: {result.returncode}\n" |
| error_msg += f"Stdout: {result.stdout}\n" |
| error_msg += f"Stderr: {result.stderr}\n" |
| error_msg += f"Expected files: {output_files}\n" |
| error_msg += f"Files in output dir: {os.listdir(output_dir) if os.path.exists(output_dir) else 'Directory does not exist'}\n" |
| |
| raise Exception(error_msg) |
| |
| def _test_audiveris(self): |
| """Test if Audiveris is properly installed""" |
| audiveris = "/opt/audiveris/bin/Audiveris" |
| |
| if not os.path.exists(audiveris): |
| print(f"β οΈ Warning: Audiveris not found at {audiveris}") |
| return False |
| |
| try: |
| |
| result = subprocess.run([audiveris, "-help"], capture_output=True, text=True, timeout=10) |
| if "Audiveris" in result.stdout or "Audiveris" in result.stderr: |
| print("β
Audiveris installation verified") |
| return True |
| else: |
| print(f"β οΈ Warning: Audiveris may not be working properly") |
| print(f"Output: {result.stdout}") |
| print(f"Error: {result.stderr}") |
| return False |
| except Exception as e: |
| print(f"β οΈ Warning: Could not test Audiveris: {e}") |
| return False |
| |
| |
| mcp_server = MusicRecognitionMCPServer(["/tmp", "/app/uploads", "/app/output"]) |
|
|
| def recognize_music_gradio(pdf_file): |
| """Gradio wrapper for music recognition""" |
| try: |
| print(f"Processing file: {pdf_file.name}") |
| result = mcp_server.recognize_music_tool(f"file://{pdf_file.name}") |
| |
| if result.get("isError"): |
| error_msg = result["content"][0]["text"] |
| print(f"Error in music recognition: {error_msg}") |
| return None |
| |
| |
| response_text = result["content"][0]["text"] |
| print(f"Response text: {response_text}") |
| |
| if "musicxml://" in response_text: |
| file_id = response_text.split("musicxml://")[1].split("`")[0] |
| print(f"Extracted file ID: {file_id}") |
| |
| if file_id in mcp_server.processed_files: |
| file_info = mcp_server.processed_files[file_id] |
| output_path = file_info["output_path"] |
| print(f"Output path from cache: {output_path}") |
| |
| if os.path.exists(output_path): |
| print(f"β
File exists: {output_path}") |
| return output_path |
| else: |
| print(f"β File not found: {output_path}") |
| |
| |
| pdf_basename = os.path.splitext(os.path.basename(pdf_file.name))[0] |
| possible_files = [ |
| f"/tmp/{pdf_basename}.mxl", |
| f"/tmp/{pdf_basename}.xml", |
| f"/tmp/{pdf_basename}.musicxml" |
| ] |
| |
| for file_path in possible_files: |
| print(f"Checking: {file_path}") |
| if os.path.exists(file_path): |
| print(f"β
Found file: {file_path}") |
| return file_path |
| |
| print("β No output file found in any expected location") |
| print(f"Files in /tmp: {os.listdir('/tmp') if os.path.exists('/tmp') else 'Directory not found'}") |
| return None |
| |
| except Exception as e: |
| print(f"Exception in Gradio wrapper: {str(e)}") |
| import traceback |
| traceback.print_exc() |
| return None |
|
|
| |
| custom_css = """ |
| .gradio-container { |
| max-width: 1200px !important; |
| margin: auto !important; |
| } |
| |
| .main-header { |
| text-align: center; |
| background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); |
| color: white; |
| padding: 2rem; |
| border-radius: 15px; |
| margin-bottom: 2rem; |
| box-shadow: 0 8px 32px rgba(0,0,0,0.1); |
| } |
| |
| .feature-card { |
| background: white; |
| border-radius: 12px; |
| padding: 1.5rem; |
| margin: 1rem 0; |
| box-shadow: 0 4px 16px rgba(0,0,0,0.1); |
| border-left: 4px solid #667eea; |
| } |
| |
| .upload-area { |
| border: 2px dashed #667eea; |
| border-radius: 12px; |
| padding: 2rem; |
| text-align: center; |
| background: linear-gradient(135deg, #f5f7fa 0%, #c3cfe2 100%); |
| transition: all 0.3s ease; |
| } |
| |
| .upload-area:hover { |
| border-color: #764ba2; |
| background: linear-gradient(135deg, #e3f2fd 0%, #bbdefb 100%); |
| } |
| |
| .status-success { |
| background: linear-gradient(135deg, #4caf50 0%, #45a049 100%); |
| color: white; |
| padding: 1rem; |
| border-radius: 8px; |
| margin: 1rem 0; |
| } |
| |
| .status-error { |
| background: linear-gradient(135deg, #f44336 0%, #d32f2f 100%); |
| color: white; |
| padding: 1rem; |
| border-radius: 8px; |
| margin: 1rem 0; |
| } |
| |
| .info-box { |
| background: linear-gradient(135deg, #e3f2fd 0%, #bbdefb 100%); |
| border-radius: 8px; |
| padding: 1rem; |
| margin: 1rem 0; |
| border-left: 4px solid #2196f3; |
| } |
| """ |
|
|
| def create_gradio_interface(): |
| with gr.Blocks(css=custom_css, title="πΌ Audiveris Music Score Recognition", theme=gr.themes.Soft()) as interface: |
| |
| gr.HTML(""" |
| <div class="main-header"> |
| <h1>πΌ Audiveris Music Score Recognition</h1> |
| <p style="font-size: 1.2em; margin-top: 1rem; opacity: 0.9;"> |
| Transform your PDF music scores into editable MusicXML files using advanced AI recognition |
| </p> |
| </div> |
| """) |
| |
| |
| with gr.Row(): |
| with gr.Column(scale=1): |
| gr.HTML(""" |
| <div class="feature-card"> |
| <h3>β¨ Features</h3> |
| <ul style="line-height: 1.8;"> |
| <li>π΅ High-accuracy music notation recognition</li> |
| <li>π PDF to MusicXML conversion</li> |
| <li>πΉ Supports complex musical scores</li> |
| <li>β‘ Fast processing with Audiveris engine</li> |
| <li>πΎ Downloadable results</li> |
| </ul> |
| </div> |
| """) |
| |
| gr.HTML(""" |
| <div class="info-box"> |
| <h4>π How to use:</h4> |
| <ol style="line-height: 1.6;"> |
| <li>Upload your PDF music score</li> |
| <li>Click "π΅ Convert to MusicXML"</li> |
| <li>Wait for processing to complete</li> |
| <li>Download your MusicXML file</li> |
| </ol> |
| </div> |
| """) |
| |
| with gr.Column(scale=2): |
| |
| gr.HTML("<h3 style='text-align: center; color: #667eea;'>π Upload Your Music Score</h3>") |
| |
| pdf_input = gr.File( |
| file_types=[".pdf"], |
| label="Select PDF File", |
| file_count="single", |
| height=200, |
| elem_classes=["upload-area"] |
| ) |
| |
| |
| convert_btn = gr.Button( |
| "π΅ Convert to MusicXML", |
| variant="primary", |
| size="lg", |
| scale=1 |
| ) |
| |
| |
| status_display = gr.HTML(visible=False) |
| progress_bar = gr.Progress() |
| |
| |
| gr.HTML("<h3 style='text-align: center; color: #667eea; margin-top: 2rem;'>π₯ Download Results</h3>") |
| |
| output_file = gr.File( |
| label="MusicXML Output", |
| visible=False, |
| height=100 |
| ) |
| |
| |
| processing_info = gr.Textbox( |
| label="Processing Details", |
| lines=8, |
| visible=False, |
| interactive=False |
| ) |
| |
| |
| gr.HTML(""" |
| <div style="text-align: center; margin-top: 3rem; padding: 2rem; background: #f8f9fa; border-radius: 12px;"> |
| <p style="color: #666; margin: 0;"> |
| Powered by <strong>Audiveris</strong> β’ Built with β€οΈ using Gradio |
| </p> |
| <p style="color: #888; font-size: 0.9em; margin-top: 0.5rem;"> |
| For best results, use high-quality PDF scans with clear musical notation |
| </p> |
| </div> |
| """) |
| |
| |
| def process_with_feedback(pdf_file, progress=gr.Progress()): |
| if pdf_file is None: |
| return ( |
| gr.HTML("<div class='status-error'>β Please upload a PDF file first!</div>", visible=True), |
| None, |
| gr.Textbox(visible=False), |
| gr.File(visible=False) |
| ) |
| |
| try: |
| |
| progress(0.1, desc="π Analyzing PDF file...") |
| |
| status_html = """ |
| <div class='status-success'> |
| <h4>π Processing your music score...</h4> |
| <p>File: <strong>{}</strong></p> |
| <p>Size: <strong>{:.2f} MB</strong></p> |
| <p>Please wait while Audiveris analyzes your score...</p> |
| </div> |
| """.format( |
| pdf_file.name.split('/')[-1], |
| os.path.getsize(pdf_file.name) / (1024*1024) |
| ) |
| |
| progress(0.3, desc="π΅ Running Audiveris recognition...") |
| |
| |
| result_file = recognize_music_gradio(pdf_file) |
| |
| progress(0.9, desc="β
Finalizing results...") |
| |
| if result_file and os.path.exists(result_file): |
| |
| success_html = """ |
| <div class='status-success'> |
| <h4>β
Conversion completed successfully!</h4> |
| <p>π Output: <strong>{}</strong></p> |
| <p>π Size: <strong>{:.2f} KB</strong></p> |
| <p>π Your MusicXML file is ready for download!</p> |
| </div> |
| """.format( |
| os.path.basename(result_file), |
| os.path.getsize(result_file) / 1024 |
| ) |
| |
| |
| details = f"""β
CONVERSION SUCCESSFUL |
| |
| π Input File: {pdf_file.name.split('/')[-1]} |
| π Input Size: {os.path.getsize(pdf_file.name) / (1024*1024):.2f} MB |
| |
| π΅ Output File: {os.path.basename(result_file)} |
| π Output Size: {os.path.getsize(result_file) / 1024:.2f} KB |
| |
| β±οΈ Processing completed at: {datetime.now().strftime("%Y-%m-%d %H:%M:%S")} |
| |
| πΌ Your PDF music score has been successfully converted to MusicXML format! |
| You can now download the file and use it in music notation software like MuseScore, Finale, or Sibelius.""" |
| |
| progress(1.0, desc="π Complete!") |
| |
| return ( |
| gr.HTML(success_html, visible=True), |
| gr.File(result_file, visible=True), |
| gr.Textbox(details, visible=True), |
| gr.File(visible=True) |
| ) |
| else: |
| |
| error_html = """ |
| <div class='status-error'> |
| <h4>β Conversion failed</h4> |
| <p>The music recognition process encountered an error.</p> |
| <p>Please check that your PDF contains clear musical notation and try again.</p> |
| </div> |
| """ |
| |
| error_details = f"""β CONVERSION FAILED |
| |
| π Input File: {pdf_file.name.split('/')[-1]} |
| π Input Size: {os.path.getsize(pdf_file.name) / (1024*1024):.2f} MB |
| |
| β οΈ Error: No output file was generated by Audiveris |
| β±οΈ Failed at: {datetime.now().strftime("%Y-%m-%d %H:%M:%S")} |
| |
| π‘ Troubleshooting tips: |
| β’ Ensure your PDF contains clear, high-quality musical notation |
| β’ Check that the PDF is not password-protected |
| β’ Try with a different PDF file |
| β’ Make sure the musical notation is not handwritten""" |
| |
| return ( |
| gr.HTML(error_html, visible=True), |
| None, |
| gr.Textbox(error_details, visible=True), |
| gr.File(visible=False) |
| ) |
| |
| except Exception as e: |
| |
| error_html = f""" |
| <div class='status-error'> |
| <h4>β Processing Error</h4> |
| <p>An unexpected error occurred: <code>{str(e)}</code></p> |
| <p>Please try again or contact support if the problem persists.</p> |
| </div> |
| """ |
| |
| error_details = f"""β PROCESSING ERROR |
| |
| π Input File: {pdf_file.name.split('/')[-1] if pdf_file else 'Unknown'} |
| β οΈ Error: {str(e)} |
| β±οΈ Failed at: {datetime.now().strftime("%Y-%m-%d %H:%M:%S")} |
| |
| π§ Technical Details: |
| {str(e)} |
| |
| Please try again with a different file or contact support.""" |
| |
| return ( |
| gr.HTML(error_html, visible=True), |
| None, |
| gr.Textbox(error_details, visible=True), |
| gr.File(visible=False) |
| ) |
| |
| |
| convert_btn.click( |
| fn=process_with_feedback, |
| inputs=[pdf_input], |
| outputs=[status_display, output_file, processing_info, output_file], |
| show_progress=True |
| ) |
| |
| |
| pdf_input.change( |
| fn=lambda: (gr.HTML(visible=False), gr.Textbox(visible=False), gr.File(visible=False)), |
| outputs=[status_display, processing_info, output_file] |
| ) |
| |
| return interface |
|
|
| |
| gradio_interface = create_gradio_interface() |
|
|
| |
|
|
| if __name__ == "__main__": |
| print("===== Application Startup at {} =====".format(datetime.now().strftime("%Y-%m-%d %H:%M:%S"))) |
| print() |
| |
| print("π΅ MCP-Compliant Music Recognition Service Starting...") |
| print("π± Gradio UI: http://localhost:7860") |
| |
| |
| try: |
| gradio_interface.launch( |
| server_name="0.0.0.0", |
| server_port=7860, |
| mcp_server=True, |
| share=True, |
| prevent_thread_lock=False |
| ) |
| except Exception as e: |
| print(f"β Failed to start Gradio interface: {e}") |
| import traceback |
| traceback.print_exc() |
|
|