import os import subprocess import shutil import uuid import config class JoernParser: def __init__(self): self.temp_dir = config.TEMP_DIR os.makedirs(self.temp_dir, exist_ok=True) def parse_code(self, source_code: str): """ Takes raw C code string, saves it, runs Joern, and returns the GraphML path. """ # 1. Create a unique session ID for this request session_id = str(uuid.uuid4()) session_path = os.path.join(self.temp_dir, session_id) os.makedirs(session_path, exist_ok=True) # 2. Write the source code to a file src_file_path = os.path.join(session_path, "test.c") with open(src_file_path, "w") as f: f.write(source_code) # 3. Define output paths cpg_out_path = os.path.join(session_path, "cpg.bin") graph_out_path = os.path.join(session_path, "export.graphml") try: # 4. Run Joern Parse (C -> Binary CPG) cmd_parse = f"joern-parse {src_file_path} --output {cpg_out_path}" subprocess.run(cmd_parse, shell=True, check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) # 5. Run Joern Export (Binary CPG -> GraphML) cmd_export = f"joern-export {cpg_out_path} --repr all --format graphml --out {session_path}/export" subprocess.run(cmd_export, shell=True, check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) # 6. Locate the exported file (Prioritize export.xml) export_dir = os.path.join(session_path, "export") target_xml = os.path.join(export_dir, "export.xml") # CHECK 1: Explicitly look for export.xml if os.path.exists(target_xml): shutil.move(target_xml, graph_out_path) return graph_out_path, session_path # CHECK 2: Fallback scan (only if export.xml is missing) for file in os.listdir(export_dir): if file.endswith(".xml") or file.endswith(".graphml"): shutil.move(os.path.join(export_dir, file), graph_out_path) return graph_out_path, session_path raise Exception("GraphML file not found after export.") except subprocess.CalledProcessError as e: print(f"Joern Error: {e}") self.clean_up(session_path) raise e def clean_up(self, session_path): """Deletes the temporary files for this session.""" if os.path.exists(session_path): shutil.rmtree(session_path)