Spaces:
Running
Running
| #!/usr/bin/env python3 | |
| # -*- coding: utf-8 -*- | |
| """ | |
| 书籍数据导入脚本 - 支持新的 data 数据结构 | |
| 将 data 目录中的数据导入 SQLite 数据库 | |
| """ | |
| import os | |
| import json | |
| import sqlite3 | |
| import sys | |
| from pathlib import Path | |
| from datetime import datetime | |
| import logging | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='%(asctime)s - %(levelname)s - %(message)s' | |
| ) | |
| logger = logging.getLogger(__name__) | |
| def create_database(db_path='books.db', schema_path='database/db_schema.sql'): | |
| """ | |
| 创建数据库和表结构 | |
| Args: | |
| db_path: 数据库文件路径 | |
| schema_path: Schema SQL文件路径 | |
| Returns: | |
| sqlite3.Connection: 数据库连接对象 | |
| """ | |
| logger.info(f"📦 创建数据库: {db_path}") | |
| # 读取schema | |
| if not Path(schema_path).exists(): | |
| logger.error(f"❌ 错误: Schema文件 {schema_path} 不存在") | |
| sys.exit(1) | |
| with open(schema_path, 'r', encoding='utf-8') as f: | |
| schema_sql = f.read() | |
| # 连接数据库并执行schema | |
| conn = sqlite3.connect(db_path) | |
| cursor = conn.cursor() | |
| # 执行所有SQL语句 | |
| cursor.executescript(schema_sql) | |
| conn.commit() | |
| logger.info("✅ 数据库结构创建成功") | |
| return conn | |
| def import_book_info(conn, book_dir: Path) -> int: | |
| """ | |
| 导入书籍基本信息 | |
| Args: | |
| conn: 数据库连接 | |
| book_dir: 书籍目录 | |
| Returns: | |
| int: 书籍ID | |
| """ | |
| book_info_file = book_dir / "book_info.json" | |
| if not book_info_file.exists(): | |
| logger.warning(f"book_info.json 不存在: {book_dir}") | |
| return None | |
| with open(book_info_file, 'r', encoding='utf-8') as f: | |
| book_info = json.load(f) | |
| cursor = conn.cursor() | |
| sql = """ | |
| INSERT OR REPLACE INTO books ( | |
| market_book_id, market_classify_id, market_book_name, stage, | |
| market_book_cover, mod_book_id, mod_book_name, mod_book_cover, | |
| old_mod_book_id, is_mod, create_date, create_user, isbn, type, | |
| subject_id, ver_id, grade_id, reel_id, app_id, sort, | |
| market_classify_name, old_mod_classify_id, mod_classify_id, | |
| start_page, max_page, source_dir | |
| ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) | |
| """ | |
| cursor.execute(sql, ( | |
| book_info.get("MarketBookID"), | |
| book_info.get("MarketClassifyId"), | |
| book_info.get("MarketBookName"), | |
| book_info.get("Stage"), | |
| book_info.get("MarketBookCover"), | |
| book_info.get("MODBookID"), | |
| book_info.get("MODBookName"), | |
| book_info.get("MODBookCover"), | |
| book_info.get("OldMODBookID"), | |
| book_info.get("IsMod"), | |
| book_info.get("CreateDate"), | |
| book_info.get("CreateUser"), | |
| book_info.get("ISBN"), | |
| book_info.get("Type"), | |
| book_info.get("SubjectID"), | |
| book_info.get("VerID"), | |
| book_info.get("GradeID"), | |
| book_info.get("ReelID"), | |
| book_info.get("AppID"), | |
| book_info.get("Sort"), | |
| book_info.get("MarketClassifyName"), | |
| book_info.get("OldModClassifyID"), | |
| book_info.get("ModClassifyID"), | |
| book_info.get("StartPage"), | |
| book_info.get("MaxPage"), | |
| str(book_dir.name) | |
| )) | |
| conn.commit() | |
| book_id = book_info.get("MarketBookID") | |
| logger.info(f"导入书籍信息成功: {book_id} - {book_info.get('MarketBookName')}") | |
| return book_id | |
| def import_pages(conn, book_dir: Path, book_id: int): | |
| """ | |
| 导入页面数据 | |
| Args: | |
| conn: 数据库连接 | |
| book_dir: 书籍目录 | |
| book_id: 书籍ID | |
| """ | |
| pages_file = book_dir / "pages.json" | |
| if not pages_file.exists(): | |
| logger.warning(f"pages.json 不存在: {book_dir}") | |
| return | |
| with open(pages_file, 'r', encoding='utf-8') as f: | |
| pages = json.load(f) | |
| cursor = conn.cursor() | |
| page_sql = """ | |
| INSERT OR REPLACE INTO pages (page_id, book_id, page_number, origin_img_url, encrypt_img_url) | |
| VALUES (?, ?, ?, ?, ?) | |
| """ | |
| piece_sql = """ | |
| INSERT OR REPLACE INTO pieces ( | |
| piece_id, page_id, original, translation, origin_sound_url, encrypt_sound_url, | |
| duration, coordinate_x, coordinate_y, coordinate_width, coordinate_height, | |
| is_evaluated, show_translation, rich_original, rich_translation, piece_order | |
| ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) | |
| """ | |
| page_count = 0 | |
| piece_count = 0 | |
| for page in pages: | |
| page_id = page.get("pageId") | |
| page_number = page.get("pageNumber") | |
| origin_img_url = page.get("originImgUrl", "") | |
| encrypt_img_url = page.get("encryptImgUrl", "") | |
| if not page_id or not page_number: | |
| continue | |
| # 插入页面 | |
| cursor.execute(page_sql, (page_id, book_id, page_number, origin_img_url, encrypt_img_url)) | |
| page_count += 1 | |
| # 插入页面片段 | |
| pieces = page.get("pieces", []) | |
| for i, piece in enumerate(pieces): | |
| coordinate = piece.get("coordinate", {}) | |
| cursor.execute(piece_sql, ( | |
| piece.get("pieceId"), | |
| page_id, | |
| piece.get("original", ""), | |
| piece.get("translation", ""), | |
| piece.get("originSoundUrl", ""), | |
| piece.get("encryptSoundUrl", ""), | |
| piece.get("duration", 0), | |
| coordinate.get("x", 0), | |
| coordinate.get("y", 0), | |
| coordinate.get("width", 0), | |
| coordinate.get("height", 0), | |
| piece.get("isEvaluated", 0), | |
| piece.get("showTranslation", 1), | |
| piece.get("richOriginal", ""), | |
| piece.get("richTranslation", ""), | |
| i | |
| )) | |
| piece_count += 1 | |
| # 每50页提交一次 | |
| if page_count % 50 == 0: | |
| conn.commit() | |
| logger.info(f" 进度: {page_count}/{len(pages)} 页, {piece_count} 个内容片段") | |
| conn.commit() | |
| logger.info(f"导入 {page_count} 页内容,{piece_count} 个片段成功") | |
| def generate_catalog_from_pages(book_dir: Path, book_id: int, pages_data: list) -> list: | |
| """ | |
| 从pages.json自动生成目录结构 | |
| 为每一页生成一个目录项,使用该页第一个piece的文本作为标题 | |
| Args: | |
| book_dir: 书籍目录 | |
| book_id: 书籍ID | |
| pages_data: 页面数据列表 | |
| Returns: | |
| 生成的目录列表 | |
| """ | |
| if not pages_data: | |
| return [] | |
| catalogs = [] | |
| # 为每一页生成一个目录项 | |
| for i, page in enumerate(pages_data): | |
| page_number = page.get('pageNumber') | |
| pieces = page.get('pieces', []) | |
| # 获取第一个piece的文本作为标题 | |
| if pieces and len(pieces) > 0: | |
| first_piece = pieces[0] | |
| title = first_piece.get('original', '').strip() | |
| title_cn = first_piece.get('translation', '').strip() | |
| else: | |
| # 如果没有piece,使用默认标题 | |
| title = f"Page {page_number}" | |
| title_cn = f"第{page_number}页" | |
| # 限制标题长度,避免过长 | |
| if len(title) > 80: | |
| title = title[:77] + "..." | |
| if len(title_cn) > 80: | |
| title_cn = title_cn[:77] + "..." | |
| catalogs.append({ | |
| 'CatalogID': i + 1, | |
| 'CatalogName': title or f"Page {page_number}", | |
| 'CatalogNameCN': title_cn or f"第{page_number}页", | |
| 'StartPage': page_number, | |
| 'EndPage': page_number, # 每个目录项对应一页 | |
| 'Thumbnail': f"{book_dir.name}/images/thumbnails/page_{page_number:03d}_thumb.jpg", | |
| 'ParentID': 0, | |
| 'Sort': i + 1 | |
| }) | |
| logger.info(f" 从pages.json生成 {len(catalogs)} 个目录项(每页一项)") | |
| return catalogs | |
| def import_catalog(conn, book_dir: Path, book_id: int, pages_data: list = None): | |
| """ | |
| 导入目录数据(强制从pages.json生成) | |
| Args: | |
| conn: 数据库连接 | |
| book_dir: 书籍目录 | |
| book_id: 书籍ID | |
| pages_data: 页面数据(用于自动生成目录) | |
| """ | |
| catalogs = [] | |
| # 强制从 pages.json 生成目录(忽略已有的 catalog.json) | |
| if pages_data: | |
| logger.info(f" 🔄 强制从 pages.json 生成目录...") | |
| catalogs = generate_catalog_from_pages(book_dir, book_id, pages_data) | |
| else: | |
| # 如果 pages_data 不可用,尝试从 catalog.json 读取作为备选 | |
| catalog_file = book_dir / "catalog.json" | |
| if catalog_file.exists(): | |
| logger.info(f" ⚠️ pages.json 不可用,从现有 catalog.json 读取...") | |
| with open(catalog_file, 'r', encoding='utf-8') as f: | |
| catalog_data = json.load(f) | |
| catalogs = catalog_data.get("catalog", []) | |
| else: | |
| logger.warning(f" ❌ 无法生成目录:pages.json 和 catalog.json 都不可用") | |
| return | |
| if not catalogs: | |
| logger.warning(f" ⚠️ 目录为空,跳过导入") | |
| return | |
| # 3. 导入到数据库 | |
| cursor = conn.cursor() | |
| sql = """ | |
| INSERT OR REPLACE INTO catalogs ( | |
| catalog_id, book_id, catalog_name, catalog_name_cn, | |
| start_page, end_page, thumbnail, parent_id, sort | |
| ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) | |
| """ | |
| for catalog in catalogs: | |
| # 生成唯一的catalog_id (book_id * 10000 + sort) | |
| catalog_id = book_id * 10000 + catalog.get("Sort", catalog.get("CatalogID", 0)) | |
| cursor.execute(sql, ( | |
| catalog_id, | |
| book_id, | |
| catalog.get("CatalogName"), | |
| catalog.get("CatalogNameCN"), | |
| catalog.get("StartPage"), | |
| catalog.get("EndPage"), | |
| catalog.get("Thumbnail", ""), | |
| catalog.get("ParentID", 0), | |
| catalog.get("Sort", catalog.get("CatalogID", 0)) | |
| )) | |
| conn.commit() | |
| logger.info(f"导入 {len(catalogs)} 个目录项成功") | |
| def import_book(conn, book_dir: Path): | |
| """ | |
| 导入单本书籍的完整数据 | |
| Args: | |
| conn: 数据库连接 | |
| book_dir: 书籍目录 | |
| """ | |
| book_name = book_dir.name | |
| logger.info(f"\n{'=' * 60}") | |
| logger.info(f"开始导入书籍: {book_name}") | |
| logger.info(f"{'=' * 60}") | |
| try: | |
| # 1. 导入书籍信息 | |
| book_id = import_book_info(conn, book_dir) | |
| if not book_id: | |
| logger.error(f"导入失败: 无法获取书籍ID") | |
| return False | |
| # 2. 导入页面数据 | |
| import_pages(conn, book_dir, book_id) | |
| # 3. 读取pages.json用于生成目录 | |
| pages_data = None | |
| pages_file = book_dir / "pages.json" | |
| if pages_file.exists(): | |
| with open(pages_file, 'r', encoding='utf-8') as f: | |
| pages_data = json.load(f) | |
| # 4. 导入目录数据(支持自动生成) | |
| import_catalog(conn, book_dir, book_id, pages_data) | |
| logger.info(f"✅ 书籍 {book_name} 导入完成!\n") | |
| return True | |
| except Exception as e: | |
| logger.error(f"❌ 导入书籍 {book_name} 失败: {e}", exc_info=True) | |
| return False | |
| def import_all_books(conn, data_dir: str): | |
| """ | |
| 导入所有书籍 | |
| Args: | |
| conn: 数据库连接 | |
| data_dir: data 目录路径 | |
| """ | |
| logger.info("=" * 60) | |
| logger.info("开始批量导入书籍数据") | |
| logger.info(f"数据目录: {data_dir}") | |
| logger.info("=" * 60) | |
| data_path = Path(data_dir) | |
| if not data_path.exists(): | |
| logger.error(f"❌ 数据目录不存在: {data_dir}") | |
| return | |
| # 获取所有书籍目录 | |
| book_dirs = [d for d in data_path.iterdir() if d.is_dir()] | |
| logger.info(f"找到 {len(book_dirs)} 本书籍\n") | |
| success_count = 0 | |
| failed_count = 0 | |
| for idx, book_dir in enumerate(book_dirs, 1): | |
| logger.info(f"[{idx}/{len(book_dirs)}] 处理: {book_dir.name}") | |
| if import_book(conn, book_dir): | |
| success_count += 1 | |
| else: | |
| failed_count += 1 | |
| logger.info("\n" + "=" * 60) | |
| logger.info("批量导入完成!") | |
| logger.info(f"成功: {success_count} 本") | |
| logger.info(f"失败: {failed_count} 本") | |
| logger.info("=" * 60) | |
| def verify_data(conn): | |
| """ | |
| 验证导入的数据 | |
| Args: | |
| conn: 数据库连接 | |
| """ | |
| logger.info("\n🔍 验证数据...") | |
| cursor = conn.cursor() | |
| # 统计数据 | |
| cursor.execute('SELECT COUNT(*) FROM books') | |
| book_count = cursor.fetchone()[0] | |
| cursor.execute('SELECT COUNT(*) FROM pages') | |
| page_count = cursor.fetchone()[0] | |
| cursor.execute('SELECT COUNT(*) FROM pieces') | |
| piece_count = cursor.fetchone()[0] | |
| cursor.execute('SELECT COUNT(*) FROM catalogs') | |
| catalog_count = cursor.fetchone()[0] | |
| logger.info(f" - 书籍总数: {book_count}") | |
| logger.info(f" - 页面总数: {page_count}") | |
| logger.info(f" - 内容片段总数: {piece_count}") | |
| logger.info(f" - 目录项总数: {catalog_count}") | |
| # 显示所有书籍列表 | |
| cursor.execute(''' | |
| SELECT market_book_id, market_book_name, max_page, market_classify_name | |
| FROM books | |
| ORDER BY market_book_id | |
| ''') | |
| books = cursor.fetchall() | |
| if books: | |
| logger.info(f"\n 📚 所有书籍:") | |
| for book in books: | |
| logger.info(f" - ID={book[0]}, 名称={book[1]}, 页数={book[2]}, 分类={book[3]}") | |
| logger.info("\n✅ 数据验证完成") | |
| def main(): | |
| """主函数""" | |
| import argparse | |
| print("=" * 80) | |
| print("📚 Books API Data 导入工具") | |
| print("=" * 80) | |
| # 命令行参数解析 | |
| parser = argparse.ArgumentParser(description='导入 data 数据到 SQLite 数据库') | |
| parser.add_argument('data_dir', nargs='?', default='data', | |
| help='data 数据目录路径 (默认: data)') | |
| parser.add_argument('--db', default='books.db', | |
| help='数据库文件路径 (默认: books.db)') | |
| parser.add_argument('--schema', default='database/db_schema.sql', | |
| help='Schema文件路径 (默认: database/db_schema.sql)') | |
| parser.add_argument('--verify-only', action='store_true', | |
| help='仅验证数据,不导入') | |
| parser.add_argument('--recreate', action='store_true', | |
| help='删除现有数据库并重新创建') | |
| args = parser.parse_args() | |
| try: | |
| # 检查数据库是否存在 | |
| db_exists = Path(args.db).exists() | |
| if args.verify_only: | |
| # 仅验证 | |
| if not db_exists: | |
| print(f"❌ 错误: 数据库 {args.db} 不存在") | |
| return 1 | |
| conn = sqlite3.connect(args.db) | |
| verify_data(conn) | |
| conn.close() | |
| return 0 | |
| # 重新创建数据库 | |
| if args.recreate and db_exists: | |
| logger.info(f"🗑️ 删除现有数据库: {args.db}") | |
| Path(args.db).unlink() | |
| db_exists = False | |
| # 创建或连接数据库 | |
| if db_exists: | |
| logger.info(f"📂 数据库已存在: {args.db}") | |
| conn = sqlite3.connect(args.db) | |
| logger.info("✅ 连接到现有数据库") | |
| else: | |
| conn = create_database(args.db, args.schema) | |
| # 导入数据 | |
| import_all_books(conn, args.data_dir) | |
| # 验证数据 | |
| verify_data(conn) | |
| # 关闭连接 | |
| conn.close() | |
| print("\n" + "=" * 80) | |
| print("🎉 所有操作完成!") | |
| print("=" * 80) | |
| print(f"\n💡 提示:") | |
| print(f" - 使用 'python3 {sys.argv[0]} --verify-only' 验证数据") | |
| print(f" - 使用 'python3 {sys.argv[0]} --recreate' 重新导入所有数据") | |
| print(f" - 启动 app.py 来使用Web接口访问数据") | |
| return 0 | |
| except Exception as e: | |
| logger.error(f"\n❌ 错误: {e}", exc_info=True) | |
| return 1 | |
| if __name__ == '__main__': | |
| sys.exit(main()) | |