"""
# Helper functions for templates
def get_username_by_id(user_id):
return users_data['users'].get(user_id, {}).get('username', 'Неизвестно')
def get_status_text(status):
texts = {
'signed_by_creator': 'Создан, ожидает',
'signed_by_recipient': 'Подписан',
'rejected': 'Отклонен'
}
return texts.get(status, 'Неизвестно')
def get_status_class(status):
classes = {
'signed_by_creator': 'status-signed_by_creator',
'signed_by_recipient': 'status-signed_by_recipient',
'rejected': 'status-rejected'
}
return classes.get(status, 'status-unknown')
def format_expiration_datetime(iso_string):
if not iso_string:
return None
try:
dt_obj = datetime.fromisoformat(iso_string.replace('Z', '+00:00')) # Handle Z timezone format
return dt_obj.strftime('%Y-%m-%d %H:%M UTC')
except ValueError:
return iso_string # Return original if formatting fails
@app.route('/')
def index():
return render_template_string(MINI_APP_TEMPLATE)
@app.route('/admin')
def admin_panel():
return render_template_string(ADMIN_TEMPLATE, chain=blockchain.chain, users_data=users_data['users'])
@app.route('/login', methods=['POST'])
def login():
data = request.json
user_info = validate_telegram_data(data.get('initData'), BOT_TOKEN)
if not user_info:
return jsonify({"error": "Неверная аутентификация Telegram"}), 403
user_id = str(user_info['id'])
username = user_info.get('username')
first_name = user_info.get('first_name', '')
last_name = user_info.get('last_name', '')
if user_id not in users_data['users']:
logging.info(f"Новый пользователь: ID {user_id}, @{username or 'N/A'}")
users_data['users'][user_id] = {
"username": username,
"first_name": first_name,
"last_name": last_name,
"created_contracts": [],
"received_contracts": []
}
if username:
users_data['usernames'][username.lower()] = user_id
save_all_data()
upload_db_to_hf()
else:
current_user_data = users_data['users'][user_id]
# Update username/name if changed
if current_user_data.get('username') != username:
old_username = current_user_data.get('username')
if old_username and old_username.lower() in users_data['usernames']:
del users_data['usernames'][old_username.lower()]
current_user_data['username'] = username
if username: users_data['usernames'][username.lower()] = user_id
# Note: save_all_data/upload happens after updating both lists below
if current_user_data.get('first_name') != first_name:
current_user_data['first_name'] = first_name
if current_user_data.get('last_name') != last_name:
current_user_data['last_name'] = last_name
user = users_data['users'][user_id]
# For the UI, we need partner username and update status from chain if necessary
def enhance_contracts_list(contracts_list):
enhanced_list = []
for contract_summary in contracts_list:
partner_id = contract_summary.get('partner_id')
partner_username = users_data['users'].get(partner_id, {}).get('username', 'Неизвестно')
# Check chain for latest status
contract_details, actions = blockchain.get_contract_details_from_chain(contract_summary['id'])
current_status = 'signed_by_creator' # Default after creation
for action in actions:
if action['type'] in ['sign', 'reject'] and action['user_id'] == contract_summary['partner_id']:
if action['type'] == 'sign':
current_status = 'signed_by_recipient'
elif action['type'] == 'reject':
current_status = 'rejected'
# Update status in users_data if it differs from chain
if contract_summary.get('status') != current_status:
contract_summary['status'] = current_status
# Mark for saving
enhanced_list.append({**contract_summary, 'partner_username': partner_username})
return enhanced_list
# Update user data structure in memory with latest statuses from chain
user['created_contracts'] = enhance_contracts_list(user.get('created_contracts', []))
user['received_contracts'] = enhance_contracts_list(user.get('received_contracts', []))
# Save updated statuses and user info (like username change)
save_all_data()
upload_db_to_hf()
return jsonify({
"user_id": user_id,
"username": user.get('username'),
"first_name": user.get('first_name'),
"last_name": user.get('last_name'),
"created_contracts": user['created_contracts'],
"received_contracts": user['received_contracts']
})
@app.route('/contracts/new', methods=['POST'])
def new_contract():
data = request.json
user_info = validate_telegram_data(data.get('initData'), BOT_TOKEN)
if not user_info: return jsonify({"error": "Неверная аутентификация"}), 403
creator_id = str(user_info['id'])
title = data.get('title', '').strip()
subject = data.get('subject', '').strip()
recipient_username = data.get('recipient_username', '').strip().lstrip('@')
expiration_datetime_str = data.get('expiration_datetime')
if not all([title, subject, recipient_username]):
return jsonify({"error": "Все обязательные поля (название, предмет, получатель) должны быть заполнены"}), 400
if '@' in recipient_username:
recipient_username = recipient_username.lstrip('@')
recipient_id = users_data['usernames'].get(recipient_username.lower())
if not recipient_id or recipient_id not in users_data['users']:
return jsonify({"error": f"Пользователь с username @{recipient_username} не найден"}), 404
if creator_id == recipient_id:
return jsonify({"error": "Нельзя создать договор самому себе"}), 400
# Validate and format expiration date
expiration_iso = None
if expiration_datetime_str:
try:
# Assume datetime-local format yyyy-MM-ddThh:mm
# Convert to UTC and ISO format
local_dt = datetime.fromisoformat(expiration_datetime_str)
# Assuming the input is local time, convert to UTC
# Note: This is a simplification. A real app should handle timezones properly.
# For now, we'll just store it as provided or simple ISO.
# Let's just store the string provided by datetime-local input
expiration_iso = expiration_datetime_str # Store as is for simplicity or convert if needed
except ValueError:
return jsonify({"error": "Неверный формат даты/времени срока действия"}), 400
# Basic check against current time
try:
# Create a datetime object from the input string for comparison
input_dt = datetime.fromisoformat(expiration_datetime_str)
# Get current time in a timezone-aware format, or naive if comparison is naive
# Comparing naive datetimes:
if input_dt < datetime.now():
return jsonify({"error": "Срок действия не может быть в прошлом"}), 400
except ValueError:
pass # Already caught invalid format above
contract_id = str(uuid.uuid4())
# 1. Record creation action in blockchain pending list
blockchain.add_contract_action(
contract_id=contract_id,
user_id=creator_id, # Creator performs the 'create' action
action_type='create',
details={
'title': title,
'subject': subject,
'recipient_id': recipient_id,
'creator_id': creator_id,
'expiration_datetime': expiration_iso # Store expiration date/time
}
)
# 2. Update users_data with contract summary
creator_user_data = users_data['users'][creator_id]
recipient_user_data = users_data['users'][recipient_id]
# Initial status is signed by creator, pending recipient
contract_summary_for_creator = {
'id': contract_id,
'title': title,
'status': 'signed_by_creator', # Creator automatically signs on creation
'partner_id': recipient_id
# Expiration is not needed in summary, retrieved on view
}
contract_summary_for_recipient = {
'id': contract_id,
'title': title,
'status': 'signed_by_creator', # Status is pending recipient signature
'partner_id': creator_id
}
creator_user_data['created_contracts'].append(contract_summary_for_creator)
recipient_user_data['received_contracts'].append(contract_summary_for_recipient)
# 3. Mine block, save and upload
create_block_from_pending_actions()
return jsonify({"message": "Договор успешно создан и ожидает подписи получателя", "contract_id": contract_id}), 201
@app.route('/contracts//', methods=['POST'])
def contract_action(contract_id, action_type):
data = request.json
user_info = validate_telegram_data(data.get('initData'), BOT_TOKEN)
if not user_info: return jsonify({"error": "Неверная аутентификация"}), 403
user_id = str(user_info['id'])
valid_actions = ['sign', 'reject']
if action_type not in valid_actions:
return jsonify({"error": "Неверное действие"}), 400
user_data = users_data['users'].get(user_id)
if not user_data:
return jsonify({"error": "Пользователь не найден"}), 404
# Find the contract in user's received contracts list and check status
is_recipient = False
contract_summary_index = -1
for i, c in enumerate(user_data.get('received_contracts', [])):
if c.get('id') == contract_id:
is_recipient = True
contract_summary_index = i
break
if not is_recipient:
# Could be the creator trying to sign/reject their own contract? Prevent this.
# Or maybe the contract simply doesn't exist or isn't for this user.
return jsonify({"error": "Действие невозможно для этого договора или вы не являетесь его получателем"}), 403
contract_summary = user_data['received_contracts'][contract_summary_index]
# Check if the contract is in a state where this action is allowed (must be pending recipient signature)
if contract_summary.get('status') != 'signed_by_creator':
return jsonify({"error": f"Договор уже имеет статус '{get_status_text(contract_summary.get('status'))}'"}), 400
# Record action in blockchain pending list
blockchain.add_contract_action(
contract_id=contract_id,
user_id=user_id, # Recipient performs the action
action_type=action_type,
details={} # No extra details needed for sign/reject
)
# Update contract status in users_data for both parties
new_status = 'signed_by_recipient' if action_type == 'sign' else 'rejected'
creator_id = contract_summary.get('partner_id') # Partner for recipient is the creator
# Update recipient's list
user_data['received_contracts'][contract_summary_index]['status'] = new_status
# Update creator's list
if creator_id and creator_id in users_data['users']:
creator_user_data = users_data['users'][creator_id]
for i, c in enumerate(creator_user_data.get('created_contracts', [])):
if c.get('id') == contract_id:
creator_user_data['created_contracts'][i]['status'] = new_status
break
# Mine block, save and upload
create_block_from_pending_actions()
action_message = "подписан" if action_type == 'sign' else "отклонен"
return jsonify({"message": f"Договор успешно {action_message}"}), 200
@app.route('/contracts//view', methods=['GET'])
def view_contract(contract_id):
init_data = request.args.get('initData')
user_info = validate_telegram_data(init_data, BOT_TOKEN)
# Allow viewing even without initData if the contract is public?
# For this implementation, we assume users only view contracts they are a part of.
# So authentication is required.
if not user_info:
return "Неверная аутентификация", 403
user_id = str(user_info['id'])
# Check if the user is a party to this contract (creator or recipient)
user_is_party = False
if user_id in users_data['users']:
user_contracts = users_data['users'][user_id].get('created_contracts', []) + users_data['users'][user_id].get('received_contracts', [])
for contract_summary in user_contracts:
if contract_summary.get('id') == contract_id:
user_is_party = True
break
if not user_is_party:
return "Договор не найден или у вас нет доступа", 404
# Get contract details and actions from the blockchain
contract_details, actions = blockchain.get_contract_details_from_chain(contract_id)
if not contract_details:
return "Договор не найден в блокчейне", 404
creator_id = contract_details.get('creator_id')
recipient_id = contract_details.get('recipient_id')
creator_username = get_username_by_id(creator_id)
recipient_username = get_username_by_id(recipient_id)
# Determine current status from actions
current_status = 'signed_by_creator' # Initial status
for action in actions:
if action['type'] in ['sign', 'reject'] and action['user_id'] == recipient_id:
if action['type'] == 'sign':
current_status = 'signed_by_recipient'
elif action['type'] == 'reject':
current_status = 'rejected'
break # Once signed or rejected by recipient, status is final
expiration_datetime = contract_details.get('expiration_datetime')
expiration_display = format_expiration_datetime(expiration_datetime)
return render_template_string(
VIEW_CONTRACT_TEMPLATE,
contract=contract_details,
creator_username=creator_username,
recipient_username=recipient_username,
actions=actions,
current_status=current_status,
users_data=users_data['users'], # Pass users_data for username lookup in template
get_status_text=get_status_text, # Pass helper functions
get_status_class=get_status_class,
expiration_display=expiration_display
)
if __name__ == '__main__':
logging.info("Application starting up.")
if HF_TOKEN_WRITE:
threading.Thread(target=periodic_backup, daemon=True).start()
logging.info("Periodic backup thread started.")
else:
logging.warning("Periodic backup will NOT run (HF_TOKEN for writing not set).")
port = int(os.environ.get('PORT', 7860))
logging.info(f"Starting Flask app on host 0.0.0.0 and port {port}")
app.run(host='0.0.0.0', port=port)