chatbot-lichsu / backend /admin_services.py
Nguyễn Quốc Vỹ
Tối ưu đồng bộ chỉ mục: chỉ index tài liệu mới, bỏ reindex toàn bộ
0700453
"""
Admin Service Layer: Tài liệu hệ thống, User, History, Feedback, RAG thống kê/reindex.
Dùng cho trang quản trị Admin.
"""
import os
import sys
import uuid
import shutil
# Đảm bảo import được data_processing (từ thư mục gốc project)
ROOT_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
if ROOT_DIR not in sys.path:
sys.path.insert(0, ROOT_DIR)
from backend import db
from backend.auth import is_admin
from backend.runtime_paths import PDF_DIR
from backend.db_sync import schedule_pdf_upload, schedule_pdf_delete, schedule_vector_sync
from data_processing.dynamic_indexing import add_pdf_file
from data_processing.indexing import delete_chunks_by_source
# ======================== UserService ========================
def list_users(limit: int = 500) -> list:
"""Danh sách người dùng (admin)."""
return db.list_users(limit=limit)
def update_user_role(ma_nguoi_dung: str, vai_tro: str) -> tuple[bool, str]:
"""Cập nhật vai trò (user/admin). Returns (success, message)."""
if vai_tro not in ("user", "admin"):
return False, "Vai trò không hợp lệ"
ok = db.update_user_role(ma_nguoi_dung, vai_tro)
return ok, "Đã cập nhật vai trò" if ok else "Cập nhật thất bại"
def lock_user(ma_nguoi_dung: str) -> bool:
"""Khóa tài khoản."""
return db.lock_user(ma_nguoi_dung)
def unlock_user(ma_nguoi_dung: str) -> bool:
"""Mở khóa tài khoản."""
return db.unlock_user(ma_nguoi_dung)
# ======================== HistoryService ========================
def list_conversations(limit: int = 200) -> list:
"""Lịch sử hỏi đáp toàn hệ thống (admin)."""
return db.list_conversations_admin(limit=limit)
def list_feedback(limit: int = 300) -> list:
"""Danh sách phản hồi người dùng (admin)."""
return db.list_feedback_admin(limit=limit)
def update_feedback_status(ma_phan_hoi: int, trang_thai: str) -> tuple[bool, str]:
"""Cập nhật trạng thái xử lý phản hồi: moi / daxem / dong."""
if trang_thai not in ("moi", "daxem", "dong"):
return False, "Trạng thái không hợp lệ"
ok = db.update_phan_hoi_status(ma_phan_hoi, trang_thai)
return ok, "Đã cập nhật" if ok else "Cập nhật thất bại"
# ======================== Tài liệu hệ thống (DocumentService) ========================
def list_system_docs() -> list:
"""
Danh sách tài liệu hệ thống: từ bảng tai_lieu_he_thong (nếu có)
và/hoặc quét thư mục PDF runtime để đồng bộ.
"""
from backend.db import list_tai_lieu_he_thong
rows = list_tai_lieu_he_thong()
# Bổ sung từ thư mục PDF runtime nếu file tồn tại nhưng chưa trong DB
os.makedirs(PDF_DIR, exist_ok=True)
for f in os.listdir(PDF_DIR):
if f.lower().endswith(".pdf"):
path = os.path.join(PDF_DIR, f)
path_norm = os.path.normpath(path)
if not any(os.path.normpath(r["duong_dan"]) == path_norm for r in rows):
rows.append({
"ma_tai_lieu": f"file_{os.path.splitext(f)[0]}",
"ten_file": f,
"duong_dan": path_norm,
"ngay_them": None,
"ngay_cap_nhat": None,
})
return rows
def create_system_doc(file_path: str, filename: str = None) -> tuple[bool, str]:
"""
Thêm tài liệu hệ thống: copy file vào thư mục PDF runtime và ghi DB.
file_path: đường dẫn file tải lên (Streamlit UploadedFile hoặc path).
"""
filename = filename or os.path.basename(file_path)
if not filename.lower().endswith(".pdf"):
return False, "Chỉ chấp nhận file PDF"
ma = str(uuid.uuid4())
dest = os.path.join(PDF_DIR, filename)
os.makedirs(PDF_DIR, exist_ok=True)
try:
if hasattr(file_path, "read"):
with open(dest, "wb") as f:
f.write(file_path.read())
else:
shutil.copy2(file_path, dest)
db.insert_tai_lieu_he_thong(ma_tai_lieu=ma, ten_file=filename, duong_dan=os.path.abspath(dest))
# Index ngay sau khi upload để hỏi đáp dùng được luôn.
chunks_added = add_pdf_file(dest)
schedule_pdf_upload(dest, filename)
return True, f"Đã thêm tài liệu và index {chunks_added} chunks."
except Exception as e:
return False, str(e)
def update_system_doc(ma_tai_lieu: str, file_path: str = None, ten_file: str = None) -> tuple[bool, str]:
"""Cập nhật tài liệu hệ thống (metadata hoặc thay file)."""
docs = [r for r in list_system_docs() if r.get("ma_tai_lieu") == ma_tai_lieu]
if not docs:
return False, "Không tìm thấy tài liệu"
old_path = docs[0].get("duong_dan")
new_name = ten_file or (os.path.basename(file_path) if file_path else docs[0]["ten_file"])
if file_path and os.path.exists(old_path):
try:
if hasattr(file_path, "read"):
with open(old_path, "wb") as f:
f.write(file_path.read())
else:
shutil.copy2(file_path, old_path)
except Exception as e:
return False, str(e)
# Cập nhật tên trong DB nếu đổi tên (cần hàm update trong db - hiện chỉ có insert upsert)
db.insert_tai_lieu_he_thong(ma_tai_lieu=ma_tai_lieu, ten_file=new_name, duong_dan=old_path)
# Khi thay file, cần cập nhật vector ngay để tránh dùng dữ liệu cũ.
if file_path:
delete_chunks_by_source(new_name)
chunks_added = add_pdf_file(old_path)
msg = f"Đã cập nhật và index lại {chunks_added} chunks."
else:
msg = "Đã cập nhật metadata tài liệu."
schedule_pdf_upload(old_path, new_name)
return True, msg
def delete_system_doc(ma_tai_lieu: str) -> tuple[bool, str]:
"""Xóa tài liệu hệ thống: xóa file (nếu có) và xóa bản ghi DB."""
docs = [r for r in list_system_docs() if r.get("ma_tai_lieu") == ma_tai_lieu]
if not docs:
return False, "Không tìm thấy tài liệu"
path = docs[0].get("duong_dan")
filename = docs[0].get("ten_file") or (os.path.basename(path) if path else "")
try:
if path and os.path.isfile(path):
os.remove(path)
except OSError:
pass
db.delete_tai_lieu_he_thong(ma_tai_lieu)
if filename:
schedule_pdf_delete(filename)
return True, "Đã xóa tài liệu. Chạy 'Tái lập chỉ mục' để cập nhật RAG."
def reindex_doc(ma_tai_lieu: str) -> tuple[bool, str]:
"""Tái lập chỉ mục cho một tài liệu (reindex toàn bộ vẫn an toàn hơn)."""
# Đơn giản: gọi reindex_all (ChromaDB không hỗ trợ xóa theo source dễ dàng)
return reindex_all()
def reindex_all() -> tuple[bool, str]:
"""
Đồng bộ chỉ mục theo kiểu tăng dần (incremental):
- Quét thư mục PDF runtime
- Chỉ index tài liệu CHƯA có trong ChromaDB
Tránh reindex toàn bộ gây chậm khi số tài liệu lớn.
"""
try:
from data_processing.dynamic_indexing import add_pdf_file
if not os.path.isdir(PDF_DIR):
return False, f"Không có tài liệu PDF trong {PDF_DIR}"
pdf_files = [
os.path.join(PDF_DIR, f)
for f in sorted(os.listdir(PDF_DIR))
if f.lower().endswith(".pdf")
]
if not pdf_files:
return False, f"Không có tài liệu PDF trong {PDF_DIR}"
indexed_docs = 0
indexed_chunks = 0
skipped_docs = 0
for pdf_path in pdf_files:
added = add_pdf_file(pdf_path)
if added > 0:
indexed_docs += 1
indexed_chunks += added
else:
skipped_docs += 1
schedule_vector_sync()
return True, (
f"Đồng bộ xong: thêm mới {indexed_docs} tài liệu / {indexed_chunks} chunks, "
f"bỏ qua {skipped_docs} tài liệu đã có."
)
except Exception as e:
return False, f"Lỗi: {str(e)}"
# ======================== RAGAdminService ========================
def get_rag_stats() -> dict:
"""Thống kê RAG (ChromaDB)."""
try:
from data_processing.indexing import get_stats
return get_stats()
except Exception as e:
return {"error": str(e), "total_chunks": 0, "collection_name": ""}