from flask import Flask, render_template_string, request, redirect, url_for, jsonify, flash, send_from_directory import json import os import logging import threading import time from datetime import datetime from huggingface_hub import HfApi, hf_hub_download from huggingface_hub.utils import RepositoryNotFoundError, HfHubHTTPError from werkzeug.utils import secure_filename from dotenv import load_dotenv import uuid import hmac import hashlib import urllib.parse import requests load_dotenv() app = Flask(name) app.secret_key = os.getenv("FLASK_SECRET_KEY", 'telegram_wall_secret_key_for_flash_messages_only') --- CONFIGURATION --- DATA_FILE = 'wall_data.json' UPLOAD_FOLDER = 'uploads' SYNC_FILES = [DATA_FILE, UPLOAD_FOLDER] # Will be treated as a list of files/folders MAX_CONTENT_LENGTH = 16 * 1024 * 1024 # 16 MB limit for file uploads ALLOWED_EXTENSIONS = {'png', 'jpg', 'jpeg', 'gif', 'mp4', 'mov', 'avi', 'pdf', 'doc', 'docx', 'txt'} if not os.path.exists(UPLOAD_FOLDER): os.makedirs(UPLOAD_FOLDER) app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER app.config['MAX_CONTENT_LENGTH'] = MAX_CONTENT_LENGTH REPO_ID = os.getenv("HF_REPO_ID", "Kgshop/telegram-wall-app") HF_TOKEN_WRITE = os.getenv("HF_TOKEN_WRITE") HF_TOKEN_READ = os.getenv("HF_TOKEN_READ") NOTE: The provided token is not a real Telegram Bot Token format. A real token looks like: : TELEGRAM_BOT_TOKEN = os.getenv("TELEGRAM_BOT_TOKEN", "7549355625:AAGYWatM-nUVQirgBiBwoAtWZgzfp3QnQjY") DOWNLOAD_RETRIES = 3 DOWNLOAD_DELAY = 5 logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') --- UTILITIES --- def allowed_file(filename): return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS def download_db_from_hf(specific_file=None, retries=DOWNLOAD_RETRIES, delay=DOWNLOAD_DELAY): token_to_use = HF_TOKEN_READ if HF_TOKEN_READ else HF_TOKEN_WRITE files_to_download = [specific_file] if specific_file else SYNC_FILES all_successful = True for file_name in files_to_download: if file_name == UPLOAD_FOLDER: logging.info("Skipping UPLOAD_FOLDER direct download as it's typically large. Requires git-lfs/separate process.") continue code Code download content_copy expand_less success = False for attempt in range(retries + 1): try: logging.info(f"Downloading {file_name} (Attempt {attempt + 1}/{retries + 1})...") local_path = hf_hub_download( repo_id=REPO_ID, filename=file_name, repo_type="dataset", token=token_to_use, local_dir=".", local_dir_use_symlinks=False, force_download=True, resume_download=False ) logging.info(f"Successfully downloaded {file_name} to {local_path}.") success = True break except RepositoryNotFoundError: logging.error(f"Repository {REPO_ID} not found. Download cancelled for all files.") return False except HfHubHTTPError as e: if e.response.status_code == 404: logging.warning(f"File {file_name} not found in repo {REPO_ID} (404). Skipping this file.") if attempt == 0 and not os.path.exists(file_name): try: if file_name == DATA_FILE: default_data = {'posts': [], 'users': {}} with open(file_name, 'w', encoding='utf-8') as f: json.dump(default_data, f) logging.info(f"Created empty local file {file_name} because it was not found on HF.") except Exception as create_e: logging.error(f"Failed to create empty local file {file_name}: {create_e}") success = True break else: logging.error(f"HTTP error downloading {file_name} (Attempt {attempt + 1}): {e}. Retrying in {delay}s...") except Exception as e: logging.error(f"Unexpected error downloading {file_name} (Attempt {attempt + 1}): {e}. Retrying in {delay}s...", exc_info=True) if attempt < retries: time.sleep(delay) if not success: logging.error(f"Failed to download {file_name} after {retries + 1} attempts.") all_successful = False return all_successful def upload_db_to_hf(specific_file=None): if not HF_TOKEN_WRITE: logging.warning("HF_TOKEN_WRITE not set. Skipping upload to Hugging Face.") return try: api = HfApi() files_to_upload = [specific_file] if specific_file and specific_file != UPLOAD_FOLDER else [] code Code download content_copy expand_less if not specific_file: files_to_upload = [DATA_FILE] # Add files from UPLOAD_FOLDER for root, _, files in os.walk(UPLOAD_FOLDER): for file_name in files: local_path = os.path.join(root, file_name) path_in_repo = local_path files_to_upload.append((local_path, path_in_repo)) logging.info(f"Starting upload of {len(files_to_upload)} files/paths to HF repo {REPO_ID}...") for item in files_to_upload: local_path = item if isinstance(item, tuple) else item path_in_repo = item if isinstance(item, tuple) else item if os.path.exists(local_path): try: api.upload_file( path_or_fileobj=local_path, path_in_repo=path_in_repo, repo_id=REPO_ID, repo_type="dataset", token=HF_TOKEN_WRITE, commit_message=f"Sync {path_in_repo} {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}" ) logging.info(f"File {path_in_repo} successfully uploaded to Hugging Face.") except Exception as e: logging.error(f"Error uploading file {path_in_repo} to Hugging Face: {e}") else: logging.warning(f"File {local_path} not found locally, skipping upload.") logging.info("Finished uploading files to HF.") except Exception as e: logging.error(f"General error during Hugging Face upload initialization or process: {e}", exc_info=True) def periodic_backup(): backup_interval = 1800 logging.info(f"Setting up periodic backup every {backup_interval} seconds.") while True: time.sleep(backup_interval) logging.info("Starting periodic backup...") upload_db_to_hf() logging.info("Periodic backup finished.") def load_data(): default_data = {'posts': [], 'users': {}} try: with open(DATA_FILE, 'r', encoding='utf-8') as file: data = json.load(file) logging.info(f"Local data loaded successfully from {DATA_FILE}") if not isinstance(data, dict): raise FileNotFoundError for key in default_data: if key not in data: data[key] = default_data[key] return data except (FileNotFoundError, json.JSONDecodeError) as e: logging.warning(f"Error loading local data ({e}). Attempting download from HF.") code Code download content_copy expand_less if download_db_from_hf(specific_file=DATA_FILE): try: with open(DATA_FILE, 'r', encoding='utf-8') as file: data = json.load(file) logging.info(f"Data loaded successfully from {DATA_FILE} after download.") if not isinstance(data, dict): return default_data for key in default_data: if key not in data: data[key] = default_data[key] return data except Exception as load_e: logging.error(f"Error loading downloaded {DATA_FILE}: {load_e}. Using default.", exc_info=True) return default_data else: logging.error(f"Failed to download {DATA_FILE} from HF. Using empty default data structure.") if not os.path.exists(DATA_FILE): try: with open(DATA_FILE, 'w', encoding='utf-8') as f: json.dump(default_data, f) logging.info(f"Created empty local file {DATA_FILE} after failed download.") except Exception as create_e: logging.error(f"Failed to create empty local file {DATA_FILE}: {create_e}") return default_data def save_data(data): try: if not isinstance(data, dict): logging.error("Attempted to save invalid data structure (not a dict). Aborting save.") return default_keys = {'posts': [], 'users': {}} for key in default_keys: if key not in data: data[key] = default_keys[key] code Code download content_copy expand_less with open(DATA_FILE, 'w', encoding='utf-8') as file: json.dump(data, file, ensure_ascii=False, indent=4) logging.info(f"Data successfully saved to {DATA_FILE}") threading.Thread(target=upload_db_to_hf, args=(DATA_FILE,), daemon=True).start() except Exception as e: logging.error(f"Error saving data to {DATA_FILE}: {e}", exc_info=True) def verify_telegram_auth_data(auth_data_str, bot_token): if not auth_data_str: return False, None code Code download content_copy expand_less params = dict(urllib.parse.parse_qsl(auth_data_str)) if 'hash' not in params: return False, None received_hash = params.pop('hash') sorted_params = sorted(params.items()) data_check_string_parts = [] for key, value in sorted_params: data_check_string_parts.append(f"{key}={value}") data_check_string = "\n".join(data_check_string_parts) secret_key = hmac.new("WebAppData".encode(), bot_token.encode(), hashlib.sha256).digest() calculated_hash = hmac.new(secret_key, data_check_string.encode(), hashlib.sha256).hexdigest() if calculated_hash == received_hash: try: user_data = json.loads(params.get('user', '{}')) # Get chat_id if available (often same as user_id in private chats) chat_id = params.get('chat_id', user_data.get('id')) user_data['chat_id'] = chat_id return True, user_data except json.JSONDecodeError: return False, None return False, None def get_authenticated_user_details(request_headers): auth_data_str = request_headers.get('X-Telegram-Auth') if not auth_data_str: return None is_valid, user_data_from_auth = verify_telegram_auth_data(auth_data_str, TELEGRAM_BOT_TOKEN) if is_valid and user_data_from_auth: data = load_data() user_id_str = str(user_data_from_auth.get('id')) return data.get('users', {}).get(user_id_str) return None def send_telegram_notification(chat_id, message): if not TELEGRAM_BOT_TOKEN: logging.warning("TELEGRAM_BOT_TOKEN not set. Cannot send notification.") return code Code download content_copy expand_less url = f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/sendMessage" payload = { 'chat_id': chat_id, 'text': message, 'parse_mode': 'HTML' } try: response = requests.post(url, data=payload, timeout=5) response.raise_for_status() logging.info(f"Notification sent to {chat_id}.") return response.json() except requests.exceptions.RequestException as e: logging.error(f"Error sending Telegram notification to {chat_id}: {e}") return None --- API ROUTES --- @app.route('/api/auth_user', methods=['POST']) def auth_user(): auth_data_str = request.headers.get('X-Telegram-Auth') if not auth_data_str: init_data_payload = request.json.get('init_data') if init_data_payload: auth_data_str = init_data_payload else: return jsonify({"error": "Authentication data not provided"}), 401 code Code download content_copy expand_less is_valid, user_data_from_auth = verify_telegram_auth_data(auth_data_str, TELEGRAM_BOT_TOKEN) if not is_valid or not user_data_from_auth: return jsonify({"error": "Invalid authentication data"}), 403 data = load_data() users = data.get('users', {}) user_id_str = str(user_data_from_auth.get('id')) # Update/Create user record user_record = users.get(user_id_str, {}) # Store chat_id (assuming it is available or fallback to user_id) # Note: In a real-world bot, the chat_id should be captured during the /start command. user_record.update({ 'id': user_data_from_auth.get('id'), 'chat_id': user_data_from_auth.get('chat_id') or user_data_from_auth.get('id'), # Critical for notifications 'first_name': user_data_from_auth.get('first_name'), 'last_name': user_data_from_auth.get('last_name'), 'username': user_data_from_auth.get('username'), 'language_code': user_data_from_auth.get('language_code'), 'photo_url': user_data_from_auth.get('photo_url'), 'last_seen': datetime.now().isoformat() }) if 'first_seen' not in user_record: user_record['first_seen'] = datetime.now().isoformat() users[user_id_str] = user_record data['users'] = users save_data(data) return jsonify({"message": "User authenticated", "user": users[user_id_str]}), 200 @app.route('/api/users', methods=['GET']) def get_users(): # Only authenticated users can view the list if not get_authenticated_user_details(request.headers): return jsonify({"error": "Authentication required"}), 401 code Code download content_copy expand_less data = load_data() user_list = list(data.get('users', {}).values()) # Return limited user info for the list safe_user_list = [{ 'id': user['id'], 'first_name': user['first_name'], 'last_name': user['last_name'], 'username': user.get('username'), 'photo_url': user.get('photo_url') } for user in user_list] return jsonify(sorted(safe_user_list, key=lambda x: x.get('first_name', 'z'))), 200 @app.route('/api/wall/', methods=['GET']) def get_user_wall(user_id): if not get_authenticated_user_details(request.headers): return jsonify({"error": "Authentication required"}), 401 code Code download content_copy expand_less data = load_data() all_posts = data.get('posts', []) # Filter posts: # 1. Posts made by the user_id on their own wall (target_user_id is None or same as user_id) # 2. Posts made by *others* targeting this user_id's wall wall_posts = [ post for post in all_posts if (str(post.get('user_id')) == str(user_id) and not post.get('target_user_id')) or (str(post.get('target_user_id')) == str(user_id)) ] # Enrich posts with user info (poster and target) users = data.get('users', {}) enriched_posts = [] for post in sorted(wall_posts, key=lambda x: x.get('timestamp', ''), reverse=True): poster = users.get(str(post['user_id']), {}) target = users.get(str(post.get('target_user_id')), {}) if post.get('target_user_id') else None enriched_post = post.copy() enriched_post['poster_name'] = f"{poster.get('first_name', 'Unknown')} {poster.get('last_name', '')}".strip() enriched_post['poster_username'] = poster.get('username', None) if target: enriched_post['target_name'] = f"{target.get('first_name', 'Unknown')} {target.get('last_name', '')}".strip() enriched_post['target_username'] = target.get('username', None) enriched_posts.append(enriched_post) return jsonify(enriched_posts), 200 @app.route('/api/post/', methods=['POST']) def create_post(target_user_id): user = get_authenticated_user_details(request.headers) if not user: return jsonify({"error": "Authentication required or user not found in DB"}), 401 code Code download content_copy expand_less if str(target_user_id) == 'me': target_user_id = str(user['id']) data = load_data() users = data.get('users', {}) target_user = users.get(str(target_user_id)) if not target_user: return jsonify({"error": "Target user not found"}), 404 text_content = request.form.get('content', '').strip() file = request.files.get('file') if not text_content and not file: return jsonify({"error": "Post must contain text or a file"}), 400 new_post = { "id": str(uuid.uuid4()), "user_id": str(user['id']), "target_user_id": str(target_user_id) if str(target_user_id) != str(user['id']) else None, # Null for own wall post "timestamp": datetime.now().isoformat(), "content": text_content, "type": "text", "file_path": None } # Handle file upload if file and file.filename: if not allowed_file(file.filename): return jsonify({"error": "File type not allowed"}), 400 extension = file.filename.rsplit('.', 1)[[1](https://www.google.com/url?sa=E&q=https%3A%2F%2Fvertexaisearch.cloud.google.com%2Fgrounding-api-redirect%2FAUZIYQF6S04rq1C0Z_rbzF1NSAxsil9bBG4L3nuHOSOAbHHtJiwnE2LxVsPOVpiPhBXRK6XaybxBsn0UZ9Mn1KLhTGONkEjmCPX1AD7mT0SoQ15oTUhmR7n6PGa73aBGIEQ67iFmSMDvPPA3aXv6RLq5SesfTK1HYQ3z5Q%3D%3D)].lower() filename = secure_filename(f"{new_post['id']}.{extension}") file_path = os.path.join(app.config['UPLOAD_FOLDER'], filename) try: file.save(file_path) new_post['file_path'] = filename if extension in ['png', 'jpg', 'jpeg', 'gif']: new_post['type'] = 'photo' elif extension in ['mp4', 'mov', 'avi']: new_post['type'] = 'video' else: new_post['type'] = 'document' # Asynchronously upload file to HF threading.Thread(target=upload_db_to_hf, args=((file_path, os.path.join(UPLOAD_FOLDER, filename)),), daemon=True).start() except Exception as e: logging.error(f"File save/upload failed: {e}") return jsonify({"error": f"Failed to save file: {e}"}), 500 data['posts'].append(new_post) save_data(data) # --- Notification Logic --- if str(target_user_id) != str(user['id']): # Post to another user's wall, send notification to the target user poster_name = user.get('first_name', 'Someone') target_chat_id = target_user.get('chat_id') if target_chat_id: message = f"📢 New post on your wall!\n\n{poster_name} posted: {text_content[:100]}..." threading.Thread(target=send_telegram_notification, args=(target_chat_id, message), daemon=True).start() return jsonify(new_post), 201 @app.route('/api/post/', methods=['DELETE']) def delete_post(post_id): user = get_authenticated_user_details(request.headers) if not user: return jsonify({"error": "Authentication required or user not found in DB"}), 401 code Code download content_copy expand_less data = load_data() posts_list = data.get('posts', []) original_length = len(posts_list) item_to_delete = next((i for i in posts_list if i['id'] == post_id), None) if not item_to_delete: return jsonify({"error": "Post not found"}), 404 # Allow deleting if the user is the poster OR the user is the wall owner (if target_user_id is set) is_poster = str(item_to_delete.get('user_id')) == str(user.get('id')) is_wall_owner = str(item_to_delete.get('target_user_id')) == str(user.get('id')) if not is_poster and not is_wall_owner: return jsonify({"error": "Forbidden: You can only delete your own posts or posts on your wall"}), 403 # Delete the associated file if it exists file_path = item_to_delete.get('file_path') if file_path: full_path = os.path.join(app.config['UPLOAD_FOLDER'], file_path) if os.path.exists(full_path): os.remove(full_path) logging.info(f"Deleted file: {full_path}") # Note: File deletion on HF requires complex logic (git delete commit) - skipping for this example. data['posts'] = [i for i in posts_list if i['id'] != post_id] if len(data['posts']) < original_length: save_data(data) return jsonify({"message": "Post deleted successfully"}), 200 return jsonify({"error": "Post not found or deletion failed"}), 404 @app.route('/uploads/', methods=['GET']) def uploaded_file(filename): return send_from_directory(app.config['UPLOAD_FOLDER'], filename) --- WEBAPP TEMPLATES & MAIN VIEW --- MAIN_APP_TEMPLATE = ''' TonWall
Loading...
code Code download content_copy expand_less ''' @app.route('/') def main_app_view(): return render_template_string(MAIN_APP_TEMPLATE) --- BOOTSTRAP --- if name == 'main': logging.info("Application starting up. Performing initial data load/download...") download_db_from_hf() load_data() logging.info("Initial data load complete.") code Code download content_copy expand_less if HF_TOKEN_WRITE: backup_thread = threading.Thread(target=periodic_backup, daemon=True) backup_thread.start() logging.info("Periodic backup thread started.") else: logging.warning("Periodic backup will NOT run (HF_TOKEN_WRITE not set).") port = int(os.environ.get('PORT', 7860)) logging.info(f"Starting Flask app on host 0.0.0.0 and port {port}") app.run(debug=False, host='0.0.0.0', port=port)