|
|
import json |
|
|
import sys |
|
|
from time import localtime, sleep, ticks_diff, ticks_ms |
|
|
|
|
|
import uos |
|
|
from data_logging import ( |
|
|
get_local_timestamp, |
|
|
get_onboard_temperature, |
|
|
write_payload_backup, |
|
|
) |
|
|
from machine import PWM, Pin |
|
|
from ufastrsa.genprime import genrsa |
|
|
from ufastrsa.rsa import RSA |
|
|
from uio import StringIO |
|
|
|
|
|
|
|
|
def beep(buzzer, power=0.005): |
|
|
buzzer.freq(300) |
|
|
buzzer.duty_u16(round(65535 * power)) |
|
|
sleep(0.15) |
|
|
buzzer.duty_u16(0) |
|
|
|
|
|
|
|
|
def get_traceback(err): |
|
|
try: |
|
|
with StringIO() as f: |
|
|
sys.print_exception(err, f) |
|
|
return f.getvalue() |
|
|
except Exception as err2: |
|
|
print(err2) |
|
|
return f"Failed to extract file and line number due to {err2}.\nOriginal error: {err}" |
|
|
|
|
|
|
|
|
def merge_two_dicts(x, y): |
|
|
z = x.copy() |
|
|
z.update(y) |
|
|
return z |
|
|
|
|
|
|
|
|
def path_exists(path): |
|
|
|
|
|
|
|
|
parent = "" |
|
|
name = path |
|
|
|
|
|
|
|
|
index = path.rstrip("/").rfind("/") |
|
|
if index >= 0: |
|
|
index += 1 |
|
|
parent = path[: index - 1] |
|
|
name = path[index:] |
|
|
|
|
|
|
|
|
|
|
|
return any((name == x[0]) for x in uos.ilistdir(parent)) |
|
|
|
|
|
|
|
|
def encrypt_id(my_id, verbose=False): |
|
|
rsa_path = "rsa.json" |
|
|
|
|
|
try: |
|
|
with open(rsa_path, "r") as f: |
|
|
cipher_data = json.load(f) |
|
|
cipher = RSA( |
|
|
cipher_data["bits"], |
|
|
n=cipher_data["n"], |
|
|
e=cipher_data["e"], |
|
|
d=cipher_data["d"], |
|
|
) |
|
|
except (KeyError, OSError) as e: |
|
|
print(e) |
|
|
print("Generating new RSA parameters...") |
|
|
bits = 256 |
|
|
bits, n, e, d = genrsa(bits, e=65537) |
|
|
cipher = RSA(bits, n=n, e=e, d=d) |
|
|
with open("rsa.json", "w") as f: |
|
|
json.dump(dict(bits=bits, n=n, e=e, d=d), f) |
|
|
|
|
|
if verbose: |
|
|
with open(rsa_path, "r") as f: |
|
|
cipher_data = json.load(f) |
|
|
print("RSA parameters (keep private):") |
|
|
print(cipher_data) |
|
|
|
|
|
my_id = int.from_bytes(cipher.pkcs_encrypt(my_id), "big") |
|
|
return my_id |
|
|
|
|
|
|
|
|
def decrypt_id(my_id): |
|
|
rsa_path = "rsa.json" |
|
|
if path_exists(rsa_path): |
|
|
with open(rsa_path, "r") as f: |
|
|
cipher_data = json.load(f) |
|
|
cipher = RSA( |
|
|
cipher_data["bits"], |
|
|
n=cipher_data["n"], |
|
|
e=cipher_data["e"], |
|
|
d=cipher_data["d"], |
|
|
) |
|
|
else: |
|
|
bits = 256 |
|
|
bits, n, e, d = genrsa(bits, e=65537) |
|
|
cipher = RSA(bits, n=n, e=e, d=d) |
|
|
with open("rsa.json", "w") as f: |
|
|
json.dump(dict(bits=bits, n=n, e=e, d=d), f) |
|
|
|
|
|
my_id = int.from_bytes(cipher.pkcs_decrypt(my_id), "big") |
|
|
return my_id |
|
|
|
|
|
|
|
|
def get_onboard_led(): |
|
|
try: |
|
|
onboard_led = Pin("LED", Pin.OUT) |
|
|
except Exception as e: |
|
|
print(e) |
|
|
onboard_led = Pin(25, Pin.OUT) |
|
|
return onboard_led |
|
|
|
|
|
|
|
|
class Experiment(object): |
|
|
def __init__( |
|
|
self, |
|
|
run_experiment_fn, |
|
|
devices, |
|
|
reset_experiment_fn=None, |
|
|
validate_inputs_fn=None, |
|
|
emergency_shutdown_fn=None, |
|
|
buzzer=None, |
|
|
sdcard_ready=False, |
|
|
) -> None: |
|
|
self.validate_inputs_fn = validate_inputs_fn |
|
|
self.run_experiment_fn = run_experiment_fn |
|
|
self.reset_experiment_fn = reset_experiment_fn |
|
|
self.devices = devices |
|
|
self.emergency_shutdown_fn = emergency_shutdown_fn |
|
|
self.buzzer = buzzer |
|
|
self.sdcard_ready = sdcard_ready |
|
|
|
|
|
if self.reset_experiment_fn is None: |
|
|
|
|
|
def do_nothing(*args, **kwargs): |
|
|
pass |
|
|
|
|
|
self.reset_experiment_fn = do_nothing |
|
|
|
|
|
if self.emergency_shutdown_fn is None: |
|
|
self.emergency_shutdown_fn = self.reset_experiment_fn |
|
|
|
|
|
if self.validate_inputs_fn is None: |
|
|
|
|
|
def no_input_validation(*args, **kwargs): |
|
|
return True |
|
|
|
|
|
self.validate_inputs_fn = no_input_validation |
|
|
|
|
|
if self.buzzer is None: |
|
|
self.buzzer = PWM(Pin(18)) |
|
|
|
|
|
def try_experiment(self, msg): |
|
|
payload_data = {} |
|
|
|
|
|
|
|
|
|
|
|
print(msg) |
|
|
|
|
|
|
|
|
|
|
|
try: |
|
|
parameters = json.loads(msg) |
|
|
payload_data["_input_message"] = parameters |
|
|
|
|
|
|
|
|
self.validate_inputs_fn(parameters) |
|
|
|
|
|
beep(self.buzzer) |
|
|
sensor_data = self.run_experiment_fn(parameters, self.devices) |
|
|
payload_data = merge_two_dicts(payload_data, sensor_data) |
|
|
|
|
|
except Exception as err: |
|
|
print(err) |
|
|
if "_input_message" not in payload_data.keys(): |
|
|
payload_data["_input_message"] = msg |
|
|
payload_data["error"] = get_traceback(err) |
|
|
|
|
|
try: |
|
|
payload_data["onboard_temperature_K"] = get_onboard_temperature(unit="K") |
|
|
payload_data["sd_card_ready"] = self.sdcard_ready |
|
|
stamp, time_str = get_local_timestamp(return_str=True) |
|
|
payload_data["utc_timestamp"] = stamp |
|
|
payload_data["utc_time_str"] = time_str |
|
|
except OverflowError as e: |
|
|
print(get_traceback(e)) |
|
|
except Exception as e: |
|
|
print(get_traceback(e)) |
|
|
|
|
|
try: |
|
|
parameters = json.loads(msg) |
|
|
self.reset_experiment_fn(parameters, devices=self.devices) |
|
|
except Exception as e: |
|
|
try: |
|
|
self.emergency_shutdown_fn(devices=self.devices) |
|
|
payload_data["reset_error"] = get_traceback(e) |
|
|
except Exception as e: |
|
|
payload_data["emergency_error"] = get_traceback(e) |
|
|
|
|
|
return payload_data |
|
|
|
|
|
def write_to_sd_card(self, payload_data, fpath="/sd/experiments.txt"): |
|
|
try: |
|
|
write_payload_backup(payload_data, fpath=fpath) |
|
|
except Exception as e: |
|
|
w = f"Failed to write to SD card: {get_traceback(e)}" |
|
|
print(w) |
|
|
payload_data["warning"] = w |
|
|
|
|
|
return payload_data |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def heartbeat(client, first, ping_interval_ms=15000): |
|
|
global lastping |
|
|
if first: |
|
|
client.ping() |
|
|
lastping = ticks_ms() |
|
|
if ticks_diff(ticks_ms(), lastping) >= ping_interval_ms: |
|
|
client.ping() |
|
|
lastping = ticks_ms() |
|
|
return |
|
|
|
|
|
|
|
|
def sign_of_life(led, first, blink_interval_ms=5000): |
|
|
global last_blink |
|
|
if first: |
|
|
led.on() |
|
|
last_blink = ticks_ms() |
|
|
time_since = ticks_diff(ticks_ms(), last_blink) |
|
|
if led.value() == 0 and time_since >= blink_interval_ms: |
|
|
led.toggle() |
|
|
last_blink = ticks_ms() |
|
|
elif led.value() == 1 and time_since >= 500: |
|
|
led.toggle() |
|
|
last_blink = ticks_ms() |
|
|
|
|
|
|
|
|
class DummyMotor: |
|
|
def __init__(self): |
|
|
pass |
|
|
|
|
|
|
|
|
class DummySensor: |
|
|
def __init__(self): |
|
|
pass |
|
|
|