Spaces:
Runtime error
Runtime error
| import logging | |
| import os | |
| import tempfile | |
| import time | |
| from subprocess import PIPE, Popen | |
| from typing import Optional, Tuple | |
| from .jsonrpc import RPC as JSONRPC | |
| from .rawrpc import RPC as RawRPC | |
| class CLIClient: | |
| """ | |
| Represents a JSONRPC client to connect to CodeQL CLI Server | |
| """ | |
| def __init__(self): | |
| self.cache = {"ram": []} | |
| self.conn = RawRPC( | |
| [ | |
| "codeql", | |
| "execute", | |
| "cli-server", | |
| "--logdir", | |
| "/tmp/codeql_kernel_cliserver", | |
| ] | |
| ) | |
| def stop(self): | |
| self.conn.stop() | |
| def resolve_ram(self) -> Tuple[Optional[str], Optional[list]]: | |
| if self.cache.get("ram"): | |
| return (None, self.cache.get("ram")) | |
| else: | |
| cmd = ["resolve", "ram", "--format=json"] | |
| (err, result) = self.conn.request(cmd) | |
| if err: | |
| return (err, None) | |
| self.cache["ram"] = [x for x in result if x.startswith("-J")] | |
| return (None, self.cache.get("ram")) | |
| def resolve_metadata(self, query) -> Tuple[Optional[str], dict]: | |
| cmd = ["resolve", "metadata", "--format=json", query] | |
| return self.conn.request(cmd) | |
| def resolve_database(self, db_path) -> Tuple[Optional[str], dict]: | |
| cmd = ["resolve", "database", "--format=json", db_path] | |
| return self.conn.request(cmd) | |
| def resolve_library_path(self, query) -> Tuple[Optional[str], Optional[dict]]: | |
| cmd = ["resolve", "library-path", "--format=json", "--query", query] | |
| return self.conn.request(cmd) | |
| def bqrs_info(self, bqrs_path) -> Tuple[Optional[str], dict]: | |
| cmd = ["bqrs", "info", "--format=json", bqrs_path] | |
| return self.conn.request(cmd) | |
| def bqrs_decode(self, bqrs_path) -> Tuple[Optional[str], Optional[str]]: | |
| (err, ram_opts) = self.resolve_ram() | |
| if err or not ram_opts: | |
| return (f"Error resolving ram options {err}", None) | |
| results_path = tempfile.NamedTemporaryFile(delete=False) | |
| cmd = [ | |
| "bqrs", | |
| "decode", | |
| "--format=csv", | |
| f"-o={results_path.name}", | |
| "--entities=string,url", | |
| bqrs_path, | |
| ] | |
| cmd.extend(ram_opts) | |
| (err, _) = self.conn.request(cmd) | |
| if err: | |
| return (f"Error decoding bqrs file {err}", None) | |
| if os.path.exists(results_path.name): | |
| with open(results_path.name, "r") as f: | |
| data = f.read() | |
| # return json.loads(data) | |
| return (None, data) | |
| else: | |
| return ("Error decoding results", None) | |
| class QueryClient: | |
| """ | |
| Represents a JSONRPC client to connect to CodeQL Query Server | |
| """ | |
| def __init__(self, on_progress=None, on_result=None): | |
| self._cli_client: CLIClient = CLIClient() | |
| cmd = ["codeql", "execute", "query-server2", "--threads=0", "--evaluator-log-level", "5"] | |
| # debug | |
| # cmd.extend(["--debug", "--tuple-counting", "-v", "--log-to-stderr"]) | |
| # --save-cache --max-disk-cache XX | |
| (err, ram_opts) = self._cli_client.resolve_ram() | |
| if err or not ram_opts: | |
| return (f"Error resolving ram options {err}", None) | |
| cmd.extend(ram_opts) | |
| self._proc = Popen(cmd, stdin=PIPE, stdout=PIPE, stderr=PIPE) | |
| handlers = {} | |
| if on_progress: | |
| handlers["ql/progressUpdated"] = on_progress | |
| self._conn = JSONRPC( | |
| handlers=handlers, stdout=self._proc.stdin, stdin=self._proc.stdout | |
| ) | |
| self._progress_id = 0 | |
| self._evaluate_id = 0 | |
| self._db_metadata = {} | |
| # TODO: wait for query server to be ready | |
| time.sleep(2) | |
| def stop(self): | |
| if self._proc.stdin: | |
| self._proc.stdin.close() | |
| if self._proc.stdout: | |
| self._proc.stdout.close() | |
| self._proc.terminate() | |
| self._proc.wait() | |
| if self._cli_client: | |
| self._cli_client.stop() | |
| def next_progress_id(self) -> int: | |
| self._progress_id += 1 | |
| return self._progress_id | |
| def next_evaluate_id(self) -> int: | |
| self._evaluate_id += 1 | |
| return self._evaluate_id | |
| def register_database(self, db_path) -> Optional[str]: | |
| """ | |
| Register a database with the query server | |
| """ | |
| if not db_path.endswith("/"): | |
| db_path = db_path + "/" | |
| if not os.path.isdir(db_path): | |
| return f"Database path {db_path} is not a directory" | |
| (err, db_metadata) = self._cli_client.resolve_database(db_path) | |
| if err: | |
| return "Failed to resolve database metadata" | |
| # TODO: implement on-the-fly query patching | |
| params = { | |
| "body": { | |
| "databases": [db_path], | |
| "progressId": self.next_progress_id(), | |
| } | |
| } | |
| (err, _) = self._conn.request("evaluation/registerDatabases", args=params) | |
| if err: | |
| return err | |
| self._db_metadata = db_metadata | |
| self._db_metadata["path"] = db_path | |
| return None | |
| def run_query( | |
| self, query_path, quick_eval={} | |
| ) -> Tuple[Optional[str], Optional[str]]: | |
| logging.info(f"Running query {query_path}") | |
| bqrs_path = tempfile.NamedTemporaryFile(suffix=".bqrs").name | |
| target = {"query": {"xx": ""}} | |
| if bool(quick_eval): | |
| target = { | |
| "quickEval": { | |
| "quickEvalPos": { | |
| "fileName": query_path, | |
| "line": quick_eval.get("startLine"), | |
| "column": quick_eval.get("startColumn"), | |
| "endLine": quick_eval.get("endLine"), | |
| "endColumn": quick_eval.get("endColumn"), | |
| } | |
| } | |
| } | |
| run_queries_params = { | |
| "body": { | |
| "db": self._db_metadata["path"], | |
| # TODO: get additional packs from ENV, command, config, etc. | |
| "additionalPacks": ["/Users/pwntester/src/github.com/github/codeql"], | |
| "externalInputs": [], | |
| "singletonExternalInputs": [], # opts.templateValues or {}, | |
| "outputPath": bqrs_path, | |
| "queryPath": query_path, | |
| "target": target, | |
| }, | |
| "progressId": self.next_progress_id(), | |
| } | |
| (err, resp) = self._conn.request( | |
| "evaluation/runQuery", args=run_queries_params | |
| ) | |
| if resp and resp["resultType"] != 0: | |
| return (resp["message"], None) | |
| if err: | |
| return (str(err), None) | |
| if os.path.exists(bqrs_path): | |
| (err, bqrs_info) = self._cli_client.bqrs_info(bqrs_path) | |
| if err: | |
| return (err, "") | |
| if not bqrs_info or not bqrs_info["result-sets"]: | |
| return ("Failed to get bqrs info", "") | |
| count = bqrs_info["result-sets"][0]["rows"] | |
| for result_set in bqrs_info["result-sets"]: | |
| if result_set["name"] == "#select": | |
| count = result_set["rows"] | |
| if count > 0: | |
| return self._cli_client.bqrs_decode(bqrs_path) | |
| else: | |
| return (None, "No results") | |
| else: | |
| return (f"Failed to find results file at {bqrs_path}", "") | |