|
|
import os |
|
|
from typing import List, Tuple |
|
|
|
|
|
from dotenv import load_dotenv |
|
|
|
|
|
from app.core.config import ALPHABET, ENCODING_UTF8 |
|
|
|
|
|
|
|
|
ENV_KEY_SHIFT = 5 |
|
|
|
|
|
|
|
|
class ShiftCipher: |
|
|
@staticmethod |
|
|
def _shift_char(ch: str, shift: int) -> str: |
|
|
index = ALPHABET.find(ch) |
|
|
if index == -1: |
|
|
raise ValueError(f"Character '{ch}' is not in the configured alphabet") |
|
|
return ALPHABET[(index + shift) % len(ALPHABET)] |
|
|
|
|
|
@staticmethod |
|
|
def apply(data: bytes, shift: int) -> bytes: |
|
|
text = data.decode(ENCODING_UTF8) |
|
|
shifted = "".join(ShiftCipher._shift_char(ch, shift) for ch in text) |
|
|
return shifted.encode(ENCODING_UTF8) |
|
|
|
|
|
@staticmethod |
|
|
def transform_text(value: str, shift: int) -> str: |
|
|
return "".join(ShiftCipher._shift_char(ch, shift) for ch in value) |
|
|
|
|
|
class LZ78Codec: |
|
|
@staticmethod |
|
|
def compress(text: str) -> bytes: |
|
|
dictionary: dict[str, int] = {} |
|
|
current = "" |
|
|
output: List[Tuple[int, str]] = [] |
|
|
for ch in text: |
|
|
candidate = current + ch |
|
|
if candidate in dictionary: |
|
|
current = candidate |
|
|
continue |
|
|
index = dictionary.get(current, 0) |
|
|
output.append((index, ch)) |
|
|
dictionary[candidate] = len(dictionary) + 1 |
|
|
current = "" |
|
|
if current: |
|
|
output.append((dictionary.get(current, 0), "")) |
|
|
serialized = ";".join(f"{index}:{ord(ch) if ch else -1}" for index, ch in output) |
|
|
return serialized.encode(ENCODING_UTF8) |
|
|
|
|
|
@staticmethod |
|
|
def decompress(data: bytes) -> str: |
|
|
if not data: |
|
|
return "" |
|
|
serialized = data.decode(ENCODING_UTF8) |
|
|
if not serialized: |
|
|
return "" |
|
|
pairs: List[Tuple[int, str]] = [] |
|
|
for chunk in serialized.split(";"): |
|
|
if not chunk: |
|
|
continue |
|
|
index_part, code_part = chunk.split(":", 1) |
|
|
idx = int(index_part) |
|
|
code = int(code_part) |
|
|
ch = "" if code == -1 else chr(code) |
|
|
pairs.append((idx, ch)) |
|
|
dictionary: dict[int, str] = {0: ""} |
|
|
result_parts: List[str] = [] |
|
|
for idx, ch in pairs: |
|
|
prefix = dictionary.get(idx, "") |
|
|
entry = prefix + ch |
|
|
result_parts.append(entry) |
|
|
dictionary[len(dictionary)] = entry |
|
|
return "".join(result_parts) |
|
|
|
|
|
|
|
|
class DataCipher: |
|
|
def __init__(self, key: str) -> None: |
|
|
self.key = key |
|
|
self.shift = self._derive_shift(key) |
|
|
|
|
|
@staticmethod |
|
|
def _derive_shift(key: str) -> int: |
|
|
value = sum(ord(ch) for ch in key) % 256 |
|
|
return value if value else 1 |
|
|
|
|
|
def encrypt(self, plain: str) -> str: |
|
|
compressed = LZ78Codec.compress(plain) |
|
|
encrypted = ShiftCipher.apply(compressed, self.shift) |
|
|
return encrypted.hex() |
|
|
|
|
|
def decrypt(self, encoded: str) -> str: |
|
|
encrypted = bytes.fromhex(encoded) |
|
|
decrypted = ShiftCipher.apply(encrypted, -self.shift) |
|
|
return LZ78Codec.decompress(decrypted) |
|
|
|
|
|
|
|
|
def load_encryption_key() -> str: |
|
|
load_dotenv() |
|
|
encrypted_key = os.getenv("APP_KEY_ENC") |
|
|
if not encrypted_key: |
|
|
raise RuntimeError("APP_KEY_ENC is not set") |
|
|
return ShiftCipher.transform_text(encrypted_key, -ENV_KEY_SHIFT) |
|
|
|
|
|
|