Spaces:
Running
Running
| import os | |
| import subprocess | |
| import shutil | |
| import uuid | |
| import config | |
| class JoernParser: | |
| def __init__(self): | |
| self.temp_dir = config.TEMP_DIR | |
| os.makedirs(self.temp_dir, exist_ok=True) | |
| def parse_code(self, source_code: str): | |
| """ | |
| Takes raw C code string, saves it, runs Joern, and returns the GraphML path. | |
| """ | |
| # 1. Create a unique session ID for this request | |
| session_id = str(uuid.uuid4()) | |
| session_path = os.path.join(self.temp_dir, session_id) | |
| os.makedirs(session_path, exist_ok=True) | |
| # 2. Write the source code to a file | |
| src_file_path = os.path.join(session_path, "test.c") | |
| with open(src_file_path, "w") as f: | |
| f.write(source_code) | |
| # 3. Define output paths | |
| cpg_out_path = os.path.join(session_path, "cpg.bin") | |
| graph_out_path = os.path.join(session_path, "export.graphml") | |
| try: | |
| # 4. Run Joern Parse (C -> Binary CPG) | |
| cmd_parse = f"joern-parse {src_file_path} --output {cpg_out_path}" | |
| subprocess.run(cmd_parse, shell=True, check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) | |
| # 5. Run Joern Export (Binary CPG -> GraphML) | |
| cmd_export = f"joern-export {cpg_out_path} --repr all --format graphml --out {session_path}/export" | |
| subprocess.run(cmd_export, shell=True, check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) | |
| # 6. Locate the exported file (Prioritize export.xml) | |
| export_dir = os.path.join(session_path, "export") | |
| target_xml = os.path.join(export_dir, "export.xml") | |
| # CHECK 1: Explicitly look for export.xml | |
| if os.path.exists(target_xml): | |
| shutil.move(target_xml, graph_out_path) | |
| return graph_out_path, session_path | |
| # CHECK 2: Fallback scan (only if export.xml is missing) | |
| for file in os.listdir(export_dir): | |
| if file.endswith(".xml") or file.endswith(".graphml"): | |
| shutil.move(os.path.join(export_dir, file), graph_out_path) | |
| return graph_out_path, session_path | |
| raise Exception("GraphML file not found after export.") | |
| except subprocess.CalledProcessError as e: | |
| print(f"Joern Error: {e}") | |
| self.clean_up(session_path) | |
| raise e | |
| def clean_up(self, session_path): | |
| """Deletes the temporary files for this session.""" | |
| if os.path.exists(session_path): | |
| shutil.rmtree(session_path) | |