point / database /import_book_data.py
eithney
code ref
e74eb63
Raw
History Blame Contribute Delete
16.5 kB
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
书籍数据导入脚本 - 支持新的 data 数据结构
将 data 目录中的数据导入 SQLite 数据库
"""
import os
import json
import sqlite3
import sys
from pathlib import Path
from datetime import datetime
import logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
def create_database(db_path='books.db', schema_path='database/db_schema.sql'):
"""
创建数据库和表结构
Args:
db_path: 数据库文件路径
schema_path: Schema SQL文件路径
Returns:
sqlite3.Connection: 数据库连接对象
"""
logger.info(f"📦 创建数据库: {db_path}")
# 读取schema
if not Path(schema_path).exists():
logger.error(f"❌ 错误: Schema文件 {schema_path} 不存在")
sys.exit(1)
with open(schema_path, 'r', encoding='utf-8') as f:
schema_sql = f.read()
# 连接数据库并执行schema
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
# 执行所有SQL语句
cursor.executescript(schema_sql)
conn.commit()
logger.info("✅ 数据库结构创建成功")
return conn
def import_book_info(conn, book_dir: Path) -> int:
"""
导入书籍基本信息
Args:
conn: 数据库连接
book_dir: 书籍目录
Returns:
int: 书籍ID
"""
book_info_file = book_dir / "book_info.json"
if not book_info_file.exists():
logger.warning(f"book_info.json 不存在: {book_dir}")
return None
with open(book_info_file, 'r', encoding='utf-8') as f:
book_info = json.load(f)
cursor = conn.cursor()
sql = """
INSERT OR REPLACE INTO books (
market_book_id, market_classify_id, market_book_name, stage,
market_book_cover, mod_book_id, mod_book_name, mod_book_cover,
old_mod_book_id, is_mod, create_date, create_user, isbn, type,
subject_id, ver_id, grade_id, reel_id, app_id, sort,
market_classify_name, old_mod_classify_id, mod_classify_id,
start_page, max_page, source_dir
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
"""
cursor.execute(sql, (
book_info.get("MarketBookID"),
book_info.get("MarketClassifyId"),
book_info.get("MarketBookName"),
book_info.get("Stage"),
book_info.get("MarketBookCover"),
book_info.get("MODBookID"),
book_info.get("MODBookName"),
book_info.get("MODBookCover"),
book_info.get("OldMODBookID"),
book_info.get("IsMod"),
book_info.get("CreateDate"),
book_info.get("CreateUser"),
book_info.get("ISBN"),
book_info.get("Type"),
book_info.get("SubjectID"),
book_info.get("VerID"),
book_info.get("GradeID"),
book_info.get("ReelID"),
book_info.get("AppID"),
book_info.get("Sort"),
book_info.get("MarketClassifyName"),
book_info.get("OldModClassifyID"),
book_info.get("ModClassifyID"),
book_info.get("StartPage"),
book_info.get("MaxPage"),
str(book_dir.name)
))
conn.commit()
book_id = book_info.get("MarketBookID")
logger.info(f"导入书籍信息成功: {book_id} - {book_info.get('MarketBookName')}")
return book_id
def import_pages(conn, book_dir: Path, book_id: int):
"""
导入页面数据
Args:
conn: 数据库连接
book_dir: 书籍目录
book_id: 书籍ID
"""
pages_file = book_dir / "pages.json"
if not pages_file.exists():
logger.warning(f"pages.json 不存在: {book_dir}")
return
with open(pages_file, 'r', encoding='utf-8') as f:
pages = json.load(f)
cursor = conn.cursor()
page_sql = """
INSERT OR REPLACE INTO pages (page_id, book_id, page_number, origin_img_url, encrypt_img_url)
VALUES (?, ?, ?, ?, ?)
"""
piece_sql = """
INSERT OR REPLACE INTO pieces (
piece_id, page_id, original, translation, origin_sound_url, encrypt_sound_url,
duration, coordinate_x, coordinate_y, coordinate_width, coordinate_height,
is_evaluated, show_translation, rich_original, rich_translation, piece_order
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
"""
page_count = 0
piece_count = 0
for page in pages:
page_id = page.get("pageId")
page_number = page.get("pageNumber")
origin_img_url = page.get("originImgUrl", "")
encrypt_img_url = page.get("encryptImgUrl", "")
if not page_id or not page_number:
continue
# 插入页面
cursor.execute(page_sql, (page_id, book_id, page_number, origin_img_url, encrypt_img_url))
page_count += 1
# 插入页面片段
pieces = page.get("pieces", [])
for i, piece in enumerate(pieces):
coordinate = piece.get("coordinate", {})
cursor.execute(piece_sql, (
piece.get("pieceId"),
page_id,
piece.get("original", ""),
piece.get("translation", ""),
piece.get("originSoundUrl", ""),
piece.get("encryptSoundUrl", ""),
piece.get("duration", 0),
coordinate.get("x", 0),
coordinate.get("y", 0),
coordinate.get("width", 0),
coordinate.get("height", 0),
piece.get("isEvaluated", 0),
piece.get("showTranslation", 1),
piece.get("richOriginal", ""),
piece.get("richTranslation", ""),
i
))
piece_count += 1
# 每50页提交一次
if page_count % 50 == 0:
conn.commit()
logger.info(f" 进度: {page_count}/{len(pages)} 页, {piece_count} 个内容片段")
conn.commit()
logger.info(f"导入 {page_count} 页内容,{piece_count} 个片段成功")
def generate_catalog_from_pages(book_dir: Path, book_id: int, pages_data: list) -> list:
"""
从pages.json自动生成目录结构
为每一页生成一个目录项,使用该页第一个piece的文本作为标题
Args:
book_dir: 书籍目录
book_id: 书籍ID
pages_data: 页面数据列表
Returns:
生成的目录列表
"""
if not pages_data:
return []
catalogs = []
# 为每一页生成一个目录项
for i, page in enumerate(pages_data):
page_number = page.get('pageNumber')
pieces = page.get('pieces', [])
# 获取第一个piece的文本作为标题
if pieces and len(pieces) > 0:
first_piece = pieces[0]
title = first_piece.get('original', '').strip()
title_cn = first_piece.get('translation', '').strip()
else:
# 如果没有piece,使用默认标题
title = f"Page {page_number}"
title_cn = f"第{page_number}页"
# 限制标题长度,避免过长
if len(title) > 80:
title = title[:77] + "..."
if len(title_cn) > 80:
title_cn = title_cn[:77] + "..."
catalogs.append({
'CatalogID': i + 1,
'CatalogName': title or f"Page {page_number}",
'CatalogNameCN': title_cn or f"第{page_number}页",
'StartPage': page_number,
'EndPage': page_number, # 每个目录项对应一页
'Thumbnail': f"{book_dir.name}/images/thumbnails/page_{page_number:03d}_thumb.jpg",
'ParentID': 0,
'Sort': i + 1
})
logger.info(f" 从pages.json生成 {len(catalogs)} 个目录项(每页一项)")
return catalogs
def import_catalog(conn, book_dir: Path, book_id: int, pages_data: list = None):
"""
导入目录数据(强制从pages.json生成)
Args:
conn: 数据库连接
book_dir: 书籍目录
book_id: 书籍ID
pages_data: 页面数据(用于自动生成目录)
"""
catalogs = []
# 强制从 pages.json 生成目录(忽略已有的 catalog.json)
if pages_data:
logger.info(f" 🔄 强制从 pages.json 生成目录...")
catalogs = generate_catalog_from_pages(book_dir, book_id, pages_data)
else:
# 如果 pages_data 不可用,尝试从 catalog.json 读取作为备选
catalog_file = book_dir / "catalog.json"
if catalog_file.exists():
logger.info(f" ⚠️ pages.json 不可用,从现有 catalog.json 读取...")
with open(catalog_file, 'r', encoding='utf-8') as f:
catalog_data = json.load(f)
catalogs = catalog_data.get("catalog", [])
else:
logger.warning(f" ❌ 无法生成目录:pages.json 和 catalog.json 都不可用")
return
if not catalogs:
logger.warning(f" ⚠️ 目录为空,跳过导入")
return
# 3. 导入到数据库
cursor = conn.cursor()
sql = """
INSERT OR REPLACE INTO catalogs (
catalog_id, book_id, catalog_name, catalog_name_cn,
start_page, end_page, thumbnail, parent_id, sort
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
"""
for catalog in catalogs:
# 生成唯一的catalog_id (book_id * 10000 + sort)
catalog_id = book_id * 10000 + catalog.get("Sort", catalog.get("CatalogID", 0))
cursor.execute(sql, (
catalog_id,
book_id,
catalog.get("CatalogName"),
catalog.get("CatalogNameCN"),
catalog.get("StartPage"),
catalog.get("EndPage"),
catalog.get("Thumbnail", ""),
catalog.get("ParentID", 0),
catalog.get("Sort", catalog.get("CatalogID", 0))
))
conn.commit()
logger.info(f"导入 {len(catalogs)} 个目录项成功")
def import_book(conn, book_dir: Path):
"""
导入单本书籍的完整数据
Args:
conn: 数据库连接
book_dir: 书籍目录
"""
book_name = book_dir.name
logger.info(f"\n{'=' * 60}")
logger.info(f"开始导入书籍: {book_name}")
logger.info(f"{'=' * 60}")
try:
# 1. 导入书籍信息
book_id = import_book_info(conn, book_dir)
if not book_id:
logger.error(f"导入失败: 无法获取书籍ID")
return False
# 2. 导入页面数据
import_pages(conn, book_dir, book_id)
# 3. 读取pages.json用于生成目录
pages_data = None
pages_file = book_dir / "pages.json"
if pages_file.exists():
with open(pages_file, 'r', encoding='utf-8') as f:
pages_data = json.load(f)
# 4. 导入目录数据(支持自动生成)
import_catalog(conn, book_dir, book_id, pages_data)
logger.info(f"✅ 书籍 {book_name} 导入完成!\n")
return True
except Exception as e:
logger.error(f"❌ 导入书籍 {book_name} 失败: {e}", exc_info=True)
return False
def import_all_books(conn, data_dir: str):
"""
导入所有书籍
Args:
conn: 数据库连接
data_dir: data 目录路径
"""
logger.info("=" * 60)
logger.info("开始批量导入书籍数据")
logger.info(f"数据目录: {data_dir}")
logger.info("=" * 60)
data_path = Path(data_dir)
if not data_path.exists():
logger.error(f"❌ 数据目录不存在: {data_dir}")
return
# 获取所有书籍目录
book_dirs = [d for d in data_path.iterdir() if d.is_dir()]
logger.info(f"找到 {len(book_dirs)} 本书籍\n")
success_count = 0
failed_count = 0
for idx, book_dir in enumerate(book_dirs, 1):
logger.info(f"[{idx}/{len(book_dirs)}] 处理: {book_dir.name}")
if import_book(conn, book_dir):
success_count += 1
else:
failed_count += 1
logger.info("\n" + "=" * 60)
logger.info("批量导入完成!")
logger.info(f"成功: {success_count} 本")
logger.info(f"失败: {failed_count} 本")
logger.info("=" * 60)
def verify_data(conn):
"""
验证导入的数据
Args:
conn: 数据库连接
"""
logger.info("\n🔍 验证数据...")
cursor = conn.cursor()
# 统计数据
cursor.execute('SELECT COUNT(*) FROM books')
book_count = cursor.fetchone()[0]
cursor.execute('SELECT COUNT(*) FROM pages')
page_count = cursor.fetchone()[0]
cursor.execute('SELECT COUNT(*) FROM pieces')
piece_count = cursor.fetchone()[0]
cursor.execute('SELECT COUNT(*) FROM catalogs')
catalog_count = cursor.fetchone()[0]
logger.info(f" - 书籍总数: {book_count}")
logger.info(f" - 页面总数: {page_count}")
logger.info(f" - 内容片段总数: {piece_count}")
logger.info(f" - 目录项总数: {catalog_count}")
# 显示所有书籍列表
cursor.execute('''
SELECT market_book_id, market_book_name, max_page, market_classify_name
FROM books
ORDER BY market_book_id
''')
books = cursor.fetchall()
if books:
logger.info(f"\n 📚 所有书籍:")
for book in books:
logger.info(f" - ID={book[0]}, 名称={book[1]}, 页数={book[2]}, 分类={book[3]}")
logger.info("\n✅ 数据验证完成")
def main():
"""主函数"""
import argparse
print("=" * 80)
print("📚 Books API Data 导入工具")
print("=" * 80)
# 命令行参数解析
parser = argparse.ArgumentParser(description='导入 data 数据到 SQLite 数据库')
parser.add_argument('data_dir', nargs='?', default='data',
help='data 数据目录路径 (默认: data)')
parser.add_argument('--db', default='books.db',
help='数据库文件路径 (默认: books.db)')
parser.add_argument('--schema', default='database/db_schema.sql',
help='Schema文件路径 (默认: database/db_schema.sql)')
parser.add_argument('--verify-only', action='store_true',
help='仅验证数据,不导入')
parser.add_argument('--recreate', action='store_true',
help='删除现有数据库并重新创建')
args = parser.parse_args()
try:
# 检查数据库是否存在
db_exists = Path(args.db).exists()
if args.verify_only:
# 仅验证
if not db_exists:
print(f"❌ 错误: 数据库 {args.db} 不存在")
return 1
conn = sqlite3.connect(args.db)
verify_data(conn)
conn.close()
return 0
# 重新创建数据库
if args.recreate and db_exists:
logger.info(f"🗑️ 删除现有数据库: {args.db}")
Path(args.db).unlink()
db_exists = False
# 创建或连接数据库
if db_exists:
logger.info(f"📂 数据库已存在: {args.db}")
conn = sqlite3.connect(args.db)
logger.info("✅ 连接到现有数据库")
else:
conn = create_database(args.db, args.schema)
# 导入数据
import_all_books(conn, args.data_dir)
# 验证数据
verify_data(conn)
# 关闭连接
conn.close()
print("\n" + "=" * 80)
print("🎉 所有操作完成!")
print("=" * 80)
print(f"\n💡 提示:")
print(f" - 使用 'python3 {sys.argv[0]} --verify-only' 验证数据")
print(f" - 使用 'python3 {sys.argv[0]} --recreate' 重新导入所有数据")
print(f" - 启动 app.py 来使用Web接口访问数据")
return 0
except Exception as e:
logger.error(f"\n❌ 错误: {e}", exc_info=True)
return 1
if __name__ == '__main__':
sys.exit(main())