# --- START OF FILE app (24).py ---
import os
import hmac
import hashlib
import json
from urllib.parse import unquote, parse_qsl, urlencode
from flask import Flask, request, jsonify, Response, send_file
from flask_caching import Cache
import logging
import threading
import time
from datetime import datetime
from huggingface_hub import HfApi, hf_hub_download, utils as hf_utils
from werkzeug.utils import secure_filename
import requests
from io import BytesIO
import uuid
from typing import Union, Optional
import shutil
app = Flask(__name__)
app.secret_key = os.getenv("FLASK_SECRET_KEY", "supersecretkey_mini_app_unique_v2")
BOT_TOKEN = os.getenv('TELEGRAM_BOT_TOKEN', '6750208873:AAE2hvPlJ99dBdhGa_Brre0IIpUdOvXxHt4')
DATA_FILE = 'cloudeng_mini_app_data.json'
DATA_FILE_TEMP = DATA_FILE + '.tmp'
DATA_FILE_BACKUP = DATA_FILE + '.bak'
REPO_ID = "Eluza133/Z1e1u"
HF_TOKEN_WRITE = os.getenv("HF_TOKEN")
HF_TOKEN_READ = os.getenv("HF_TOKEN_READ") or HF_TOKEN_WRITE
UPLOAD_FOLDER = 'uploads_mini_app'
os.makedirs(UPLOAD_FOLDER, exist_ok=True)
cache = Cache(app, config={'CACHE_TYPE': 'simple'})
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
AUTH_DATA_LIFETIME = 3600
def find_node_by_id(filesystem, node_id):
if not filesystem or not isinstance(filesystem, dict):
return None, None
if filesystem.get('id') == node_id:
return filesystem, None
queue = [(filesystem, None)]
visited = {filesystem.get('id')}
while queue:
current_node, parent = queue.pop(0)
node_type = current_node.get('type')
node_children = current_node.get('children')
if node_type == 'folder' and isinstance(node_children, list):
for child in node_children:
if not isinstance(child, dict): continue
child_id = child.get('id')
if not child_id: continue
if child_id == node_id:
return child, current_node
if child_id not in visited and child.get('type') == 'folder':
visited.add(child_id)
queue.append((child, current_node))
return None, None
def add_node(filesystem, parent_id, node_data):
parent_node, _ = find_node_by_id(filesystem, parent_id)
if parent_node and parent_node.get('type') == 'folder':
if 'children' not in parent_node or not isinstance(parent_node['children'], list):
parent_node['children'] = []
existing_ids = {child.get('id') for child in parent_node['children'] if isinstance(child, dict)}
if node_data.get('id') not in existing_ids:
parent_node['children'].append(node_data)
return True
return False
def remove_node(filesystem, node_id):
node_to_remove, parent_node = find_node_by_id(filesystem, node_id)
if node_to_remove and parent_node and 'children' in parent_node and isinstance(parent_node['children'], list):
original_length = len(parent_node['children'])
parent_node['children'] = [child for child in parent_node['children'] if not isinstance(child, dict) or child.get('id') != node_id]
return len(parent_node['children']) < original_length
if node_to_remove and node_id == filesystem.get('id'):
logging.warning("Attempted to remove root node directly.")
return False
return False
def get_node_path_list(filesystem, node_id):
path_list = []
current_id = node_id
processed_ids = set()
max_depth = 20
depth = 0
while current_id and current_id not in processed_ids and depth < max_depth:
processed_ids.add(current_id)
depth += 1
node, parent = find_node_by_id(filesystem, current_id)
if not node or not isinstance(node, dict):
logging.error(f"Path traversal failed: Node not found or invalid for ID {current_id}")
break
path_list.append({
'id': node.get('id'),
'name': node.get('name', node.get('original_filename', 'Unknown'))
})
if not parent or not isinstance(parent, dict):
if node.get('id') != 'root':
logging.warning(f"Node {current_id} has no parent, stopping path traversal.")
break
parent_id = parent.get('id')
if parent_id == current_id:
logging.error(f"Filesystem loop detected at node {current_id}")
break
current_id = parent_id
if not any(p['id'] == 'root' for p in path_list):
root_node, _ = find_node_by_id(filesystem, 'root')
if root_node:
path_list.append({'id': 'root', 'name': root_node.get('name', 'Root')})
else:
path_list.append({'id': 'root', 'name': 'Root'})
final_path = []
seen_ids = set()
for item in reversed(path_list):
if item['id'] not in seen_ids:
final_path.append(item)
seen_ids.add(item['id'])
return final_path
def initialize_user_filesystem(user_data):
if not isinstance(user_data, dict):
logging.error("Invalid user_data passed to initialize_user_filesystem")
return
if 'filesystem' not in user_data or not isinstance(user_data.get('filesystem'), dict) or not user_data['filesystem'].get('id') == 'root':
logging.warning(f"Initializing/Resetting filesystem for user data fragment: {str(user_data)[:100]}")
user_data['filesystem'] = {
"type": "folder",
"id": "root",
"name": "Root",
"children": []
}
elif 'children' not in user_data['filesystem'] or not isinstance(user_data['filesystem']['children'], list):
logging.warning(f"Fixing missing/invalid children array for root filesystem: {str(user_data)[:100]}")
user_data['filesystem']['children'] = []
def load_data_from_file(filepath):
try:
with open(filepath, 'r', encoding='utf-8') as file:
data = json.load(file)
if not isinstance(data, dict):
logging.warning(f"Data in {filepath} is not a dict, using empty.")
return {'users': {}}
data.setdefault('users', {})
# Deep check and initialization
users_copy = data.get('users', {})
if not isinstance(users_copy, dict):
logging.warning(f"Users field in {filepath} is not a dict, resetting users.")
data['users'] = {}
return data
for user_id, user_data in list(users_copy.items()): # Use list to allow potential removal during iteration
if not isinstance(user_data, dict):
logging.warning(f"Invalid user data structure for user {user_id} in {filepath}, removing entry.")
del data['users'][user_id]
continue
initialize_user_filesystem(user_data)
logging.info(f"Data loaded successfully from {filepath}")
return data
except FileNotFoundError:
logging.warning(f"{filepath} not found.")
return None
except json.JSONDecodeError:
logging.error(f"Error decoding JSON from {filepath}.")
return None
except Exception as e:
logging.error(f"Error loading data from {filepath}: {e}")
return None
@cache.memoize(timeout=60)
def load_data():
logging.info("Attempting to load data...")
# 1. Try to download from HF
download_success = download_db_from_hf()
# 2. Try loading the main file
data = load_data_from_file(DATA_FILE)
if data is not None:
logging.info("Using main data file.")
return data
# 3. If main file failed or didn't exist (and download might have failed), try backup
logging.warning("Main data file failed to load or not found, trying backup.")
data = load_data_from_file(DATA_FILE_BACKUP)
if data is not None:
logging.info("Using backup data file.")
# Attempt to restore main file from backup
try:
shutil.copy(DATA_FILE_BACKUP, DATA_FILE)
logging.info(f"Restored {DATA_FILE} from {DATA_FILE_BACKUP}")
except Exception as e:
logging.error(f"Failed to restore main file from backup: {e}")
return data
# 4. If both fail, initialize empty structure
logging.error("Both main and backup data files are missing or corrupt. Initializing empty data.")
return {'users': {}}
def save_data(data):
if not isinstance(data, dict) or not isinstance(data.get('users'), dict):
logging.critical(f"CRITICAL: Attempted to save invalid data structure: {str(data)[:200]}. Aborting save.")
# Optionally raise an exception or handle more gracefully
return False # Indicate save failure
try:
# Write to temporary file first
with open(DATA_FILE_TEMP, 'w', encoding='utf-8') as file:
json.dump(data, file, ensure_ascii=False, indent=4)
# If temporary write succeeded, create backup and then rename
if os.path.exists(DATA_FILE):
try:
shutil.copy(DATA_FILE, DATA_FILE_BACKUP) # More robust than rename for backup
logging.info(f"Created backup: {DATA_FILE_BACKUP}")
except Exception as e:
logging.warning(f"Could not create backup file {DATA_FILE_BACKUP}: {e}")
shutil.move(DATA_FILE_TEMP, DATA_FILE) # Atomic rename/move
cache.clear() # Clear cache after successful save
logging.info("Data saved successfully to " + DATA_FILE)
# Schedule HF upload (run_as_future makes it non-blocking)
upload_thread = threading.Thread(target=upload_db_to_hf)
upload_thread.start()
return True # Indicate save success
except Exception as e:
logging.error(f"Error saving data: {e}")
# Clean up temp file if it exists
if os.path.exists(DATA_FILE_TEMP):
try:
os.remove(DATA_FILE_TEMP)
except OSError as e_rm:
logging.error(f"Error removing temporary save file {DATA_FILE_TEMP}: {e_rm}")
return False # Indicate save failure
def upload_db_to_hf():
if not HF_TOKEN_WRITE:
logging.warning("HF_TOKEN_WRITE not set, skipping database upload.")
return
if not os.path.exists(DATA_FILE):
logging.warning(f"Data file {DATA_FILE} not found for upload.")
return
try:
api = HfApi()
api.upload_file(
path_or_fileobj=DATA_FILE,
path_in_repo=DATA_FILE,
repo_id=REPO_ID,
repo_type="dataset",
token=HF_TOKEN_WRITE,
commit_message=f"Backup MiniApp {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}",
# run_as_future=True # Already running in a separate thread from save_data
)
logging.info("Database upload to Hugging Face completed.")
except Exception as e:
logging.error(f"Error during database upload: {e}")
def download_db_from_hf():
if not HF_TOKEN_READ:
logging.warning("HF_TOKEN_READ not set, skipping database download.")
return False
local_path_tmp = DATA_FILE + ".hf_download"
try:
logging.info(f"Attempting download of {DATA_FILE} from {REPO_ID}")
hf_hub_download(
repo_id=REPO_ID,
filename=DATA_FILE,
repo_type="dataset",
token=HF_TOKEN_READ,
local_dir=".",
local_dir_use_symlinks=False,
force_download=True, # Ensure we get the latest
etag_timeout=10,
resume_download=False,
cache_dir=None, # Don't use HF cache, write directly
local_path=local_path_tmp # Download to temp file first
)
# Verify downloaded file is valid JSON before replacing
if load_data_from_file(local_path_tmp) is not None:
shutil.move(local_path_tmp, DATA_FILE)
logging.info("Database downloaded successfully from Hugging Face and verified.")
cache.clear() # Clear cache as data might have changed
return True
else:
logging.error("Downloaded database file is invalid JSON. Discarding download.")
os.remove(local_path_tmp)
return False
except hf_utils.RepositoryNotFoundError:
logging.error(f"Repository {REPO_ID} not found on Hugging Face.")
return False
except hf_utils.EntryNotFoundError:
logging.warning(f"{DATA_FILE} not found in repo {REPO_ID}. Using local/backup if available.")
return False
except requests.exceptions.RequestException as e:
logging.error(f"Connection error downloading DB from HF: {e}. Using local/backup.")
return False
except Exception as e:
logging.error(f"Generic error downloading database: {e}")
# Clean up potentially partial download
if os.path.exists(local_path_tmp):
try: os.remove(local_path_tmp)
except OSError: pass
return False
def get_file_type(filename):
if not filename or '.' not in filename: return 'other'
ext = filename.lower().split('.')[-1]
if ext in ['mp4', 'mov', 'avi', 'webm', 'mkv', 'm4v', 'quicktime']: return 'video'
if ext in ['jpg', 'jpeg', 'png', 'gif', 'bmp', 'webp', 'svg', 'heic', 'heif']: return 'image'
if ext == 'pdf': return 'pdf'
if ext in ['txt', 'md', 'log', 'csv', 'json', 'xml', 'html', 'css', 'js', 'py', 'java', 'c', 'cpp']: return 'text'
if ext in ['doc', 'docx', 'rtf']: return 'doc'
if ext in ['xls', 'xlsx']: return 'sheet'
if ext in ['ppt', 'pptx']: return 'slides'
if ext in ['zip', 'rar', '7z', 'gz', 'tar']: return 'archive'
if ext in ['mp3', 'wav', 'ogg', 'flac', 'aac', 'm4a']: return 'audio'
return 'other'
def check_telegram_authorization(auth_data: str, bot_token: str) -> Optional[dict]:
if not auth_data or not bot_token or bot_token == 'YOUR_BOT_TOKEN':
logging.warning("Validation skipped: Missing auth_data or valid BOT_TOKEN.")
return None # Consider returning a specific error?
try:
parsed_data = dict(parse_qsl(unquote(auth_data)))
if "hash" not in parsed_data:
logging.error("Hash not found in auth data")
return None
telegram_hash = parsed_data.pop('hash')
auth_date_ts = int(parsed_data.get('auth_date', 0))
current_ts = int(time.time())
if abs(current_ts - auth_date_ts) > AUTH_DATA_LIFETIME:
logging.warning(f"Auth data expired (Auth: {auth_date_ts}, Now: {current_ts}, Diff: {current_ts - auth_date_ts})")
# return None # Temporarily disable expiration check for easier testing if needed
pass # Allow expired data for now, maybe add strict mode later
data_check_string = "\n".join(sorted([f"{k}={v}" for k, v in parsed_data.items()]))
secret_key = hmac.new("WebAppData".encode(), bot_token.encode(), hashlib.sha256).digest()
calculated_hash = hmac.new(secret_key, data_check_string.encode(), hashlib.sha256).hexdigest()
if hmac.compare_digest(calculated_hash, telegram_hash):
user_data_str = parsed_data.get('user')
if user_data_str:
try:
user_info = json.loads(user_data_str)
if 'id' not in user_info:
logging.error("Validated user data missing 'id'")
return None
logging.info(f"Validation successful for user ID: {user_info.get('id')}")
return user_info
except json.JSONDecodeError:
logging.error("Failed to decode user JSON from auth data")
return None
else:
logging.warning("No 'user' field in validated auth data")
return None
else:
logging.warning("Hash mismatch during validation")
return None
except Exception as e:
logging.error(f"Exception during validation: {e}", exc_info=True)
return None
HTML_TEMPLATE = """
Cloud Eng
Cloud Eng
Files
This folder is empty.
"""
@app.route('/')
def index():
return Response(HTML_TEMPLATE, mimetype='text/html')
@app.route('/validate_init_data', methods=['POST'])
def validate_init_data():
data = request.get_json()
if not data or 'initData' not in data:
return jsonify({"status": "error", "message": "Missing initData"}), 400
init_data = data['initData']
user_info = check_telegram_authorization(init_data, BOT_TOKEN)
if user_info and 'id' in user_info:
tg_user_id = str(user_info['id'])
db_data = load_data()
users = db_data.setdefault('users', {})
save_needed = False
user_entry = users.get(tg_user_id)
if not user_entry or not isinstance(user_entry, dict):
logging.info(f"New user detected or invalid entry: {tg_user_id}. Initializing.")
users[tg_user_id] = {
'user_info': user_info,
'created_at': datetime.now().isoformat() # Use ISO format
}
initialize_user_filesystem(users[tg_user_id])
save_needed = True
else:
# Check if filesystem needs initialization or repair
if 'filesystem' not in user_entry or not isinstance(user_entry.get('filesystem'), dict):
logging.warning(f"Filesystem missing or invalid for user {tg_user_id}. Re-initializing.")
initialize_user_filesystem(user_entry)
save_needed = True
# Optionally update user info if changed (e.g., username)
if user_entry.get('user_info', {}).get('username') != user_info.get('username'):
user_entry['user_info'] = user_info # Update stored info
save_needed = True
if save_needed:
if not save_data(db_data):
logging.error(f"Failed to save data for user {tg_user_id} during validation.")
# Avoid returning 500 if possible, user might still be usable with loaded data
# return jsonify({"status": "error", "message": "Error saving user data."}), 500
pass # Logged the error, proceed with current (possibly unsaved) state
return jsonify({"status": "ok", "user": user_info})
else:
logging.warning(f"Validation failed for initData prefix: {init_data[:100]}...")
return jsonify({"status": "error", "message": "Invalid authorization data."}), 403
@app.route('/get_dashboard_data', methods=['POST'])
def get_dashboard_data():
data = request.get_json()
if not data or 'initData' not in data or 'folder_id' not in data:
return jsonify({"status": "error", "message": "Incomplete request"}), 400
user_info = check_telegram_authorization(data['initData'], BOT_TOKEN)
if not user_info or 'id' not in user_info:
return jsonify({"status": "error", "message": "Unauthorized"}), 403
tg_user_id = str(user_info['id'])
folder_id = data['folder_id']
db_data = load_data()
user_data = db_data.get('users', {}).get(tg_user_id)
if not user_data or 'filesystem' not in user_data or not isinstance(user_data['filesystem'], dict):
logging.error(f"User data or filesystem missing/invalid for validated user {tg_user_id}")
# Attempt recovery if filesystem is bad but user_data exists
if isinstance(user_data, dict):
logging.warning(f"Attempting to re-initialize filesystem for user {tg_user_id}")
initialize_user_filesystem(user_data)
if not save_data(db_data):
logging.error(f"Failed to save re-initialized filesystem for user {tg_user_id}")
# Continue with the newly initialized filesystem if save failed but init worked
else:
return jsonify({"status": "error", "message": "User data error"}), 500
current_folder, _ = find_node_by_id(user_data['filesystem'], folder_id)
if not current_folder or current_folder.get('type') != 'folder':
logging.warning(f"Folder {folder_id} not found or invalid for user {tg_user_id}. Defaulting to root.")
folder_id = 'root'
current_folder, _ = find_node_by_id(user_data['filesystem'], folder_id)
if not current_folder:
logging.critical(f"CRITICAL: Root folder cannot be found for user {tg_user_id} even after check.")
# Attempt recovery again
initialize_user_filesystem(user_data)
if not save_data(db_data):
logging.error(f"Failed to save re-initialized filesystem after root recovery attempt for {tg_user_id}")
current_folder, _ = find_node_by_id(user_data['filesystem'], 'root')
if not current_folder: # Still failing
return jsonify({"status": "error", "message": "Critical error: Root folder missing."}), 500
items_in_folder = current_folder.get('children', [])
if not isinstance(items_in_folder, list):
logging.warning(f"Invalid 'children' in folder {folder_id} for user {tg_user_id}. Resetting to empty list.")
items_in_folder = []
current_folder['children'] = []
# Consider saving data here if you want to persist this fix immediately
# save_data(db_data)
breadcrumbs = get_node_path_list(user_data['filesystem'], folder_id)
current_folder_info = {
'id': current_folder.get('id'),
'name': current_folder.get('name', 'Root')
}
return jsonify({
"status": "ok",
"items": items_in_folder,
"breadcrumbs": breadcrumbs,
"current_folder": current_folder_info
})
@app.route('/upload', methods=['POST'])
def upload_files():
init_data = request.form.get('initData')
current_folder_id = request.form.get('current_folder_id', 'root')
files = request.files.getlist('files')
user_info = check_telegram_authorization(init_data, BOT_TOKEN)
if not user_info or 'id' not in user_info:
return jsonify({"status": "error", "message": "Unauthorized"}), 403
tg_user_id = str(user_info['id'])
if not HF_TOKEN_WRITE:
return jsonify({'status': 'error', 'message': 'Upload configuration error.'}), 500
if not files or all(not f.filename for f in files):
return jsonify({'status': 'error', 'message': 'No files selected for upload.'}), 400
if len(files) > 20:
return jsonify({'status': 'error', 'message': 'Maximum 20 files per upload.'}), 400
db_data = load_data()
user_data = db_data.get('users', {}).get(tg_user_id)
if not user_data or 'filesystem' not in user_data or not isinstance(user_data['filesystem'], dict):
logging.error(f"Upload error: User data or filesystem missing/invalid for {tg_user_id}")
return jsonify({"status": "error", "message": "User data error during upload."}), 500
target_folder_node, _ = find_node_by_id(user_data['filesystem'], current_folder_id)
if not target_folder_node or target_folder_node.get('type') != 'folder':
logging.error(f"Upload error: Target folder {current_folder_id} not found for user {tg_user_id}")
return jsonify({'status': 'error', 'message': 'Target folder not found!'}), 404
api = HfApi()
uploaded_count = 0
errors = []
nodes_added = [] # Keep track of nodes added in this request
for file in files:
if file and file.filename:
original_filename = secure_filename(file.filename)
if not original_filename:
logging.warning(f"Skipping file with potentially insecure name: {file.filename}")
errors.append(f"Skipped file with invalid name: {file.filename}")
continue
name_part, ext_part = os.path.splitext(original_filename)
unique_suffix = uuid.uuid4().hex[:8]
# Ensure filename doesn't become excessively long
max_len = 100
safe_name_part = name_part[:max_len]
unique_filename = f"{safe_name_part}_{unique_suffix}{ext_part}"
file_id = uuid.uuid4().hex
# Define path relative to user/folder for organization
hf_path = f"cloud_files/{tg_user_id}/{file_id[:2]}/{file_id}_{unique_filename}" # Add subfolder based on ID start
temp_path = os.path.join(UPLOAD_FOLDER, f"{file_id}_{unique_filename}")
file_info = {
'type': 'file', 'id': file_id,
'original_filename': original_filename,
'unique_filename': unique_filename, # Store the unique name used on HF
'path': hf_path,
'file_type': get_file_type(original_filename),
'upload_date': datetime.now().isoformat() # Use ISO format
}
try:
file.save(temp_path)
logging.info(f"Attempting HF upload to: {hf_path}")
api.upload_file(
path_or_fileobj=temp_path, path_in_repo=hf_path,
repo_id=REPO_ID, repo_type="dataset", token=HF_TOKEN_WRITE,
commit_message=f"User {tg_user_id} uploaded {original_filename}"
)
logging.info(f"HF upload successful for {original_filename} ({file_id})")
# Add node to filesystem structure *after* successful HF upload
if add_node(user_data['filesystem'], current_folder_id, file_info):
uploaded_count += 1
nodes_added.append(file_info) # Track success
else:
# This case is critical - file is on HF, but not in DB structure
error_msg = f"Failed to add metadata for {original_filename} after upload."
errors.append(error_msg)
logging.error(f"{error_msg} User: {tg_user_id}, FileID: {file_id}, TargetFolder: {current_folder_id}")
# Attempt to delete the orphaned HF file
try:
logging.warning(f"Attempting cleanup of orphaned HF file: {hf_path}")
api.delete_file(path_in_repo=hf_path, repo_id=REPO_ID, repo_type="dataset", token=HF_TOKEN_WRITE)
logging.info(f"Orphaned file {hf_path} deleted from HF.")
except Exception as del_err:
logging.error(f"CRITICAL: Failed to delete orphaned HF file {hf_path}: {del_err}")
except Exception as e:
logging.error(f"Upload error for {original_filename} (User: {tg_user_id}, FileID: {file_id}): {e}", exc_info=True)
errors.append(f"Error uploading {original_filename}")
# Ensure node wasn't partially added if error occurred during add_node or before
if file_info in nodes_added: nodes_added.remove(file_info)
finally:
# Clean up local temporary file
if os.path.exists(temp_path):
try: os.remove(temp_path)
except OSError as e_rm: logging.warning(f"Error removing temp file {temp_path}: {e_rm}")
# Save data only if at least one file was successfully uploaded AND added to structure
if uploaded_count > 0 and nodes_added:
logging.info(f"Saving DB for user {tg_user_id} after {uploaded_count} successful uploads.")
if not save_data(db_data):
# If save fails, we have inconsistency: files on HF, maybe some nodes added in memory, but not persisted.
logging.error(f"CRITICAL: Failed to save DB after successful uploads for user {tg_user_id}.")
errors.append("Critical error saving file metadata after upload.")
# Attempt to revert the in-memory additions? Very complex. Logging is key here.
# Rollback: Remove nodes that were added in this request from the in-memory structure
for node_info in nodes_added:
remove_node(user_data['filesystem'], node_info['id'])
uploaded_count = 0 # Reflect that the save failed
# Do NOT try to delete the HF files here, could lead to data loss if DB save fails intermittently
final_message = f"{uploaded_count} file(s) uploaded."
if errors:
final_message += f" Errors occurred with {len(errors)} file(s)."
# Consider logging the specific errors to the user if appropriate
# final_message += " Details: " + "; ".join(errors)
return jsonify({
"status": "ok" if uploaded_count > 0 else "error", # Status based on successful *persisted* uploads
"message": final_message
})
@app.route('/create_folder', methods=['POST'])
def create_folder():
data = request.get_json()
if not data or 'initData' not in data or 'parent_folder_id' not in data or 'folder_name' not in data:
return jsonify({"status": "error", "message": "Incomplete request"}), 400
user_info = check_telegram_authorization(data['initData'], BOT_TOKEN)
if not user_info or 'id' not in user_info:
return jsonify({"status": "error", "message": "Unauthorized"}), 403
tg_user_id = str(user_info['id'])
parent_folder_id = data['parent_folder_id']
folder_name = data['folder_name'].strip()
if not folder_name:
return jsonify({'status': 'error', 'message': 'Folder name cannot be empty.'}), 400
if len(folder_name) > 100:
return jsonify({'status': 'error', 'message': 'Folder name is too long.'}), 400
# Basic validation for problematic characters
if /[<>:"/\\|?*]/.test(folder_name):
return jsonify({'status': 'error', 'message': 'Folder name contains invalid characters.'}), 400
db_data = load_data()
user_data = db_data.get('users', {}).get(tg_user_id)
if not user_data or 'filesystem' not in user_data or not isinstance(user_data['filesystem'], dict):
logging.error(f"Create folder error: User data or filesystem missing/invalid for {tg_user_id}")
return jsonify({"status": "error", "message": "User data error."}), 500
# Check if folder with the same name already exists in the parent
parent_node, _ = find_node_by_id(user_data['filesystem'], parent_folder_id)
if parent_node and 'children' in parent_node and isinstance(parent_node['children'], list):
for child in parent_node['children']:
if isinstance(child, dict) and child.get('type') == 'folder' and child.get('name') == folder_name:
return jsonify({'status': 'error', 'message': f'A folder named "{folder_name}" already exists here.'}), 409 # 409 Conflict
folder_id = uuid.uuid4().hex
folder_data = {
'type': 'folder', 'id': folder_id,
'name': folder_name, 'children': []
}
if add_node(user_data['filesystem'], parent_folder_id, folder_data):
if save_data(db_data):
return jsonify({'status': 'ok', 'message': f'Folder "{folder_name}" created.'})
else:
logging.error(f"Create folder save error ({tg_user_id}) after adding node {folder_id}.")
# Attempt to rollback the in-memory addition
remove_node(user_data['filesystem'], folder_id)
return jsonify({'status': 'error', 'message': 'Error saving data after creating folder.'}), 500
else:
# This implies parent folder wasn't found or wasn't a folder type
logging.error(f"Create folder error: Failed add_node. User: {tg_user_id}, Parent: {parent_folder_id}")
return jsonify({'status': 'error', 'message': 'Could not find parent folder to add new folder.'}), 400
@app.route('/download/')
def download_file_route(file_id):
# Note: This route has NO BUILT-IN AUTHENTICATION.
# It relies on the obscurity of file_id and HF path.
# For sensitive data, proper auth (e.g., checking initData passed as query param,
# or session-based auth) would be needed here, which complicates direct linking/previewing.
db_data = load_data() # Use cached data if possible
file_node = None
owner_user_id = None
# Find the file node across all users
for user_id_scan, user_data_scan in db_data.get('users', {}).items():
if 'filesystem' in user_data_scan and isinstance(user_data_scan['filesystem'], dict):
node, _ = find_node_by_id(user_data_scan['filesystem'], file_id)
if node and isinstance(node, dict) and node.get('type') == 'file':
file_node = node
owner_user_id = user_id_scan
break
if not file_node:
logging.warning(f"Download request for unknown file_id: {file_id}")
return Response("File not found", status=404, mimetype='text/plain')
hf_path = file_node.get('path')
original_filename = file_node.get('original_filename', f'{file_id}_download')
if not hf_path:
logging.error(f"Download error: Missing HF path for file ID {file_id} (Owner: {owner_user_id})")
return Response("Error: File path configuration missing", status=500, mimetype='text/plain')
# Construct the direct download URL
# Using /info/refs might be faster for checking existence before redirecting, but resolve/main is simpler
file_url = f"https://huggingface.co/datasets/{REPO_ID}/resolve/main/{hf_path}?download=true"
logging.info(f"Attempting to serve file via redirect/proxy from: {file_url}")
try:
headers = {}
if HF_TOKEN_READ:
headers["authorization"] = f"Bearer {HF_TOKEN_READ}"
# Use requests to stream the file from HF
# Timeout set for initial connection and read chunks
response = requests.get(file_url, headers=headers, stream=True, timeout=(10, 30)) # (connect_timeout, read_timeout)
response.raise_for_status() # Check for 4xx/5xx errors from HF
# Prepare Flask response headers
resp_headers = {}
content_type = response.headers.get('Content-Type', 'application/octet-stream')
resp_headers['Content-Type'] = content_type
# Create a safe filename for Content-Disposition
# Simple approach: replace potentially problematic chars
safe_filename = "".join(c if c.isalnum() or c in ['.', '-', '_'] else '_' for c in original_filename)
# Encode for header value (URL encoding for filename*=UTF-8'')
encoded_filename = urlencode({'filename': original_filename}, encoding='utf-8')[9:]
resp_headers['Content-Disposition'] = f"attachment; filename=\"{safe_filename}\"; filename*=UTF-8''{encoded_filename}"
# Add Content-Length if provided by HF
if 'Content-Length' in response.headers:
resp_headers['Content-Length'] = response.headers['Content-Length']
# Stream the response body
return Response(response.iter_content(chunk_size=8192), status=response.status_code, headers=resp_headers)
except requests.exceptions.Timeout:
logging.error(f"Timeout downloading file from HF: {hf_path}")
return Response("Error: Timed out connecting to file storage", status=504, mimetype='text/plain') # 504 Gateway Timeout
except requests.exceptions.RequestException as e:
status_code = e.response.status_code if e.response is not None else 502 # 502 Bad Gateway if no response
logging.error(f"Error downloading file from HF ({hf_path}, Owner: {owner_user_id}): {e} (Status: {status_code})")
# Don't expose detailed error message to client
return Response(f"Error retrieving file ({status_code})", status=status_code, mimetype='text/plain')
except Exception as e:
logging.error(f"Unexpected error during download proxy ({hf_path}, Owner: {owner_user_id}): {e}", exc_info=True)
return Response("Internal server error during file download", status=500, mimetype='text/plain')
@app.route('/delete_file/', methods=['POST'])
def delete_file_route(file_id):
data = request.get_json()
if not data or 'initData' not in data: # current_folder_id might not be strictly necessary
return jsonify({"status": "error", "message": "Incomplete request"}), 400
user_info = check_telegram_authorization(data['initData'], BOT_TOKEN)
if not user_info or 'id' not in user_info:
return jsonify({"status": "error", "message": "Unauthorized"}), 403
tg_user_id = str(user_info['id'])
if not HF_TOKEN_WRITE:
return jsonify({'status': 'error', 'message': 'Deletion configuration error.'}), 500
db_data = load_data()
user_data = db_data.get('users', {}).get(tg_user_id)
if not user_data or 'filesystem' not in user_data or not isinstance(user_data['filesystem'], dict):
logging.error(f"Delete file error: User data or filesystem missing/invalid for {tg_user_id}")
# Don't reveal file existence, just say user data error
return jsonify({"status": "error", "message": "User data error."}), 500
file_node, parent_node = find_node_by_id(user_data['filesystem'], file_id)
if not file_node or file_node.get('type') != 'file' or not parent_node:
# File not found *for this user*. Do not confirm non-existence.
logging.warning(f"Delete request for non-existent/invalid file ID {file_id} by user {tg_user_id}")
return jsonify({'status': 'error', 'message': 'File not found.'}), 404
hf_path = file_node.get('path')
original_filename = file_node.get('original_filename', 'file')
db_removed = False
hf_deleted = False
save_error = False
# 1. Attempt to delete from Hugging Face Hub
if hf_path:
try:
api = HfApi()
logging.info(f"Attempting HF delete for: {hf_path} by user {tg_user_id}")
api.delete_file(
path_in_repo=hf_path, repo_id=REPO_ID, repo_type="dataset", token=HF_TOKEN_WRITE,
commit_message=f"User {tg_user_id} deleted {original_filename}"
)
hf_deleted = True
logging.info(f"Successfully deleted file {hf_path} from HF Hub for user {tg_user_id}")
except hf_utils.EntryNotFoundError:
logging.warning(f"File {hf_path} already deleted or never existed on HF Hub for delete attempt by {tg_user_id}.")
hf_deleted = True # Treat as success for the purpose of DB removal
except Exception as e:
logging.error(f"Error deleting file from HF Hub ({hf_path}, User: {tg_user_id}): {e}")
# Do not stop here; still try to remove from DB if HF delete fails,
# but report the overall operation as potentially failed.
# A background cleanup job might be needed for such inconsistencies.
else:
logging.warning(f"File node {file_id} for user {tg_user_id} has no HF path. Skipping HF deletion.")
hf_deleted = True # No path means nothing to delete on HF
# 2. Attempt to remove from DB structure *if HF deletion was successful or skipped*
if hf_deleted:
if remove_node(user_data['filesystem'], file_id):
db_removed = True
logging.info(f"Removed file node {file_id} from DB for user {tg_user_id}")
# 3. Attempt to save the updated DB structure
if not save_data(db_data):
logging.error(f"CRITICAL: Delete file DB save error for user {tg_user_id} after removing node {file_id}.")
save_error = True
# Attempt to rollback the in-memory removal? Very risky. Better to log.
# Re-adding the node might fail if parent was modified etc.
# add_node(user_data['filesystem'], parent_node['id'], file_node) # Risky rollback attempt
else:
# This shouldn't happen if find_node_by_id found it initially
logging.error(f"Failed to remove file node {file_id} from DB structure for {tg_user_id} after it was found.")
# Determine final status
if db_removed and not save_error:
return jsonify({'status': 'ok', 'message': f'File "{original_filename}" deleted.'})
elif hf_deleted and db_removed and save_error:
return jsonify({'status': 'error', 'message': f'File deleted from storage, but failed to update database.'}), 500
elif hf_deleted and not db_removed:
return jsonify({'status': 'error', 'message': f'File deleted from storage, but failed to remove from database structure.'}), 500
else: # hf_deleted is False (meaning HF delete failed)
return jsonify({'status': 'error', 'message': f'Failed to delete file from storage.'}), 500
@app.route('/delete_folder/', methods=['POST'])
def delete_folder_route(folder_id):
if folder_id == 'root':
return jsonify({'status': 'error', 'message': 'Cannot delete the root folder.'}), 400
data = request.get_json()
if not data or 'initData' not in data:
return jsonify({"status": "error", "message": "Incomplete request"}), 400
user_info = check_telegram_authorization(data['initData'], BOT_TOKEN)
if not user_info or 'id' not in user_info:
return jsonify({"status": "error", "message": "Unauthorized"}), 403
tg_user_id = str(user_info['id'])
db_data = load_data()
user_data = db_data.get('users', {}).get(tg_user_id)
if not user_data or 'filesystem' not in user_data or not isinstance(user_data['filesystem'], dict):
logging.error(f"Delete folder error: User data or filesystem missing/invalid for {tg_user_id}")
return jsonify({"status": "error", "message": "User data error."}), 500
folder_node, parent_node = find_node_by_id(user_data['filesystem'], folder_id)
if not folder_node or folder_node.get('type') != 'folder' or not parent_node:
logging.warning(f"Delete request for non-existent/invalid folder ID {folder_id} by user {tg_user_id}")
return jsonify({'status': 'error', 'message': 'Folder not found.'}), 404
folder_name = folder_node.get('name', 'folder')
# Check if folder is empty (safer to check 'children' array directly)
if 'children' in folder_node and isinstance(folder_node['children'], list) and folder_node['children']:
return jsonify({'status': 'error', 'message': f'Folder "{folder_name}" is not empty. Please delete its contents first.'}), 400
# Attempt to remove the folder node
if remove_node(user_data['filesystem'], folder_id):
# Attempt to save the change
if save_data(db_data):
logging.info(f"Folder {folder_id} ('{folder_name}') deleted by user {tg_user_id}")
return jsonify({'status': 'ok', 'message': f'Folder "{folder_name}" deleted.'})
else:
logging.error(f"Delete folder save error for user {tg_user_id} after removing node {folder_id}.")
# Attempt rollback (risky)
# add_node(user_data['filesystem'], parent_node['id'], folder_node)
return jsonify({'status': 'error', 'message': 'Error saving database after deleting folder.'}), 500
else:
# This indicates an internal logic error if the node was found before
logging.error(f"Failed to remove empty folder node {folder_id} from DB for {tg_user_id} after it was found.")
return jsonify({'status': 'error', 'message': 'Could not remove folder from database structure.'}), 500
@app.route('/get_text_content/')
def get_text_content_route(file_id):
# NO AUTHENTICATION - relies on file_id obscurity
db_data = load_data()
file_node = None
owner_user_id = None
for user_id_scan, user_data_scan in db_data.get('users', {}).items():
if 'filesystem' in user_data_scan and isinstance(user_data_scan['filesystem'], dict):
node, _ = find_node_by_id(user_data_scan['filesystem'], file_id)
# Allow preview only for 'text' type files
if node and isinstance(node, dict) and node.get('type') == 'file' and node.get('file_type') == 'text':
file_node = node
owner_user_id = user_id_scan
break
if not file_node:
logging.warning(f"Text content request for unknown/non-text file_id: {file_id}")
return Response("Text file not found or preview not allowed", status=404, mimetype='text/plain')
hf_path = file_node.get('path')
if not hf_path:
logging.error(f"Text content error: Missing HF path for file ID {file_id} (Owner: {owner_user_id})")
return Response("Error: File path configuration missing", status=500, mimetype='text/plain')
file_url = f"https://huggingface.co/datasets/{REPO_ID}/resolve/main/{hf_path}?download=true"
logging.info(f"Attempting to fetch text content from: {file_url}")
try:
headers = {}
if HF_TOKEN_READ:
headers["authorization"] = f"Bearer {HF_TOKEN_READ}"
response = requests.get(file_url, headers=headers, timeout=15) # Shorter timeout for text files
response.raise_for_status()
# Limit preview size to prevent loading huge files in browser
max_preview_size = 1 * 1024 * 1024 # 1 MB limit
if 'Content-Length' in response.headers and int(response.headers['Content-Length']) > max_preview_size:
logging.warning(f"Text file {file_id} too large for preview ({response.headers['Content-Length']} bytes).")
return Response("File is too large for preview (>1MB). Please download.", status=413, mimetype='text/plain') # 413 Payload Too Large
# If size is unknown or within limits, proceed to read content
content_bytes = response.content
if len(content_bytes) > max_preview_size:
logging.warning(f"Text file {file_id} too large for preview after download ({len(content_bytes)} bytes).")
return Response("File is too large for preview (>1MB). Please download.", status=413, mimetype='text/plain')
# Attempt to decode the text content
text_content = None
detected_encoding = None
# Try common encodings
encodings_to_try = ['utf-8', 'cp1251', 'latin-1']
for enc in encodings_to_try:
try:
text_content = content_bytes.decode(enc)
detected_encoding = enc
logging.info(f"Decoded text file {file_id} using {enc}")
break
except UnicodeDecodeError:
continue
if text_content is None:
# Fallback: Try to detect using chardet if installed, or assume UTF-8 lossy
try:
import chardet
result = chardet.detect(content_bytes)
detected_encoding = result['encoding']
if detected_encoding:
text_content = content_bytes.decode(detected_encoding, errors='replace')
logging.info(f"Decoded text file {file_id} using detected encoding {detected_encoding}")
else:
raise ValueError("Chardet could not detect encoding")
except (ImportError, Exception) as E:
logging.warning(f"Could not decode text file {file_id} with common encodings or chardet ({E}). Falling back to utf-8 replace.")
text_content = content_bytes.decode('utf-8', errors='replace')
detected_encoding = 'utf-8 (replaced errors)'
# Return decoded text with appropriate content type
return Response(text_content, mimetype=f'text/plain; charset={detected_encoding.split(" ")[0]}') # Use detected/fallback encoding
except requests.exceptions.Timeout:
logging.error(f"Timeout fetching text content from HF: {hf_path}")
return Response("Error: Timed out connecting to file storage", status=504, mimetype='text/plain')
except requests.exceptions.RequestException as e:
status_code = e.response.status_code if e.response is not None else 502
logging.error(f"Error fetching text content from HF ({hf_path}, Owner: {owner_user_id}): {e} (Status: {status_code})")
return Response(f"Error retrieving text content ({status_code})", status=status_code, mimetype='text/plain')
except Exception as e:
logging.error(f"Unexpected error fetching text content ({hf_path}, Owner: {owner_user_id}): {e}", exc_info=True)
return Response("Internal server error fetching text content", status=500, mimetype='text/plain')
@app.route('/preview_thumb/')
def preview_thumb_route(file_id):
# NO AUTHENTICATION
db_data = load_data()
file_node = None
owner_user_id = None
for user_id_scan, user_data_scan in db_data.get('users', {}).items():
if 'filesystem' in user_data_scan and isinstance(user_data_scan['filesystem'], dict):
node, _ = find_node_by_id(user_data_scan['filesystem'], file_id)
if node and isinstance(node, dict) and node.get('type') == 'file' and node.get('file_type') == 'image':
file_node = node
owner_user_id = user_id_scan
break
if not file_node: return Response("Image not found", status=404, mimetype='text/plain')
hf_path = file_node.get('path')
if not hf_path: return Response("Error: File path missing", status=500, mimetype='text/plain')
# Use the /resolve/main path for direct file access
file_url = f"https://huggingface.co/datasets/{REPO_ID}/resolve/main/{hf_path}"
logging.info(f"Attempting to serve image preview via proxy from: {file_url}")
try:
headers = {}
if HF_TOKEN_READ: headers["authorization"] = f"Bearer {HF_TOKEN_READ}"
response = requests.get(file_url, headers=headers, stream=True, timeout=20)
response.raise_for_status()
# Stream the image content directly
resp_headers = {}
content_type = response.headers.get('Content-Type', 'application/octet-stream')
# Basic validation it looks like an image type
if not content_type.startswith('image/'):
logging.warning(f"HF returned non-image content type '{content_type}' for image preview request: {hf_path}")
# Fallback or return error? Let's try returning it anyway.
# return Response("Invalid content type from storage", status=502, mimetype='text/plain')
resp_headers['Content-Type'] = content_type
if 'Content-Length' in response.headers:
resp_headers['Content-Length'] = response.headers['Content-Length']
# Add cache headers? Maybe Cache-Control: public, max-age=3600 ?
return Response(response.iter_content(chunk_size=8192), status=response.status_code, headers=resp_headers)
except requests.exceptions.Timeout:
logging.error(f"Timeout fetching preview from HF: {hf_path}")
return Response("Error: Timed out connecting to storage", status=504, mimetype='text/plain')
except requests.exceptions.RequestException as e:
status_code = e.response.status_code if e.response is not None else 502
logging.error(f"Error fetching preview from HF ({hf_path}, Owner: {owner_user_id}): {e} (Status: {status_code})")
return Response(f"Error retrieving preview ({status_code})", status=status_code, mimetype='text/plain')
except Exception as e:
logging.error(f"Unexpected error during preview proxy ({hf_path}, Owner: {owner_user_id}): {e}", exc_info=True)
return Response("Internal server error during preview", status=500, mimetype='text/plain')
# --- Main Execution ---
if __name__ == '__main__':
print("Starting Zeus Cloud Mini App Backend...")
logging.info("Starting Zeus Cloud Mini App Backend...")
# Initial sanity checks
if not BOT_TOKEN or BOT_TOKEN == 'YOUR_BOT_TOKEN':
logging.critical("\n" + "*"*60 +
"\n CRITICAL: TELEGRAM_BOT_TOKEN is not set correctly. " +
"\n Telegram authentication WILL FAIL. Set the environment variable." +
"\n" + "*"*60)
if not HF_TOKEN_WRITE:
logging.warning("HF_TOKEN (write access) is not set. File uploads & deletions will fail.")
if not HF_TOKEN_READ and HF_TOKEN_WRITE:
logging.info("HF_TOKEN_READ not set, using HF_TOKEN (write token) for read access.")
elif not HF_TOKEN_READ and not HF_TOKEN_WRITE:
logging.warning("HF_TOKEN_READ is not set. File downloads/previews might fail if repo is private.")
if not REPO_ID:
logging.critical("HF REPO_ID is not set. Application cannot function.")
exit(1)
logging.info(f"Using HF Repo: {REPO_ID}")
logging.info(f"Data file: {DATA_FILE}")
# Attempt initial data load/sync
logging.info("Performing initial database sync/load...")
initial_data = load_data()
if not initial_data or not initial_data.get('users'):
logging.warning("Initial data load resulted in empty or invalid data. Check logs.")
else:
logging.info(f"Initial data loaded. User count: {len(initial_data['users'])}")
# Run Flask app
# Use waitress or gunicorn in production instead of Flask's development server
logging.info("Starting Flask server...")
try:
# For production deployment, replace app.run with a production server like waitress or gunicorn
# Example using waitress (install with: pip install waitress):
# from waitress import serve
# serve(app, host='0.0.0.0', port=7860)
# Using Flask's development server (set debug=False for production-like behavior)
app.run(debug=False, host='0.0.0.0', port=7860)
except Exception as run_e:
logging.critical(f"Failed to start Flask server: {run_e}", exc_info=True)
exit(1)
# --- END OF FILE app (24).py ---