#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 交互式英语学习应用 - Flask 重构版本 在7860端口提供HTTP服务,支持静态文件和RESTful API """ import os import sys import json import time from datetime import datetime from pathlib import Path from flask import Flask, render_template, send_from_directory, jsonify, request, session from flask_cors import CORS import logging from logging.handlers import RotatingFileHandler from database.db_manager import get_db_instance # 创建 Flask 应用 app = Flask(__name__, static_folder='static', static_url_path='/static') # 配置 app.config['SECRET_KEY'] = os.environ.get('SECRET_KEY', 'dev-secret-key-change-in-production') app.config['JSON_AS_ASCII'] = False # 支持中文JSON app.config['MAX_CONTENT_LENGTH'] = 16 * 1024 * 1024 # 最大上传16MB # 启用CORS CORS(app, resources={ r"/*": { "origins": "*", "methods": ["GET", "POST", "PUT", "DELETE", "OPTIONS"], "allow_headers": ["Content-Type", "Authorization"] } }) # 配置日志 def setup_logging(): """配置日志系统""" if not app.debug: try: # 尝试在当前目录创建日志目录 log_dir = Path('logs') log_dir.mkdir(exist_ok=True) log_file = log_dir / 'app.log' except (PermissionError, OSError): # 如果当前目录无权限,使用 /tmp 目录 try: log_dir = Path('/tmp/logs') log_dir.mkdir(exist_ok=True) log_file = log_dir / 'app.log' print(f"⚠️ 使用临时日志目录: {log_file}") except (PermissionError, OSError): # 如果都失败,只使用控制台日志 print("⚠️ 无法创建日志文件,仅输出到控制台") log_file = None # 如果有有效的日志文件路径,添加文件处理器 if log_file: try: file_handler = RotatingFileHandler( log_file, maxBytes=10 * 1024 * 1024, backupCount=10, encoding='utf-8' ) file_handler.setFormatter(logging.Formatter( '[%(asctime)s] %(levelname)s in %(module)s: %(message)s' )) file_handler.setLevel(logging.INFO) app.logger.addHandler(file_handler) except (PermissionError, OSError) as e: print(f"⚠️ 无法创建日志处理器: {e}") # 确保至少有控制台输出 if not app.logger.handlers: console_handler = logging.StreamHandler() console_handler.setFormatter(logging.Formatter( '[%(asctime)s] %(levelname)s: %(message)s' )) console_handler.setLevel(logging.INFO) app.logger.addHandler(console_handler) app.logger.setLevel(logging.INFO) app.logger.info('🚀 Flask 应用启动') # OpenSearch 配置(根据项目需求) OPENSEARCH_CONFIG = { 'host': os.environ.get('OPENSEARCH_HOST', '192.168.3.33'), 'port': int(os.environ.get('OPENSEARCH_PORT', 9200)), 'use_ssl': os.environ.get('OPENSEARCH_USE_SSL', 'False').lower() == 'true', } # 全局变量:存储学习数据 BOOK_DATA = None # 数据库实例 DB = None def get_client_ip(): """ 获取客户端真实 IP 地址 在代理服务器(如 Hugging Face Spaces)后面时,需要检查代理头部 """ # 按优先级检查各种代理头部 headers_to_check = [ 'X-Forwarded-For', 'X-Real-IP', 'CF-Connecting-IP', # Cloudflare 'True-Client-IP', 'X-Client-IP', ] for header in headers_to_check: ip = request.headers.get(header) if ip: # X-Forwarded-For 可能包含多个 IP,取第一个 return ip.split(',')[0].strip() # 如果没有代理头部,使用 remote_addr return request.remote_addr or 'Unknown' def load_book_data(): """ 加载书籍数据(已废弃,保留是为了兼容性) 现在使用数据库接口,不再加载JSON文件 """ global BOOK_DATA app.logger.info('ℹ️ load_book_data 已废弃,使用数据库接口') # 设置为空字典表示已初始化,但不再使用 BOOK_DATA = {} return True def init_database(): """初始化数据库连接""" global DB try: DB = get_db_instance('books.db') app.logger.info('✅ 数据库连接初始化成功') return True except Exception as e: app.logger.error(f'❌ 数据库初始化失败: {e}') return False # ============================================================================ # 路由:静态文件服务 # ============================================================================ @app.route('/') def index(): """主页 - 书籍目录""" return send_from_directory('.', 'index.html') @app.route('/reader') def reader(): """阅读页面""" return send_from_directory('.', 'reader.html') @app.route('/') def serve_static(filename): """提供根目录下的静态文件(如 style.css, script.js)""" # 避免与API路由冲突 if filename.startswith('api/'): return jsonify({'error': '接口不存在'}), 404 return send_from_directory('.', filename) # ============================================================================ # 路由:API 端点 # ============================================================================ @app.route('/api/health') def health_check(): """健康检查端点""" return jsonify({ 'status': 'healthy', 'timestamp': datetime.now().isoformat(), 'version': '2.0.0-flask' }) @app.route('/api/book/info') def get_book_info(): """ 获取书籍信息(旧接口,保留兼容性) 推荐使用: /api/v2/books/ """ try: if not DB: return jsonify({'error': '数据库未初始化'}), 500 # 默认获取第一本书的信息(为了兼容旧版本) books = DB.get_all_books() if not books: return jsonify({'error': '没有找到书籍数据'}), 404 # 使用最后导入的书籍 book = books[-1] # 返回书籍元信息 info = { 'book_id': book['market_book_id'], 'total_pages': book['max_page'], 'title': book['market_book_name'], 'loaded': True } return jsonify(info) except Exception as e: app.logger.error(f"获取书籍信息失败: {e}") return jsonify({'error': str(e)}), 500 @app.route('/api/book/page/') def get_page_content(page_num): """ 获取指定页面内容(旧接口,保留兼容性) 推荐使用: /api/v2/books//pages/ """ try: if not DB: return jsonify({'error': '数据库未初始化'}), 500 # 获取最后导入的书籍 books = DB.get_all_books() if not books: return jsonify({'error': '没有找到书籍数据'}), 404 book = books[-1] book_id = book['market_book_id'] # 获取页面内容 page = DB.get_page_content(book_id, page_num) if not page: return jsonify({'error': '页码超出范围'}), 404 return jsonify({ 'page_num': page_num, 'content': page, 'total_pages': book['max_page'] }) except Exception as e: app.logger.error(f"获取页面内容失败: {e}") return jsonify({'error': str(e)}), 500 @app.route('/api/progress/save', methods=['POST']) def save_progress(): """保存学习进度""" try: data = request.get_json() # 这里可以保存到数据库,现在先存到 session if 'progress' not in session: session['progress'] = {} session['progress'].update({ 'current_page': data.get('current_page', 0), 'bookmarks': data.get('bookmarks', []), 'settings': data.get('settings', {}), 'last_updated': datetime.now().isoformat() }) app.logger.info(f"保存学习进度: 第 {data.get('current_page')} 页") return jsonify({ 'success': True, 'message': '学习进度已保存' }) except Exception as e: app.logger.error(f"保存进度失败: {e}") return jsonify({ 'success': False, 'error': str(e) }), 500 @app.route('/api/progress/load', methods=['GET']) def load_progress(): """加载学习进度""" progress = session.get('progress', { 'current_page': 0, 'bookmarks': [], 'settings': {} }) return jsonify(progress) @app.route('/api/search', methods=['POST']) def search_content(): """搜索内容(未来可集成 OpenSearch)""" try: data = request.get_json() keyword = data.get('keyword', '').strip() if not keyword: return jsonify({'error': '搜索关键词不能为空'}), 400 # 简单搜索实现(在书籍数据中搜索) if not BOOK_DATA: return jsonify({'error': '书籍数据未加载'}), 500 results = [] pages = BOOK_DATA.get('pages', []) for page_num, page_content in enumerate(pages): # 在页面内容中搜索关键词 page_str = json.dumps(page_content, ensure_ascii=False) if keyword.lower() in page_str.lower(): results.append({ 'page_num': page_num, 'preview': str(page_content)[:200] + '...' }) app.logger.info(f"搜索 '{keyword}': 找到 {len(results)} 个结果") return jsonify({ 'keyword': keyword, 'total': len(results), 'results': results[:20] # 限制返回前20个结果 }) except Exception as e: app.logger.error(f"搜索失败: {e}") return jsonify({ 'success': False, 'error': str(e) }), 500 @app.route('/api/opensearch/status') def opensearch_status(): """检查 OpenSearch 连接状态(示例)""" try: # 这里可以实际连接 OpenSearch # from opensearchpy import OpenSearch # client = OpenSearch([{'host': OPENSEARCH_CONFIG['host'], 'port': OPENSEARCH_CONFIG['port']}]) # info = client.info() return jsonify({ 'configured': True, 'host': OPENSEARCH_CONFIG['host'], 'port': OPENSEARCH_CONFIG['port'], 'status': 'not_implemented', 'message': 'OpenSearch 集成待实现' }) except Exception as e: return jsonify({ 'configured': False, 'error': str(e) }), 500 @app.route('/api/stats') def get_stats(): """获取学习统计数据""" # 从 session 或数据库获取统计数据 progress = session.get('progress', {}) return jsonify({ 'current_page': progress.get('current_page', 0), 'total_bookmarks': len(progress.get('bookmarks', [])), 'last_visit': progress.get('last_updated', None), 'total_pages': len(BOOK_DATA.get('pages', [])) if BOOK_DATA else 0 }) # ============================================================================ # 路由:新版API端点(基于SQLite数据库) # ============================================================================ @app.route('/api/v2/books') def get_books(): """ 获取所有书籍列表 Query Parameters: grade_id: 年级ID(可选) Returns: { "success": true, "count": 10, "books": [ { "market_book_id": 168, "market_book_name": "一年级上册", "market_book_cover": "168_一年级上册/images/page_001.jpg", "max_page": 73, "market_classify_name": "沪教版(深圳)", "grade_id": 40, "reel_id": 1 } ] } """ try: if not DB: return jsonify({ 'success': False, 'error': '数据库未初始化' }), 500 # 检查是否有年级筛选 grade_id = request.args.get('grade_id', type=int) if grade_id: books = DB.get_books_by_grade(grade_id) else: books = DB.get_all_books() app.logger.info(f"获取书籍列表: {len(books)} 本书") return jsonify({ 'success': True, 'count': len(books), 'books': books }) except Exception as e: app.logger.error(f"获取书籍列表失败: {e}") return jsonify({ 'success': False, 'error': str(e) }), 500 @app.route('/api/v2/books/') def get_book_info_v2(book_id): """ 获取指定书籍的详细信息 Args: book_id: 书籍ID Returns: { "success": true, "book": { "book_id": 1, "book_name": "书籍名称", "total_pages": 100, ... } } """ try: if not DB: return jsonify({ 'success': False, 'error': '数据库未初始化' }), 500 book = DB.get_book_by_id(book_id) if not book: return jsonify({ 'success': False, 'error': '书籍不存在' }), 404 app.logger.info(f"获取书籍信息: ID={book_id}") return jsonify({ 'success': True, 'book': book }) except Exception as e: app.logger.error(f"获取书籍信息失败: {e}") return jsonify({ 'success': False, 'error': str(e) }), 500 @app.route('/api/v2/books//pages') def get_book_pages(book_id): """ 获取书籍的所有页面列表(不含片段内容) Args: book_id: 书籍ID Returns: { "success": true, "book_id": 168, "book_name": "一年级上册", "total_pages": 73, "pages": [ { "page_id": 2111, "page_number": 1, "origin_img_url": "...", "encrypt_img_url": "..." } ] } """ try: if not DB: return jsonify({ 'success': False, 'error': '数据库未初始化' }), 500 # 检查书籍是否存在 book = DB.get_book_by_id(book_id) if not book: return jsonify({ 'success': False, 'error': '书籍不存在' }), 404 # 获取页面列表 pages = DB.get_book_pages(book_id) app.logger.info(f"获取书籍页面列表: 书籍ID={book_id}, 页数={len(pages)}") return jsonify({ 'success': True, 'book_id': book_id, 'book_name': book['market_book_name'], 'total_pages': len(pages), 'pages': pages }) except Exception as e: app.logger.error(f"获取书籍页面列表失败: {e}") return jsonify({ 'success': False, 'error': str(e) }), 500 @app.route('/api/v2/books//catalog') def get_book_catalog(book_id): """ 获取书籍目录结构(章节目录) Args: book_id: 书籍ID Returns: { "success": true, "book_id": 168, "book_name": "一年级上册", "catalog": [ { "catalog_id": 1, "catalog_name": "Unit 1 Hello", "catalog_name_cn": "第一单元 你好", "start_page": 2, "end_page": 10, "thumbnail": "...", "children": [] } ] } """ try: if not DB: return jsonify({ 'success': False, 'error': '数据库未初始化' }), 500 # 检查书籍是否存在 book = DB.get_book_by_id(book_id) if not book: return jsonify({ 'success': False, 'error': '书籍不存在' }), 404 # 获取目录 catalog = DB.get_book_catalog(book_id) app.logger.info(f"获取书籍目录: 书籍ID={book_id}, 目录项={len(catalog)}") return jsonify({ 'success': True, 'book_id': book_id, 'book_name': book['market_book_name'], 'catalog': catalog }) except Exception as e: app.logger.error(f"获取书籍目录失败: {e}") return jsonify({ 'success': False, 'error': str(e) }), 500 @app.route('/api/v2/books//pages/') def get_page_content_v2(book_id, page_num): """ 获取指定页面的完整内容 Args: book_id: 书籍ID page_num: 页码 Returns: { "success": true, "book_id": 1, "page": { "page_id": 1001, "page_number": 1, "origin_img_url": "https://...", "pieces": [ { "piece_id": 10001, "original": "Hello", "translation": "你好", "origin_sound_url": "https://...", ... } ], "piece_count": 10 } } """ try: if not DB: return jsonify({ 'success': False, 'error': '数据库未初始化' }), 500 # 检查书籍是否存在 book = DB.get_book_by_id(book_id) if not book: return jsonify({ 'success': False, 'error': '书籍不存在' }), 404 # 获取页面内容 page = DB.get_page_content(book_id, page_num) if not page: return jsonify({ 'success': False, 'error': f'页码 {page_num} 不存在' }), 404 app.logger.info(f"获取页面内容: 书籍ID={book_id}, 页码={page_num}") return jsonify({ 'success': True, 'book_id': book_id, 'book_name': book['market_book_name'], 'page': page }) except Exception as e: app.logger.error(f"获取页面内容失败: {e}") return jsonify({ 'success': False, 'error': str(e) }), 500 @app.route('/api/v2/books//search') def search_book_content(book_id): """ 在书籍中搜索内容 Query Parameters: keyword: 搜索关键词 limit: 返回结果数量(默认20) Returns: { "success": true, "book_id": 168, "keyword": "hello", "count": 5, "results": [ { "page_number": 2, "piece_id": 26342, "original": "Hello", "translation": "你好", "origin_sound_url": "..." } ] } """ try: if not DB: return jsonify({ 'success': False, 'error': '数据库未初始化' }), 500 keyword = request.args.get('keyword', '').strip() if not keyword: return jsonify({ 'success': False, 'error': '请提供搜索关键词' }), 400 limit = request.args.get('limit', 20, type=int) # 检查书籍是否存在 book = DB.get_book_by_id(book_id) if not book: return jsonify({ 'success': False, 'error': '书籍不存在' }), 404 # 搜索 results = DB.search_content(book_id, keyword, limit) app.logger.info(f"搜索内容: 书籍ID={book_id}, 关键词={keyword}, 结果数={len(results)}") return jsonify({ 'success': True, 'book_id': book_id, 'book_name': book['market_book_name'], 'keyword': keyword, 'count': len(results), 'results': results }) except Exception as e: app.logger.error(f"搜索失败: {e}") return jsonify({ 'success': False, 'error': str(e) }), 500 @app.route('/api/v2/search') def search_all_content(): """ 在所有书籍中搜索内容 Query Parameters: keyword: 搜索关键词 limit: 返回结果数量(默认50) Returns: { "success": true, "keyword": "hello", "count": 15, "results": [ { "market_book_id": 168, "market_book_name": "一年级上册", "page_number": 2, "piece_id": 26342, "original": "Hello", "translation": "你好", "origin_sound_url": "..." } ] } """ try: if not DB: return jsonify({ 'success': False, 'error': '数据库未初始化' }), 500 keyword = request.args.get('keyword', '').strip() if not keyword: return jsonify({ 'success': False, 'error': '请提供搜索关键词' }), 400 limit = request.args.get('limit', 50, type=int) # 搜索所有书籍 results = DB.search_all_books(keyword, limit) app.logger.info(f"全局搜索: 关键词={keyword}, 结果数={len(results)}") return jsonify({ 'success': True, 'keyword': keyword, 'count': len(results), 'results': results }) except Exception as e: app.logger.error(f"搜索失败: {e}") return jsonify({ 'success': False, 'error': str(e) }), 500 @app.route('/api/v2/books//statistics') def get_book_statistics(book_id): """ 获取书籍统计信息 Args: book_id: 书籍ID Returns: { "success": true, "statistics": { "book_id": 168, "total_pages": 73, "total_pieces": 500, "total_audio": 450, "total_catalogs": 12 } } """ try: if not DB: return jsonify({ 'success': False, 'error': '数据库未初始化' }), 500 # 检查书籍是否存在 book = DB.get_book_by_id(book_id) if not book: return jsonify({ 'success': False, 'error': '书籍不存在' }), 404 # 获取统计信息 stats = DB.get_book_statistics(book_id) app.logger.info(f"获取统计信息: 书籍ID={book_id}") return jsonify({ 'success': True, 'statistics': stats }) except Exception as e: app.logger.error(f"获取统计信息失败: {e}") return jsonify({ 'success': False, 'error': str(e) }), 500 @app.route('/api/v2/statistics') def get_overall_statistics(): """ 获取整体统计信息 Returns: { "success": true, "statistics": { "total_books": 30, "total_pages": 2000, "total_pieces": 50000, "total_catalogs": 360 } } """ try: if not DB: return jsonify({ 'success': False, 'error': '数据库未初始化' }), 500 stats = DB.get_overall_statistics() app.logger.info("获取整体统计信息") return jsonify({ 'success': True, 'statistics': stats }) except Exception as e: app.logger.error(f"获取统计信息失败: {e}") return jsonify({ 'success': False, 'error': str(e) }), 500 # ============================================================================ # 错误处理 # ============================================================================ @app.errorhandler(404) def not_found(error): """404错误处理""" if request.path.startswith('/api/'): return jsonify({'error': '接口不存在'}), 404 return send_from_directory('.', 'index.html') @app.errorhandler(500) def internal_error(error): """500错误处理""" app.logger.error(f'服务器错误: {error}') return jsonify({'error': '服务器内部错误'}), 500 # ============================================================================ # 请求钩子 # ============================================================================ @app.before_request def before_request(): """请求前处理""" # 获取客户端真实 IP client_ip = get_client_ip() # 记录所有请求信息(包括静态文件) user_agent = request.headers.get('User-Agent', 'Unknown')[:100] # 限制长度 # API 请求记录详细信息 if request.path.startswith('/api/'): app.logger.info( f"[{client_ip}] {request.method} {request.path} " f"| UA: {user_agent}" ) # 静态资源只记录简要信息 elif app.debug: app.logger.debug(f"[{client_ip}] {request.method} {request.path}") # 将 IP 存储到 g 对象,方便在其他地方使用 from flask import g g.client_ip = client_ip @app.after_request def after_request(response): """请求后处理 - 添加缓存控制""" # API 端点不缓存 if request.path.startswith('/api/'): response.headers['Cache-Control'] = 'no-cache, no-store, must-revalidate' response.headers['Pragma'] = 'no-cache' response.headers['Expires'] = '0' else: # 静态资源缓存1小时 response.headers['Cache-Control'] = 'public, max-age=3600' return response # ============================================================================ # 应用初始化和启动 # ============================================================================ def auto_generate_database(): """ 自动生成数据库 如果 books.db 不存在,自动运行导入脚本生成数据库 """ db_path = 'books.db' schema_path = 'database/db_schema.sql' data_dir = 'data' # 检查必要的文件和目录 if not os.path.exists(schema_path): print(f"❌ 错误: Schema文件 {schema_path} 不存在") return False if not os.path.exists(data_dir): print(f"❌ 错误: 数据目录 {data_dir} 不存在") print(f" 无法自动生成数据库,请先准备数据") return False print(f"📦 未找到数据库文件,正在自动生成...") print(f" 数据目录: {data_dir}") print(f" 这可能需要几分钟时间,请稍候...") print("=" * 60) try: # 导入必要的函数 from database.import_book_data import create_database, import_all_books, verify_data # 创建数据库 conn = create_database(db_path, schema_path) # 导入所有书籍数据 import_all_books(conn, data_dir) # 验证数据 verify_data(conn) # 关闭连接 conn.close() print("=" * 60) print("✅ 数据库自动生成完成!") print("=" * 60) return True except Exception as e: print(f"❌ 自动生成数据库失败: {e}") app.logger.error(f"自动生成数据库失败: {e}", exc_info=True) return False def initialize_app(): """初始化应用""" print("🚀 交互式英语学习应用 - Flask 版本") print("=" * 60) # 设置日志 setup_logging() # 检查必要文件(暂时不检查 books.db,因为会自动生成) if not os.path.exists('index.html'): app.logger.error(f"❌ 缺少必要文件: index.html") return False # 加载书籍数据 if not load_book_data(): return False # 检查数据库文件 if not os.path.exists('books.db'): print("ℹ️ 未找到数据库文件 books.db") print("🔄 正在自动生成数据库...") # 自动生成数据库 if not auto_generate_database(): print("❌ 数据库自动生成失败") print(" 请手动运行: python3 database/import_book_data.py") return False # 初始化数据库连接 print("📚 正在初始化数据库连接...") if not init_database(): print("❌ 数据库初始化失败") return False print("✅ 应用初始化完成") return True def main(): """主函数""" # 初始化应用(如果还未初始化) if not DB: if not initialize_app(): print("❌ 应用初始化失败") return 1 # Hugging Face Spaces 要求监听 7860 端口 port = int(os.environ.get('PORT', 7860)) debug = os.environ.get('FLASK_DEBUG', 'False').lower() == 'true' print(f"🌐 监听端口: {port}") print(f"📁 工作目录: {os.getcwd()}") print(f"🔧 调试模式: {'开启' if debug else '关闭'}") print("=" * 60) print("🎉 应用已准备就绪!") print("=" * 60) try: # 启动 Flask 应用 app.run( host='0.0.0.0', port=port, debug=debug, threaded=True, # 多线程处理请求 use_reloader=debug # 开发模式下启用热重载 ) except KeyboardInterrupt: print("\n\n🛑 服务器已停止") return 0 except Exception as e: print(f"❌ 服务器启动失败: {e}") app.logger.error(f"服务器启动失败: {e}") return 1 # ============================================================================ # 应用初始化(支持 Gunicorn 和直接运行) # ============================================================================ # 在模块加载时初始化应用(无论是 gunicorn 还是直接运行都会执行) # 这确保了在 HuggingFace Spaces 上使用 gunicorn 时也能正确初始化 if not DB: # 避免重复初始化 print("🔧 开始初始化应用...") if not initialize_app(): print("❌ 应用初始化失败") sys.exit(1) if __name__ == "__main__": sys.exit(main())