#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 书籍数据导入脚本 - 支持新的 data 数据结构 将 data 目录中的数据导入 SQLite 数据库 """ import os import json import sqlite3 import sys from pathlib import Path from datetime import datetime import logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) def create_database(db_path='books.db', schema_path='database/db_schema.sql'): """ 创建数据库和表结构 Args: db_path: 数据库文件路径 schema_path: Schema SQL文件路径 Returns: sqlite3.Connection: 数据库连接对象 """ logger.info(f"📦 创建数据库: {db_path}") # 读取schema if not Path(schema_path).exists(): logger.error(f"❌ 错误: Schema文件 {schema_path} 不存在") sys.exit(1) with open(schema_path, 'r', encoding='utf-8') as f: schema_sql = f.read() # 连接数据库并执行schema conn = sqlite3.connect(db_path) cursor = conn.cursor() # 执行所有SQL语句 cursor.executescript(schema_sql) conn.commit() logger.info("✅ 数据库结构创建成功") return conn def import_book_info(conn, book_dir: Path) -> int: """ 导入书籍基本信息 Args: conn: 数据库连接 book_dir: 书籍目录 Returns: int: 书籍ID """ book_info_file = book_dir / "book_info.json" if not book_info_file.exists(): logger.warning(f"book_info.json 不存在: {book_dir}") return None with open(book_info_file, 'r', encoding='utf-8') as f: book_info = json.load(f) cursor = conn.cursor() sql = """ INSERT OR REPLACE INTO books ( market_book_id, market_classify_id, market_book_name, stage, market_book_cover, mod_book_id, mod_book_name, mod_book_cover, old_mod_book_id, is_mod, create_date, create_user, isbn, type, subject_id, ver_id, grade_id, reel_id, app_id, sort, market_classify_name, old_mod_classify_id, mod_classify_id, start_page, max_page, source_dir ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """ cursor.execute(sql, ( book_info.get("MarketBookID"), book_info.get("MarketClassifyId"), book_info.get("MarketBookName"), book_info.get("Stage"), book_info.get("MarketBookCover"), book_info.get("MODBookID"), book_info.get("MODBookName"), book_info.get("MODBookCover"), book_info.get("OldMODBookID"), book_info.get("IsMod"), book_info.get("CreateDate"), book_info.get("CreateUser"), book_info.get("ISBN"), book_info.get("Type"), book_info.get("SubjectID"), book_info.get("VerID"), book_info.get("GradeID"), book_info.get("ReelID"), book_info.get("AppID"), book_info.get("Sort"), book_info.get("MarketClassifyName"), book_info.get("OldModClassifyID"), book_info.get("ModClassifyID"), book_info.get("StartPage"), book_info.get("MaxPage"), str(book_dir.name) )) conn.commit() book_id = book_info.get("MarketBookID") logger.info(f"导入书籍信息成功: {book_id} - {book_info.get('MarketBookName')}") return book_id def import_pages(conn, book_dir: Path, book_id: int): """ 导入页面数据 Args: conn: 数据库连接 book_dir: 书籍目录 book_id: 书籍ID """ pages_file = book_dir / "pages.json" if not pages_file.exists(): logger.warning(f"pages.json 不存在: {book_dir}") return with open(pages_file, 'r', encoding='utf-8') as f: pages = json.load(f) cursor = conn.cursor() page_sql = """ INSERT OR REPLACE INTO pages (page_id, book_id, page_number, origin_img_url, encrypt_img_url) VALUES (?, ?, ?, ?, ?) """ piece_sql = """ INSERT OR REPLACE INTO pieces ( piece_id, page_id, original, translation, origin_sound_url, encrypt_sound_url, duration, coordinate_x, coordinate_y, coordinate_width, coordinate_height, is_evaluated, show_translation, rich_original, rich_translation, piece_order ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """ page_count = 0 piece_count = 0 for page in pages: page_id = page.get("pageId") page_number = page.get("pageNumber") origin_img_url = page.get("originImgUrl", "") encrypt_img_url = page.get("encryptImgUrl", "") if not page_id or not page_number: continue # 插入页面 cursor.execute(page_sql, (page_id, book_id, page_number, origin_img_url, encrypt_img_url)) page_count += 1 # 插入页面片段 pieces = page.get("pieces", []) for i, piece in enumerate(pieces): coordinate = piece.get("coordinate", {}) cursor.execute(piece_sql, ( piece.get("pieceId"), page_id, piece.get("original", ""), piece.get("translation", ""), piece.get("originSoundUrl", ""), piece.get("encryptSoundUrl", ""), piece.get("duration", 0), coordinate.get("x", 0), coordinate.get("y", 0), coordinate.get("width", 0), coordinate.get("height", 0), piece.get("isEvaluated", 0), piece.get("showTranslation", 1), piece.get("richOriginal", ""), piece.get("richTranslation", ""), i )) piece_count += 1 # 每50页提交一次 if page_count % 50 == 0: conn.commit() logger.info(f" 进度: {page_count}/{len(pages)} 页, {piece_count} 个内容片段") conn.commit() logger.info(f"导入 {page_count} 页内容,{piece_count} 个片段成功") def generate_catalog_from_pages(book_dir: Path, book_id: int, pages_data: list) -> list: """ 从pages.json自动生成目录结构 为每一页生成一个目录项,使用该页第一个piece的文本作为标题 Args: book_dir: 书籍目录 book_id: 书籍ID pages_data: 页面数据列表 Returns: 生成的目录列表 """ if not pages_data: return [] catalogs = [] # 为每一页生成一个目录项 for i, page in enumerate(pages_data): page_number = page.get('pageNumber') pieces = page.get('pieces', []) # 获取第一个piece的文本作为标题 if pieces and len(pieces) > 0: first_piece = pieces[0] title = first_piece.get('original', '').strip() title_cn = first_piece.get('translation', '').strip() else: # 如果没有piece,使用默认标题 title = f"Page {page_number}" title_cn = f"第{page_number}页" # 限制标题长度,避免过长 if len(title) > 80: title = title[:77] + "..." if len(title_cn) > 80: title_cn = title_cn[:77] + "..." catalogs.append({ 'CatalogID': i + 1, 'CatalogName': title or f"Page {page_number}", 'CatalogNameCN': title_cn or f"第{page_number}页", 'StartPage': page_number, 'EndPage': page_number, # 每个目录项对应一页 'Thumbnail': f"{book_dir.name}/images/thumbnails/page_{page_number:03d}_thumb.jpg", 'ParentID': 0, 'Sort': i + 1 }) logger.info(f" 从pages.json生成 {len(catalogs)} 个目录项(每页一项)") return catalogs def import_catalog(conn, book_dir: Path, book_id: int, pages_data: list = None): """ 导入目录数据(强制从pages.json生成) Args: conn: 数据库连接 book_dir: 书籍目录 book_id: 书籍ID pages_data: 页面数据(用于自动生成目录) """ catalogs = [] # 强制从 pages.json 生成目录(忽略已有的 catalog.json) if pages_data: logger.info(f" 🔄 强制从 pages.json 生成目录...") catalogs = generate_catalog_from_pages(book_dir, book_id, pages_data) else: # 如果 pages_data 不可用,尝试从 catalog.json 读取作为备选 catalog_file = book_dir / "catalog.json" if catalog_file.exists(): logger.info(f" ⚠️ pages.json 不可用,从现有 catalog.json 读取...") with open(catalog_file, 'r', encoding='utf-8') as f: catalog_data = json.load(f) catalogs = catalog_data.get("catalog", []) else: logger.warning(f" ❌ 无法生成目录:pages.json 和 catalog.json 都不可用") return if not catalogs: logger.warning(f" ⚠️ 目录为空,跳过导入") return # 3. 导入到数据库 cursor = conn.cursor() sql = """ INSERT OR REPLACE INTO catalogs ( catalog_id, book_id, catalog_name, catalog_name_cn, start_page, end_page, thumbnail, parent_id, sort ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) """ for catalog in catalogs: # 生成唯一的catalog_id (book_id * 10000 + sort) catalog_id = book_id * 10000 + catalog.get("Sort", catalog.get("CatalogID", 0)) cursor.execute(sql, ( catalog_id, book_id, catalog.get("CatalogName"), catalog.get("CatalogNameCN"), catalog.get("StartPage"), catalog.get("EndPage"), catalog.get("Thumbnail", ""), catalog.get("ParentID", 0), catalog.get("Sort", catalog.get("CatalogID", 0)) )) conn.commit() logger.info(f"导入 {len(catalogs)} 个目录项成功") def import_book(conn, book_dir: Path): """ 导入单本书籍的完整数据 Args: conn: 数据库连接 book_dir: 书籍目录 """ book_name = book_dir.name logger.info(f"\n{'=' * 60}") logger.info(f"开始导入书籍: {book_name}") logger.info(f"{'=' * 60}") try: # 1. 导入书籍信息 book_id = import_book_info(conn, book_dir) if not book_id: logger.error(f"导入失败: 无法获取书籍ID") return False # 2. 导入页面数据 import_pages(conn, book_dir, book_id) # 3. 读取pages.json用于生成目录 pages_data = None pages_file = book_dir / "pages.json" if pages_file.exists(): with open(pages_file, 'r', encoding='utf-8') as f: pages_data = json.load(f) # 4. 导入目录数据(支持自动生成) import_catalog(conn, book_dir, book_id, pages_data) logger.info(f"✅ 书籍 {book_name} 导入完成!\n") return True except Exception as e: logger.error(f"❌ 导入书籍 {book_name} 失败: {e}", exc_info=True) return False def import_all_books(conn, data_dir: str): """ 导入所有书籍 Args: conn: 数据库连接 data_dir: data 目录路径 """ logger.info("=" * 60) logger.info("开始批量导入书籍数据") logger.info(f"数据目录: {data_dir}") logger.info("=" * 60) data_path = Path(data_dir) if not data_path.exists(): logger.error(f"❌ 数据目录不存在: {data_dir}") return # 获取所有书籍目录 book_dirs = [d for d in data_path.iterdir() if d.is_dir()] logger.info(f"找到 {len(book_dirs)} 本书籍\n") success_count = 0 failed_count = 0 for idx, book_dir in enumerate(book_dirs, 1): logger.info(f"[{idx}/{len(book_dirs)}] 处理: {book_dir.name}") if import_book(conn, book_dir): success_count += 1 else: failed_count += 1 logger.info("\n" + "=" * 60) logger.info("批量导入完成!") logger.info(f"成功: {success_count} 本") logger.info(f"失败: {failed_count} 本") logger.info("=" * 60) def verify_data(conn): """ 验证导入的数据 Args: conn: 数据库连接 """ logger.info("\n🔍 验证数据...") cursor = conn.cursor() # 统计数据 cursor.execute('SELECT COUNT(*) FROM books') book_count = cursor.fetchone()[0] cursor.execute('SELECT COUNT(*) FROM pages') page_count = cursor.fetchone()[0] cursor.execute('SELECT COUNT(*) FROM pieces') piece_count = cursor.fetchone()[0] cursor.execute('SELECT COUNT(*) FROM catalogs') catalog_count = cursor.fetchone()[0] logger.info(f" - 书籍总数: {book_count}") logger.info(f" - 页面总数: {page_count}") logger.info(f" - 内容片段总数: {piece_count}") logger.info(f" - 目录项总数: {catalog_count}") # 显示所有书籍列表 cursor.execute(''' SELECT market_book_id, market_book_name, max_page, market_classify_name FROM books ORDER BY market_book_id ''') books = cursor.fetchall() if books: logger.info(f"\n 📚 所有书籍:") for book in books: logger.info(f" - ID={book[0]}, 名称={book[1]}, 页数={book[2]}, 分类={book[3]}") logger.info("\n✅ 数据验证完成") def main(): """主函数""" import argparse print("=" * 80) print("📚 Books API Data 导入工具") print("=" * 80) # 命令行参数解析 parser = argparse.ArgumentParser(description='导入 data 数据到 SQLite 数据库') parser.add_argument('data_dir', nargs='?', default='data', help='data 数据目录路径 (默认: data)') parser.add_argument('--db', default='books.db', help='数据库文件路径 (默认: books.db)') parser.add_argument('--schema', default='database/db_schema.sql', help='Schema文件路径 (默认: database/db_schema.sql)') parser.add_argument('--verify-only', action='store_true', help='仅验证数据,不导入') parser.add_argument('--recreate', action='store_true', help='删除现有数据库并重新创建') args = parser.parse_args() try: # 检查数据库是否存在 db_exists = Path(args.db).exists() if args.verify_only: # 仅验证 if not db_exists: print(f"❌ 错误: 数据库 {args.db} 不存在") return 1 conn = sqlite3.connect(args.db) verify_data(conn) conn.close() return 0 # 重新创建数据库 if args.recreate and db_exists: logger.info(f"🗑️ 删除现有数据库: {args.db}") Path(args.db).unlink() db_exists = False # 创建或连接数据库 if db_exists: logger.info(f"📂 数据库已存在: {args.db}") conn = sqlite3.connect(args.db) logger.info("✅ 连接到现有数据库") else: conn = create_database(args.db, args.schema) # 导入数据 import_all_books(conn, args.data_dir) # 验证数据 verify_data(conn) # 关闭连接 conn.close() print("\n" + "=" * 80) print("🎉 所有操作完成!") print("=" * 80) print(f"\n💡 提示:") print(f" - 使用 'python3 {sys.argv[0]} --verify-only' 验证数据") print(f" - 使用 'python3 {sys.argv[0]} --recreate' 重新导入所有数据") print(f" - 启动 app.py 来使用Web接口访问数据") return 0 except Exception as e: logger.error(f"\n❌ 错误: {e}", exc_info=True) return 1 if __name__ == '__main__': sys.exit(main())