from flask import Flask, render_template_string, request, redirect, url_for, session, flash import json import os import logging import threading import time from datetime import datetime from huggingface_hub import HfApi, hf_hub_download from werkzeug.utils import secure_filename import random app = Flask(__name__) app.secret_key = 'supersecretkey' # Замените на безопасный ключ в продакшене DATA_FILE = 'data_ostenhost.json' REPO_ID = "Eluza133/w1f9" HF_TOKEN_WRITE = os.getenv("HF_TOKEN") # Токен с правами записи HF_TOKEN_READ = os.getenv("HF_TOKEN_READ") or HF_TOKEN_WRITE # Настройка логирования logging.basicConfig(level=logging.DEBUG) # Функции работы с базой данных def load_data(): try: download_db_from_hf() with open(DATA_FILE, 'r', encoding='utf-8') as file: data = json.load(file) if not isinstance(data, dict): logging.warning("Данные не в формате dict, инициализация пустой базы") return {'posts': [], 'users': {}} if 'posts' not in data: data['posts'] = [] if 'users' not in data: data['users'] = {} logging.info("Данные успешно загружены") return data except Exception as e: logging.error(f"Ошибка загрузки данных: {e}") return {'posts': [], 'users': {}} def save_data(data): try: with open(DATA_FILE, 'w', encoding='utf-8') as file: json.dump(data, file, ensure_ascii=False, indent=4) upload_db_to_hf() logging.info("Данные сохранены и загружены на HF") except Exception as e: logging.error(f"Ошибка сохранения данных: {e}") raise def upload_db_to_hf(): try: api = HfApi() api.upload_file( path_or_fileobj=DATA_FILE, path_in_repo=DATA_FILE, repo_id=REPO_ID, repo_type="dataset", token=HF_TOKEN_WRITE, commit_message=f"Backup {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}" ) logging.info("База данных загружена на Hugging Face") except Exception as e: logging.error(f"Ошибка загрузки базы: {e}") def download_db_from_hf(): try: hf_hub_download( repo_id=REPO_ID, filename=DATA_FILE, repo_type="dataset", token=HF_TOKEN_READ, local_dir=".", local_dir_use_symlinks=False ) logging.info("База данных скачана с Hugging Face") except Exception as e: logging.error(f"Ошибка скачивания базы: {e}") if not os.path.exists(DATA_FILE): with open(DATA_FILE, 'w', encoding='utf-8') as f: json.dump({'posts': [], 'users': {}}, f) def periodic_backup(): while True: upload_db_to_hf() time.sleep(15) # Базовый шаблон боковой навигации NAV_HTML = ''' ''' # Регистрация пользователя @app.route('/register', methods=['GET', 'POST']) def register(): if request.method == 'POST': username = request.form.get('username') password = request.form.get('password') data = load_data() if username in data['users']: flash('Пользователь уже существует!') return redirect(url_for('register')) data['users'][username] = {'password': password, 'bio': '', 'link': '', 'avatar': None} save_data(data) logging.info(f"Пользователь {username} зарегистрирован") flash('Регистрация успешна! Войдите в систему.') return redirect(url_for('login')) is_authenticated = 'username' in session username = session.get('username', None) html = ''' Регистрация ''' + NAV_HTML + '''

Регистрация

{% with messages = get_flashed_messages() %} {% if messages %} {% for message in messages %}
{{ message }}
{% endfor %} {% endif %} {% endwith %}

Уже есть аккаунт? Войти

''' return render_template_string(html, is_authenticated=is_authenticated, username=username) # Авторизация пользователя @app.route('/login', methods=['GET', 'POST']) def login(): if request.method == 'POST': username = request.form.get('username') password = request.form.get('password') data = load_data() if username in data['users'] and data['users'][username]['password'] == password: session['username'] = username session.permanent = True logging.info(f"Успешный вход: {username}") return redirect(url_for('feed')) flash('Неверный логин или пароль!') return redirect(url_for('login')) is_authenticated = 'username' in session username = session.get('username', None) html = ''' Вход ''' + NAV_HTML + '''

Вход

{% with messages = get_flashed_messages() %} {% if messages %} {% for message in messages %}
{{ message }}
{% endfor %} {% endif %} {% endwith %}

Нет аккаунта? Зарегистрироваться

''' return render_template_string(html, is_authenticated=is_authenticated, username=username) # Выход из системы @app.route('/logout') def logout(): session.pop('username', None) logging.info("Пользователь вышел из системы") return redirect(url_for('feed')) # Главная страница - лента публикаций @app.route('/', methods=['GET', 'POST']) def feed(): data = load_data() posts = sorted(data.get('posts', []), key=lambda x: datetime.strptime(x['upload_date'], '%Y-%m-%d %H:%M:%S'), reverse=True) is_authenticated = 'username' in session username = session.get('username', None) search_query = request.form.get('search', '').strip().lower() if request.method == 'POST' else request.args.get('search', '').strip().lower() if search_query: posts = [post for post in posts if search_query in post['title'].lower() or search_query in post['description'].lower()] html = ''' Лента ''' + NAV_HTML + '''

Лента публикаций

{% for post in posts %} {% if post['type'] == 'video' %} {% else %} {{ post['title'] }} {% endif %}

{{ post['title'] }}

{{ post['description'] }}

Загрузил: {{ post['uploader'] }} | {{ post['upload_date'] }}

Просмотров: {{ post['views'] }} | Лайков: {{ post['likes']|length }}

{% endfor %}
''' return render_template_string(html, posts=posts, is_authenticated=is_authenticated, username=username, repo_id=REPO_ID, search_query=search_query) # Страница отдельной публикации @app.route('/post/', methods=['GET', 'POST']) def post_page(post_id): data = load_data() post = next((p for p in data['posts'] if p['id'] == post_id), None) if not post: return "Публикация не найдена", 404 is_authenticated = 'username' in session username = session.get('username', None) # Увеличение количества просмотров post['views'] = post.get('views', 0) + 1 save_data(data) if request.method == 'POST' and is_authenticated: if 'like' in request.form: if username not in post.get('likes', []): post['likes'] = post.get('likes', []) + [username] else: post['likes'] = [user for user in post.get('likes', []) if user != username] save_data(data) elif 'comment' in request.form: comment_text = request.form.get('comment') if comment_text: post['comments'] = post.get('comments', []) + [{'user': username, 'text': comment_text, 'date': datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] save_data(data) html = ''' {{ post['title'] }} ''' + NAV_HTML + '''

{{ post['title'] }}

{% if post['type'] == 'video' %} {% else %} {{ post['title'] }} {% endif %}

{{ post['description'] }}

Загрузил: {{ post['uploader'] }} | {{ post['upload_date'] }}

Просмотров: {{ post['views'] }} | Лайков: {{ post['likes']|length }}

{% if is_authenticated %}
{% endif %}

Комментарии

{% for comment in post.get('comments', []) %}

{{ comment['user'] }} ({{ comment['date'] }}): {{ comment['text'] }}

{% endfor %} Назад к ленте {% if not is_authenticated %}

Войдите, чтобы ставить лайки и комментировать.

{% endif %}
''' return render_template_string(html, post=post, repo_id=REPO_ID, is_authenticated=is_authenticated, username=username) # Страница профиля текущего пользователя @app.route('/profile', methods=['GET', 'POST']) def profile(): if 'username' not in session: flash('Войдите, чтобы просмотреть профиль!') return redirect(url_for('login')) data = load_data() username = session['username'] user_posts = sorted([p for p in data['posts'] if p['uploader'] == username], key=lambda x: datetime.strptime(x['upload_date'], '%Y-%m-%d %H:%M:%S'), reverse=True) is_authenticated = 'username' in session # Подсчет общего количества просмотров и лайков total_views = sum(post.get('views', 0) for post in user_posts) total_likes = sum(len(post.get('likes', [])) for post in user_posts) # Получение текущих данных пользователя user_data = data['users'].get(username, {}) bio = user_data.get('bio', '') link = user_data.get('link', '') avatar = user_data.get('avatar', None) if request.method == 'POST': if 'delete_post' in request.form: post_id = request.form.get('post_id') if post_id: data['posts'] = [p for p in data['posts'] if p['id'] != post_id or p['uploader'] != username] save_data(data) logging.info(f"Публикация {post_id} удалена пользователем {username}") return redirect(url_for('profile')) elif 'update_profile' in request.form: bio = request.form.get('bio', '') link = request.form.get('link', '') avatar_file = request.files.get('avatar') if avatar_file and avatar_file.filename: filename = secure_filename(avatar_file.filename) temp_path = os.path.join('uploads', filename) os.makedirs('uploads', exist_ok=True) avatar_file.save(temp_path) api = HfApi() avatar_path = f"avatars/{username}/{filename}" api.upload_file( path_or_fileobj=temp_path, path_in_repo=avatar_path, repo_id=REPO_ID, repo_type="dataset", token=HF_TOKEN_WRITE, commit_message=f"Добавлен аватар для {username}" ) data['users'][username]['avatar'] = avatar_path if os.path.exists(temp_path): os.remove(temp_path) data['users'][username]['bio'] = bio data['users'][username]['link'] = link save_data(data) logging.info(f"Профиль {username} обновлен") return redirect(url_for('profile')) html = ''' Профиль - {{ username }} ''' + NAV_HTML + '''

Профиль: {{ username }}

{% if avatar %} Аватар {% endif %}

{{ bio }}

{% if link %}

{{ link }}

{% endif %}

Всего просмотров: {{ total_views }} | Всего лайков: {{ total_likes }}

Редактировать профиль

Добавить публикацию

Ваши публикации

{% if user_posts %} {% for post in user_posts %}
{% if post['type'] == 'video' %} {% else %} {{ post['title'] }} {% endif %}

{{ post['title'] }}

{{ post['description'] }}

{{ post['upload_date'] }}

Просмотров: {{ post['views'] }} | Лайков: {{ post['likes']|length }}

{% endfor %} {% else %}

Вы пока не загрузили ни одной публикации.

{% endif %}
''' return render_template_string(html, username=username, user_posts=user_posts, total_views=total_views, total_likes=total_likes, bio=bio, link=link, avatar=avatar, repo_id=REPO_ID, is_authenticated=is_authenticated) # Страница профиля другого пользователя @app.route('/profile/') def user_profile(username): data = load_data() if username not in data['users']: return "Пользователь не найден", 404 user_posts = sorted([p for p in data['posts'] if p['uploader'] == username], key=lambda x: datetime.strptime(x['upload_date'], '%Y-%m-%d %H:%M:%S'), reverse=True) is_authenticated = 'username' in session current_user = session.get('username', None) # Подсчет общего количества просмотров и лайков total_views = sum(post.get('views', 0) for post in user_posts) total_likes = sum(len(post.get('likes', [])) for post in user_posts) # Получение данных пользователя user_data = data['users'].get(username, {}) bio = user_data.get('bio', '') link = user_data.get('link', '') avatar = user_data.get('avatar', None) html = ''' Профиль - {{ username }} ''' + NAV_HTML + '''

Профиль: {{ username }}

{% if avatar %} Аватар {% endif %}

{{ bio }}

{% if link %}

{{ link }}

{% endif %}

Всего просмотров: {{ total_views }} | Всего лайков: {{ total_likes }}

Публикации

{% if user_posts %} {% for post in user_posts %}
{% if post['type'] == 'video' %} {% else %} {{ post['title'] }} {% endif %}

{{ post['title'] }}

{{ post['description'] }}

{{ post['upload_date'] }}

Просмотров: {{ post['views'] }} | Лайков: {{ post['likes']|length }}

{% endfor %} {% else %}

Этот пользователь пока не загрузил ни одной публикации.

{% endif %}
''' return render_template_string(html, username=username, user_posts=user_posts, total_views=total_views, total_likes=total_likes, bio=bio, link=link, avatar=avatar, repo_id=REPO_ID, is_authenticated=is_authenticated, current_user=current_user) # Страница загрузки контента @app.route('/upload', methods=['GET', 'POST']) def upload(): if 'username' not in session: flash('Войдите, чтобы загрузить контент!') return redirect(url_for('login')) if request.method == 'POST': title = request.form.get('title') description = request.form.get('description') file = request.files.get('file') uploader = session['username'] if not title or not file: return "Укажите название и выберите файл", 400 filename = secure_filename(file.filename) temp_path = os.path.join('uploads', filename) os.makedirs('uploads', exist_ok=True) file.save(temp_path) # Определение типа файла file_type = 'video' if filename.lower().endswith(('.mp4', '.mov', '.avi')) else 'photo' # Загрузка на Hugging Face api = HfApi() api.upload_file( path_or_fileobj=temp_path, path_in_repo=f"{file_type}s/{filename}", repo_id=REPO_ID, repo_type="dataset", token=HF_TOKEN_WRITE, commit_message=f"Добавлена публикация: {title} пользователем {uploader}" ) # Обновление базы данных data = load_data() post_id = f"post_{len(data['posts']) + 1}" while any(p['id'] == post_id for p in data['posts']): post_id = f"post_{len(data['posts']) + 1}_{int(time.time())}" data['posts'].append({ 'id': post_id, 'title': title, 'description': description, 'filename': filename, 'type': file_type, 'uploader': uploader, 'upload_date': datetime.now().strftime('%Y-%m-%d %H:%M:%S'), 'views': 0, 'likes': [], 'comments': [] }) save_data(data) logging.info(f"Публикация {title} ({file_type}) загружена пользователем {uploader} с ID {post_id}") if os.path.exists(temp_path): os.remove(temp_path) return redirect(url_for('profile')) is_authenticated = 'username' in session username = session.get('username', None) html = ''' Загрузка контента ''' + NAV_HTML + '''

Загрузить контент

0%
''' return render_template_string(html, username=username, is_authenticated=is_authenticated) # Админ-панель (все посты: видео и фото) @app.route('/admhosto', methods=['GET', 'POST']) def admin_panel(): data = load_data() posts = sorted(data.get('posts', []), key=lambda x: datetime.strptime(x['upload_date'], '%Y-%m-%d %H:%M:%S'), reverse=True) is_authenticated = 'username' in session username = session.get('username', None) search_query = request.form.get('search', '').strip().lower() if request.method == 'POST' and 'search' in request.form else request.args.get('search', '').strip().lower() if search_query: posts = [post for post in posts if search_query in post['title'].lower() or search_query in post['description'].lower()] if request.method == 'POST' and 'delete' in request.form: post_id = request.form.get('post_id') if post_id: data['posts'] = [p for p in data['posts'] if p['id'] != post_id] save_data(data) logging.info(f"Удален пост с ID {post_id} через админ-панель") return redirect(url_for('admin_panel')) html = ''' Админ-панель ''' + NAV_HTML + '''

Админ-панель: Все посты

{% if posts %} {% for post in posts %}
{% if post['type'] == 'video' %} {% else %} {{ post['title'] }} {% endif %}

{{ post['title'] }}

{{ post['description'] }}

Загрузил: {{ post['uploader'] }} | {{ post['upload_date'] }}

Просмотров: {{ post['views'] }} | Лайков: {{ post['likes']|length }}

{% endfor %} {% else %}

Посты не найдены.

{% endif %}
''' return render_template_string(html, posts=posts, is_authenticated=is_authenticated, username=username, repo_id=REPO_ID, search_query=search_query) if __name__ == '__main__': backup_thread = threading.Thread(target=periodic_backup, daemon=True) backup_thread.start() app.run(debug=True, host='0.0.0.0', port=7860)