graphguard-backend / joern_parser.py
Bharateesha lvn
Deploy GraphGuard Backend V1
d511278
import os
import subprocess
import shutil
import uuid
import config
class JoernParser:
def __init__(self):
self.temp_dir = config.TEMP_DIR
os.makedirs(self.temp_dir, exist_ok=True)
def parse_code(self, source_code: str):
"""
Takes raw C code string, saves it, runs Joern, and returns the GraphML path.
"""
# 1. Create a unique session ID for this request
session_id = str(uuid.uuid4())
session_path = os.path.join(self.temp_dir, session_id)
os.makedirs(session_path, exist_ok=True)
# 2. Write the source code to a file
src_file_path = os.path.join(session_path, "test.c")
with open(src_file_path, "w") as f:
f.write(source_code)
# 3. Define output paths
cpg_out_path = os.path.join(session_path, "cpg.bin")
graph_out_path = os.path.join(session_path, "export.graphml")
try:
# 4. Run Joern Parse (C -> Binary CPG)
cmd_parse = f"joern-parse {src_file_path} --output {cpg_out_path}"
subprocess.run(cmd_parse, shell=True, check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
# 5. Run Joern Export (Binary CPG -> GraphML)
cmd_export = f"joern-export {cpg_out_path} --repr all --format graphml --out {session_path}/export"
subprocess.run(cmd_export, shell=True, check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
# 6. Locate the exported file (Prioritize export.xml)
export_dir = os.path.join(session_path, "export")
target_xml = os.path.join(export_dir, "export.xml")
# CHECK 1: Explicitly look for export.xml
if os.path.exists(target_xml):
shutil.move(target_xml, graph_out_path)
return graph_out_path, session_path
# CHECK 2: Fallback scan (only if export.xml is missing)
for file in os.listdir(export_dir):
if file.endswith(".xml") or file.endswith(".graphml"):
shutil.move(os.path.join(export_dir, file), graph_out_path)
return graph_out_path, session_path
raise Exception("GraphML file not found after export.")
except subprocess.CalledProcessError as e:
print(f"Joern Error: {e}")
self.clean_up(session_path)
raise e
def clean_up(self, session_path):
"""Deletes the temporary files for this session."""
if os.path.exists(session_path):
shutil.rmtree(session_path)