# -*- coding: utf-8 -*- """๐Ÿ”ฑ Graphify ์ง€์‹ ๊ทธ๋ž˜ํ”„ ์ƒ์„ฑ ๋ฐ ์‹œ๊ฐํ™” ๋นŒ๋” ์ œ๊ตญ ํ”„๋กœ์ ํŠธ(shadow_brain_core)์˜ ์ฝ”๋“œ ๊ตฌ์กฐ(AST)์™€ ๋งˆํฌ๋‹ค์šด ๋ฌธ์„œ๋ฅผ ๋ถ„์„ํ•˜์—ฌ ๊ตฌ์กฐ์  ์˜์กด๋ง์„ ๋‹ด์€ graph.json ๋ฐ ์›น ์‹œ๊ฐํ™” graph.html์„ ๋นŒ๋“œํ•ฉ๋‹ˆ๋‹ค. ๋นŒ๋“œ๋œ ํŒŒ์ผ์€ ๋กœ์ปฌ ๋ฐ R2 Live ์Šคํ† ๋ฆฌ์ง€์— ์—…๋กœ๋“œ๋˜์–ด AI์™€ ๋งˆ์™•๋‹˜๊ป˜ ์ง€๋„๋กœ ์ œ๊ณต๋ฉ๋‹ˆ๋‹ค. ์‹คํ–‰ ๋ฐฉ๋ฒ•: python scripts/build_graph.py [--upload] """ import os import sys import json import ast import argparse from pathlib import Path # UTF-8 ์ธ์ฝ”๋”ฉ ๊ฐ•์ œ sys.stdout.reconfigure(encoding='utf-8') class ProjectGraphBuilder: def __init__(self, root_dir: Path): self.root_dir = root_dir.resolve() self.nodes = {} self.edges = [] self.scanned_files = set() def add_node(self, node_id: str, label: str, node_type: str, details: dict = None): if node_id not in self.nodes: self.nodes[node_id] = { "id": node_id, "label": label, "type": node_type, "details": details or {} } def add_edge(self, source: str, target: str, rel_type: str): # ์ค‘๋ณต ์—ฃ์ง€ ๋ฐฉ์ง€ edge = {"source": source, "target": target, "type": rel_type} if edge not in self.edges: self.edges.append(edge) def scan_python_files(self): """Python ํŒŒ์ผ์˜ AST ํŒŒ์‹ฑ์„ ํ†ตํ•ด ํด๋ž˜์Šค, ํ•จ์ˆ˜, import ๊ด€๊ณ„ ๋ถ„์„""" for root, _, files in os.walk(self.root_dir): # ์ œ์™ธ ํด๋” ํ•„ํ„ฐ๋ง if any(p in root for p in [".git", "venv", "__pycache__", "node_modules", "dist", "build"]): continue for file in files: if not file.endswith(".py"): continue file_path = Path(root) / file rel_path = file_path.relative_to(self.root_dir).as_posix() self.scanned_files.add(rel_path) # ํŒŒ์ผ ๋…ธ๋“œ ์ถ”๊ฐ€ self.add_node(rel_path, file, "file", {"size": file_path.stat().st_size}) try: with open(file_path, "r", encoding="utf-8", errors="ignore") as f: code = f.read() # AST ๋ถ„์„ tree = ast.parse(code, filename=str(file_path)) self._analyze_ast(tree, rel_path) except Exception as e: print(f"โš ๏ธ AST ํŒŒ์‹ฑ ์‹คํŒจ [{rel_path}]: {e}") def _analyze_ast(self, tree: ast.AST, file_rel_path: str): """AST ๋…ธ๋“œ๋ฅผ ์ˆœํšŒํ•˜๋ฉฐ ํด๋ž˜์Šค, ํ•จ์ˆ˜, import ๊ด€๊ณ„ ์ •์˜""" for node in ast.walk(tree): # 1. Imports ๋ถ„์„ if isinstance(node, ast.Import): for alias in node.names: # ํ”„๋กœ์ ํŠธ ๋‚ด๋ถ€ ๋ชจ๋“ˆ๋กœ ์ถ”์ •๋˜๋Š” import๋งŒ ์—ฐ๊ฒฐ target_module = alias.name.split('.')[0] self._try_add_import_edge(file_rel_path, target_module) elif isinstance(node, ast.ImportFrom): if node.module: target_module = node.module.split('.')[0] self._try_add_import_edge(file_rel_path, target_module) # 2. ํด๋ž˜์Šค ์ •์˜ ๋ถ„์„ elif isinstance(node, ast.ClassDef): class_id = f"{file_rel_path}::{node.name}" self.add_node(class_id, node.name, "class", { "file": file_rel_path, "methods": [n.name for n in node.body if isinstance(n, ast.FunctionDef)] }) # ํŒŒ์ผ -> ํด๋ž˜์Šค ๋ถ€๋ชจ-์ž์‹ ์—ฐ๊ฒฐ self.add_edge(file_rel_path, class_id, "defines") # 3. ์ตœ์ƒ์œ„ ํ•จ์ˆ˜ ์ •์˜ ๋ถ„์„ elif isinstance(node, ast.FunctionDef): # ํด๋ž˜์Šค ๋ฉ”์„œ๋“œ๊ฐ€ ์•„๋‹Œ ๋…๋ฆฝ ํ•จ์ˆ˜๋งŒ ์ถ”๊ฐ€ (ํด๋ž˜์Šค ๋ฉ”์„œ๋“œ๋Š” ํด๋ž˜์Šค ๋…ธ๋“œ ์ƒ์„ธ ์ •๋ณด์— ํฌํ•จ) # ๋ถ€๋ชจ๊ฐ€ ClassDef์ธ์ง€ ๊ฐ„๋‹จํžˆ ํŒ๋‹จํ•˜๊ธฐ ์œ„ํ•ด walk๊ฐ€ ์•„๋‹Œ ๋‹จ๋‹จ๊ณ„ ๋งคํ•‘์ด ์ข‹์œผ๋‚˜, ๊ฐ„์†Œํ™”๋ฅผ ์œ„ํ•ด ์ˆ˜์ง‘ pass def _try_add_import_edge(self, source_file: str, target_module: str): """์ž„ํฌํŠธ ๋Œ€์ƒ ๋ชจ๋“ˆ์ด ํ”„๋กœ์ ํŠธ ๋‚ด๋ถ€์— ์‹ค์กดํ•˜๋Š” ํŒŒ์ผ์ธ์ง€ ํŒ๋ณ„ ํ›„ ๊ฐ„์„  ์—ฐ๊ฒฐ""" # 1. module_name.py ํ˜•ํƒœ ๊ฒ€์‚ฌ potential_py = f"{target_module}.py" potential_dir_py = f"{target_module}/__init__.py" for scanned in self.scanned_files: if scanned == potential_py or scanned.endswith("/" + potential_py) or scanned == potential_dir_py: self.add_edge(source_file, scanned, "imports") return def scan_markdown_docs(self): """ํ”„๋กœ์ ํŠธ ๋ฃจํŠธ ๋‚ด Markdown ๋ฌธ์„œ์™€ ๊ทธ ์•ˆ์˜ ์—ฐ๊ฒฐ๊ณ ๋ฆฌ(wiki ๋งํฌ, ํŒŒ์ผ ์ฐธ์กฐ) ๋ถ„์„""" for root, _, files in os.walk(self.root_dir): if any(p in root for p in [".git", "venv", "node_modules", ".agent"]): continue for file in files: if not file.endswith(".md"): continue file_path = Path(root) / file rel_path = file_path.relative_to(self.root_dir).as_posix() # ๋ฌธ์„œ ๋…ธ๋“œ ์ถ”๊ฐ€ self.add_node(rel_path, file, "document", {"size": file_path.stat().st_size}) try: with open(file_path, "r", encoding="utf-8", errors="ignore") as f: content = f.read() # ๋ฌธ์„œ ๋‚ด๋ถ€์˜ ํƒ€ ํŒŒ์ผ ์ฐธ์กฐ([[๋งํฌ]] ๋˜๋Š” ๋งˆํฌ๋‹ค์šด ํŒŒ์ผ ๋งํฌ) ํƒ์ƒ‰ # ๊ฐ„๋‹จํ•œ ํŒŒ์ผ๋ช… ๋งค์นญ for other_file in self.scanned_files: other_name = Path(other_file).name if other_name in content: self.add_edge(rel_path, other_file, "references") except Exception as e: print(f"โš ๏ธ ๋ฌธ์„œ ์Šค์บ” ์‹คํŒจ [{rel_path}]: {e}") def build(self) -> dict: self.scan_python_files() self.scan_markdown_docs() return { "nodes": list(self.nodes.values()), "links": self.edges } def generate_interactive_html(graph_data: dict) -> str: """D3.js ๊ธฐ๋ฐ˜์˜ ๋ฉ‹์ง„ Force-Directed Graph UI ํ…œํ”Œ๋ฆฟ ์ƒ์„ฑ""" data_json = json.dumps(graph_data, ensure_ascii=False, indent=2) html_template = f""" ๐Ÿ”ฑ Imperial Guild Knowledge Graph Portal

Node Details

์†Œ์Šค ์ฝ”๋“œ (File)
๋งˆํฌ๋‹ค์šด ๋ฌธ์„œ (Document)
๊ตฌ์กฐ์  ํด๋ž˜์Šค (Class)
""" return html_template def upload_to_r2_live(json_path: Path, html_path: Path): """R2 Live ๋ฒ„ํ‚ท์— ์ง€์‹ ๊ทธ๋ž˜ํ”„ ์ง€๋„ ์—…๋กœ๋“œ""" try: # shadow_brain_core/web_server.py์˜ R2 Live ๋กœ๋“œ ํŒจํ„ด ์ฐธ์กฐ core_dir = Path(__file__).parent.parent project_root = core_dir.parent sys.path.append(str(core_dir)) sys.path.append(str(project_root / "scripts" / "tools")) from cloud_tool import R2_ACCOUNTS, _get_bw_credential import boto3 from botocore.config import Config print("๐Ÿ”ฎ R2 Live ์ž๊ฒฉ์ฆ๋ช… ํ™•๋ณด ์‹œ์ž‘...") # live ์„ค์ • ์ถ”์ถœ live_cfg = R2_ACCOUNTS.get("live") if not live_cfg: print("โŒ R2 Live ๊ณ„์ •์ด cloud_tool.py์— ์ •์˜๋˜์–ด ์žˆ์ง€ ์•Š์Šต๋‹ˆ๋‹ค.") return account_id = _get_bw_credential(live_cfg["account_id"]) access_key = _get_bw_credential(live_cfg["access_key_id"]) secret_key = _get_bw_credential(live_cfg["secret_access_key"]) # ๐Ÿ”ฑ [Fix] R2_ACCOUNTS["live"] ์˜ ๋ฒ„ํ‚ท ํ‚ค๋Š” "bucket"(="cache") ์ด๋‹ค. # ์ž˜๋ชป๋œ ํ‚ค "bucket_name"(๊ธฐ๋ณธ "taemingames")์„ ์“ฐ๋ฉด ๊ณต๊ฐœ ๋„๋ฉ”์ธ # r2.taemingames.com(=cache ๋ฒ„ํ‚ท)๊ณผ ๋‹ค๋ฅธ ๋ฒ„ํ‚ท์— ์˜ฌ๋ผ๊ฐ€ 404 ๊ฐ€ ๋‚œ๋‹ค. bucket_name = live_cfg.get("bucket", "cache") if not all([account_id, access_key, secret_key]): print("โŒ R2 Live ์ž๊ฒฉ์ฆ๋ช…์ด ๋น„์–ด ์žˆ์–ด ์—…๋กœ๋“œ๋ฅผ ๊ฑด๋„ˆ๋œ๋‹ˆ๋‹ค.") return print("โšก R2 Live S3 ํด๋ผ์ด์–ธํŠธ ์ดˆ๊ธฐํ™” ์ค‘...") s3 = boto3.client( "s3", endpoint_url=f"https://{account_id}.r2.cloudflarestorage.com", aws_access_key_id=access_key, aws_secret_access_key=secret_key, config=Config(signature_version="s3v4") ) prefix = "cloud-obsidian/graph" # json ์—…๋กœ๋“œ print(f"๐Ÿš€ {json_path.name} ์—…๋กœ๋“œ ์ค‘ -> R2 Live/{prefix}/{json_path.name}") s3.upload_file( Filename=str(json_path), Bucket=bucket_name, Key=f"{prefix}/{json_path.name}", ExtraArgs={"ContentType": "application/json"} ) # html ์—…๋กœ๋“œ print(f"๐Ÿš€ {html_path.name} ์—…๋กœ๋“œ ์ค‘ -> R2 Live/{prefix}/{html_path.name}") s3.upload_file( Filename=str(html_path), Bucket=bucket_name, Key=f"{prefix}/{html_path.name}", ExtraArgs={"ContentType": "text/html"} ) print("โœ… R2 Live ์ง€์‹ ๊ทธ๋ž˜ํ”„ ์—…๋กœ๋“œ ์™„๋ฃŒ!") except ImportError: print("โš ๏ธ cloud_tool ๋˜๋Š” boto3 ๋ชจ๋“ˆ์„ ์ฐพ์„ ์ˆ˜ ์—†์–ด R2 ์—…๋กœ๋“œ๋ฅผ ์ƒ๋žตํ•ฉ๋‹ˆ๋‹ค.") except Exception as e: print(f"โš ๏ธ R2 Live ์—…๋กœ๋“œ ์‹คํŒจ: {e}") def main(): parser = argparse.ArgumentParser(description="๐Ÿ”ฑ Graphify ์ง€์‹ ๊ทธ๋ž˜ํ”„ ์—”์ง„") parser.add_argument("--upload", action="store_true", help="R2 Live ๋ฒ„ํ‚ท์— ์ž๋™ ์—…๋กœ๋“œ") args = parser.parse_args() # ์Šคํฌ๋ฆฝํŠธ ์‹คํ–‰ ์œ„์น˜๊ฐ€ scripts/ ์—ฌ๋„ ๋ถ€๋ชจ shadow_brain_core๋ฅผ ๊ฐ€๋ฆฌํ‚ค๋„๋ก ์„ค์ • script_dir = Path(__file__).parent.resolve() core_dir = script_dir.parent print(f"๐Ÿ”ฑ Graphify: [{core_dir}] ์†Œ์Šค ๋ถ„์„์„ ๊ฐœ์‹œํ•ฉ๋‹ˆ๋‹ค...") builder = ProjectGraphBuilder(core_dir) graph_data = builder.build() # ๋กœ์ปฌ ์ €์žฅ ๊ฒฝ๋กœ output_dir = core_dir / "static" / "graph" output_dir.mkdir(parents=True, exist_ok=True) json_path = output_dir / "graph.json" html_path = output_dir / "graph.html" # json ์“ฐ๊ธฐ with open(json_path, "w", encoding="utf-8") as f: json.dump(graph_data, f, ensure_ascii=False, indent=2) # html ์“ฐ๊ธฐ html_content = generate_interactive_html(graph_data) with open(html_path, "w", encoding="utf-8") as f: f.write(html_content) print(f"โœ… ๋ถ„์„ ์™„๋ฃŒ! ์ด ๋…ธ๋“œ: {len(graph_data['nodes'])}, ๊ฐ„์„ : {len(graph_data['links'])}") print(f"๐Ÿ’พ ๋กœ์ปฌ ๋ฐ์ดํ„ฐ ์ €์žฅ: {json_path}") print(f"๐Ÿ’พ ๋กœ์ปฌ ์‹œ๊ฐํ™” ์ €์žฅ: {html_path}") # ์—…๋กœ๋“œ ์ฒ˜๋ฆฌ if args.upload: upload_to_r2_live(json_path, html_path) if __name__ == "__main__": main()