File size: 12,017 Bytes
eeef81e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
from CommonServerPython import *

''' IMPORTS '''

import re
import requests

# Disable insecure warnings
requests.packages.urllib3.disable_warnings()

''' GLOBALS/PARAMS '''

VENDOR = 'Have I Been Pwned? V2'
MAX_RETRY_ALLOWED = demisto.params().get('max_retry_time', -1)
API_KEY = demisto.params().get('api_key')
USE_SSL = not demisto.params().get('insecure', False)

BASE_URL = 'https://haveibeenpwned.com/api/v3'
HEADERS = {
    'hibp-api-key': API_KEY,
    'user-agent': 'DBOT-API',
    'Content-Type': 'application/json',
    'Accept': 'application/json'
}

DEFAULT_DBOT_SCORE_EMAIL = 2 if demisto.params().get('default_dbot_score_email') == 'SUSPICIOUS' else 3
DEFAULT_DBOT_SCORE_DOMAIN = 2 if demisto.params().get('default_dbot_score_domain') == 'SUSPICIOUS' else 3

SUFFIXES = {
    "email": '/breachedaccount/',
    "domain": '/breaches?domain=',
    "username": '/breachedaccount/',
    "paste": '/pasteaccount/',
    "email_truncate_verified": '?truncateResponse=false&includeUnverified=true',
    "domain_truncate_verified": '&truncateResponse=false&includeUnverified=true',
    "username_truncate_verified": '?truncateResponse=false&includeUnverified=true'
}

RETRIES_END_TIME = datetime.min

''' HELPER FUNCTIONS '''


def http_request(method, url_suffix, params=None, data=None):
    while True:
        res = requests.request(
            method,
            BASE_URL + url_suffix,
            verify=USE_SSL,
            params=params,
            data=data,
            headers=HEADERS
        )

        if res.status_code != 429:
            # Rate limit response code
            break

        if datetime.now() > RETRIES_END_TIME:
            return_error('Max retry time has exceeded.')

        wait_regex = re.search(r'\d+', res.json()['message'])
        if wait_regex:
            wait_amount = wait_regex.group()
        else:
            demisto.error('failed extracting wait time will use default (5). Res body: {}'.format(res.text))
            wait_amount = 5
        if datetime.now() + timedelta(seconds=int(wait_amount)) > RETRIES_END_TIME:
            return_error('Max retry time has exceeded.')
        time.sleep(int(wait_amount))

    if res.status_code == 404:
        return None
    if not res.status_code == 200:
        if not res.status_code == 401:
            demisto.error(
                'Error in API call to Pwned Integration [%d]. Full text: %s' % (res.status_code, res.text))
        return_error('Error in API call to Pwned Integration [%d] - %s' % (res.status_code, res.reason))
        return None

    return res.json()


def html_description_to_human_readable(breach_description):
    """
    Converting from html description to hr
    :param breach_description: Description of breach from API response
    :return: Description string that altered HTML urls to clickable urls
    for better readability in war-room
    """
    html_link_pattern = re.compile('<a href="(.+?)"(.+?)>(.+?)</a>')
    patterns_found = html_link_pattern.findall(breach_description)
    for link in patterns_found:
        html_actual_address = link[0]
        html_readable_name = link[2]
        link_from_desc = '[' + html_readable_name + ']' + '(' + html_actual_address + ')'
        breach_description = re.sub(html_link_pattern, link_from_desc, breach_description, count=1)
    return breach_description


def data_to_markdown(query_type, query_arg, api_res, api_paste_res=None):
    records_found = False

    md = '### Have I Been Pwned query for ' + query_type.lower() + ': *' + query_arg + '*\n'

    if api_res:
        records_found = True
        for breach in api_res:
            verified_breach = 'Verified' if breach['IsVerified'] else 'Unverified'
            md += '#### ' + breach['Title'] + ' (' + breach['Domain'] + '): ' + str(breach['PwnCount']) + \
                  ' records breached [' + verified_breach + ' breach]\n'
            md += 'Date: **' + breach['BreachDate'] + '**\n\n'
            md += html_description_to_human_readable(breach['Description']) + '\n'
            md += 'Data breached: **' + ','.join(breach['DataClasses']) + '**\n'

    if api_paste_res:
        records_found = True
        pastes_list = []
        for paste_breach in api_paste_res:
            paste_entry = \
                {
                    'Source': paste_breach['Source'],
                    'Title': paste_breach['Title'],
                    'ID': paste_breach['Id'],
                    'Date': '',
                    'Amount of emails in paste': str(paste_breach['EmailCount'])
                }

            if paste_breach['Date']:
                paste_entry['Date'] = paste_breach['Date'].split('T')[0]

            pastes_list.append(paste_entry)

        md += tableToMarkdown('The email address was found in the following "Pastes":',
                              pastes_list,
                              ['ID', 'Title', 'Date', 'Source', 'Amount of emails in paste'])

    if not records_found:
        md += 'No records found'

    return md


def create_dbot_score_dictionary(indicator_value, indicator_type, dbot_score):
    return {
        'Indicator': indicator_value,
        'Type': indicator_type,
        'Vendor': VENDOR,
        'Score': dbot_score
    }


def create_context_entry(context_type, context_main_value, comp_sites, comp_pastes, malicious_score):
    context_dict = dict()  # dict

    if context_type == 'email':
        context_dict['Address'] = context_main_value
    else:
        context_dict['Name'] = context_main_value

    context_dict['Pwned-V2'] = {
        'Compromised': {
            'Vendor': VENDOR,
            'Reporters': ', '.join(comp_sites + comp_pastes)
        }
    }

    if malicious_score == 3:
        context_dict['Malicious'] = add_malicious_to_context(context_type)

    return context_dict


def add_malicious_to_context(malicious_type):
    return {
        'Vendor': VENDOR,
        'Description': 'The ' + malicious_type + ' has been compromised'
    }


def email_to_entry_context(email, api_email_res, api_paste_res):
    dbot_score = 0
    comp_email = dict()  # type: dict
    comp_sites = sorted([item['Title'] for item in api_email_res])
    comp_pastes = sorted(set(item['Source'] for item in api_paste_res))

    if len(comp_sites) > 0:
        dbot_score = DEFAULT_DBOT_SCORE_EMAIL
        email_context = create_context_entry('email', email, comp_sites, comp_pastes, DEFAULT_DBOT_SCORE_EMAIL)
        comp_email[outputPaths['email']] = email_context

    comp_email['DBotScore'] = create_dbot_score_dictionary(email, 'email', dbot_score)

    return comp_email


def domain_to_entry_context(domain, api_res):
    comp_sites = [item['Title'] for item in api_res]
    comp_sites = sorted(comp_sites)
    comp_domain = dict()  # type: dict
    dbot_score = 0

    if len(comp_sites) > 0:
        dbot_score = DEFAULT_DBOT_SCORE_DOMAIN
        domain_context = create_context_entry('domain', domain, comp_sites, [], DEFAULT_DBOT_SCORE_DOMAIN)
        comp_domain[outputPaths['domain']] = domain_context

    comp_domain['DBotScore'] = create_dbot_score_dictionary(domain, 'domain', dbot_score)

    return comp_domain


def set_retry_end_time():
    global RETRIES_END_TIME
    if MAX_RETRY_ALLOWED != -1:
        RETRIES_END_TIME = datetime.now() + timedelta(seconds=int(MAX_RETRY_ALLOWED))


''' COMMANDS + REQUESTS FUNCTIONS '''


def test_module(args_dict):
    """
    If the http request was successful the test will return OK
    :return: 3 arrays of outputs
    """
    http_request('GET', SUFFIXES.get("username", '') + 'test')
    return ['ok'], [None], [None]


def pwned_email_command(args_dict):
    """
    Executing the pwned request for emails list, in order to support list input, the function returns 3 lists of outputs
   :param args_dict: the demisto argument - in this case the email list is needed
   :return: 3 arrays of outputs
   """
    email_list = argToList(args_dict.get('email', ''))
    api_email_res_list, api_paste_res_list = pwned_email(email_list)

    md_list = []
    ec_list = []

    for email, api_email_res, api_paste_res in zip(email_list, api_email_res_list, api_paste_res_list):
        md_list.append(data_to_markdown('Email', email, api_email_res, api_paste_res))
        ec_list.append(email_to_entry_context(email, api_email_res or [], api_paste_res or []))
    return md_list, ec_list, api_email_res_list


def pwned_email(email_list):
    """
    Executing the http requests
    :param email_list: the email list that needed for the http requests
    :return: 2 arrays of http requests outputs
    """
    api_email_res_list = []
    api_paste_res_list = []

    for email in email_list:
        email_suffix = SUFFIXES.get("email") + email + SUFFIXES.get("email_truncate_verified")
        paste_suffix = SUFFIXES.get("paste") + email
        api_email_res_list.append(http_request('GET', url_suffix=email_suffix))
        api_paste_res_list.append(http_request('GET', url_suffix=paste_suffix))

    return api_email_res_list, api_paste_res_list


def pwned_domain_command(args_dict):
    """
    Executing the pwned request for domains list, in order to support list input, the function returns 3 lists of
    outputs
   :param args_dict: the demisto argument - in this case the domain list is needed
   :return: 3 arrays of outputs
   """
    domain_list = argToList(args_dict.get('domain', ''))
    api_res_list = pwned_domain(domain_list)

    md_list = []
    ec_list = []

    for domain, api_res in zip(domain_list, api_res_list):
        md_list.append(data_to_markdown('Domain', domain, api_res))
        ec_list.append(domain_to_entry_context(domain, api_res or []))
    return md_list, ec_list, api_res_list


def pwned_domain(domain_list):
    """
    Executing the http request
    :param domain_list: the domains list that needed for the http requests
    :return: an array of http requests outputs
    """
    api_res_list = []
    for domain in domain_list:
        suffix = SUFFIXES.get("domain") + domain + SUFFIXES.get("domain_truncate_verified")
        api_res_list.append(http_request('GET', url_suffix=suffix))
    return api_res_list


def pwned_username_command(args_dict):
    """
    Executing the pwned request for usernames list, in order to support list input, the function returns 3 lists of
    outputs
    :param args_dict: the demisto argument - in this case the username list is needed
    :return: 3 arrays of outputs
    """
    username_list = argToList(args_dict.get('username', ''))
    api_res_list = pwned_username(username_list)

    md_list = []
    ec_list = []

    for username, api_res in zip(username_list, api_res_list):
        md_list.append(data_to_markdown('Username', username, api_res))
        ec_list.append(domain_to_entry_context(username, api_res or []))
    return md_list, ec_list, api_res_list


def pwned_username(username_list):
    """
    Executing the http request
    :param username_list: the username list that needed for the http requests
    :return: an array of http requests outputs
    """
    api_res_list = []
    for username in username_list:
        suffix = SUFFIXES.get("username") + username + SUFFIXES.get("username_truncate_verified")
        api_res_list.append(http_request('GET', url_suffix=suffix))
    return api_res_list


command = demisto.command()
LOG('Command being called is: {}'.format(command))
try:
    handle_proxy()
    set_retry_end_time()
    commands = {
        'test-module': test_module,
        'email': pwned_email_command,
        'pwned-email': pwned_email_command,
        'domain': pwned_domain_command,
        'pwned-domain': pwned_domain_command,
        'pwned-username': pwned_username_command
    }

    if command in commands:
        md_list, ec_list, api_email_res_list = commands[command](demisto.args())
        for md, ec, api_paste_res in zip(md_list, ec_list, api_email_res_list):
            return_outputs(md, ec, api_paste_res)

# Log exceptions
except Exception as e:
    return_error(str(e))