| | import os |
| | import openai |
| | import json |
| | import rdflib |
| |
|
| |
|
| | class ExampleGenerator: |
| | def __init__(self): |
| | self.ontologies = {} |
| | self.ontology_files = [] |
| | self.rules = {} |
| | def add_ontology(self, onto): |
| | if onto in self.ontology_files: |
| | raise ValueError("Ontology file already exists.") |
| | else: |
| | onto_data = self.get_ontology_file(onto) |
| | if onto_data: |
| | self.ontology_files.append(onto) |
| | self.ontologies[onto] = self.get_ontology_file(onto) |
| | self.rules[onto] = self.generate_rules(onto) |
| | else: |
| | raise ValueError("Ontology file error.") |
| | def get_ontology_file(self,filename): |
| | text = "" |
| | if os.path.isfile(filename): |
| | with open(filename,'r') as f: |
| | text = f.read() |
| | f.close() |
| | return text |
| | else: |
| | raise ValueError("Invalid filename.") |
| | def ChatGPTTextSplitter(self,text): |
| | """Splits text in smaller subblocks to feed to the LLM""" |
| | prompt = f"""The total length of content that I want to send you is too large to send in only one piece. |
| | |
| | For sending you that content, I will follow this rule: |
| | |
| | [START PART 1/10] |
| | this is the content of the part 1 out of 10 in total |
| | [END PART 1/10] |
| | |
| | Then you just answer: "Instructions Sent." |
| | |
| | And when I tell you "ALL PARTS SENT", then you can continue processing the data and answering my requests. |
| | """ |
| | if type(text) == str: |
| | textsize = 12000 |
| | blocksize = int(len(text) / textsize) |
| | if blocksize > 0: |
| | yield prompt |
| |
|
| | for b in range(1,blocksize+1): |
| | if b < blocksize+1: |
| | prompt = f"""Do not answer yet. This is just another part of the text I want to send you. Just receive and acknowledge as "Part {b}/{blocksize} received" and wait for the next part. |
| | [START PART {b}/{blocksize}] |
| | {text[(b-1)*textsize:b*textsize]} |
| | [END PART {b}/{blocksize}] |
| | Remember not answering yet. Just acknowledge you received this part with the message "Part {b}/{blocksize} received" and wait for the next part. |
| | """ |
| | yield prompt |
| | else: |
| | prompt = f""" |
| | [START PART {b}/{blocksize}] |
| | {text[(b-1)*textsize:b*textsize]} |
| | [END PART {b}/{blocksize}] |
| | ALL PARTS SENT. Now you can continue processing the request. |
| | """ |
| | yield prompt |
| | else: |
| | yield text |
| | elif type(text) == list: |
| | yield prompt |
| |
|
| | for n,block in enumerate(text): |
| | if n+1 < len(text): |
| | prompt = f"""Do not answer yet. This is just another part of the text I want to send you. Just receive and acknowledge as "Part {n+1}/{len(text)} received" and wait for the next part. |
| | [START PART {n+1}/{len(text)}] |
| | {text[n]} |
| | [END PART {n+1}/{len(text)}] |
| | Remember not answering yet. Just acknowledge you received this part with the message "Part {n+1}/{len(text)} received" and wait for the next part. |
| | """ |
| | yield prompt |
| | else: |
| | prompt = f""" |
| | [START PART {n+1}/{len(text)}] |
| | {text[n]} |
| | [END PART {n+1}/{len(text)}] |
| | ALL PARTS SENT. Now you can continue processing the request. |
| | """ |
| | yield prompt |
| |
|
| | def send_ontology(self): |
| | ontology = "" |
| | if len(self.ontologies) > 0: |
| | for k,v in self.ontologies.items(): |
| | ontology+=v+"\n" |
| | print("Sending Ontology in Parts") |
| | for i in self.ChatGPTTextSplitter(ontology): |
| | print(self.llm_api(i)) |
| | else: |
| | raise ValueError("No loaded ontology to send.") |
| | def llm_api(self,prompt,model="gpt-3.5-turbo"): |
| | messages = [{ |
| | "role":"user", |
| | "content":prompt |
| | }] |
| | res = openai.ChatCompletion.create(model=model,messages=messages,temperature=0) |
| | return res.choices[0].message['content'] |
| | |
| | def generate_rules(self,onto=None): |
| | engagement_actions = ['engagement:Access', |
| | 'engagement:Alert', |
| | 'engagement:Beacon', |
| | 'engagement:Deploy', |
| | 'engagement:Obfuscate', |
| | 'engagement:Respond' |
| | ] |
| | engagement_objects = [ |
| | 'engagement:Honeypot', |
| | 'engagement:Honeytoken', |
| | 'engagement:Breadcrumb', |
| | 'engagement:BreadcrumbTrail', |
| | 'engagement:LureObject', |
| | 'engagement:HoneyObject', |
| | 'engagement:Decoy', |
| | 'engagement:DataSource' |
| | ] |
| | engagement_objectives = [ |
| | 'objective:CommandAndControl', |
| | 'objective:CredentialAccess', |
| | 'objective:DevelopResource', |
| | 'objective:Discover', |
| | 'objective:EscalatePrivilege', |
| | 'objective:Evade', |
| | 'objective:Execute', |
| | 'objective:Exfilitrate', |
| | 'objective:GainInitialAccess', |
| | 'objective:Impact', |
| | 'objective:MoveLaterally', |
| | 'objective:Persist', |
| | 'objective:Reconnaissance', |
| | 'objective:Affect', |
| | 'objective:Collect', |
| | 'objective:Detect', |
| | 'objective:Direct', |
| | 'objective:Disrupt', |
| | 'objective:Elicit', |
| | 'objective:Expose', |
| | 'objective:Motivate', |
| | 'objective:Plan', |
| | 'objective:Prepare', |
| | 'objective:Prevent', |
| | 'objective:Reassure', |
| | 'objective:Analyze', |
| | 'objective:Deny', |
| | 'objective:ElicitBehavior', |
| | 'objective:Lure', |
| | 'objective:TimeSink', |
| | 'objective:Track', |
| | 'objective:Trap' |
| | ] |
| | prefix_ns = {"engagement": "https://ontology.adversaryengagement.org/ae/engagement#", |
| | "objective":"https://ontology.adversaryengagement.org/ae/objective#", |
| | "role":"https://ontology.adversaryengagement.org/ae/role#", |
| | "identity":"https://ontology.adversaryengagement.org/ae/identity#", |
| | "uco-core prefix":"https://ontology.unifiedcyberontology.org/uco/core#", |
| | "uco-types":"https://ontology.unifiedcyberontology.org/uco/types#", |
| | "uco-role":"https://ontology.unifiedcyberontology.org/uco/role#" |
| | } |
| | ns_str ="" |
| | for k,v in prefix_ns.items(): |
| | ns_str+=f"If namespace {k} prefix is used then {v}\n" |
| | lookup = {"1":{"0":['Each'], |
| | "1":['connects to'] |
| | } |
| | } |
| | v = """Remember make a json-ld format example that only uses classes and properties terms from Adversary Engagement Ontology, Unified Cyber Ontology.""" |
| | |
| | structure = {'engagement:Narrative':{'engagement:hasStoryline':{"1":'engagement:Storyline'} |
| | }, |
| | 'engagement:Storyline':{'engagement:hasEvent':{"1":'uco-types:Thread'} |
| | }, |
| | 'uco-types:Thread':{'co:element':'contains all engagement:PlannedEvents', |
| | 'co:item':{"0":'uco-types:ThreadItem one each for each engagement:PlannedEvent'}, |
| | 'co:size':"", |
| | 'uco-types:threadOriginItem':"is the uco-types:ThreadItem for the first engagement:PlannedEvent", |
| | 'uco-types:threadTerminalItem':"is the uco-types:ThreadItem for the last engagement:PlannedEvent" |
| | }, |
| | 'co:size':{'@type':'is xsd:nonNegativeInteger', |
| | '@value':"which is the number of uco-types:ThreadItem" |
| | }, |
| | 'uco-types:ThreadItem':{'co:itemContent':'is the engagement:PlannedEvent', |
| | 'optional uco-types:threadNextItem':"is the next uco-types:ThreadItem for the next engagement:PlannedEvent if there is one", |
| | 'optional uco-types:threadPreviousItem':'is the previous uco-types:ThreadItem for the previous' |
| | }, |
| | 'engagement:PlannedEvent':{'engagement:eventContext':"connects to one of the following engagement actions:"+"\n\t\t"+"\n\t\t".join(engagement_actions) |
| | }, |
| | 'engagement action':{'uco-core:performer':"",'uco-core:object': 'connects to one of the following engagement deception objects'+"\n\t\t"+"\n\t\t".join(engagement_objects) |
| | }, |
| | 'engagement deception object':{'engagement:hasCharacterization':{'1':'uco-core:UcoObject'}, |
| | 'objective:hasObjective':'with @type objective:Objective and @id with one of the following instances:'+"\n\t\t"+"\n\t\t".join(engagement_objectives), |
| | 'uco-core:name':'is the objective' |
| | }, |
| | 'person':{'@type':'is uco-identity:Person', |
| | 'uco-core:hasFacet':{"1":{'connects to uco-identity:SimpleNameFacet':{'uco-identity:familyName':"",'uco-identity:givenName':""} |
| | } |
| | } |
| | }, |
| | 'uco-core:Role':{'@id':"is the role",'uco-core:name': 'is the role' |
| | }, |
| | 'uco-core:Role there is a uco-core:Relationship':{'uco-core:kindofRelationship':'is "has_Role"', |
| | 'uco-core:source':{"1":"the person who has the role"}, |
| | "uco-core:target":{"1":"uco-core:Role"} |
| | }, |
| | 'engagement:BreadcrumbTrail':{'engagement:hasBreadcrumb':{"1":{'uco-types:Thread':{'co:element':"contains all engagement:Breadcrumb that belong to this engagement:BreadcrumbTrail","co:size":"","co:item":"contains all uco-types:ThreadItem one each for each engagement:Breadcrumb","uco-types:threadOriginItem":"is the uco-types:ThreadItem for the first engagement:Breadcrumb belonging to this engagement:BreadcrumbTrail","uco-types:threadTerminalItem":"is the uco-types:ThreadItem for the last engagement:Breadcrumb belonging to this engagement:BreadcrumbTrail"}} |
| | }}, |
| | 'engagement:Breadcrumb':{'engagement:hasCharacterization':{"1":{'which connects to a uco-core:UcoObject which':{'uco-core:description':'which describes the object characterizing the breadcrumb'}},'uco-core:name': 'is the role' |
| | }}, |
| | "class":{'@type': 'which is the class', |
| | '@id': 'which is a unique identifier'}, |
| | "ns":ns_str |
| | |
| | } |
| | |
| | def get_list(struct,limiter="\n\t",skippre=False): |
| | all_stat = [] |
| | for k,v in struct.items(): |
| | if k == "ns": |
| | all_stat.append(v) |
| | elif type(v)==dict: |
| | look = "1" |
| | if len(v) > 1: |
| | plural = "has properties:" |
| | else: |
| | plural = "has property:" |
| | if type(v)==dict: |
| | statement = [" ".join([lookup[look]["0"][0],k,plural])] |
| | if skippre: |
| | statement = [" ".join([k,plural])] |
| | for vk, vv in v.items(): |
| | statement.append(limiter) |
| | statement.append(vk) |
| | if type(vv)==dict: |
| | for i in list(lookup.keys()): |
| | if i in vv: |
| | val = v[vk][i] |
| | if type(val)==dict: |
| | statement.append( get_list(val,limiter+"\t",skippre=True) ) |
| | else: |
| | if not skippre: |
| | statement.append(lookup[look]["1"][0]) |
| | statement.append(v[vk][i]) |
| | elif type(vv)==str: |
| | statement.append(v[vk]) |
| | val = " ".join(statement) |
| | all_stat.append(val) |
| | return "\n".join(all_stat) |
| |
|
| | v = get_list(structure) |
| | return v |
| | def generate_rule(self,onto=None): |
| | """Raw rule string of AEO.""" |
| | v = """Remember make a json-ld format example that only uses classes and properties terms from Adversary Engagement Ontology, Unified Cyber Ontology. |
| | |
| | Each engagement:Narrative has property: |
| | engagement:hasStoryline connects to an engagement:Storyline |
| | Each engagement:Storyline has property: |
| | engagement:hasEvent connects to a uco-types:Thread |
| | Each uco-types:Thread has properties: |
| | co:element contains all engagement:PlannedEvents |
| | co:item contains all uco-types:ThreadItem one each for each engagement:PlannedEvent. |
| | co:size |
| | uco-types:threadOriginItem is the uco-types:ThreadItem for the first engagement:PlannedEvent |
| | uco-types:threadTerminalItem is the uco-types:ThreadItem for the last engagement:PlannedEvent |
| | Each co:size has properties: |
| | @type as xsd:nonNegativeInteger |
| | @value which is the number of uco-types:ThreadItem |
| | Each uco-types:ThreadItem has property: |
| | co:itemContent is the engagement:PlannedEvent |
| | optional uco-types:threadNextItem is the next uco-types:ThreadItem for the next engagement:PlannedEvent if there is one, |
| | optional uco-types:threadPreviousItem is the previous uco-types:ThreadItem for the previous engagement:PlannedEvent if there is one |
| | Each engagement:PlannedEvent has property: |
| | engagement:eventContext connects to one of the following engagement actions: |
| | engagement:Access |
| | engagement:Alert |
| | engagement:Beacon |
| | engagement:Deploy |
| | engagement:Obfuscate |
| | engagement:Respond |
| | Each engagement action has properties: |
| | uco-core:performer |
| | uco-core:object which is the object which the action is applied to |
| | Each engagement action has property: |
| | uco-core:object connects to one of the following engagement deception objects: |
| | engagement:Honeypot |
| | engagement:Honeytoken |
| | engagement:Breadcrumb |
| | engagement:BreadcrumbTrail |
| | engagement:LureObject |
| | engagement:HoneyObject |
| | engagement:Decoy |
| | engagement:DataSource |
| | Each engagement deception object has properties: |
| | engagement:hasCharacterization connects to a uco-core:UcoObject |
| | objective:hasObjective with @type objective:Objective and @id with one of the following instances: |
| | objective:CommandAndControl |
| | objective:CredentialAccess |
| | objective:DevelopResource |
| | objective:Discover |
| | objective:EscalatePrivilege |
| | objective:Evade |
| | objective:Execute |
| | objective:Exfilitrate |
| | objective:GainInitialAccess |
| | objective:Impact |
| | objective:MoveLaterally |
| | objective:Persist |
| | objective:Reconnaissance |
| | objective:Affect |
| | objective:Collect |
| | objective:Detect |
| | objective:Direct |
| | objective:Disrupt |
| | objective:Elicit |
| | objective:Expose |
| | objective:Motivate |
| | objective:Plan |
| | objective:Prepare |
| | objective:Prevent |
| | objective:Reassure |
| | objective:Analyze |
| | objective:Deny |
| | objective:ElicitBehavior |
| | objective:Lure |
| | objective:TimeSink |
| | objective:Track |
| | objective:Trap |
| | uco-core:name is the objective |
| | All people have property: |
| | @type is uco-identity:Person |
| | uco-core:hasFacet that connects to one of the following: |
| | uco-identity:SimpleNameFacet which has the property: |
| | uco-identity:familyName |
| | uco-identity:givenName |
| | Each uco-core:Role has properties: |
| | @id is the role |
| | uco-core:name is the role |
| | Each uco-core:Role there is a uco-core:Relationship with properties: |
| | uco-core:kindofRelationship is "has_Role" |
| | uco-core:source connects to the person who has the role |
| | uco-core:target connects to uco-core:Role |
| | Each engagement:BreadcrumbTrail has property: |
| | engagement:hasBreadcrumb connects to uco-types:Thread |
| | This uco-types:Thread has property: |
| | co:element contains all engagement:Breadcrumb that belong to this engagement:BreadcrumbTrail |
| | co:item contains all uco-types:ThreadItem one each for each engagement:Breadcrumb |
| | co:size |
| | uco-types:threadOriginItem is the uco-types:ThreadItem for the first engagement:Breadcrumb belonging to this engagement:BreadcrumbTrail |
| | uco-types:threadTerminalItem is the uco-types:ThreadItem for the last engagement:Breadcrumb belonging to this engagement:BreadcrumbTrail |
| | Each engagement:Breadcrumb has the properties: |
| | engagement:hasCharacterization which connects to a uco-core:UcoObject with the property: |
| | uco-core:description which describes the object characterizing the breadcrumb |
| | All classes must include property: |
| | @type is the class |
| | @id is a unique identifier |
| | |
| | If namespace engagement prefix is used then https://ontology.adversaryengagement.org/ae/engagement# |
| | If namespace objective prefix is used then https://ontology.adversaryengagement.org/ae/objective# |
| | If namespace role prefix is used then https://ontology.adversaryengagement.org/ae/role# |
| | If namespace identity prefix is used then https://ontology.adversaryengagement.org/ae/identity# |
| | If namespace uco-core prefix is used then https://ontology.unifiedcyberontology.org/uco/core# |
| | If namespace uco-types prefix is used then https://ontology.unifiedcyberontology.org/uco/types# |
| | If namespace uco-role prefix is used then https://ontology.unifiedcyberontology.org/uco/role# |
| | """ |
| | return v |
| | |
| | def generate_continue(self): |
| | v = """ |
| | continue |
| | """ |
| | return v |
| | |
| | def raw_prompt(self,description): |
| | |
| | def run(val): |
| | prompt = f"""Give me a full json-ld format example for the following scenario: |
| | {description} |
| | |
| | {"".join(val)} |
| | """ |
| | for i in self.ChatGPTTextSplitter(prompt): |
| | res = self.llm_api(i) |
| | return res |
| | |
| | res_val = run(self.generate_rules()) |
| | try: |
| | val = json.loads(res_val) |
| | return val |
| | except: |
| | |
| | data = [] |
| | data.append(res_val) |
| | while True: |
| | res = self.llm_api(self.generate_continue()) |
| | data.append(res) |
| | try: |
| | full = "".join(data) |
| | return json.loads(full) |
| | except: |
| | pass |
| | |
| | return None |
| | |
| | def check_for_nested(self,jsonObject): |
| | try: |
| | for k,v in jsonObject.items(): |
| | if type(v) == dict: |
| | return True |
| | except: |
| | pass |
| | return False |
| | |
| | def recursive_typelist(self,obj,ls): |
| | if type(obj)!=dict: |
| | return ls |
| | obj_type = obj['@type'].split(":")[0] |
| | if obj_type not in ls: |
| | ls.append(obj_type) |
| | for k,v in obj.items(): |
| | if self.check_for_nested(v): |
| | return_ls = self.recursive_typelist(v,ls) |
| | for re in return_ls: |
| | if re not in ls: |
| | ls.append(re) |
| | return ls |
| | |
| | |
| | def prompt(self,description): |
| | res = self.raw_prompt(description) |
| | |
| | |
| | type_list = [] |
| | try: |
| | for k in res['@graph']: |
| | ns = k['@type'].split(":")[0] |
| | if ns not in type_list: |
| | type_list.append(ns) |
| | type_list = self.recursive_typelist(k,type_list) |
| | new_prefixes = {} |
| | for k,v in res['@context'].items(): |
| | if k in type_list: |
| | new_prefixes[k] = v |
| | res['@context'] = new_prefixes |
| | except: |
| | return res |
| | return res |
| |
|