import json from datetime import datetime from pathlib import Path from threading import Lock from typing import Any, Dict, Optional DATA_FILE = Path(__file__).with_name("mock_bank_accounts.json") PENDING_FILE = Path(__file__).with_name("pending_transfers.json") _LOCK = Lock() def _read_json(path: Path, default: Any) -> Any: if not path.exists(): return default with path.open("r", encoding="utf-8") as file: return json.load(file) def _write_json(path: Path, data: Any) -> None: with path.open("w", encoding="utf-8") as file: json.dump(data, file, ensure_ascii=True, indent=2) def _load_bank_data() -> Dict[str, Any]: return _read_json(DATA_FILE, {"accounts": [], "transactions": []}) def _save_bank_data(data: Dict[str, Any]) -> None: _write_json(DATA_FILE, data) def _load_pending_transfers() -> Dict[str, Any]: return _read_json(PENDING_FILE, {}) def _save_pending_transfers(data: Dict[str, Any]) -> None: _write_json(PENDING_FILE, data) def _find_account_by_telegram_id(accounts: list[Dict[str, Any]], telegram_id: int) -> Optional[Dict[str, Any]]: return next((account for account in accounts if account.get("telegram_id") == telegram_id), None) def _find_account_by_serial_id(accounts: list[Dict[str, Any]], serial_id: str) -> Optional[Dict[str, Any]]: normalized_id = str(serial_id).strip() return next((account for account in accounts if str(account.get("serial_id")) == normalized_id), None) def get_sender_account(telegram_id: int, user_name: str = None) -> Dict[str, Any]: with _LOCK: data = _load_bank_data() sender = _find_account_by_telegram_id(data["accounts"], telegram_id) if sender: is_real = sender.get("is_real", True) is_illusion = not is_real or sender.get("name", "").startswith("Test User") return { "success": True, "account": sender, "is_real": is_real, "is_illusion": is_illusion, } # Ask for user name before creating illusion account if not user_name: return { "success": False, "need_user_name": True, "message": "I need to create an illusion account for testing purposes. What is your name?", } # Create illusion account for testing purposes new_account = { "telegram_id": telegram_id, "account_id": f"ACC-{telegram_id}", "serial_id": str(9000 + telegram_id % 1000), "name": f"Test User {user_name}", "balance": 2000.0, "currency": "YER", "is_real": False } data["accounts"].append(new_account) _save_bank_data(data) return { "success": True, "account": new_account, "is_real": False, "is_illusion": True, "message": f"I've created an illusion account for {user_name} with 2000 YER balance to try sending money. This is for testing purposes only." } def get_account_balance(telegram_id: int) -> Dict[str, Any]: sender_result = get_sender_account(telegram_id) if not sender_result["success"]: return sender_result account = sender_result["account"] result = { "success": True, "telegram_id": telegram_id, "account_id": account["account_id"], "serial_id": account["serial_id"], "name": account["name"], "balance": account.get("balance", 0.0), "currency": account.get("currency", "YER"), "is_real": sender_result.get("is_real", True), "is_illusion": sender_result.get("is_illusion", False), } if result["is_illusion"]: result["message"] = "This is an illusion account created for testing purposes with 2000 YER balance." return result def get_receiver_account_name(receiver_serial_id: str) -> Dict[str, Any]: with _LOCK: data = _load_bank_data() receiver = _find_account_by_serial_id(data["accounts"], receiver_serial_id) if not receiver: return { "success": False, "message": f"No account was found for ID {receiver_serial_id}.", } return { "success": True, "serial_id": receiver["serial_id"], "name": receiver["name"], "account_id": receiver["account_id"], "currency": receiver.get("currency", "YER"), } def prepare_transfer(telegram_id: int, receiver_serial_id: str, amount: Optional[float] = None) -> Dict[str, Any]: with _LOCK: data = _load_bank_data() sender = _find_account_by_telegram_id(data["accounts"], telegram_id) if not sender: return { "success": False, "message": "You do not have an account in the bank system. Please visit the bank to create an account first.", } receiver = _find_account_by_serial_id(data["accounts"], receiver_serial_id) if not receiver: return { "success": False, "message": f"No account was found for ID {receiver_serial_id}.", } if receiver["serial_id"] == sender["serial_id"]: return { "success": False, "message": "You cannot transfer money to the same account.", } pending_transfers = _load_pending_transfers() pending_payload = { "telegram_id": telegram_id, "sender_name": sender["name"], "sender_serial_id": sender["serial_id"], "receiver_name": receiver["name"], "receiver_serial_id": receiver["serial_id"], "receiver_account_id": receiver["account_id"], "amount": amount, "currency": sender.get("currency", "YER"), "created_at": datetime.utcnow().isoformat(), } pending_transfers[str(telegram_id)] = pending_payload _save_pending_transfers(pending_transfers) return { "success": True, "pending_transfer": pending_payload, "message": f"Receiver found: {receiver['name']}. Waiting for user confirmation.", } def get_pending_transfer(telegram_id: int) -> Dict[str, Any]: with _LOCK: pending_transfers = _load_pending_transfers() pending_transfer = pending_transfers.get(str(telegram_id)) if not pending_transfer: return { "success": False, "message": "No pending transfer was found for this Telegram user.", } return { "success": True, "pending_transfer": pending_transfer, } def confirm_transfer(telegram_id: int) -> Dict[str, Any]: with _LOCK: data = _load_bank_data() pending_transfers = _load_pending_transfers() pending_transfer = pending_transfers.get(str(telegram_id)) if not pending_transfer: return { "success": False, "message": "There is no pending transfer to confirm.", } sender = _find_account_by_telegram_id(data["accounts"], telegram_id) receiver = _find_account_by_serial_id(data["accounts"], pending_transfer["receiver_serial_id"]) if not sender or not receiver: return { "success": False, "message": "The sender or receiver account could not be found.", } amount = pending_transfer.get("amount") if amount is None: return { "success": False, "message": "The transfer amount is missing. Ask the user for the amount before confirming.", } try: amount_value = float(amount) except (TypeError, ValueError): return { "success": False, "message": "The transfer amount is invalid.", } if amount_value <= 0: return { "success": False, "message": "The transfer amount must be greater than zero.", } if float(sender.get("balance", 0.0)) < amount_value: return { "success": False, "message": f"Insufficient balance. Available balance is {sender.get('balance', 0.0):.2f} {sender.get('currency', 'YER')}.", } sender["balance"] = round(float(sender["balance"]) - amount_value, 2) receiver["balance"] = round(float(receiver.get("balance", 0.0)) + amount_value, 2) transaction = { "transaction_id": f"TX-{datetime.utcnow().strftime('%Y%m%d%H%M%S%f')}", "telegram_id": telegram_id, "sender_serial_id": sender["serial_id"], "receiver_serial_id": receiver["serial_id"], "receiver_name": receiver["name"], "amount": amount_value, "currency": sender.get("currency", "YER"), "created_at": datetime.utcnow().isoformat(), } data.setdefault("transactions", []).append(transaction) _save_bank_data(data) pending_transfers.pop(str(telegram_id), None) _save_pending_transfers(pending_transfers) # Check if sender is an illusion account using is_real variable is_real = sender.get("is_real", True) is_illusion = not is_real message = f"Transfer completed successfully to {receiver['name']}." if is_illusion: message += " ⚠️ Please note: This process was not real - it was for testing purposes only. No actual money was transferred." return { "success": True, "transaction": transaction, "sender_balance": sender["balance"], "is_real": is_real, "is_illusion": is_illusion, "message": message, } def cancel_transfer(telegram_id: int) -> Dict[str, Any]: with _LOCK: pending_transfers = _load_pending_transfers() removed = pending_transfers.pop(str(telegram_id), None) _save_pending_transfers(pending_transfers) if not removed: return { "success": False, "message": "There is no pending transfer to cancel.", } return { "success": True, "message": "The pending transfer has been canceled.", }