wall / app.py
Shveiauto's picture
Update app.py
577baed verified
from flask import Flask, render_template_string, request, redirect, url_for, jsonify, flash, send_from_directory
import json
import os
import logging
import threading
import time
from datetime import datetime
from huggingface_hub import HfApi, hf_hub_download
from huggingface_hub.utils import RepositoryNotFoundError, HfHubHTTPError
from werkzeug.utils import secure_filename
from dotenv import load_dotenv
import uuid
import hmac
import hashlib
import urllib.parse
import requests
load_dotenv()
app = Flask(name)
app.secret_key = os.getenv("FLASK_SECRET_KEY", 'telegram_wall_secret_key_for_flash_messages_only')
--- CONFIGURATION ---
DATA_FILE = 'wall_data.json'
UPLOAD_FOLDER = 'uploads'
SYNC_FILES = [DATA_FILE, UPLOAD_FOLDER] # Will be treated as a list of files/folders
MAX_CONTENT_LENGTH = 16 * 1024 * 1024 # 16 MB limit for file uploads
ALLOWED_EXTENSIONS = {'png', 'jpg', 'jpeg', 'gif', 'mp4', 'mov', 'avi', 'pdf', 'doc', 'docx', 'txt'}
if not os.path.exists(UPLOAD_FOLDER):
os.makedirs(UPLOAD_FOLDER)
app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER
app.config['MAX_CONTENT_LENGTH'] = MAX_CONTENT_LENGTH
REPO_ID = os.getenv("HF_REPO_ID", "Kgshop/telegram-wall-app")
HF_TOKEN_WRITE = os.getenv("HF_TOKEN_WRITE")
HF_TOKEN_READ = os.getenv("HF_TOKEN_READ")
NOTE: The provided token is not a real Telegram Bot Token format.
A real token looks like: <BOT_ID>:<RANDOM_STRING>
TELEGRAM_BOT_TOKEN = os.getenv("TELEGRAM_BOT_TOKEN", "7549355625:AAGYWatM-nUVQirgBiBwoAtWZgzfp3QnQjY")
DOWNLOAD_RETRIES = 3
DOWNLOAD_DELAY = 5
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
--- UTILITIES ---
def allowed_file(filename):
return '.' in filename and
filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS
def download_db_from_hf(specific_file=None, retries=DOWNLOAD_RETRIES, delay=DOWNLOAD_DELAY):
token_to_use = HF_TOKEN_READ if HF_TOKEN_READ else HF_TOKEN_WRITE
files_to_download = [specific_file] if specific_file else SYNC_FILES
all_successful = True
for file_name in files_to_download:
if file_name == UPLOAD_FOLDER:
logging.info("Skipping UPLOAD_FOLDER direct download as it's typically large. Requires git-lfs/separate process.")
continue
code
Code
download
content_copy
expand_less
success = False
for attempt in range(retries + 1):
try:
logging.info(f"Downloading {file_name} (Attempt {attempt + 1}/{retries + 1})...")
local_path = hf_hub_download(
repo_id=REPO_ID, filename=file_name, repo_type="dataset",
token=token_to_use, local_dir=".", local_dir_use_symlinks=False,
force_download=True, resume_download=False
)
logging.info(f"Successfully downloaded {file_name} to {local_path}.")
success = True
break
except RepositoryNotFoundError:
logging.error(f"Repository {REPO_ID} not found. Download cancelled for all files.")
return False
except HfHubHTTPError as e:
if e.response.status_code == 404:
logging.warning(f"File {file_name} not found in repo {REPO_ID} (404). Skipping this file.")
if attempt == 0 and not os.path.exists(file_name):
try:
if file_name == DATA_FILE:
default_data = {'posts': [], 'users': {}}
with open(file_name, 'w', encoding='utf-8') as f:
json.dump(default_data, f)
logging.info(f"Created empty local file {file_name} because it was not found on HF.")
except Exception as create_e:
logging.error(f"Failed to create empty local file {file_name}: {create_e}")
success = True
break
else:
logging.error(f"HTTP error downloading {file_name} (Attempt {attempt + 1}): {e}. Retrying in {delay}s...")
except Exception as e:
logging.error(f"Unexpected error downloading {file_name} (Attempt {attempt + 1}): {e}. Retrying in {delay}s...", exc_info=True)
if attempt < retries: time.sleep(delay)
if not success:
logging.error(f"Failed to download {file_name} after {retries + 1} attempts.")
all_successful = False
return all_successful
def upload_db_to_hf(specific_file=None):
if not HF_TOKEN_WRITE:
logging.warning("HF_TOKEN_WRITE not set. Skipping upload to Hugging Face.")
return
try:
api = HfApi()
files_to_upload = [specific_file] if specific_file and specific_file != UPLOAD_FOLDER else []
code
Code
download
content_copy
expand_less
if not specific_file:
files_to_upload = [DATA_FILE]
# Add files from UPLOAD_FOLDER
for root, _, files in os.walk(UPLOAD_FOLDER):
for file_name in files:
local_path = os.path.join(root, file_name)
path_in_repo = local_path
files_to_upload.append((local_path, path_in_repo))
logging.info(f"Starting upload of {len(files_to_upload)} files/paths to HF repo {REPO_ID}...")
for item in files_to_upload:
local_path = item if isinstance(item, tuple) else item
path_in_repo = item if isinstance(item, tuple) else item
if os.path.exists(local_path):
try:
api.upload_file(
path_or_fileobj=local_path, path_in_repo=path_in_repo, repo_id=REPO_ID,
repo_type="dataset", token=HF_TOKEN_WRITE,
commit_message=f"Sync {path_in_repo} {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"
)
logging.info(f"File {path_in_repo} successfully uploaded to Hugging Face.")
except Exception as e:
logging.error(f"Error uploading file {path_in_repo} to Hugging Face: {e}")
else:
logging.warning(f"File {local_path} not found locally, skipping upload.")
logging.info("Finished uploading files to HF.")
except Exception as e:
logging.error(f"General error during Hugging Face upload initialization or process: {e}", exc_info=True)
def periodic_backup():
backup_interval = 1800
logging.info(f"Setting up periodic backup every {backup_interval} seconds.")
while True:
time.sleep(backup_interval)
logging.info("Starting periodic backup...")
upload_db_to_hf()
logging.info("Periodic backup finished.")
def load_data():
default_data = {'posts': [], 'users': {}}
try:
with open(DATA_FILE, 'r', encoding='utf-8') as file:
data = json.load(file)
logging.info(f"Local data loaded successfully from {DATA_FILE}")
if not isinstance(data, dict): raise FileNotFoundError
for key in default_data:
if key not in data: data[key] = default_data[key]
return data
except (FileNotFoundError, json.JSONDecodeError) as e:
logging.warning(f"Error loading local data ({e}). Attempting download from HF.")
code
Code
download
content_copy
expand_less
if download_db_from_hf(specific_file=DATA_FILE):
try:
with open(DATA_FILE, 'r', encoding='utf-8') as file:
data = json.load(file)
logging.info(f"Data loaded successfully from {DATA_FILE} after download.")
if not isinstance(data, dict): return default_data
for key in default_data:
if key not in data: data[key] = default_data[key]
return data
except Exception as load_e:
logging.error(f"Error loading downloaded {DATA_FILE}: {load_e}. Using default.", exc_info=True)
return default_data
else:
logging.error(f"Failed to download {DATA_FILE} from HF. Using empty default data structure.")
if not os.path.exists(DATA_FILE):
try:
with open(DATA_FILE, 'w', encoding='utf-8') as f: json.dump(default_data, f)
logging.info(f"Created empty local file {DATA_FILE} after failed download.")
except Exception as create_e:
logging.error(f"Failed to create empty local file {DATA_FILE}: {create_e}")
return default_data
def save_data(data):
try:
if not isinstance(data, dict):
logging.error("Attempted to save invalid data structure (not a dict). Aborting save.")
return
default_keys = {'posts': [], 'users': {}}
for key in default_keys:
if key not in data: data[key] = default_keys[key]
code
Code
download
content_copy
expand_less
with open(DATA_FILE, 'w', encoding='utf-8') as file:
json.dump(data, file, ensure_ascii=False, indent=4)
logging.info(f"Data successfully saved to {DATA_FILE}")
threading.Thread(target=upload_db_to_hf, args=(DATA_FILE,), daemon=True).start()
except Exception as e:
logging.error(f"Error saving data to {DATA_FILE}: {e}", exc_info=True)
def verify_telegram_auth_data(auth_data_str, bot_token):
if not auth_data_str:
return False, None
code
Code
download
content_copy
expand_less
params = dict(urllib.parse.parse_qsl(auth_data_str))
if 'hash' not in params:
return False, None
received_hash = params.pop('hash')
sorted_params = sorted(params.items())
data_check_string_parts = []
for key, value in sorted_params:
data_check_string_parts.append(f"{key}={value}")
data_check_string = "\n".join(data_check_string_parts)
secret_key = hmac.new("WebAppData".encode(), bot_token.encode(), hashlib.sha256).digest()
calculated_hash = hmac.new(secret_key, data_check_string.encode(), hashlib.sha256).hexdigest()
if calculated_hash == received_hash:
try:
user_data = json.loads(params.get('user', '{}'))
# Get chat_id if available (often same as user_id in private chats)
chat_id = params.get('chat_id', user_data.get('id'))
user_data['chat_id'] = chat_id
return True, user_data
except json.JSONDecodeError:
return False, None
return False, None
def get_authenticated_user_details(request_headers):
auth_data_str = request_headers.get('X-Telegram-Auth')
if not auth_data_str:
return None
is_valid, user_data_from_auth = verify_telegram_auth_data(auth_data_str, TELEGRAM_BOT_TOKEN)
if is_valid and user_data_from_auth:
data = load_data()
user_id_str = str(user_data_from_auth.get('id'))
return data.get('users', {}).get(user_id_str)
return None
def send_telegram_notification(chat_id, message):
if not TELEGRAM_BOT_TOKEN:
logging.warning("TELEGRAM_BOT_TOKEN not set. Cannot send notification.")
return
code
Code
download
content_copy
expand_less
url = f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/sendMessage"
payload = {
'chat_id': chat_id,
'text': message,
'parse_mode': 'HTML'
}
try:
response = requests.post(url, data=payload, timeout=5)
response.raise_for_status()
logging.info(f"Notification sent to {chat_id}.")
return response.json()
except requests.exceptions.RequestException as e:
logging.error(f"Error sending Telegram notification to {chat_id}: {e}")
return None
--- API ROUTES ---
@app.route('/api/auth_user', methods=['POST'])
def auth_user():
auth_data_str = request.headers.get('X-Telegram-Auth')
if not auth_data_str:
init_data_payload = request.json.get('init_data')
if init_data_payload:
auth_data_str = init_data_payload
else:
return jsonify({"error": "Authentication data not provided"}), 401
code
Code
download
content_copy
expand_less
is_valid, user_data_from_auth = verify_telegram_auth_data(auth_data_str, TELEGRAM_BOT_TOKEN)
if not is_valid or not user_data_from_auth:
return jsonify({"error": "Invalid authentication data"}), 403
data = load_data()
users = data.get('users', {})
user_id_str = str(user_data_from_auth.get('id'))
# Update/Create user record
user_record = users.get(user_id_str, {})
# Store chat_id (assuming it is available or fallback to user_id)
# Note: In a real-world bot, the chat_id should be captured during the /start command.
user_record.update({
'id': user_data_from_auth.get('id'),
'chat_id': user_data_from_auth.get('chat_id') or user_data_from_auth.get('id'), # Critical for notifications
'first_name': user_data_from_auth.get('first_name'),
'last_name': user_data_from_auth.get('last_name'),
'username': user_data_from_auth.get('username'),
'language_code': user_data_from_auth.get('language_code'),
'photo_url': user_data_from_auth.get('photo_url'),
'last_seen': datetime.now().isoformat()
})
if 'first_seen' not in user_record:
user_record['first_seen'] = datetime.now().isoformat()
users[user_id_str] = user_record
data['users'] = users
save_data(data)
return jsonify({"message": "User authenticated", "user": users[user_id_str]}), 200
@app.route('/api/users', methods=['GET'])
def get_users():
# Only authenticated users can view the list
if not get_authenticated_user_details(request.headers):
return jsonify({"error": "Authentication required"}), 401
code
Code
download
content_copy
expand_less
data = load_data()
user_list = list(data.get('users', {}).values())
# Return limited user info for the list
safe_user_list = [{
'id': user['id'],
'first_name': user['first_name'],
'last_name': user['last_name'],
'username': user.get('username'),
'photo_url': user.get('photo_url')
} for user in user_list]
return jsonify(sorted(safe_user_list, key=lambda x: x.get('first_name', 'z'))), 200
@app.route('/api/wall/<user_id>', methods=['GET'])
def get_user_wall(user_id):
if not get_authenticated_user_details(request.headers):
return jsonify({"error": "Authentication required"}), 401
code
Code
download
content_copy
expand_less
data = load_data()
all_posts = data.get('posts', [])
# Filter posts:
# 1. Posts made by the user_id on their own wall (target_user_id is None or same as user_id)
# 2. Posts made by *others* targeting this user_id's wall
wall_posts = [
post for post in all_posts
if (str(post.get('user_id')) == str(user_id) and not post.get('target_user_id')) or
(str(post.get('target_user_id')) == str(user_id))
]
# Enrich posts with user info (poster and target)
users = data.get('users', {})
enriched_posts = []
for post in sorted(wall_posts, key=lambda x: x.get('timestamp', ''), reverse=True):
poster = users.get(str(post['user_id']), {})
target = users.get(str(post.get('target_user_id')), {}) if post.get('target_user_id') else None
enriched_post = post.copy()
enriched_post['poster_name'] = f"{poster.get('first_name', 'Unknown')} {poster.get('last_name', '')}".strip()
enriched_post['poster_username'] = poster.get('username', None)
if target:
enriched_post['target_name'] = f"{target.get('first_name', 'Unknown')} {target.get('last_name', '')}".strip()
enriched_post['target_username'] = target.get('username', None)
enriched_posts.append(enriched_post)
return jsonify(enriched_posts), 200
@app.route('/api/post/<target_user_id>', methods=['POST'])
def create_post(target_user_id):
user = get_authenticated_user_details(request.headers)
if not user:
return jsonify({"error": "Authentication required or user not found in DB"}), 401
code
Code
download
content_copy
expand_less
if str(target_user_id) == 'me':
target_user_id = str(user['id'])
data = load_data()
users = data.get('users', {})
target_user = users.get(str(target_user_id))
if not target_user:
return jsonify({"error": "Target user not found"}), 404
text_content = request.form.get('content', '').strip()
file = request.files.get('file')
if not text_content and not file:
return jsonify({"error": "Post must contain text or a file"}), 400
new_post = {
"id": str(uuid.uuid4()),
"user_id": str(user['id']),
"target_user_id": str(target_user_id) if str(target_user_id) != str(user['id']) else None, # Null for own wall post
"timestamp": datetime.now().isoformat(),
"content": text_content,
"type": "text",
"file_path": None
}
# Handle file upload
if file and file.filename:
if not allowed_file(file.filename):
return jsonify({"error": "File type not allowed"}), 400
extension = file.filename.rsplit('.', 1)[[1](https://www.google.com/url?sa=E&q=https%3A%2F%2Fvertexaisearch.cloud.google.com%2Fgrounding-api-redirect%2FAUZIYQF6S04rq1C0Z_rbzF1NSAxsil9bBG4L3nuHOSOAbHHtJiwnE2LxVsPOVpiPhBXRK6XaybxBsn0UZ9Mn1KLhTGONkEjmCPX1AD7mT0SoQ15oTUhmR7n6PGa73aBGIEQ67iFmSMDvPPA3aXv6RLq5SesfTK1HYQ3z5Q%3D%3D)].lower()
filename = secure_filename(f"{new_post['id']}.{extension}")
file_path = os.path.join(app.config['UPLOAD_FOLDER'], filename)
try:
file.save(file_path)
new_post['file_path'] = filename
if extension in ['png', 'jpg', 'jpeg', 'gif']: new_post['type'] = 'photo'
elif extension in ['mp4', 'mov', 'avi']: new_post['type'] = 'video'
else: new_post['type'] = 'document'
# Asynchronously upload file to HF
threading.Thread(target=upload_db_to_hf, args=((file_path, os.path.join(UPLOAD_FOLDER, filename)),), daemon=True).start()
except Exception as e:
logging.error(f"File save/upload failed: {e}")
return jsonify({"error": f"Failed to save file: {e}"}), 500
data['posts'].append(new_post)
save_data(data)
# --- Notification Logic ---
if str(target_user_id) != str(user['id']):
# Post to another user's wall, send notification to the target user
poster_name = user.get('first_name', 'Someone')
target_chat_id = target_user.get('chat_id')
if target_chat_id:
message = f"📢 New post on your wall!\n\n<b>{poster_name}</b> posted: {text_content[:100]}..."
threading.Thread(target=send_telegram_notification, args=(target_chat_id, message), daemon=True).start()
return jsonify(new_post), 201
@app.route('/api/post/<post_id>', methods=['DELETE'])
def delete_post(post_id):
user = get_authenticated_user_details(request.headers)
if not user: return jsonify({"error": "Authentication required or user not found in DB"}), 401
code
Code
download
content_copy
expand_less
data = load_data()
posts_list = data.get('posts', [])
original_length = len(posts_list)
item_to_delete = next((i for i in posts_list if i['id'] == post_id), None)
if not item_to_delete: return jsonify({"error": "Post not found"}), 404
# Allow deleting if the user is the poster OR the user is the wall owner (if target_user_id is set)
is_poster = str(item_to_delete.get('user_id')) == str(user.get('id'))
is_wall_owner = str(item_to_delete.get('target_user_id')) == str(user.get('id'))
if not is_poster and not is_wall_owner:
return jsonify({"error": "Forbidden: You can only delete your own posts or posts on your wall"}), 403
# Delete the associated file if it exists
file_path = item_to_delete.get('file_path')
if file_path:
full_path = os.path.join(app.config['UPLOAD_FOLDER'], file_path)
if os.path.exists(full_path):
os.remove(full_path)
logging.info(f"Deleted file: {full_path}")
# Note: File deletion on HF requires complex logic (git delete commit) - skipping for this example.
data['posts'] = [i for i in posts_list if i['id'] != post_id]
if len(data['posts']) < original_length:
save_data(data)
return jsonify({"message": "Post deleted successfully"}), 200
return jsonify({"error": "Post not found or deletion failed"}), 404
@app.route('/uploads/<filename>', methods=['GET'])
def uploaded_file(filename):
return send_from_directory(app.config['UPLOAD_FOLDER'], filename)
--- WEBAPP TEMPLATES & MAIN VIEW ---
MAIN_APP_TEMPLATE = '''
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0, user-scalable=no, viewport-fit=cover">
<title>TonWall</title>
<script src="https://telegram.org/js/telegram-web-app.js"></script>
<link rel="stylesheet" href="https://cdnjs.cloudflare.com/ajax/libs/font-awesome/6.5.2/css/all.min.css">
<style>
:root {
--tg-theme-bg-color: #ffffff;
--tg-theme-text-color: #000000;
--tg-theme-hint-color: #999999;
--tg-theme-link-color: #007aff;
--tg-theme-button-color: #007aff;
--tg-theme-button-text-color: #ffffff;
--tg-theme-secondary-bg-color: #f0f0f0;
--tg-theme-header-bg-color: #efeff4;
--tg-theme-section-bg-color: #ffffff;
--tg-theme-destructive-text-color: #ff3b30;
}
body {
font-family: -apple-system, BlinkMacSystemFont, "Segoe UI", "Roboto", "Helvetica Neue", Arial, sans-serif;
margin: 0;
padding: 0;
background-color: var(--tg-theme-bg-color);
color: var(--tg-theme-text-color);
overscroll-behavior-y: none;
-webkit-font-smoothing: antialiased;
min-height: 100vh;
display: flex;
flex-direction: column;
}
.header {
background-color: var(--tg-theme-header-bg-color);
padding: 12px 15px;
text-align: center;
font-weight: 600;
font-size: 17px;
border-bottom: 0.5px solid var(--tg-theme-secondary-bg-color);
position: sticky;
top: 0;
z-index: 100;
}
.content { flex-grow: 1; padding: 10px 0; overflow-x: hidden; transition: opacity 0.2s ease-out; }
.footer-nav {
display: flex;
justify-content: space-around;
background-color: var(--tg-theme-header-bg-color);
border-top: 0.5px solid var(--tg-theme-hint-color);
position: sticky;
bottom: 0;
z-index: 100;
padding-top: 5px;
padding-bottom: env(safe-area-inset-bottom);
}
.nav-button {
display: flex;
flex-direction: column;
align-items: center;
padding: 5px 0 8px 0;
flex-grow: 1;
cursor: pointer;
background: none;
border: none;
color: var(--tg-theme-hint-color);
font-size: 11px;
font-weight: 500;
-webkit-tap-highlight-color: transparent;
transition: color 0.2s ease;
}
.nav-button.active { color: var(--tg-theme-link-color); }
.nav-button i { font-size: 20px; margin-bottom: 2px; }
code
Code
download
content_copy
expand_less
/* Wall Styles */
.post-list { display: flex; flex-direction: column; gap: 10px; padding: 0 15px; }
.post-card {
background-color: var(--tg-theme-section-bg-color);
padding: 15px;
border-radius: 10px;
box-shadow: 0 2px 8px rgba(0,0,0,0.06);
word-wrap: break-word;
}
.post-header { display: flex; align-items: center; margin-bottom: 10px; }
.post-header img { width: 40px; height: 40px; border-radius: 50%; margin-right: 10px; object-fit: cover; background-color: var(--tg-theme-secondary-bg-color); }
.post-info { flex-grow: 1; }
.post-info h4 { margin: 0; font-size: 15px; font-weight: 600; color: var(--tg-theme-text-color); }
.post-info span { font-size: 12px; color: var(--tg-theme-hint-color); }
.post-content p { margin: 5px 0; font-size: 15px; color: var(--tg-theme-text-color); white-space: pre-wrap; }
.post-content a { color: var(--tg-theme-link-color); text-decoration: none; }
.post-content a:hover { text-decoration: underline; }
.post-content .media-container { max-width: 100%; border-radius: 8px; overflow: hidden; margin-top: 10px; }
.post-content img, .post-content video { width: 100%; height: auto; display: block; }
/* User List Styles */
.user-list { padding: 10px 15px; display: flex; flex-direction: column; gap: 1px; }
.user-list-item {
display: flex;
align-items: center;
padding: 12px 15px;
background-color: var(--tg-theme-section-bg-color);
border-radius: 8px;
margin-bottom: 8px;
box-shadow: 0 1px 4px rgba(0,0,0,0.05);
cursor: pointer;
transition: background-color 0.1s ease;
}
.user-list-item:active { background-color: var(--tg-theme-secondary-bg-color); }
.user-list-item img { width: 45px; height: 45px; border-radius: 50%; margin-right: 15px; object-fit: cover; background-color: var(--tg-theme-secondary-bg-color); }
.user-list-item span { font-size: 16px; font-weight: 500; color: var(--tg-theme-text-color); }
/* Form Styles */
.form-container { padding: 20px 15px; }
.form-group { margin-bottom: 18px; }
.form-group label { display: block; font-size: 14px; color: var(--tg-theme-hint-color); margin-bottom: 6px; font-weight: 500; }
.form-group textarea {
width: 100%;
padding: 12px;
border: 1px solid var(--tg-theme-secondary-bg-color);
border-radius: 8px;
font-size: 16px;
background-color: var(--tg-theme-bg-color);
color: var(--tg-theme-text-color);
box-sizing: border-box;
transition: border-color 0.2s ease;
min-height: 100px;
resize: vertical;
}
.form-group input[type="file"] { border: none; padding: 5px 0; }
.form-group textarea:focus { border-color: var(--tg-theme-link-color); outline: none; }
.error-message { color: var(--tg-theme-destructive-text-color); font-size: 14px; margin-top: 10px; text-align: center; }
.loading, .empty-state { text-align: center; padding: 50px 15px; color: var(--tg-theme-hint-color); font-size: 16px; }
.delete-button {
margin-top: 10px;
padding: 6px 12px;
background-color: var(--tg-theme-destructive-text-color);
color: var(--tg-theme-button-text-color);
border: none;
border-radius: 6px;
cursor: pointer;
font-size: 13px;
transition: opacity 0.1s;
}
.delete-button:active { opacity: 0.8; }
/* Media utility classes */
.media-image { max-height: 400px; object-fit: cover; }
.media-video { max-height: 400px; }
.media-document {
background-color: var(--tg-theme-secondary-bg-color);
padding: 10px;
border-radius: 6px;
font-size: 14px;
display: flex;
align-items: center;
}
.media-document i { margin-right: 8px; font-size: 18px; color: var(--tg-theme-link-color); }
.media-document span { font-weight: 500; }
</style>
</head>
<body>
<div class="app-container">
<div class="header" id="appHeader"></div>
<div class="content" id="mainContent">
<div class="loading">Loading...</div>
</div>
<div class="footer-nav" id="footerNav">
<button class="nav-button active" data-view="my_wall" id="navMyWall">
<i class="fas fa-home"></i>
<span id="labelMyWall">My Wall</span>
</button>
<button class="nav-button" data-view="users" id="navUsers">
<i class="fas fa-users"></i>
<span id="labelUsers">Users</span>
</button>
<button class="nav-button" data-view="new_post" id="navNewPost">
<i class="fas fa-plus-circle"></i>
<span id="labelNewPost">New Post</span>
</button>
</div>
</div>
code
Code
download
content_copy
expand_less
<script>
const tg = window.Telegram.WebApp;
let currentUser = null;
let currentView = 'my_wall';
let currentTargetUserId = null;
const mainContent = document.getElementById('mainContent');
const footerNav = document.getElementById('footerNav');
const appHeader = document.getElementById('appHeader');
const langCode = tg.initDataUnsafe.user?.language_code;
const isRussian = langCode && (langCode.startsWith('ru') || langCode.startsWith('uk') || langCode.startsWith('be'));
const T = {
'My Wall': isRussian ? 'Моя Стена' : 'My Wall',
'Users': isRussian ? 'Пользователи' : 'Users',
'New Post': isRussian ? 'Новая Запись' : 'New Post',
'Post on Wall': isRussian ? 'Опубликовать на Стену' : 'Post on Wall',
'Users List': isRussian ? 'Список Пользователей' : 'Users List',
'Wall of': isRussian ? 'Стена' : 'Wall of',
'Post Text': isRussian ? 'Текст записи...' : 'Post Text...',
'Attachment (optional)': isRussian ? 'Вложение (необязательно)' : 'Attachment (optional)',
'Post': isRussian ? 'Опубликовать' : 'Post',
'No posts found.': isRussian ? 'Записей не найдено.' : 'No posts found.',
'No users found.': isRussian ? 'Пользователей не найдено.' : 'No users found.',
'Loading...': isRussian ? 'Загрузка...' : 'Loading...',
'Posted by': isRussian ? 'Опубликовано' : 'Posted by',
'on': isRussian ? 'на' : 'on',
'View Wall': isRussian ? 'Посмотреть Стену' : 'View Wall',
'Delete Post?': isRussian ? 'Удалить эту запись?' : 'Delete this post?',
'Post deleted.': isRussian ? 'Запись удалена.' : 'Post deleted.',
'Post failed.': isRussian ? 'Ошибка публикации.' : 'Post failed.',
'Error loading.': isRussian ? 'Ошибка загрузки.' : 'Error loading.',
'File': isRussian ? 'Файл' : 'File',
'Document': isRussian ? 'Документ' : 'Document',
'Video': isRussian ? 'Видео' : 'Video',
'Photo': isRussian ? 'Фото' : 'Photo',
'Post must contain text or a file': isRussian ? 'Запись должна содержать текст или файл' : 'Post must contain text or a file',
'Delete': isRussian ? 'Удалить' : 'Delete',
'to your wall': isRussian ? 'на вашу стену' : 'to your wall',
'on the wall of': isRussian ? 'на стене' : 'on the wall of'
};
function translateUI() {
document.getElementById('appHeader').textContent = 'TonWall';
document.getElementById('labelMyWall').textContent = T['My Wall'];
document.getElementById('labelUsers').textContent = T['Users'];
document.getElementById('labelNewPost').textContent = T['New Post'];
}
function applyThemeParams() {
const rootStyle = document.documentElement.style;
// Use fallback colors based on Telegram WebApp documentation
rootStyle.setProperty('--tg-theme-bg-color', tg.themeParams.bg_color || '#ffffff');
rootStyle.setProperty('--tg-theme-text-color', tg.themeParams.text_color || '#000000');
rootStyle.setProperty('--tg-theme-hint-color', tg.themeParams.hint_color || '#999999');
rootStyle.setProperty('--tg-theme-link-color', tg.themeParams.link_color || '#007aff');
rootStyle.setProperty('--tg-theme-button-color', tg.themeParams.button_color || '#007aff');
rootStyle.setProperty('--tg-theme-button-text-color', tg.themeParams.button_button_text_color || '#ffffff');
rootStyle.setProperty('--tg-theme-secondary-bg-color', tg.themeParams.secondary_bg_color || '#f0f0f0');
rootStyle.setProperty('--tg-theme-header-bg-color', tg.themeParams.header_bg_color || tg.themeParams.secondary_bg_color || '#efeff4');
rootStyle.setProperty('--tg-theme-section-bg-color', tg.themeParams.section_bg_color || tg.themeParams.bg_color || '#ffffff');
rootStyle.setProperty('--tg-theme-destructive-text-color', tg.themeParams.destructive_text_color || '#ff3b30');
tg.setBackgroundColor(tg.themeParams.bg_color || '#ffffff');
}
async function apiCall(endpoint, method = 'GET', body = null, isFormData = false) {
const headers = {};
if (tg.initData) {
headers['X-Telegram-Auth'] = tg.initData;
}
const options = { method, headers };
if (isFormData) {
// Flask handles multipart/form-data without setting Content-Type in JS headers
delete options.headers['Content-Type'];
options.body = body;
} else if (body) {
headers['Content-Type'] = 'application/json';
options.body = JSON.stringify(body);
}
try {
const response = await fetch(endpoint, options);
if (response.status === 401 || response.status === 403) {
tg.showAlert("Authentication failed. Please restart the Mini App from the bot.");
throw new Error("Authentication failed");
}
if (!response.ok) {
const errorData = await response.json().catch(() => ({ error: `HTTP error ${response.status}` }));
throw new Error(errorData.error || `HTTP error ${response.status}`);
}
if (response.status === 204 || response.headers.get('content-length') === '0') return {};
return response.json();
} catch (error) {
console.error('API Call Error:', error);
tg.showAlert(error.message || T['Error loading.']);
throw error;
}
}
function getUserAvatarUrl(user) {
return user.photo_url || '';
}
function getFullUserName(user) {
let name = `${user.first_name || ''} ${user.last_name || ''}`.trim();
if (!name) name = user.username || `User ${user.id}`;
return name;
}
function renderPostMedia(post) {
if (!post.file_path) return '';
const fileUrl = `/uploads/${post.file_path}`;
if (post.type === 'photo') {
return `<div class="media-container"><img src="${fileUrl}" class="media-image" alt="${T['Photo']}"></div>`;
} else if (post.type === 'video') {
return `<div class="media-container"><video controls src="${fileUrl}" class="media-video"></video></div>`;
} else if (post.type === 'document') {
return `
<a href="${fileUrl}" target="_blank" rel="noopener noreferrer" class="media-document">
<i class="fas fa-file"></i>
<span>${T['Document']} (${post.file_path})</span>
</a>
`;
}
return '';
}
function renderPostList(posts, wallOwnerId) {
mainContent.style.opacity = 0;
tg.BackButton.hide();
tg.MainButton.hide();
if (!posts || posts.length === 0) {
mainContent.innerHTML = `<div class="empty-state">${T['No posts found.']}</div>`;
} else {
mainContent.innerHTML = `<div class="post-list">${posts.map(post => {
const poster = { id: post.user_id, first_name: post.poster_name, username: post.poster_username, photo_url: post.photo_url };
const isCurrentUserWall = String(wallOwnerId) === String(currentUser.id);
const isPoster = String(post.user_id) === String(currentUser.id);
const isWallOwner = isCurrentUserWall || (post.target_user_id && String(post.target_user_id) === String(currentUser.id));
const postTarget = post.target_user_id
? `${T['on the wall of']} <a href="#" onclick="loadView('user_wall', '${post.target_user_id}')">${post.target_name}</a>`
: `${T['to your wall']}`;
let infoLine;
if (isCurrentUserWall && !post.target_user_id) { // My Wall main view
infoLine = `<span>${new Date(post.timestamp).toLocaleDateString()} ${new Date(post.timestamp).toLocaleTimeString([], {hour: '2-digit', minute:'2-digit'})}</span>`;
} else {
infoLine = `<span>${T['Posted by']} <a href="#" onclick="loadView('user_wall', '${post.user_id}')">@${post.poster_username || post.poster_name}</a> ${postTarget}</span>`;
}
return `
<div class="post-card" id="post-${post.id}">
<div class="post-header">
<img src="${getUserAvatarUrl(poster)}" alt="Avatar">
<div class="post-info">
<h4>${post.poster_name}</h4>
<span>@${post.poster_username || 'anonymous'}</span>
</div>
</div>
<div class="post-content">
${post.content ? `<p>${post.content.replace(/\\n/g, '<br>')}</p>` : ''}
${renderPostMedia(post)}
</div>
<div style="font-size: 12px; color: var(--tg-theme-hint-color); margin-top: 10px;">
${infoLine}
</div>
${(isPoster || isWallOwner) ? `<button class="delete-button" onclick="handleDeletePost('${post.id}')">${T['Delete']}</button>` : ''}
</div>
`;
}).join('')}</div>`;
}
setTimeout(() => { mainContent.style.opacity = 1; }, 50);
}
function renderUsersList(users) {
mainContent.style.opacity = 0;
tg.BackButton.hide();
tg.MainButton.hide();
if (!users || users.length === 0) {
mainContent.innerHTML = `<div class="empty-state">${T['No users found.']}</div>`;
} else {
mainContent.innerHTML = `<div class="user-list">${users.map(user => {
const fullName = getFullUserName(user);
return `
<div class="user-list-item" onclick="loadView('user_wall', '${user.id}')">
<img src="${getUserAvatarUrl(user)}" alt="Avatar">
<span>${fullName} ${user.username ? `(@${user.username})` : ''}</span>
<div style="margin-left: auto; color: var(--tg-theme-link-color); font-size: 14px; font-weight: 500;">${T['View Wall']}</div>
</div>
`;
}).join('')}</div>`;
}
setTimeout(() => { mainContent.style.opacity = 1; }, 50);
}
function renderNewPostForm(targetUser) {
mainContent.style.opacity = 0;
tg.MainButton.hide();
const targetName = targetUser ? getFullUserName(targetUser) : getFullUserName(currentUser);
const isTargetingSelf = !targetUser || String(targetUser.id) === String(currentUser.id);
const targetHeader = isTargetingSelf
? `${T['New Post']} (${T['My Wall']})`
: `${T['Post on Wall']} ${targetName}`;
appHeader.textContent = targetHeader;
let formHtml = `<div class="form-container">
<form id="postForm" enctype="multipart/form-data">
<div class="form-group">
<label for="postContent">${T['Post Text']}</label>
<textarea id="postContent" name="content" placeholder="${T['Post Text']}..."></textarea>
</div>
<div class="form-group">
<label for="postFile">${T['Attachment (optional)']}</label>
<input type="file" id="postFile" name="file" accept="image/*,video/*,.pdf,.doc,.docx,.txt">
</div>
<div id="formError" class="error-message"></div>
</form>
</div>`;
mainContent.innerHTML = formHtml;
setTimeout(() => { mainContent.style.opacity = 1; }, 50);
tg.MainButton.setText(T['Post']);
tg.MainButton.show();
tg.MainButton.onClick(() => handleSubmitPost(targetUser ? targetUser.id : currentUser.id));
}
function handleSubmitPost(targetUserId) {
const content = document.getElementById('postContent').value.trim();
const fileInput = document.getElementById('postFile');
const file = fileInput.files;
const formError = document.getElementById('formError');
formError.textContent = '';
if (!content && !file) {
formError.textContent = T['Post must contain text or a file'];
tg.HapticFeedback.notificationOccurred('error');
return;
}
const formData = new FormData();
formData.append('content', content);
if (file) {
formData.append('file', file);
}
tg.MainButton.showProgress();
tg.HapticFeedback.impactOccurred('light');
apiCall(`/api/post/${targetUserId}`, 'POST', formData, true)
.then(response => {
tg.HapticFeedback.notificationOccurred('success');
tg.MainButton.hideProgress();
tg.MainButton.hide();
if (String(targetUserId) === String(currentUser.id)) {
loadView('my_wall');
} else {
loadView('user_wall', targetUserId);
}
})
.catch(err => {
tg.HapticFeedback.notificationOccurred('error');
tg.MainButton.hideProgress();
formError.textContent = err.message || T['Post failed.'];
});
}
function handleDeletePost(postId) {
tg.showConfirm(T['Delete Post?'], (confirmed) => {
if (confirmed) {
tg.HapticFeedback.impactOccurred('medium');
apiCall(`/api/post/${postId}`, 'DELETE')
.then(() => {
tg.HapticFeedback.notificationOccurred('success');
tg.showAlert(T['Post deleted.']);
// Reload current view
if (currentView === 'my_wall') loadView('my_wall');
else if (currentView === 'user_wall') loadView('user_wall', currentTargetUserId);
})
.catch(err => {
tg.HapticFeedback.notificationOccurred('error');
tg.showAlert(err.message || T['Error loading.']);
});
} else {
tg.HapticFeedback.impactOccurred('light');
}
});
}
async function loadView(viewName, userId = null) {
if (currentView === viewName && viewName !== 'new_post' && viewName !== 'user_wall') return;
tg.HapticFeedback.impactOccurred('light');
currentView = viewName;
currentTargetUserId = userId;
document.querySelectorAll('.nav-button').forEach(btn => btn.classList.remove('active'));
if (viewName !== 'user_wall' && viewName !== 'new_post') {
document.querySelector(`.nav-button[data-view="${viewName}"]`).classList.add('active');
}
mainContent.innerHTML = `<div class="loading">${T['Loading...']}</div>`;
tg.BackButton.hide();
tg.MainButton.hide();
if (viewName === 'my_wall') {
appHeader.textContent = T['My Wall'];
try {
const posts = await apiCall(`/api/wall/${currentUser.id}`);
renderPostList(posts, currentUser.id);
} catch (e) {
mainContent.innerHTML = `<div class="empty-state">${T['Error loading.']}</div>`;
}
} else if (viewName === 'users') {
appHeader.textContent = T['Users List'];
try {
const users = await apiCall('/api/users');
const filteredUsers = users.filter(u => String(u.id) !== String(currentUser.id));
renderUsersList(filteredUsers);
} catch (e) {
mainContent.innerHTML = `<div class="empty-state">${T['Error loading.']}</div>`;
}
} else if (viewName === 'user_wall' && userId) {
tg.BackButton.show();
tg.BackButton.onClick(() => loadView('users'));
try {
const users = await apiCall('/api/users');
const targetUser = users.find(u => String(u.id) === String(userId));
if (!targetUser) throw new Error("User not found");
appHeader.textContent = `${T['Wall of']} ${getFullUserName(targetUser)}`;
const posts = await apiCall(`/api/wall/${userId}`);
renderPostList(posts, userId);
// Add a button to post to this user's wall
tg.MainButton.setText(T['Post on Wall']);
tg.MainButton.show();
tg.MainButton.onClick(() => loadView('new_post', userId));
} catch (e) {
mainContent.innerHTML = `<div class="empty-state">${T['Error loading.']}</div>`;
}
} else if (viewName === 'new_post' && userId) {
tg.BackButton.show();
tg.BackButton.onClick(() => {
if (String(userId) === String(currentUser.id)) loadView('my_wall');
else loadView('user_wall', userId);
});
try {
const users = await apiCall('/api/users');
const targetUser = users.find(u => String(u.id) === String(userId));
if (!targetUser) throw new Error("User not found");
renderNewPostForm(targetUser);
} catch (e) {
mainContent.innerHTML = `<div class="empty-state">${T['Error loading.']}</div>`;
}
} else if (viewName === 'new_post') {
tg.BackButton.show();
tg.BackButton.onClick(() => loadView('my_wall'));
renderNewPostForm(currentUser);
}
}
async function init() {
tg.ready();
applyThemeParams();
tg.expand();
tg.enableClosingConfirmation();
translateUI();
tg.onEvent('themeChanged', applyThemeParams);
try {
const authResponse = await apiCall('/api/auth_user', 'POST', { init_data: tg.initData });
currentUser = authResponse.user;
if (!currentUser) throw new Error("Auth failed: No user object");
// Set initial view to My Wall for the authenticated user
loadView('my_wall');
} catch (error) {
console.error("Auth error:", error);
mainContent.innerHTML = `<div class="empty-state">Authentication failed. Please launch from the Telegram bot.</div>`;
}
document.querySelectorAll('.nav-button').forEach(button => {
button.addEventListener('click', () => {
const view = button.dataset.view;
if (view === 'new_post') {
loadView('new_post', currentUser.id); // Default to posting on own wall
} else {
loadView(view);
}
});
});
}
init();
</script>
</body>
</html>
'''
@app.route('/')
def main_app_view():
return render_template_string(MAIN_APP_TEMPLATE)
--- BOOTSTRAP ---
if name == 'main':
logging.info("Application starting up. Performing initial data load/download...")
download_db_from_hf()
load_data()
logging.info("Initial data load complete.")
code
Code
download
content_copy
expand_less
if HF_TOKEN_WRITE:
backup_thread = threading.Thread(target=periodic_backup, daemon=True)
backup_thread.start()
logging.info("Periodic backup thread started.")
else:
logging.warning("Periodic backup will NOT run (HF_TOKEN_WRITE not set).")
port = int(os.environ.get('PORT', 7860))
logging.info(f"Starting Flask app on host 0.0.0.0 and port {port}")
app.run(debug=False, host='0.0.0.0', port=port)