# -*- coding: utf-8 -*-
import os
import json
import time
from flask import Flask, request, jsonify, Response, render_template_string, redirect, url_for
from pywebpush import webpush, WebPushException
from py_vapid import Vapid
# --- Константы и Настройки ---
APP_PORT = 7860
NEWS_FILE = "news.json"
SUBSCRIPTIONS_FILE = "subscriptions.json"
VAPID_KEYS_FILE = "vapid_keys.json"
VAPID_CONTACT_EMAIL = "mailto:your_email@example.com" # Замените на ваш email
# --- Инициализация Flask ---
app = Flask(__name__)
app.secret_key = os.urandom(24) # Для flash сообщений (хотя здесь не используются)
# --- Генерация/Загрузка VAPID ключей ---
vapid_keys = {}
if os.path.exists(VAPID_KEYS_FILE):
try:
with open(VAPID_KEYS_FILE, "r") as f:
vapid_keys = json.load(f)
print(f"VAPID keys loaded from {VAPID_KEYS_FILE}")
except Exception as e:
print(f"Error loading VAPID keys: {e}. Generating new ones.")
vapid_keys = {}
if not vapid_keys or 'private_key' not in vapid_keys or 'public_key' not in vapid_keys:
print("Generating new VAPID keys...")
try:
vapid = Vapid.generate()
vapid_keys = {
"private_key": vapid.private_key,
"public_key": vapid.public_key
}
with open(VAPID_KEYS_FILE, "w") as f:
json.dump(vapid_keys, f, indent=4)
print(f"VAPID keys generated and saved to {VAPID_KEYS_FILE}")
except Exception as e:
print(f"FATAL: Could not generate or save VAPID keys: {e}")
exit(1) # Не можем работать без ключей
VAPID_PRIVATE_KEY = vapid_keys['private_key']
VAPID_PUBLIC_KEY = vapid_keys['public_key']
# --- Вспомогательные функции ---
def load_data(filename):
"""Загружает данные из JSON файла."""
if not os.path.exists(filename):
return []
try:
with open(filename, 'r', encoding='utf-8') as f:
# Handle empty file case
content = f.read()
if not content:
return []
return json.loads(content)
except (IOError, json.JSONDecodeError) as e:
print(f"Error loading {filename}: {e}")
return [] # Возвращаем пустой список при ошибке
def save_data(filename, data):
"""Сохраняет данные в JSON файл."""
try:
with open(filename, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=4)
except IOError as e:
print(f"Error saving {filename}: {e}")
def send_notification(subscription_info, title, body):
"""Отправляет одно push уведомление."""
print(f"Attempting to send notification to: {subscription_info.get('endpoint')[:50]}...")
try:
webpush(
subscription_info=subscription_info,
data=json.dumps({"title": title, "body": body}),
vapid_private_key=VAPID_PRIVATE_KEY,
vapid_claims={"sub": VAPID_CONTACT_EMAIL}
)
print("Notification sent successfully.")
return True
except WebPushException as ex:
print(f"WebPushException: {ex}")
# Улавливаем специфичные ошибки, например, подписка недействительна
if ex.response and ex.response.status_code in [404, 410]:
print(f"Subscription {subscription_info.get('endpoint')[:50]}... seems invalid (Gone or Not Found). Consider removing it.")
# Здесь можно добавить логику удаления недействительной подписки
# remove_subscription(subscription_info) # Понадобится функция для удаления
else:
print("Notification sending failed for other reason.")
return False
except Exception as e:
print(f"An unexpected error occurred during push notification sending: {e}")
return False
def notify_all(title, body):
"""Отправляет уведомление всем подписчикам."""
subscriptions = load_data(SUBSCRIPTIONS_FILE)
if not subscriptions:
print("No subscriptions found to notify.")
return
print(f"Notifying {len(subscriptions)} subscribers about '{title}'...")
# Создаем копию списка для безопасной итерации, если будем удалять элементы
# (Хотя в текущей реализации удаление не происходит внутри цикла)
for sub in list(subscriptions):
send_notification(sub, title, body)
# Небольшая пауза, чтобы не перегружать серверы push-уведомлений (опционально)
time.sleep(0.1)
# --- HTML Шаблон ---
HTML_TEMPLATE = """
"""
# --- Service Worker JavaScript ---
SERVICE_WORKER_JS = """
// service-worker.js
// Уникальное имя кэша (можно добавить версию)
const CACHE_NAME = 'news-pwa-cache-v1';
// Ресурсы для кэширования при установке
const urlsToCache = [
'/', // Кэшируем главную страницу
// Можно добавить другие статические ресурсы, если они есть (CSS, JS файлы, изображения)
// '/static/style.css',
// '/static/logo.png'
];
// Установка Service Worker: кэшируем основные ресурсы
self.addEventListener('install', event => {
console.log('Service Worker: Installing...');
event.waitUntil(
caches.open(CACHE_NAME)
.then(cache => {
console.log('Service Worker: Caching app shell');
return cache.addAll(urlsToCache);
})
.then(() => {
console.log('Service Worker: Install completed');
// Принудительная активация нового SW сразу после установки (не рекомендуется для продакшена без тщательного тестирования)
// return self.skipWaiting();
})
.catch(error => {
console.error('Service Worker: Installation failed', error);
})
);
});
// Активация Service Worker: очищаем старые кэши
self.addEventListener('activate', event => {
console.log('Service Worker: Activating...');
event.waitUntil(
caches.keys().then(cacheNames => {
return Promise.all(
cacheNames.map(cacheName => {
// Удаляем все кэши, кроме текущего активного
if (cacheName !== CACHE_NAME) {
console.log('Service Worker: Clearing old cache:', cacheName);
return caches.delete(cacheName);
}
})
);
}).then(() => {
console.log('Service Worker: Activation completed');
// Захватываем контроль над открытыми страницами немедленно
return self.clients.claim();
})
);
});
// Обработка запросов (Fetch): стратегия Cache First для закэшированных ресурсов
self.addEventListener('fetch', event => {
// Мы отвечаем только на GET запросы
if (event.request.method !== 'GET') {
return;
}
// Для запросов навигации (HTML страниц) используем стратегию Network Falling Back to Cache
if (event.request.mode === 'navigate') {
event.respondWith(
fetch(event.request)
.catch(() => {
// Если сеть недоступна, пробуем достать из кэша главную страницу
return caches.match('/');
})
);
return;
}
// Для остальных запросов (CSS, JS, картинки и т.д.) используем Cache First
event.respondWith(
caches.match(event.request)
.then(cachedResponse => {
// Если ресурс есть в кэше, возвращаем его
if (cachedResponse) {
// console.log('Service Worker: Serving from cache:', event.request.url);
return cachedResponse;
}
// Если ресурса нет в кэше, запрашиваем его из сети
// console.log('Service Worker: Fetching from network:', event.request.url);
return fetch(event.request).then(
networkResponse => {
// Опционально: можно кэшировать новые запросы динамически
// if (networkResponse.ok) {
// const responseToCache = networkResponse.clone();
// caches.open(CACHE_NAME)
// .then(cache => {
// cache.put(event.request, responseToCache);
// });
// }
return networkResponse;
}
).catch(error => {
console.error('Service Worker: Fetch failed; returning offline fallback or error.', error);
// Можно вернуть запасной контент (например, оффлайн-страницу), если он был закэширован
// return caches.match('/offline.html');
});
})
);
});
// Обработка Push-уведомлений
self.addEventListener('push', event => {
console.log('[Service Worker] Push Received.');
console.log(`[Service Worker] Push had this data: "${event.data.text()}"`);
let title = 'Новая новость!';
let options = {
body: 'Проверьте обновления на сайте.',
icon: null, // Иконки нет, как запрошено
badge: null // Значок для Android
// tag: 'news-notification' // Тег для группировки или замены уведомлений
};
try {
const data = event.data.json();
title = data.title || title;
options.body = data.body || options.body;
if (data.icon) options.icon = data.icon; // Если вдруг передали иконку
} catch (e) {
console.log("Push data was not JSON, using default message.");
options.body = event.data.text(); // Используем текст как тело, если не JSON
}
event.waitUntil(
self.registration.showNotification(title, options)
);
});
// Обработка клика по уведомлению (опционально)
self.addEventListener('notificationclick', event => {
console.log('[Service Worker] Notification click Received.');
event.notification.close(); // Закрываем уведомление
// Открываем или фокусируем окно приложения при клике
event.waitUntil(
clients.matchAll({ type: "window" })
.then(clientList => {
// Проверяем, открыта ли уже вкладка с этим URL
for (const client of clientList) {
// '/' - URL вашего приложения
if (client.url === '/' && 'focus' in client) {
return client.focus(); // Фокусируемся на существующей вкладке
}
}
// Если вкладка не найдена, открываем новую
if (clients.openWindow) {
return clients.openWindow('/'); // Открываем главную страницу
}
})
);
});
"""
# --- Manifest JSON ---
MANIFEST_JSON = {
"name": "Новостное PWA Приложение",
"short_name": "НовостиPWA",
"description": "Простое PWA для просмотра и добавления новостей с push-уведомлениями.",
"start_url": "/",
"display": "standalone", # Или 'minimal-ui', 'fullscreen', 'browser'
"background_color": "#ffffff",
"theme_color": "#3367D6",
# "icons": [] # Пустой массив, т.к. иконка не нужна по запросу
# Если браузер требует иконку для установки, можно добавить минимальную:
# "icons": [
# {
# "src": "", # 1x1 transparent pixel
# "sizes": "1x1",
# "type": "image/png"
# }
# ]
# ПРИМЕЧАНИЕ: Отсутствие иконок может помешать установке PWA в некоторых браузерах.
# Лучше предоставить хотя бы одну иконку. Оставим пока без иконок согласно запросу.
}
# --- Маршруты Flask ---
@app.route('/')
def index():
"""Главная страница, отображает новости и форму добавления."""
news_list = load_data(NEWS_FILE)
return render_template_string(HTML_TEMPLATE, news=news_list, vapid_public_key=VAPID_PUBLIC_KEY)
@app.route('/manifest.json')
def manifest():
"""Сервирует manifest.json."""
return jsonify(MANIFEST_JSON)
@app.route('/service-worker.js')
def service_worker():
"""Сервирует файл service-worker.js."""
return Response(SERVICE_WORKER_JS, mimetype='application/javascript')
@app.route('/add_news', methods=['POST'])
def add_news():
"""Добавляет новую новость и уведомляет подписчиков."""
title = request.form.get('title')
content = request.form.get('content')
if not title or not content:
# Можно добавить flash сообщение об ошибке, если используется render_template
return "Ошибка: Заголовок и содержание не могут быть пустыми.", 400
news_list = load_data(NEWS_FILE)
new_entry = {
'id': int(time.time() * 1000), # Простой ID на основе времени
'title': title,
'content': content,
'timestamp': time.strftime('%Y-%m-%d %H:%M:%S')
}
news_list.append(new_entry)
save_data(NEWS_FILE, news_list)
# Отправляем уведомления подписчикам
print(f"Sending notification for new news: '{title}'")
notify_all(title=f"Новая новость: {title}", body=content[:100] + ('...' if len(content) > 100 else '')) # Ограничим длину тела уведомления
return redirect(url_for('index')) # Перенаправляем на главную страницу
@app.route('/subscribe', methods=['POST'])
def subscribe():
"""Сохраняет информацию о подписке пользователя."""
subscription_info = request.json
if not subscription_info or 'endpoint' not in subscription_info:
return jsonify({"error": "Invalid subscription object"}), 400
print("Received subscription:")
print(json.dumps(subscription_info, indent=2))
subscriptions = load_data(SUBSCRIPTIONS_FILE)
# Проверяем, нет ли уже такой подписки (по endpoint)
exists = any(sub.get('endpoint') == subscription_info.get('endpoint') for sub in subscriptions)
if not exists:
subscriptions.append(subscription_info)
save_data(SUBSCRIPTIONS_FILE, subscriptions)
print(f"Subscription added. Total subscriptions: {len(subscriptions)}")
return jsonify({"message": "Subscription added successfully."}), 201
else:
print("Subscription already exists.")
return jsonify({"message": "Subscription already exists."}), 200
# --- Запуск приложения ---
if __name__ == '__main__':
print(f"Starting Flask app on http://127.0.0.1:{APP_PORT}")
print(f"Make sure to access via localhost or 127.0.0.1 for PWA/Push features.")
# Используем host='0.0.0.0' чтобы быть доступным извне, но PWA может не работать без HTTPS
# Для локального тестирования лучше использовать host='127.0.0.1'
app.run(host='0.0.0.0', port=APP_PORT, debug=True)
# Важно: debug=True не рекомендуется для продакшена!