| | import json |
| | import sys |
| | from time import gmtime, localtime, time |
| |
|
| | import machine |
| | import ntptime |
| | import uos |
| | import urequests |
| | from machine import SPI, Pin |
| | from sdcard import sdcard |
| | from uio import StringIO |
| |
|
| | |
| | |
| |
|
| |
|
| | def get_traceback(err): |
| | try: |
| | with StringIO() as f: |
| | sys.print_exception(err, f) |
| | return f.getvalue() |
| | except Exception as err2: |
| | print(err2) |
| | return f"Failed to extract file and line number due to {err2}.\nOriginal error: {err}" |
| |
|
| |
|
| | def initialize_sdcard( |
| | spi_id=1, |
| | cs_pin=15, |
| | sck_pin=10, |
| | mosi_pin=11, |
| | miso_pin=12, |
| | baudrate=1000000, |
| | polarity=0, |
| | phase=0, |
| | bits=8, |
| | firstbit=SPI.MSB, |
| | verbose=True, |
| | ): |
| | try: |
| | cs = Pin(cs_pin, Pin.OUT) |
| |
|
| | spi = SPI( |
| | spi_id, |
| | baudrate=baudrate, |
| | polarity=polarity, |
| | phase=phase, |
| | bits=bits, |
| | firstbit=firstbit, |
| | sck=Pin(sck_pin), |
| | mosi=Pin(mosi_pin), |
| | miso=Pin(miso_pin), |
| | ) |
| |
|
| | |
| | sd = sdcard.SDCard(spi, cs) |
| |
|
| | vfs = uos.VfsFat(sd) |
| | uos.mount(vfs, "/sd") |
| | if verbose: |
| | print("SD Card initialized successfully") |
| | return True |
| | except Exception as e: |
| | if verbose: |
| | print(get_traceback(e)) |
| | print("SD Card failed to initialize") |
| | return False |
| |
|
| |
|
| | def write_payload_backup(payload_data: str, fpath: str = "/sd/experiments.txt"): |
| | payload = json.dumps(payload_data) |
| | with open(fpath, "a") as file: |
| | |
| | file.write(f"{payload}\r\n") |
| |
|
| |
|
| | def log_to_mongodb( |
| | document: dict, |
| | api_key: str, |
| | url: str, |
| | cluster_name: str, |
| | database_name: str, |
| | collection_name: str, |
| | verbose: bool = True, |
| | retries: int = 2, |
| | ): |
| | |
| | headers = {"api-key": api_key} |
| |
|
| | insertPayload = { |
| | "dataSource": cluster_name, |
| | "database": database_name, |
| | "collection": collection_name, |
| | "document": document, |
| | } |
| |
|
| | if verbose: |
| | print(f"sending document to {cluster_name}:{database_name}:{collection_name}") |
| |
|
| | for _ in range(retries): |
| | response = None |
| | if _ > 0: |
| | print(f"retrying... ({_} of {retries})") |
| |
|
| | try: |
| | response = urequests.post(url, headers=headers, json=insertPayload) |
| | txt = str(response.text) |
| | status_code = response.status_code |
| |
|
| | if verbose: |
| | print(f"Response: ({status_code}), msg = {txt}") |
| | if response.status_code == 201: |
| | print("Added Successfully") |
| | break |
| | else: |
| | print("Error") |
| |
|
| | |
| | response.close() |
| | except Exception as e: |
| | if response is not None: |
| | response.close() |
| | if _ == retries - 1: |
| | raise e |
| | else: |
| | print(e) |
| |
|
| |
|
| | def get_timestamp(timeout=2, return_str=False): |
| | ntptime.timeout = timeout |
| | time_int = ntptime.time() |
| | utc_tuple = gmtime(time_int) |
| | year, month, mday, hour, minute, second, weekday, yearday = utc_tuple |
| |
|
| | time_str = f"{year}-{month}-{mday} {hour:02}:{minute:02}:{second:02}" |
| |
|
| | if return_str: |
| | return time_int, time_str |
| |
|
| | return time_int |
| |
|
| |
|
| | def get_local_timestamp(return_str=False): |
| | t = time() |
| | year, month, mday, hour, minute, second, _, _ = localtime(t) |
| | time_str = f"{year}-{month}-{mday} {hour:02}:{minute:02}:{second:02}" |
| |
|
| | if return_str: |
| | return t, time_str |
| |
|
| | return t |
| |
|
| |
|
| | def get_onboard_temperature(unit="K"): |
| | sensor_temp = machine.ADC(4) |
| | conversion_factor = 3.3 / (65535) |
| | reading = sensor_temp.read_u16() * conversion_factor |
| | celsius_degrees = 27 - (reading - 0.706) / 0.001721 |
| | if unit == "C": |
| | return celsius_degrees |
| | elif unit == "K": |
| | return celsius_degrees + 273.15 |
| | elif unit == "F": |
| | return celsius_degrees * 9 / 5 + 32 |
| | else: |
| | raise ValueError("Invalid unit. Must be one of 'C', 'K', or 'F") |
| |
|