| | from ulid import ULID |
| | from typing import Dict |
| | import os, re, uuid, hashlib |
| | from dotenv import load_dotenv |
| |
|
| | load_dotenv() |
| |
|
| | def get_pdf_urls(folder_path): |
| | """ |
| | Returns an array of URLs of PDF files in the specified folder. |
| | |
| | Parameters: |
| | folder_path (str): The path to the folder. |
| | |
| | Returns: |
| | list: A list of URLs to the PDF files in the folder. |
| | """ |
| | pdf_urls = [] |
| |
|
| | |
| | for filename in os.listdir(folder_path): |
| | |
| | if filename.lower().endswith('.pdf'): |
| | |
| | file_url = os.path.join(folder_path, filename) |
| | pdf_urls.append(file_url) |
| |
|
| | return pdf_urls |
| |
|
| |
|
| | def get_pdf_urls_with_root_url(folder_path, root_url): |
| | """ |
| | Returns an array of URLs of PDF files in the specified folder. |
| | |
| | Parameters: |
| | folder_path (str): The path to the folder. |
| | root_url (str): The root URL to construct the file URLs. |
| | |
| | Returns: |
| | list: A list of URLs to the PDF files in the folder. |
| | """ |
| | pdf_urls = [] |
| |
|
| | |
| | for filename in os.listdir(folder_path): |
| | |
| | if filename.lower().endswith('.pdf'): |
| | |
| | file_url = os.path.join(root_url, filename) |
| | pdf_urls.append(file_url) |
| |
|
| | return pdf_urls |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| |
|
| | def chunk_data(data : list | dict, chunk_size: int): |
| | """ |
| | This function takes an array and a chunk size as input, and returns a new array |
| | where the original array is divided into smaller chunks of the specified size. |
| | |
| | Parameters: |
| | data (list): The original data to be chunked. |
| | chunk_size (int): The size of each chunk. |
| | |
| | Returns: |
| | list: A new array containing the chunks of the original array. |
| | |
| | Example: |
| | >>> chunk_array([1, 2, 3, 4, 5, 6], 2) |
| | [[1, 2], [3, 4], [5, 6]] |
| | """ |
| | is_object = isinstance(data, dict) |
| | is_array = isinstance(data, list) |
| |
|
| | if not is_object and not is_array: |
| | raise TypeError("Data must be a list or a dictionary.") |
| | elif is_array: |
| | return [data[i:i+chunk_size] for i in range(0, len(data), chunk_size)] |
| | elif is_object: |
| | items = list(data.items()) |
| | for i in range(0, len(items), chunk_size): |
| | yield dict(items[i:i + chunk_size]) |
| |
|
| |
|
| | def generate_ulid(seed: any = None) -> str: |
| | """ |
| | This function generates a Universally Unique Lexicographically Sortable Identifier (ULID). |
| | If a seed is provided, it will be used as the basis for the ULID generation. |
| | |
| | Parameters: |
| | seed (any, optional): A value to be used as the basis for the ULID generation. Defaults to None. |
| | |
| | Returns: |
| | str: A string representing the generated ULID. |
| | |
| | Example: |
| | >>> generate_ulid() |
| | '00000000-0001-0100-0000-000000000001' |
| | >>> generate_ulid('example_seed') |
| | '00000000-0001-0100-0000-000000000002' |
| | """ |
| | if seed is None: |
| | ulid = ULID() |
| | else: |
| | ulid = ULID(seed) |
| |
|
| | return ulid.generate() |
| |
|
| | def create_uuid_from_string(val: str) -> str: |
| | """ |
| | This function takes a string as input and generates a UUID (Universally Unique Identifier) |
| | using the input string as the basis for the hash. |
| | |
| | Parameters: |
| | val (str): The input string from which the UUID will be generated. |
| | |
| | Returns: |
| | str: A string representing the generated UUID. |
| | |
| | Example: |
| | >>> create_uuid_from_string('example_string') |
| | '00000000-0001-0100-0000-000000000001' |
| | """ |
| | hex_string = hashlib.md5(val.encode("UTF-8")).hexdigest() |
| | return str(uuid.UUID(hex=hex_string)) |
| |
|
| |
|
| | def to_snake_case(s): |
| | """ |
| | This function takes a string as input and converts it to snake_case format. |
| | |
| | Parameters: |
| | s (str): The input string to be converted to snake_case. |
| | |
| | Returns: |
| | str: A string in snake_case format. |
| | |
| | Example: |
| | >>> to_snake_case('FirstName') |
| | 'first_name' |
| | """ |
| | if not s: |
| | return '' |
| | |
| | |
| | if s.isupper(): |
| | return s |
| | |
| | return '_'.join( |
| | word.lower() for word in re.findall(r'[A-Z]{2,}(?=[A-Z][a-z]+[0-9]*|\b)|[A-Z]?[a-z]+[0-9]*|[A-Z]|[0-9]+', s) |
| | ) |
| |
|
| | def convert_to_snakecase(original_data): |
| | """ |
| | This function takes a dictionary or list of dictionaries as input and converts its keys to snake_case format. |
| | If the input is a list of dictionaries, it will recursively convert each nested dictionary. |
| | |
| | Parameters: |
| | original_data (dict or list): The input dictionary or list of dictionaries. |
| | |
| | Returns: |
| | dict: A new dictionary with keys in snake_case format. |
| | |
| | Example: |
| | >>> convert_to_snakecase({'FirstName': 'John', 'LastName': 'Doe'}) |
| | {'first_name': 'John', 'last_name': 'Doe'} |
| | >>> convert_to_snakecase([{'FirstName': 'Jane', 'LastName': 'Smith'}, {'FirstName': 'Bob', 'LastName': 'Johnson'}]) |
| | [{'first_name': 'Jane', 'last_name': 'Smith'}, {'first_name': 'Bob', 'last_name': 'Johnson'}] |
| | """ |
| | if isinstance(original_data, dict): |
| | transformed_dict = {} |
| | for k, v in original_data.items(): |
| | new_key = to_snake_case(k) |
| | if isinstance(v, (dict, list)): |
| | transformed_dict[new_key] = convert_to_snakecase(v) |
| | else: |
| | transformed_dict[new_key] = v |
| | return transformed_dict |
| | elif isinstance(original_data, list): |
| | return [convert_to_snakecase(item) for item in original_data] |
| | else: |
| | raise TypeError("Input must be a dictionary or a list of dictionaries.") |
| | |
| | import json |
| |
|
| | def store_json_data(data, output_file): |
| | try: |
| | |
| | os.makedirs(os.path.dirname(output_file), exist_ok=True) |
| |
|
| | |
| | with open(output_file, 'w') as file: |
| | json.dump(data, file, indent=4) |
| | |
| | print(f"Data successfully stored in {output_file}") |
| | except Exception as e: |
| | print(f"An error occurred: {e}") |
| |
|
| | def get_headers() -> Dict[str, str]: |
| | """ |
| | Returns the headers for the requests including the API key. |
| | """ |
| | return { |
| | '0x-api-key': f"{os.getenv('OX_API_KEY')}" |
| | } |
| |
|
| | def get_private_key() -> str: |
| | """ |
| | Returns a private key. |
| | """ |
| | return os.getenv('WALLET_PRIVATE_KEY') |
| |
|