diff --git "a/app.py" "b/app.py"
--- "a/app.py"
+++ "b/app.py"
@@ -2,25 +2,27 @@ import os
import hmac
import hashlib
import json
-from urllib.parse import unquote, parse_qsl, urlencode
-from flask import Flask, request, jsonify, Response, send_file
-from flask_caching import Cache
-import logging
+import shutil
import threading
import time
+import uuid
+import logging
from datetime import datetime
+from io import BytesIO
+from urllib.parse import unquote, parse_qsl, urlencode
+from typing import Union, Optional
+
+import requests
+from flask import Flask, request, jsonify, Response, send_file
+from flask_caching import Cache
from huggingface_hub import HfApi, hf_hub_download, utils as hf_utils
from werkzeug.utils import secure_filename
-import requests
-from io import BytesIO
-import uuid
-from typing import Union, Optional, Dict, Any
app = Flask(__name__)
-app.secret_key = os.getenv("FLASK_SECRET_KEY", "supersecretkey_mini_app_unique_dev")
-BOT_TOKEN = os.getenv('TELEGRAM_BOT_TOKEN', '6750208873:AAE2hvPlJ99dBdhGa_Brre0IIpUdOvXxHt4') # MUST be set
+app.secret_key = os.getenv("FLASK_SECRET_KEY", "supersecretkey_mini_app_unique_v2")
+BOT_TOKEN = os.getenv('TELEGRAM_BOT_TOKEN', '6750208873:AAE2hvPlJ99dBdhGa_Brre0IIpUdOvXxHt4')
DATA_FILE = 'cloudeng_mini_app_data.json'
-DATA_FILE_BACKUP = 'cloudeng_mini_app_data.json.bak'
+DATA_FILE_BACKUP = DATA_FILE + '.bak'
REPO_ID = "Eluza133/Z1e1u"
HF_TOKEN_WRITE = os.getenv("HF_TOKEN")
HF_TOKEN_READ = os.getenv("HF_TOKEN_READ") or HF_TOKEN_WRITE
@@ -30,10 +32,10 @@ os.makedirs(UPLOAD_FOLDER, exist_ok=True)
cache = Cache(app, config={'CACHE_TYPE': 'simple'})
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
-AUTH_DATA_LIFETIME = 3600 # 1 hour
+AUTH_DATA_LIFETIME = 3600
+data_lock = threading.Lock()
-# --- Filesystem Utilities ---
-def find_node_by_id(filesystem: Dict[str, Any], node_id: str) -> (Optional[Dict[str, Any]], Optional[Dict[str, Any]]):
+def find_node_by_id(filesystem, node_id):
if not filesystem or not isinstance(filesystem, dict):
return None, None
if filesystem.get('id') == node_id:
@@ -45,245 +47,179 @@ def find_node_by_id(filesystem: Dict[str, Any], node_id: str) -> (Optional[Dict[
while queue:
current_node, parent = queue.pop(0)
if current_node.get('type') == 'folder' and 'children' in current_node:
- for i, child in enumerate(current_node.get('children', [])):
- if not isinstance(child, dict):
- logging.warning(f"Invalid child found in node {current_node.get('id')}: {child}")
- continue
- child_id = child.get('id')
- if not child_id: continue
-
- if child_id == node_id:
+ for child in current_node.get('children', []):
+ child_id = child.get('id')
+ if not child_id: continue
+
+ if child_id == node_id:
return child, current_node
- if child_id not in visited and isinstance(child, dict) and child.get('type') == 'folder':
+ if child_id not in visited and isinstance(child, dict) and child.get('type') == 'folder':
visited.add(child_id)
queue.append((child, current_node))
return None, None
-def add_node(filesystem: Dict[str, Any], parent_id: str, node_data: Dict[str, Any]) -> bool:
+def add_node(filesystem, parent_id, node_data):
parent_node, _ = find_node_by_id(filesystem, parent_id)
- if parent_node and isinstance(parent_node, dict) and parent_node.get('type') == 'folder':
+ if parent_node and parent_node.get('type') == 'folder':
if 'children' not in parent_node or not isinstance(parent_node['children'], list):
parent_node['children'] = []
-
- existing_ids = set()
- valid_children = []
- for child in parent_node['children']:
- if isinstance(child, dict) and 'id' in child:
- existing_ids.add(child['id'])
- valid_children.append(child)
- else:
- logging.warning(f"Found invalid child structure in parent {parent_id}, removing: {child}")
- parent_node['children'] = valid_children # Clean up invalid entries
-
+ existing_ids = {child.get('id') for child in parent_node['children'] if isinstance(child, dict)}
if node_data.get('id') not in existing_ids:
parent_node['children'].append(node_data)
return True
- else:
- logging.warning(f"Node with id {node_data.get('id')} already exists in parent {parent_id}")
- return False # Indicate node already exists, maybe update instead?
return False
-def remove_node(filesystem: Dict[str, Any], node_id: str) -> bool:
- if node_id == filesystem.get('id'):
- logging.error("Attempted to remove the root node.")
- return False
-
+def remove_node(filesystem, node_id):
node_to_remove, parent_node = find_node_by_id(filesystem, node_id)
-
- if node_to_remove and parent_node and isinstance(parent_node, dict) and 'children' in parent_node and isinstance(parent_node['children'], list):
+ if node_to_remove and parent_node and 'children' in parent_node and isinstance(parent_node['children'], list):
original_length = len(parent_node['children'])
- parent_node['children'] = [child for child in parent_node['children'] if not (isinstance(child, dict) and child.get('id') == node_id)]
+ parent_node['children'] = [child for child in parent_node['children'] if not isinstance(child, dict) or child.get('id') != node_id]
return len(parent_node['children']) < original_length
- elif node_to_remove:
- logging.error(f"Found node {node_id} but its parent was not found or invalid.")
+ if node_to_remove and node_id == filesystem.get('id'):
+ logging.warning("Attempted to remove root node directly.")
+ return False
return False
-
-def get_node_path_list(filesystem: Dict[str, Any], node_id: str) -> list:
+def get_node_path_list(filesystem, node_id):
path_list = []
current_id = node_id
processed_ids = set()
- max_depth = 50
+ max_depth = 20
depth = 0
while current_id and current_id not in processed_ids and depth < max_depth:
processed_ids.add(current_id)
depth += 1
node, parent = find_node_by_id(filesystem, current_id)
-
if not node or not isinstance(node, dict):
- logging.error(f"Path traversal failed: Node not found or invalid for ID {current_id}")
- break
-
+ logging.warning(f"Path traversal stopped: Node not found or invalid for ID {current_id}")
+ break
path_list.append({
'id': node.get('id'),
'name': node.get('name', node.get('original_filename', 'Unknown'))
})
-
if not parent or not isinstance(parent, dict):
break
-
parent_id = parent.get('id')
if parent_id == current_id:
logging.error(f"Filesystem loop detected at node {current_id}")
break
current_id = parent_id
- if not any(p['id'] == 'root' for p in path_list):
- # Check if root exists before adding it
- root_node, _ = find_node_by_id(filesystem, 'root')
- if root_node and isinstance(root_node, dict):
- path_list.append({'id': 'root', 'name': root_node.get('name', 'Root')})
- else:
- logging.error("Root node ('root') not found in filesystem during path generation.")
-
+ if not any(p['id'] == 'root' for p in path_list) and filesystem and filesystem.get('id') == 'root':
+ path_list.append({'id': 'root', 'name': filesystem.get('name','Root')})
final_path = []
seen_ids = set()
for item in reversed(path_list):
- if item.get('id') not in seen_ids:
+ if item['id'] not in seen_ids:
final_path.append(item)
- seen_ids.add(item.get('id'))
-
- if not final_path or final_path[0].get('id') != 'root':
- logging.warning(f"Path generation for {node_id} resulted in unexpected structure. Correcting.")
- # Attempt to reconstruct from root if possible
- root_node, _ = find_node_by_id(filesystem, 'root')
- if root_node:
- corrected_path = [{'id': 'root', 'name': root_node.get('name', 'Root')}]
- # We might not be able to fully reconstruct the broken path here easily
- # Return at least the root if the target node path failed badly
- return corrected_path
- else:
- return [] # Return empty if root is also missing
-
+ seen_ids.add(item['id'])
+ if not final_path or final_path[0]['id'] != 'root':
+ final_path.insert(0, {'id': 'root', 'name': filesystem.get('name','Root') if filesystem else 'Root'})
return final_path
-def initialize_user_filesystem(user_data: Dict[str, Any]):
+
+def initialize_user_filesystem(user_data):
if 'filesystem' not in user_data or not isinstance(user_data.get('filesystem'), dict) or not user_data['filesystem'].get('id') == 'root':
- logging.warning(f"Initializing/resetting filesystem for user data: {user_data.get('user_info', {}).get('id', 'UNKNOWN')}")
+ logging.warning(f"Initializing/Resetting filesystem for user.")
user_data['filesystem'] = {
"type": "folder",
"id": "root",
"name": "Root",
"children": []
}
+ elif 'children' not in user_data['filesystem'] or not isinstance(user_data['filesystem']['children'], list):
+ user_data['filesystem']['children'] = []
-# --- Data Loading/Saving ---
-def load_data() -> Dict[str, Any]:
- global_data = {'users': {}}
- try:
- if not os.path.exists(DATA_FILE):
- logging.warning(f"{DATA_FILE} not found locally. Attempting download.")
- download_db_from_hf() # Try to fetch first
-
- if os.path.exists(DATA_FILE):
- with open(DATA_FILE, 'r', encoding='utf-8') as file:
- try:
- global_data = json.load(file)
- if not isinstance(global_data, dict):
- logging.error(f"Data file {DATA_FILE} is not a dictionary. Trying backup.")
- raise ValueError("Data is not a dictionary")
- global_data.setdefault('users', {})
- except (json.JSONDecodeError, ValueError) as e:
- logging.error(f"Error decoding JSON from {DATA_FILE}: {e}. Attempting to load backup {DATA_FILE_BACKUP}")
- if os.path.exists(DATA_FILE_BACKUP):
- try:
- with open(DATA_FILE_BACKUP, 'r', encoding='utf-8') as bak_file:
- global_data = json.load(bak_file)
- if not isinstance(global_data, dict):
- logging.error(f"Backup file {DATA_FILE_BACKUP} is also invalid. Initializing empty data.")
- global_data = {'users': {}}
- else:
- logging.info(f"Successfully loaded data from backup {DATA_FILE_BACKUP}")
- # Optionally try to restore the main file from backup here
- try:
- with open(DATA_FILE, 'w', encoding='utf-8') as main_file:
- json.dump(global_data, main_file, ensure_ascii=False, indent=4)
- logging.info(f"Restored {DATA_FILE} from backup.")
- except Exception as write_err:
- logging.error(f"Failed to restore {DATA_FILE} from backup: {write_err}")
- except Exception as bak_e:
- logging.error(f"Error reading backup file {DATA_FILE_BACKUP}: {bak_e}. Initializing empty data.")
- global_data = {'users': {}}
- else:
- logging.warning(f"Backup file {DATA_FILE_BACKUP} not found. Initializing empty data.")
- global_data = {'users': {}}
- else:
- logging.warning(f"{DATA_FILE} still not found after download attempt. Initializing empty data.")
- global_data = {'users': {}}
+def load_data_from_file(filepath):
+ try:
+ with open(filepath, 'r', encoding='utf-8') as file:
+ data = json.load(file)
+ if not isinstance(data, dict):
+ logging.warning(f"Data file {filepath} is not a dict, treating as invalid.")
+ return None
+ data.setdefault('users', {})
+ for user_id, user_data in data['users'].items():
+ if isinstance(user_data, dict):
+ initialize_user_filesystem(user_data)
+ else:
+ logging.warning(f"Invalid user_data structure for user {user_id} in {filepath}, skipping.")
+ logging.info(f"Data loaded successfully from {filepath}.")
+ return data
+ except FileNotFoundError:
+ logging.info(f"{filepath} not found locally.")
+ return None
+ except json.JSONDecodeError:
+ logging.error(f"Error decoding JSON from {filepath}.")
+ return None
except Exception as e:
- logging.error(f"Unexpected error loading data: {e}. Returning empty data.")
- global_data = {'users': {}}
-
- # Ensure filesystem is initialized for all users after loading
- users = global_data.setdefault('users', {})
- if isinstance(users, dict):
- for user_id, user_data in users.items():
- if isinstance(user_data, dict):
- initialize_user_filesystem(user_data)
- else:
- logging.warning(f"Invalid user data structure for user {user_id}. Skipping initialization.")
- logging.info("Data loaded and filesystems checked/initialized.")
- else:
- logging.error("User data structure is not a dictionary. Resetting users.")
- global_data['users'] = {}
-
+ logging.error(f"Error loading data from {filepath}: {e}")
+ return None
- cache.set('app_data', global_data)
- logging.info(f"Loaded data into cache. User count: {len(global_data.get('users', {}))}")
- return global_data
+@cache.memoize(timeout=60)
+def load_data():
+ with data_lock:
+ data = None
+ primary_exists = os.path.exists(DATA_FILE)
+ backup_exists = os.path.exists(DATA_FILE_BACKUP)
+
+ if primary_exists:
+ data = load_data_from_file(DATA_FILE)
+
+ if data is None and backup_exists:
+ logging.warning(f"Primary data file {DATA_FILE} failed to load or missing, attempting backup.")
+ data = load_data_from_file(DATA_FILE_BACKUP)
+ if data:
+ logging.info("Loaded data from backup. Attempting to restore primary file.")
+ try:
+ shutil.copy2(DATA_FILE_BACKUP, DATA_FILE)
+ except Exception as e:
+ logging.error(f"Failed to restore primary file from backup: {e}")
+ if data is None:
+ logging.warning("Both primary and backup data files failed to load or missing. Attempting download from HF.")
+ download_success = download_db_from_hf()
+ if download_success:
+ data = load_data_from_file(DATA_FILE)
-def save_data(data: Dict[str, Any]):
- if not isinstance(data, dict) or 'users' not in data:
- logging.error("Attempted to save invalid data structure. Aborting save.")
- return # Prevent saving malformed data
+ if data is None:
+ logging.critical("CRITICAL: Could not load data from local files or HF. Initializing empty data structure.")
+ data = {'users': {}}
- # Validate filesystem integrity before saving (basic check)
- for user_id, user_data in data.get('users', {}).items():
- if not isinstance(user_data, dict) or 'filesystem' not in user_data or not isinstance(user_data['filesystem'], dict) or user_data['filesystem'].get('id') != 'root':
- logging.error(f"Filesystem integrity check failed for user {user_id}. Aborting save.")
- # Optionally try to recover/reset the user's filesystem here? Risky.
- return
+ return data
- try:
- # Backup current file before overwriting
- if os.path.exists(DATA_FILE):
- try:
- os.replace(DATA_FILE, DATA_FILE_BACKUP) # Atomic rename if possible
- logging.info(f"Created backup {DATA_FILE_BACKUP}")
- except OSError as e:
- logging.warning(f"Could not create backup file {DATA_FILE_BACKUP}: {e}. Proceeding with caution.")
+def save_data(data):
+ with data_lock:
+ try:
+ if os.path.exists(DATA_FILE):
+ try:
+ shutil.copy2(DATA_FILE, DATA_FILE_BACKUP)
+ logging.info(f"Created backup: {DATA_FILE_BACKUP}")
+ except Exception as backup_err:
+ logging.error(f"Failed to create backup file {DATA_FILE_BACKUP}: {backup_err}")
- with open(DATA_FILE, 'w', encoding='utf-8') as file:
- json.dump(data, file, ensure_ascii=False, indent=4)
+ with open(DATA_FILE, 'w', encoding='utf-8') as file:
+ json.dump(data, file, ensure_ascii=False, indent=2) # Use indent=2 for smaller file size
- cache.set('app_data', data) # Update cache immediately
- logging.info("Data saved locally successfully.")
- upload_db_to_hf() # Initiate upload after successful local save
- except Exception as e:
- logging.error(f"Error saving data locally: {e}")
- # Attempt to restore from backup if save failed
- if os.path.exists(DATA_FILE_BACKUP):
- try:
- os.replace(DATA_FILE_BACKUP, DATA_FILE)
- logging.info(f"Restored {DATA_FILE} from backup due to save failure.")
- # Reload data from restored file?
- load_data()
- except OSError as restore_e:
- logging.error(f"CRITICAL: Failed to save data AND failed to restore backup: {restore_e}")
+ logging.info(f"Data saved locally to {DATA_FILE}")
+ cache.clear()
+ upload_db_to_hf()
+ return True
+ except Exception as e:
+ logging.error(f"CRITICAL: Error saving data to {DATA_FILE}: {e}")
+ return False
def upload_db_to_hf():
if not HF_TOKEN_WRITE:
logging.warning("HF_TOKEN_WRITE not set, skipping database upload.")
return
if not os.path.exists(DATA_FILE):
- logging.error(f"Cannot upload {DATA_FILE} to HF: File does not exist locally.")
+ logging.warning(f"Local data file {DATA_FILE} not found for upload.")
return
try:
api = HfApi()
@@ -294,106 +230,63 @@ def upload_db_to_hf():
repo_type="dataset",
token=HF_TOKEN_WRITE,
commit_message=f"Backup MiniApp {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}",
- run_as_future=True # Schedule async
+ run_as_future=True
)
logging.info("Database upload to Hugging Face scheduled.")
except Exception as e:
- logging.error(f"Error scheduling database upload to HF: {e}")
+ logging.error(f"Error scheduling database upload: {e}")
+
def download_db_from_hf():
if not HF_TOKEN_READ:
logging.warning("HF_TOKEN_READ not set, skipping database download.")
- if not os.path.exists(DATA_FILE):
- logging.info(f"Local file {DATA_FILE} missing and no read token. Creating empty DB.")
- with open(DATA_FILE, 'w', encoding='utf-8') as f:
- json.dump({'users': {}}, f)
- return
+ return False
try:
- # Backup local file before potentially overwriting
- if os.path.exists(DATA_FILE):
- backup_path = f"{DATA_FILE}.{int(time.time())}.local_bak"
- try:
- os.rename(DATA_FILE, backup_path)
- logging.info(f"Backed up local DB to {backup_path} before downloading.")
- except OSError as e:
- logging.warning(f"Could not backup local DB before download: {e}")
-
- downloaded_path = hf_hub_download(
+ hf_hub_download(
repo_id=REPO_ID,
filename=DATA_FILE,
repo_type="dataset",
token=HF_TOKEN_READ,
local_dir=".",
- local_dir_use_symlinks=False, # Ensure actual file is created
- force_filename=DATA_FILE, # Ensure it overwrites/creates the correct name
+ local_dir_use_symlinks=False,
+ force_download=True,
etag_timeout=10
)
- logging.info(f"Database downloaded from Hugging Face to {downloaded_path}")
- # Basic validation of the downloaded file
- try:
- with open(downloaded_path, 'r', encoding='utf-8') as f:
- content = json.load(f)
- if not isinstance(content, dict) or 'users' not in content:
- logging.error("Downloaded DB file is invalid. Restoring local backup if exists.")
- raise ValueError("Invalid DB structure downloaded")
- except (json.JSONDecodeError, ValueError, Exception) as validate_e:
- logging.error(f"Validation of downloaded DB failed: {validate_e}")
- if 'backup_path' in locals() and os.path.exists(backup_path):
- try:
- os.replace(backup_path, DATA_FILE)
- logging.info("Restored local DB from backup due to invalid download.")
- except OSError as restore_e:
- logging.error(f"Failed to restore local DB backup: {restore_e}")
- # If restore fails, we might be left with a bad file or no file
- if os.path.exists(DATA_FILE): os.remove(DATA_FILE)
- with open(DATA_FILE, 'w', encoding='utf-8') as f: json.dump({'users': {}}, f)
-
-
+ logging.info(f"Database downloaded from Hugging Face to {DATA_FILE}")
+ return True
except hf_utils.RepositoryNotFoundError:
- logging.error(f"Repository {REPO_ID} not found on HF.")
- if not os.path.exists(DATA_FILE):
- with open(DATA_FILE, 'w', encoding='utf-8') as f: json.dump({'users': {}}, f)
+ logging.error(f"Repository {REPO_ID} not found on Hugging Face.")
+ return False
except hf_utils.EntryNotFoundError:
- logging.warning(f"{DATA_FILE} not found in repo {REPO_ID}. Using local version or creating empty.")
- if not os.path.exists(DATA_FILE):
- with open(DATA_FILE, 'w', encoding='utf-8') as f: json.dump({'users': {}}, f)
- except requests.exceptions.RequestException as e:
- logging.error(f"Network error downloading DB from HF: {e}. Using local version if available.")
- # Don't create empty if local exists and network fails
- if not os.path.exists(DATA_FILE):
- with open(DATA_FILE, 'w', encoding='utf-8') as f: json.dump({'users': {}}, f)
+ logging.warning(f"{DATA_FILE} not found in repo {REPO_ID}. No file downloaded.")
+ return False
+ except requests.exceptions.ConnectionError as e:
+ logging.error(f"Connection error downloading DB from HF: {e}. Using local version if available.")
+ return False
except Exception as e:
- logging.error(f"Unexpected error downloading database: {e}")
- if not os.path.exists(DATA_FILE):
- with open(DATA_FILE, 'w', encoding='utf-8') as f: json.dump({'users': {}}, f)
+ logging.error(f"Generic error downloading database from HF: {e}")
+ return False
-# --- File Type Helper ---
-def get_file_type(filename: Optional[str]) -> str:
+def get_file_type(filename):
if not filename or '.' not in filename: return 'other'
ext = filename.lower().split('.')[-1]
- if ext in ['mp4', 'mov', 'avi', 'webm', 'mkv', 'wmv', 'flv']: return 'video'
- if ext in ['jpg', 'jpeg', 'png', 'gif', 'bmp', 'webp', 'svg', 'ico']: return 'image'
+ if ext in ['mp4', 'mov', 'avi', 'webm', 'mkv', 'm4v', 'wmv', 'flv']: return 'video'
+ if ext in ['jpg', 'jpeg', 'png', 'gif', 'bmp', 'webp', 'svg', 'heic', 'heif']: return 'image'
if ext == 'pdf': return 'pdf'
- if ext in ['txt', 'log', 'md', 'json', 'xml', 'html', 'css', 'js', 'py', 'java', 'c', 'cpp', 'h', 'hpp', 'sh', 'bat']: return 'text'
- if ext in ['mp3', 'wav', 'ogg', 'aac', 'flac', 'm4a']: return 'audio'
- if ext in ['zip', 'rar', '7z', 'tar', 'gz', 'bz2']: return 'archive'
- if ext in ['doc', 'docx', 'rtf']: return 'document_word'
- if ext in ['xls', 'xlsx']: return 'document_excel'
- if ext in ['ppt', 'pptx']: return 'document_ppt'
+ if ext in ['txt', 'md', 'log', 'csv', 'json', 'xml', 'html', 'css', 'js', 'py', 'java', 'c', 'cpp', 'go', 'rs']: return 'text'
+ if ext in ['mp3', 'wav', 'ogg', 'flac', 'aac', 'm4a']: return 'audio'
+ if ext in ['zip', 'rar', '7z', 'tar', 'gz']: return 'archive'
+ if ext in ['doc', 'docx', 'ppt', 'pptx', 'xls', 'xlsx', 'odt', 'odp', 'ods']: return 'document'
return 'other'
-# --- Telegram Validation ---
-def check_telegram_authorization(auth_data_str: str, bot_token: str) -> Optional[Dict[str, Any]]:
- if not auth_data_str or not bot_token or bot_token == 'YOUR_BOT_TOKEN':
+
+def check_telegram_authorization(auth_data: str, bot_token: str) -> Optional[dict]:
+ if not auth_data or not bot_token or bot_token == 'YOUR_BOT_TOKEN':
logging.warning("Validation skipped: Missing auth_data or valid BOT_TOKEN.")
- # In development/debug mode, maybe return a mock user?
- # if app.debug:
- # return {"id": "12345", "first_name": "Debug", "username": "debug_user"}
return None
-
try:
- parsed_data = dict(parse_qsl(unquote(auth_data_str)))
+ parsed_data = dict(parse_qsl(unquote(auth_data)))
if "hash" not in parsed_data:
logging.error("Hash not found in auth data")
return None
@@ -403,17 +296,14 @@ def check_telegram_authorization(auth_data_str: str, bot_token: str) -> Optional
current_ts = int(time.time())
if abs(current_ts - auth_date_ts) > AUTH_DATA_LIFETIME:
- logging.warning(f"Auth data expired (Auth date: {auth_date_ts}, Now: {current_ts}, Diff: {current_ts - auth_date_ts} > {AUTH_DATA_LIFETIME})")
+ logging.warning(f"Auth data expired (Auth: {auth_date_ts}, Now: {current_ts}, Diff: {current_ts - auth_date_ts})")
return None
- data_check_list = sorted([f"{k}={v}" for k, v in parsed_data.items()])
- data_check_string = "\n".join(data_check_list)
-
+ data_check_string = "\n".join(sorted([f"{k}={v}" for k, v in parsed_data.items()]))
secret_key = hmac.new("WebAppData".encode(), bot_token.encode(), hashlib.sha256).digest()
- calculated_hash_bytes = hmac.new(secret_key, data_check_string.encode(), hashlib.sha256).digest()
- calculated_hash_hex = calculated_hash_bytes.hex()
+ calculated_hash = hmac.new(secret_key, data_check_string.encode(), hashlib.sha256).hexdigest()
- if hmac.compare_digest(calculated_hash_hex, telegram_hash):
+ if calculated_hash == telegram_hash:
user_data_str = parsed_data.get('user')
if user_data_str:
try:
@@ -421,54 +311,59 @@ def check_telegram_authorization(auth_data_str: str, bot_token: str) -> Optional
if 'id' not in user_info:
logging.error("Validated user data missing 'id'")
return None
- user_info['id'] = str(user_info['id']) # Ensure ID is string
return user_info
- except json.JSONDecodeError as e:
- logging.error(f"Failed to decode user JSON from auth data: {e} - Data: {user_data_str}")
+ except json.JSONDecodeError:
+ logging.error("Failed to decode user JSON from auth data")
return None
else:
logging.warning("No 'user' field in validated auth data")
return None
else:
- logging.warning(f"Hash mismatch during validation. Received: {telegram_hash}, Calculated: {calculated_hash_hex}")
+ logging.warning("Hash mismatch during validation")
return None
except Exception as e:
- logging.exception(f"Exception during Telegram validation: {e}")
+ logging.error(f"Exception during validation: {e}")
return None
-# --- HTML, CSS, JS Template ---
HTML_TEMPLATE = """
-
+
Zeus Cloud
-
-
Загрузка...
+
Loading...
-
-
-
Zeus Cloud
+
-
-
+
-
-
-
-
-
+
+
Files
+
-
-
-
-
Содержимое
-
-
+
+
+
+
+
+
+
@@ -634,14 +545,14 @@ HTML_TEMPLATE = """
const userInfoHeaderEl = document.getElementById('user-info-header');
const flashContainerEl = document.getElementById('flash-container');
const breadcrumbsContainerEl = document.getElementById('breadcrumbs-container');
- const itemGridContainerEl = document.getElementById('item-grid-container');
+ const fileListContainerEl = document.getElementById('file-list-container');
const currentFolderTitleEl = document.getElementById('current-folder-title');
- const uploadForm = document.getElementById('upload-form'); // Still needed for FormData
+ const uploadForm = document.getElementById('upload-form');
const fileInput = document.getElementById('file-input');
- const uploadLabelBtn = document.getElementById('upload-label-btn');
+ const uploadBtn = document.getElementById('upload-btn');
const progressContainer = document.getElementById('progress-container');
const progressBar = document.getElementById('progress-bar');
- const progressText = document.getElementById('progress-text'); // Kept in HTML, maybe use later
+ const progressText = document.getElementById('progress-text');
const newFolderInput = document.getElementById('new-folder-name');
const createFolderBtn = document.getElementById('create-folder-btn');
@@ -649,12 +560,10 @@ HTML_TEMPLATE = """
let validatedInitData = null;
let currentUser = null;
let currentItems = [];
- let isUploading = false;
- // --- API Communication ---
async function apiCall(endpoint, method = 'POST', body = {}) {
if (!validatedInitData) {
- showError("Ошибка: Данные авторизации отсутствуют. Попробуйте перезапустить.");
+ showError("Authentication data is missing.");
throw new Error("Not authenticated");
}
body.initData = validatedInitData;
@@ -666,37 +575,33 @@ HTML_TEMPLATE = """
body: JSON.stringify(body)
});
if (!response.ok) {
- let errorMsg = `Ошибка сервера (${response.status})`;
+ let errorMsg = `Server error: ${response.status}`;
try {
const errData = await response.json();
errorMsg = errData.message || errorMsg;
- } catch (e) { /* Ignore if error body is not JSON */ }
+ } catch (e) { /* Ignore */ }
throw new Error(errorMsg);
}
return await response.json();
} catch (error) {
console.error(`API call to ${endpoint} failed:`, error);
- showFlash(`Ошибка: ${error.message}`, 'error');
+ showFlash(`Network or server error: ${error.message}`, 'error');
throw error;
}
}
- // --- UI Rendering ---
- function showLoadingScreen(message = 'Загрузка...') {
- loadingEl.textContent = message;
- loadingEl.style.display = 'flex';
+ function showLoadingScreen() {
+ loadingEl.style.display = 'block';
errorViewEl.style.display = 'none';
appContentEl.style.display = 'none';
}
function showError(message) {
loadingEl.style.display = 'none';
- errorViewEl.innerHTML = `
';
modal.style.display = 'flex';
- if (tg.HapticFeedback) tg.HapticFeedback.impactOccurred('light');
try {
if (type === 'pdf') {
- // PDF handling in iframe can be tricky, especially mobile
- // Option 1: Google Docs Viewer (might have CORS issues or need proxy)
- // modalContent.innerHTML = ``;
- // Option 2: Link to open externally
- // modalContent.innerHTML = `
PDF файлы лучше открывать в отдельном приложении.
Открыть PDF`;
- // Option 3: Basic iframe (might work for some PDFs/browsers)
- modalContent.innerHTML = ``;
-
+ if (tg.platform === "ios" || tg.platform === "android") {
+ tg.openLink(window.location.origin + srcOrUrl, {try_instant_view: true});
+ closeModalManual();
+ return;
+ } else {
+ modalContent.innerHTML = ``;
+ }
} else if (type === 'image') {
- modalContent.innerHTML = ``;
+ modalContent.innerHTML = ``;
} else if (type === 'video') {
- modalContent.innerHTML = ``;
+ modalContent.innerHTML = ``;
} else if (type === 'text') {
const response = await fetch(srcOrUrl);
- if (!response.ok) throw new Error(`Ошибка загрузки текста: ${response.statusText || response.status}`);
+ if (!response.ok) throw new Error(`Failed to load text: ${response.statusText}`);
const text = await response.text();
- // Basic escaping for HTML safety
- const escapedText = text.replace(/&/g, "&").replace(//g, ">");
- modalContent.innerHTML = `