Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| """ | |
| 한 장 이상의 이미지에 대해 /analyze v1 과 동일한 파이프라인을 돌리고 | |
| `<stem>_analyze_response.json`, `<stem>_analyze_summary.txt` 를 씁니다. | |
| 여러 장이면 업로드 순서와 같이 한 요청으로 병합합니다. | |
| 프로젝트 루트에서: | |
| python dump_sample_analyze_artifacts.py sample_oneline_0.png | |
| python dump_sample_analyze_artifacts.py a.png b.png c.png | |
| python dump_sample_analyze_artifacts.py sample_imgs/sample_c.jpeg --return-debug | |
| """ | |
| from __future__ import annotations | |
| import argparse | |
| import json | |
| import sys | |
| import uuid | |
| from pathlib import Path | |
| from typing import Any, Dict, List | |
| import cv2 | |
| ROOT = Path(__file__).resolve().parent | |
| SAMPLE_IMGS = ROOT / "sample_imgs" | |
| ALLOWED_EXT = {".jpg", ".jpeg", ".png", ".webp"} | |
| if str(ROOT) not in sys.path: | |
| sys.path.insert(0, str(ROOT)) | |
| import main as stella_main # noqa: E402 | |
| def _resolve_image_path(name_or_path: str) -> Path: | |
| raw = Path(name_or_path) | |
| if raw.is_file(): | |
| return raw.resolve() | |
| candidate = SAMPLE_IMGS / raw.name | |
| if candidate.is_file(): | |
| return candidate.resolve() | |
| raise FileNotFoundError( | |
| f"이미지를 찾을 수 없습니다: {name_or_path!r} " | |
| f"(루트 상대 경로 또는 {SAMPLE_IMGS.name}/ 안의 파일 이름을 주세요.)" | |
| ) | |
| def _load_decoded(path: Path) -> Dict[str, Any]: | |
| ext = path.suffix.lower() | |
| if ext not in ALLOWED_EXT: | |
| raise ValueError(f"지원하지 않는 확장자입니다: {ext} (허용: {sorted(ALLOWED_EXT)})") | |
| image = cv2.imread(str(path), cv2.IMREAD_COLOR) | |
| if image is None: | |
| raise ValueError(f"이미지를 디코딩할 수 없습니다: {path}") | |
| h, w = image.shape[:2] | |
| return { | |
| "filename": path.name, | |
| "bytes": int(path.stat().st_size), | |
| "width": int(w), | |
| "height": int(h), | |
| "image": image, | |
| } | |
| def _default_score_context() -> Dict[str, Any]: | |
| return { | |
| "clef": "treble", | |
| "key_signature": {"fifths": -1}, | |
| "time_signature": "4/4", | |
| "tempo_bpm_reference": None, | |
| "divisions": 4, | |
| } | |
| def _build_analyze_response( | |
| decoded: List[Dict[str, Any]], | |
| score_context: Dict[str, Any], | |
| *, | |
| return_debug: bool, | |
| ) -> Dict[str, Any]: | |
| body = stella_main.build_analyze_response_v1( | |
| decoded, | |
| score_context, | |
| return_debug=return_debug, | |
| ) | |
| body["request_id"] = str(uuid.uuid4()) | |
| return body | |
| def _format_event_table(events: List[Dict[str, Any]]) -> str: | |
| header = f"{'idx':>3} {'onset':>5} {'dur':>3} {'step':>4} {'oct':>3} {'alt':>3} type" | |
| sep = "-" * 50 | |
| lines = [header, sep] | |
| for i, ev in enumerate(events, start=1): | |
| onset = ev.get("onset_div", "") | |
| dur = ev.get("duration_div", "") | |
| typ = ev.get("type", "") | |
| if typ == "rest": | |
| lines.append(f"{i:>3} {onset:>5} {dur:>3} {'':>4} {'':>3} {'':>3} {typ}") | |
| else: | |
| step = ev.get("step", "") | |
| oct_ = ev.get("octave", "") | |
| alt = ev.get("alter", "") | |
| lines.append(f"{i:>3} {onset:>5} {dur:>3} {str(step):>4} {str(oct_):>3} {str(alt):>3} {typ}") | |
| return "\n".join(lines) | |
| def build_summary_text(response: Dict[str, Any]) -> str: | |
| meta = response["meta"] | |
| tl = response["timeline"] | |
| sc = response["score_context"] | |
| mel = response["melody"] | |
| lines: List[str] = [ | |
| f"request_id: {response['request_id']}", | |
| f"source: {json.dumps(response.get('source'), ensure_ascii=False)}", | |
| f"warnings: {json.dumps(response['warnings'], ensure_ascii=False)}", | |
| f"score_context: {json.dumps(sc, ensure_ascii=False)}", | |
| f"segment_map: {json.dumps(response.get('segment_map'), ensure_ascii=False)}", | |
| f"meta.preprocess: {json.dumps(meta.get('preprocess'), ensure_ascii=False)}", | |
| f"pipeline_mode: {meta.get('pipeline_mode')}", | |
| f"timeline: {json.dumps({'divisions': tl.get('divisions'), 'time_signature': tl.get('time_signature')}, ensure_ascii=False)}", | |
| "", | |
| f"--- melody ({mel.get('voice_id')}) reduction={mel.get('reduction_rule')} events={len(mel.get('events', []))} ---", | |
| _format_event_table(mel.get("events", [])), | |
| "", | |
| ] | |
| while lines and lines[-1] == "": | |
| lines.pop() | |
| return "\n".join(lines) + "\n" | |
| def _output_stem(paths: List[Path]) -> str: | |
| if len(paths) == 1: | |
| return paths[0].stem | |
| return "__".join(p.stem for p in paths) | |
| def main() -> None: | |
| parser = argparse.ArgumentParser( | |
| description="이미지 1장 이상에 대해 analyze v1 파이프라인 결과를 JSON·요약 텍스트로 저장합니다." | |
| ) | |
| parser.add_argument( | |
| "filenames", | |
| nargs="+", | |
| metavar="IMAGE", | |
| help=( | |
| f"이미지 경로 1개 이상 (순서 = 악보 이어짐). " | |
| f"파일 이름만 주면 {SAMPLE_IMGS.name}/ 에서 찾습니다." | |
| ), | |
| ) | |
| parser.add_argument( | |
| "--out-dir", | |
| type=Path, | |
| default=ROOT / "artifacts", | |
| help=f"출력 디렉터리 (기본: {ROOT.name}/artifacts)", | |
| ) | |
| parser.add_argument( | |
| "--score-context-json", | |
| type=str, | |
| default=None, | |
| help="JSON 문자열 (기본: treble, fifths=-1, 4/4)", | |
| ) | |
| parser.add_argument("--return-debug", action="store_true") | |
| args = parser.parse_args() | |
| paths = [_resolve_image_path(name) for name in args.filenames] | |
| stem = _output_stem(paths) | |
| out_dir = args.out_dir.resolve() | |
| out_dir.mkdir(parents=True, exist_ok=True) | |
| json_path = out_dir / f"{stem}_analyze_response.json" | |
| txt_path = out_dir / f"{stem}_analyze_summary.txt" | |
| if args.score_context_json: | |
| score_ctx = json.loads(args.score_context_json) | |
| if not isinstance(score_ctx, dict): | |
| raise ValueError("score_context JSON must be an object") | |
| else: | |
| score_ctx = _default_score_context() | |
| decoded = [_load_decoded(p) for p in paths] | |
| response = _build_analyze_response( | |
| decoded, | |
| score_ctx, | |
| return_debug=args.return_debug, | |
| ) | |
| json_path.write_text(json.dumps(response, ensure_ascii=False, indent=2), encoding="utf-8") | |
| txt_path.write_text(build_summary_text(response), encoding="utf-8") | |
| print(f"Wrote {json_path}") | |
| print(f"Wrote {txt_path}") | |
| if __name__ == "__main__": | |
| try: | |
| main() | |
| except (FileNotFoundError, ValueError, json.JSONDecodeError, stella_main.SingleStaffAnalyzeError) as exc: | |
| print(f"error: {exc}", file=sys.stderr) | |
| sys.exit(1) | |