Spaces:
Sleeping
Sleeping
| from fastapi import FastAPI, Form, Response | |
| from fastapi.middleware.cors import CORSMiddleware | |
| import pickle | |
| import re | |
| import io | |
| import zlib | |
| import base64 | |
| import statistics | |
| import traceback | |
| import sys | |
| # On importe le nécessaire, mais on chargera l'IA prudemment | |
| from bs4 import BeautifulSoup | |
| from midiutil import MIDIFile | |
| app = FastAPI() | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| # --- CONFIGURATION --- | |
| BRAIN_FILE = "solfa_brain.pkl" | |
| TEMPO = 100 | |
| VELOCITY = 100 | |
| # --- MOTEUR PRINCIPAL --- | |
| class MasterEngine: | |
| def __init__(self): | |
| self.model = None | |
| self.forbidden = set("bkghjqwxyzncuvBKGHJQWXYZNCUV1234567890") | |
| # Tentative de chargement sécurisée | |
| print("🛠️ Initialisation du moteur...") | |
| try: | |
| # On vérifie d'abord si sklearn est installé | |
| import sklearn | |
| print(f"ℹ️ Version Scikit-learn du serveur: {sklearn.__version__}") | |
| with open(BRAIN_FILE, "rb") as f: | |
| self.model = pickle.load(f) | |
| print("✅ Cerveau IA chargé avec succès !") | |
| except Exception as e: | |
| print(f"⚠️ AVERTISSEMENT : Le cerveau IA n'a pas pu être chargé.") | |
| print(f"⚠️ Raison : {e}") | |
| print("⚠️ -> Passage automatique en mode ALGORITHMIQUE STRICT (Sans IA)") | |
| self.model = None | |
| # Mapping Solfège | |
| self.chromatic_map = { | |
| 'd': 0, 'di': 1, 'de': 11, 'r': 2, 'ri': 3, 'ra': 1, 'm': 4, 'ma': 3, | |
| 'f': 5, 'fi': 6, 's': 7, 'si': 8, 'se': 6, 'l': 9, 'li': 10, 'la': 8, 't': 11, 'ta': 10 | |
| } | |
| self.key_signatures = { | |
| 'C': 60, 'D': 62, 'E': 64, 'F': 65, 'G': 67, 'A': 69, 'B': 71, | |
| 'CB': 59, 'DB': 61, 'EB': 63, 'GB': 66, 'AB': 68, 'BB': 70, | |
| 'F#': 66, 'C#': 61 | |
| } | |
| def extract_features(self, text): | |
| clean = text.lower().replace(" ", "") | |
| length = len(clean) | |
| if length == 0: return [0,0,0,0,0] | |
| notes = len(re.findall(r"[drmfslt]", clean)) | |
| seps = len(re.findall(r"[:|]", clean)) | |
| mods = len(re.findall(r"[.,'\-]", clean)) | |
| bad = 1 if any(c in self.forbidden for c in clean) else 0 | |
| density = (notes + seps + mods) / length | |
| return [notes/length, seps/length, mods/length, bad, density] | |
| def is_music_ai(self, text): | |
| """Utilise l'IA si dispo, sinon renvoie False""" | |
| if not self.model: return False | |
| if "dia" in text.lower() and len(text) < 20: return False | |
| try: | |
| feats = self.extract_features(text) | |
| return self.model.predict([feats])[0] == 1 | |
| except: | |
| return False | |
| def is_music_fallback(self, text): | |
| """ALGO STRICT : Utilisé si l'IA plante ou est absente""" | |
| clean = text.lower() | |
| # 1. Liste noire de mots malgaches courants (Paroles) | |
| bad_words = ["dia", "fa", "ny", "ho", "tsy", "misy", "hira", "ffpm", "p.", "andri"] | |
| if any(w in clean.split() for w in bad_words): return False | |
| # 2. Vérification des notes | |
| notes = len(re.findall(r"[drmfslt]", clean)) | |
| seps = len(re.findall(r"[:|]", clean)) # Séparateurs de mesure | |
| # Une ligne de solfa DOIT avoir des séparateurs et au moins 3 notes | |
| return notes >= 3 and seps >= 1 | |
| def clean_rhythm_text(self, text): | |
| t = text.replace('|', ':').replace('!', ':').replace('¦', ':') | |
| t = re.sub(r"([a-zA-Z',\.])\s+([a-zA-Z',\.])", r"\1:\2", t) | |
| t = re.sub(r":+", ":", t) | |
| return t | |
| def get_notes_from_beat(self, beat_str, voice_idx, root_note): | |
| if not beat_str: return [] | |
| sub_units = [] | |
| if ".," in beat_str: | |
| parts = beat_str.split(".,") | |
| if len(parts) >= 2: sub_units = [(parts[0], 0.75), (parts[1], 0.25)] | |
| elif "." in beat_str: | |
| raw = [x for x in beat_str.split('.') if x] | |
| if raw: | |
| dur = 1.0 / len(raw) | |
| for r in raw: sub_units.append((r, dur)) | |
| else: | |
| sub_units = [(beat_str, 1.0)] | |
| parsed_notes = [] | |
| for token_str, duration in sub_units: | |
| clean_tok = token_str.replace(":", "").strip() | |
| if not clean_tok or '-' in clean_tok: | |
| if '-' in clean_tok: parsed_notes.append(('SUSTAIN', duration)) | |
| continue | |
| check_str = clean_tok.lower() | |
| found_note_val = None | |
| base_note_str = "" | |
| candidates = sorted(self.chromatic_map.keys(), key=len, reverse=True) | |
| for cand in candidates: | |
| if check_str.startswith(cand): | |
| found_note_val = self.chromatic_map[cand] | |
| base_note_str = cand | |
| break | |
| if found_note_val is not None: | |
| remaining = check_str[len(base_note_str):] | |
| octave_shift = remaining.count("'") - remaining.count(",") | |
| if voice_idx >= 2: octave_shift -= 1 | |
| midi_val = root_note + found_note_val + (octave_shift * 12) | |
| parsed_notes.append((midi_val, duration)) | |
| return parsed_notes | |
| def safe_threshold(self, lines_y): | |
| try: | |
| if len(lines_y) < 2: return 35.0 | |
| gaps = [lines_y[i+1] - lines_y[i] for i in range(len(lines_y)-1)] | |
| valid = [g for g in gaps if g > 5] | |
| if not valid: return 35.0 | |
| return statistics.median(valid) * 1.6 | |
| except: return 35.0 | |
| def generate_midi_bytes(self, html_content): | |
| try: | |
| # 1. DECOMPRESSION & PARSING | |
| match = re.search(r'const b="([^"]+)"', html_content) | |
| soup = None | |
| if match: | |
| try: | |
| svg = zlib.decompress(base64.b64decode(match.group(1))).decode('utf-8') | |
| soup = BeautifulSoup(svg, 'html.parser') | |
| except: pass | |
| if not soup: | |
| soup = BeautifulSoup(html_content, 'html.parser') | |
| raw_items = [] | |
| for t in soup.find_all('text'): | |
| txt = t.get_text() | |
| if txt.strip(): | |
| try: | |
| x = float(t.get('x', '0').split()[0]) | |
| y = float(t.get('y', '0').split()[0]) | |
| raw_items.append({'x': x, 'y': y, 'text': txt}) | |
| except: pass | |
| # Fallback Regex si parsing HTML échoue | |
| if not raw_items: | |
| raw_matches = re.findall(r'<text[^>]*y="([\d\.]+)"[^>]*>([^<]+)</text>', str(soup)) | |
| for y_str, txt in raw_matches: | |
| raw_items.append({'x': 0, 'y': float(y_str), 'text': txt}) | |
| if not raw_items: return None | |
| # 2. RECONSTITUTION LIGNES | |
| lines_map = {} | |
| for item in raw_items: | |
| found = False | |
| for ky in lines_map: | |
| if abs(ky - item['y']) < 4: | |
| lines_map[ky].append(item); found=True; break | |
| if not found: lines_map[item['y']] = [item] | |
| all_lines_obj = [] | |
| full_text_blob = "" | |
| for ky in sorted(lines_map.keys()): | |
| row = sorted(lines_map[ky], key=lambda i: i['x']) | |
| txt_parts = [] | |
| last_x = -100 | |
| for i in row: | |
| if i['x'] - last_x > 12: txt_parts.append(" ") | |
| txt_parts.append(i['text']) | |
| last_x = i['x'] + len(i['text'])*5 | |
| full_line = "".join(txt_parts).strip() | |
| all_lines_obj.append({'y': ky, 'text': full_line}) | |
| full_text_blob += " " + full_line | |
| # 3. TONALITE | |
| root_note = 60 | |
| m = re.search(r"D[oô]\s*dia\s*([A-G][b#]?)", full_text_blob, re.IGNORECASE) | |
| if m: | |
| k = m.group(1).upper() | |
| root_note = self.key_signatures.get(k, 60) | |
| # 4. FILTRAGE | |
| valid_lines = [] | |
| # Strategie : On privilégie le Fallback algorithmique car il est plus sûr si l'IA déconne | |
| # L'IA sert de validation supplémentaire si elle est là | |
| for obj in all_lines_obj: | |
| txt = obj['text'] | |
| is_ok = False | |
| # Test Algorithmique d'abord (Rapide et sûr) | |
| if self.is_music_fallback(txt): | |
| is_ok = True | |
| # Test IA si chargé (pour récupérer des cas limites) | |
| elif self.model and self.is_music_ai(txt): | |
| is_ok = True | |
| if is_ok: | |
| valid_lines.append(obj) | |
| if not valid_lines: return None | |
| # 5. GROUPEMENT | |
| threshold = self.safe_threshold([l['y'] for l in valid_lines]) | |
| systems = [] | |
| current = [] | |
| last_y = -9999 | |
| for line in valid_lines: | |
| if (line['y'] - last_y) > threshold and current: | |
| systems.append(current); current = [] | |
| current.append(line['text']) | |
| last_y = line['y'] | |
| if len(current) == 4: systems.append(current); current = [] | |
| if current: systems.append(current) | |
| # 6. MIDI | |
| midi = MIDIFile(1) | |
| midi.addTempo(0, 0, TEMPO) | |
| cursor = 0.0 | |
| has_notes = False | |
| for sys in systems: | |
| max_dur = 0 | |
| for v_idx, text in enumerate(sys): | |
| channel = 0 | |
| rhythm_text = self.clean_rhythm_text(text) | |
| beats = rhythm_text.split(':') | |
| trk_cursor = cursor | |
| active_notes = [] | |
| for beat in beats: | |
| if not beat.strip(): trk_cursor += 1.0; continue | |
| notes_in_beat = self.get_notes_from_beat(beat, v_idx % 4, root_note) | |
| if not notes_in_beat: | |
| for n in active_notes: | |
| midi.addNote(0, channel, int(n[0]), n[1], n[2], VELOCITY) | |
| has_notes = True | |
| active_notes = [] | |
| trk_cursor += 1.0; continue | |
| for val, dur in notes_in_beat: | |
| if val == 'SUSTAIN': | |
| if active_notes: active_notes[-1][2] += dur | |
| else: | |
| for n in active_notes: | |
| midi.addNote(0, channel, int(n[0]), n[1], n[2], VELOCITY) | |
| has_notes = True | |
| active_notes = [] | |
| active_notes.append([val, trk_cursor, dur]) | |
| trk_cursor += dur | |
| for n in active_notes: | |
| midi.addNote(0, channel, int(n[0]), n[1], n[2], VELOCITY) | |
| has_notes = True | |
| if trk_cursor - cursor > max_dur: max_dur = trk_cursor - cursor | |
| cursor += max_dur + 0.2 | |
| if has_notes: | |
| buffer = io.BytesIO() | |
| midi.writeFile(buffer) | |
| return buffer.getvalue() | |
| return None | |
| except Exception as e: | |
| traceback.print_exc() | |
| return None | |
| # Initialisation du moteur (chargement sécurisé) | |
| engine = MasterEngine() | |
| async def convert_solfa(html_code: str = Form(...)): | |
| midi_data = engine.generate_midi_bytes(html_code) | |
| if midi_data: | |
| return Response(content=midi_data, media_type="audio/midi") | |
| else: | |
| return Response(status_code=400, content="Echec generation") |