Spaces:
Sleeping
Sleeping
| import json | |
| from datetime import datetime | |
| from pathlib import Path | |
| from threading import Lock | |
| from typing import Any, Dict, Optional | |
| DATA_FILE = Path(__file__).with_name("mock_bank_accounts.json") | |
| PENDING_FILE = Path(__file__).with_name("pending_transfers.json") | |
| _LOCK = Lock() | |
| def _read_json(path: Path, default: Any) -> Any: | |
| if not path.exists(): | |
| return default | |
| with path.open("r", encoding="utf-8") as file: | |
| return json.load(file) | |
| def _write_json(path: Path, data: Any) -> None: | |
| with path.open("w", encoding="utf-8") as file: | |
| json.dump(data, file, ensure_ascii=True, indent=2) | |
| def _load_bank_data() -> Dict[str, Any]: | |
| return _read_json(DATA_FILE, {"accounts": [], "transactions": []}) | |
| def _save_bank_data(data: Dict[str, Any]) -> None: | |
| _write_json(DATA_FILE, data) | |
| def _load_pending_transfers() -> Dict[str, Any]: | |
| return _read_json(PENDING_FILE, {}) | |
| def _save_pending_transfers(data: Dict[str, Any]) -> None: | |
| _write_json(PENDING_FILE, data) | |
| def _find_account_by_telegram_id(accounts: list[Dict[str, Any]], telegram_id: int) -> Optional[Dict[str, Any]]: | |
| return next((account for account in accounts if account.get("telegram_id") == telegram_id), None) | |
| def _find_account_by_serial_id(accounts: list[Dict[str, Any]], serial_id: str) -> Optional[Dict[str, Any]]: | |
| normalized_id = str(serial_id).strip() | |
| return next((account for account in accounts if str(account.get("serial_id")) == normalized_id), None) | |
| def get_sender_account(telegram_id: int, user_name: str = None) -> Dict[str, Any]: | |
| with _LOCK: | |
| data = _load_bank_data() | |
| sender = _find_account_by_telegram_id(data["accounts"], telegram_id) | |
| if sender: | |
| is_real = sender.get("is_real", True) | |
| is_illusion = not is_real or sender.get("name", "").startswith("Test User") | |
| return { | |
| "success": True, | |
| "account": sender, | |
| "is_real": is_real, | |
| "is_illusion": is_illusion, | |
| } | |
| # Ask for user name before creating illusion account | |
| if not user_name: | |
| return { | |
| "success": False, | |
| "need_user_name": True, | |
| "message": "I need to create an illusion account for testing purposes. What is your name?", | |
| } | |
| # Create illusion account for testing purposes | |
| new_account = { | |
| "telegram_id": telegram_id, | |
| "account_id": f"ACC-{telegram_id}", | |
| "serial_id": str(9000 + telegram_id % 1000), | |
| "name": f"Test User {user_name}", | |
| "balance": 2000.0, | |
| "currency": "YER", | |
| "is_real": False | |
| } | |
| data["accounts"].append(new_account) | |
| _save_bank_data(data) | |
| return { | |
| "success": True, | |
| "account": new_account, | |
| "is_real": False, | |
| "is_illusion": True, | |
| "message": f"I've created an illusion account for {user_name} with 2000 YER balance to try sending money. This is for testing purposes only." | |
| } | |
| def get_account_balance(telegram_id: int) -> Dict[str, Any]: | |
| sender_result = get_sender_account(telegram_id) | |
| if not sender_result["success"]: | |
| return sender_result | |
| account = sender_result["account"] | |
| result = { | |
| "success": True, | |
| "telegram_id": telegram_id, | |
| "account_id": account["account_id"], | |
| "serial_id": account["serial_id"], | |
| "name": account["name"], | |
| "balance": account.get("balance", 0.0), | |
| "currency": account.get("currency", "YER"), | |
| "is_real": sender_result.get("is_real", True), | |
| "is_illusion": sender_result.get("is_illusion", False), | |
| } | |
| if result["is_illusion"]: | |
| result["message"] = "This is an illusion account created for testing purposes with 2000 YER balance." | |
| return result | |
| def get_receiver_account_name(receiver_serial_id: str) -> Dict[str, Any]: | |
| with _LOCK: | |
| data = _load_bank_data() | |
| receiver = _find_account_by_serial_id(data["accounts"], receiver_serial_id) | |
| if not receiver: | |
| return { | |
| "success": False, | |
| "message": f"No account was found for ID {receiver_serial_id}.", | |
| } | |
| return { | |
| "success": True, | |
| "serial_id": receiver["serial_id"], | |
| "name": receiver["name"], | |
| "account_id": receiver["account_id"], | |
| "currency": receiver.get("currency", "YER"), | |
| } | |
| def prepare_transfer(telegram_id: int, receiver_serial_id: str, amount: Optional[float] = None) -> Dict[str, Any]: | |
| with _LOCK: | |
| data = _load_bank_data() | |
| sender = _find_account_by_telegram_id(data["accounts"], telegram_id) | |
| if not sender: | |
| return { | |
| "success": False, | |
| "message": "You do not have an account in the bank system. Please visit the bank to create an account first.", | |
| } | |
| receiver = _find_account_by_serial_id(data["accounts"], receiver_serial_id) | |
| if not receiver: | |
| return { | |
| "success": False, | |
| "message": f"No account was found for ID {receiver_serial_id}.", | |
| } | |
| if receiver["serial_id"] == sender["serial_id"]: | |
| return { | |
| "success": False, | |
| "message": "You cannot transfer money to the same account.", | |
| } | |
| pending_transfers = _load_pending_transfers() | |
| pending_payload = { | |
| "telegram_id": telegram_id, | |
| "sender_name": sender["name"], | |
| "sender_serial_id": sender["serial_id"], | |
| "receiver_name": receiver["name"], | |
| "receiver_serial_id": receiver["serial_id"], | |
| "receiver_account_id": receiver["account_id"], | |
| "amount": amount, | |
| "currency": sender.get("currency", "YER"), | |
| "created_at": datetime.utcnow().isoformat(), | |
| } | |
| pending_transfers[str(telegram_id)] = pending_payload | |
| _save_pending_transfers(pending_transfers) | |
| return { | |
| "success": True, | |
| "pending_transfer": pending_payload, | |
| "message": f"Receiver found: {receiver['name']}. Waiting for user confirmation.", | |
| } | |
| def get_pending_transfer(telegram_id: int) -> Dict[str, Any]: | |
| with _LOCK: | |
| pending_transfers = _load_pending_transfers() | |
| pending_transfer = pending_transfers.get(str(telegram_id)) | |
| if not pending_transfer: | |
| return { | |
| "success": False, | |
| "message": "No pending transfer was found for this Telegram user.", | |
| } | |
| return { | |
| "success": True, | |
| "pending_transfer": pending_transfer, | |
| } | |
| def confirm_transfer(telegram_id: int) -> Dict[str, Any]: | |
| with _LOCK: | |
| data = _load_bank_data() | |
| pending_transfers = _load_pending_transfers() | |
| pending_transfer = pending_transfers.get(str(telegram_id)) | |
| if not pending_transfer: | |
| return { | |
| "success": False, | |
| "message": "There is no pending transfer to confirm.", | |
| } | |
| sender = _find_account_by_telegram_id(data["accounts"], telegram_id) | |
| receiver = _find_account_by_serial_id(data["accounts"], pending_transfer["receiver_serial_id"]) | |
| if not sender or not receiver: | |
| return { | |
| "success": False, | |
| "message": "The sender or receiver account could not be found.", | |
| } | |
| amount = pending_transfer.get("amount") | |
| if amount is None: | |
| return { | |
| "success": False, | |
| "message": "The transfer amount is missing. Ask the user for the amount before confirming.", | |
| } | |
| try: | |
| amount_value = float(amount) | |
| except (TypeError, ValueError): | |
| return { | |
| "success": False, | |
| "message": "The transfer amount is invalid.", | |
| } | |
| if amount_value <= 0: | |
| return { | |
| "success": False, | |
| "message": "The transfer amount must be greater than zero.", | |
| } | |
| if float(sender.get("balance", 0.0)) < amount_value: | |
| return { | |
| "success": False, | |
| "message": f"Insufficient balance. Available balance is {sender.get('balance', 0.0):.2f} {sender.get('currency', 'YER')}.", | |
| } | |
| sender["balance"] = round(float(sender["balance"]) - amount_value, 2) | |
| receiver["balance"] = round(float(receiver.get("balance", 0.0)) + amount_value, 2) | |
| transaction = { | |
| "transaction_id": f"TX-{datetime.utcnow().strftime('%Y%m%d%H%M%S%f')}", | |
| "telegram_id": telegram_id, | |
| "sender_serial_id": sender["serial_id"], | |
| "receiver_serial_id": receiver["serial_id"], | |
| "receiver_name": receiver["name"], | |
| "amount": amount_value, | |
| "currency": sender.get("currency", "YER"), | |
| "created_at": datetime.utcnow().isoformat(), | |
| } | |
| data.setdefault("transactions", []).append(transaction) | |
| _save_bank_data(data) | |
| pending_transfers.pop(str(telegram_id), None) | |
| _save_pending_transfers(pending_transfers) | |
| # Check if sender is an illusion account using is_real variable | |
| is_real = sender.get("is_real", True) | |
| is_illusion = not is_real | |
| message = f"Transfer completed successfully to {receiver['name']}." | |
| if is_illusion: | |
| message += " ⚠️ Please note: This process was not real - it was for testing purposes only. No actual money was transferred." | |
| return { | |
| "success": True, | |
| "transaction": transaction, | |
| "sender_balance": sender["balance"], | |
| "is_real": is_real, | |
| "is_illusion": is_illusion, | |
| "message": message, | |
| } | |
| def cancel_transfer(telegram_id: int) -> Dict[str, Any]: | |
| with _LOCK: | |
| pending_transfers = _load_pending_transfers() | |
| removed = pending_transfers.pop(str(telegram_id), None) | |
| _save_pending_transfers(pending_transfers) | |
| if not removed: | |
| return { | |
| "success": False, | |
| "message": "There is no pending transfer to cancel.", | |
| } | |
| return { | |
| "success": True, | |
| "message": "The pending transfer has been canceled.", | |
| } | |