File size: 2,816 Bytes
c7c0a38
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106

import hmac 
import hashlib 
import time 
import json 
import os 
import base64 
from typing import Tuple 





def sign_payload (payload :dict ,secret :str )->Tuple [str ,str ]:

    payload ["_ts"]=int (time .time ())
    body =json .dumps (payload ,separators =(",",":"),sort_keys =True )
    sig =hmac .new (
    secret .encode (),
    body .encode (),
    hashlib .sha256 
    ).hexdigest ()
    return body ,sig 


def verify_payload (body :str ,sig :str ,secret :str ,max_age_seconds :int =300 )->dict :

    expected =hmac .new (
    secret .encode (),
    body .encode (),
    hashlib .sha256 
    ).hexdigest ()

    if not hmac .compare_digest (expected ,sig ):
        raise ValueError ("Invalid HMAC signature")

    data =json .loads (body )
    ts =data .get ("_ts",0 )
    if abs (time .time ()-ts )>max_age_seconds :
        raise ValueError (f"Stale request: {abs (time .time ()-ts ):.0f}s old")

    return data 






def generate_self_signed_cert (cert_path :str ,key_path :str ,cn :str ="localhost"):

    from cryptography import x509 
    from cryptography .x509 .oid import NameOID 
    from cryptography .hazmat .primitives import hashes ,serialization 
    from cryptography .hazmat .primitives .asymmetric import rsa 
    from cryptography .hazmat .backends import default_backend 
    import datetime 

    key =rsa .generate_private_key (
    public_exponent =65537 ,
    key_size =2048 ,
    backend =default_backend ()
    )

    subject =issuer =x509 .Name ([
    x509 .NameAttribute (NameOID .COMMON_NAME ,cn ),
    ])

    cert =(
    x509 .CertificateBuilder ()
    .subject_name (subject )
    .issuer_name (issuer )
    .public_key (key .public_key ())
    .serial_number (x509 .random_serial_number ())
    .not_valid_before (datetime .datetime .utcnow ())
    .not_valid_after (datetime .datetime .utcnow ()+datetime .timedelta (days =3650 ))
    .add_extension (
    x509 .SubjectAlternativeName ([x509 .DNSName (cn )]),
    critical =False ,
    )
    .sign (key ,hashes .SHA256 (),default_backend ())
    )

    with open (cert_path ,"wb")as f :
        f .write (cert .public_bytes (serialization .Encoding .PEM ))

    with open (key_path ,"wb")as f :
        f .write (key .private_bytes (
        serialization .Encoding .PEM ,
        serialization .PrivateFormat .TraditionalOpenSSL ,
        serialization .NoEncryption ()
        ))






def load_or_create_secret (path :str ="shared_secret.key")->str :
    if os .path .exists (path ):
        with open (path ,"r")as f :
            return f .read ().strip ()
    secret =base64 .urlsafe_b64encode (os .urandom (32 )).decode ()
    with open (path ,"w")as f :
        f .write (secret )
    return secret