| from fastapi import FastAPI, Request, HTTPException, Depends |
| from fastapi.middleware.cors import CORSMiddleware |
| from loguru import logger |
| import json |
| from typing import Dict, Any, List, Optional |
| import asyncio |
| from concurrent.futures import ThreadPoolExecutor |
| import os |
| import traceback |
| import difflib |
|
|
| from .config import Settings, get_settings |
| from .constants import VERSION_NUMBER |
| from .facebook import FacebookClient |
| from .sheets import SheetsClient |
| from .supabase_db import SupabaseClient |
| from .embedding import EmbeddingClient |
| from .utils import setup_logging, extract_command, extract_keywords, timing_decorator_async, timing_decorator_sync, ensure_log_dir, validate_config |
| from .constants import VEHICLE_KEYWORDS, SHEET_RANGE, VEHICLE_KEYWORD_TO_COLUMN |
| from .health import router as health_router |
| from .llm import create_llm_client |
| from .reranker import Reranker |
| from .request_limit_manager import RequestLimitManager |
| from .law_document_chunker import LawDocumentChunker |
| from app.channel_manager import channel_manager |
|
|
| app = FastAPI(title="WeBot Facebook Messenger API") |
|
|
| |
| app.add_middleware( |
| CORSMiddleware, |
| allow_origins=["*"], |
| allow_credentials=True, |
| allow_methods=["*"], |
| allow_headers=["*"], |
| ) |
|
|
| |
| settings = get_settings() |
| setup_logging(settings.log_level) |
|
|
| logger.info("[STARTUP] Đang lấy PORT từ biến môi trường hoặc config...") |
| port = int(os.environ.get("PORT", settings.port if hasattr(settings, 'port') else 7860)) |
| logger.info(f"[STARTUP] PORT sử dụng: {port}") |
|
|
| logger.info("[STARTUP] Khởi tạo global RequestLimitManager...") |
| |
| request_limit_manager = RequestLimitManager("gemini") |
|
|
| logger.info("[STARTUP] Khởi tạo FacebookClient...") |
| facebook_client = FacebookClient(settings.facebook_app_secret) |
| logger.info("[STARTUP] Khởi tạo SheetsClient...") |
| sheets_client = SheetsClient( |
| settings.google_sheets_credentials_file, |
| settings.google_sheets_token_file, |
| settings.conversation_sheet_id |
| ) |
| logger.info("[STARTUP] Khởi tạo SupabaseClient...") |
| supabase_client = SupabaseClient(settings.supabase_url, settings.supabase_key) |
| logger.info("[STARTUP] Khởi tạo EmbeddingClient...") |
| embedding_client = EmbeddingClient() |
|
|
|
|
| |
| |
| |
| |
| |
| |
| llm_client = create_llm_client( |
| provider="gemini", |
| base_url=settings.gemini_base_url, |
| model=settings.gemini_models_list[0] if settings.gemini_models_list else "gemini-2.5-flash" |
| ) |
|
|
| reranker = Reranker() |
|
|
| |
| law_chunker = LawDocumentChunker() |
| law_chunker.llm_client = llm_client |
|
|
| logger.info("[STARTUP] Mount health router...") |
| app.include_router(health_router) |
|
|
| logger.info("[STARTUP] Validate config...") |
| validate_config(settings) |
|
|
| executor = ThreadPoolExecutor(max_workers=4) |
|
|
| message_text = None |
|
|
| def flatten_timestamp(ts): |
| flat = [] |
| for t in ts: |
| if isinstance(t, list): |
| flat.extend(flatten_timestamp(t)) |
| else: |
| flat.append(t) |
| return flat |
|
|
| def normalize_vehicle_keyword(keyword: str) -> str: |
| """ |
| Chuẩn hoá giá trị phương tiện về đúng từ khoá gần nhất trong VEHICLE_KEYWORDS. |
| Nếu không khớp, trả về keyword gốc. |
| """ |
| if not keyword: |
| return "" |
| matches = difflib.get_close_matches(keyword.lower(), [k.lower() for k in VEHICLE_KEYWORDS], n=1, cutoff=0.6) |
| if matches: |
| |
| for k in VEHICLE_KEYWORDS: |
| if k.lower() == matches[0]: |
| return k |
| return keyword |
|
|
| @app.get("/") |
| async def root(): |
| """Endpoint root để kiểm tra trạng thái app.""" |
| logger.info("[HEALTH] Truy cập endpoint root /") |
| return {"version": VERSION_NUMBER} |
|
|
| @app.get("/webhook") |
| async def verify_webhook(request: Request): |
| """ |
| Xác thực webhook Facebook Messenger. |
| Input: request (Request) - request từ Facebook với các query params. |
| Output: Trả về challenge nếu verify thành công, lỗi nếu thất bại. |
| """ |
| params = dict(request.query_params) |
| |
| mode = params.get("hub.mode") |
| token = str(params.get("hub.verify_token", "")) |
| challenge = str(params.get("hub.challenge", "")) |
|
|
| if not all([mode, token, challenge]): |
| raise HTTPException(status_code=400, detail="Missing parameters") |
|
|
| return await facebook_client.verify_webhook( |
| token, |
| challenge, |
| settings.facebook_verify_token |
| ) |
|
|
| @app.post("/webhook") |
| @timing_decorator_async |
| async def webhook(request: Request): |
| """ |
| Nhận và xử lý message từ Facebook Messenger webhook. |
| Input: request (Request) - request chứa payload JSON từ Facebook. |
| Output: JSON status. |
| """ |
|
|
| logger.info(f"[DEBUG] Nhận message từ Facebook Messenger webhook...") |
| body_bytes = await request.body() |
| |
| |
| if not facebook_client.verify_signature(request, body_bytes): |
| raise HTTPException(status_code=403, detail="Invalid signature") |
|
|
| try: |
| body = json.loads(body_bytes) |
| |
| is_echo = ( |
| isinstance(body, dict) |
| and "entry" in body |
| and isinstance(body["entry"], list) |
| and len(body["entry"]) > 0 |
| and "messaging" in body["entry"][0] |
| and isinstance(body["entry"][0]["messaging"], list) |
| and len(body["entry"][0]["messaging"]) > 0 |
| and body["entry"][0]["messaging"][0].get("message", {}).get("is_echo", False) |
| ) |
| if is_echo: |
| logger.info(f"[DEBUG] Message is echo, skipping...") |
| return {"status": "ok"} |
| else: |
| message_data = facebook_client.parse_message(body) |
| logger.info(f"[DEBUG] message_data: {message_data}") |
| |
| if not message_data: |
| return {"status": "ok"} |
|
|
| |
| page_id = message_data.get("page_id") |
| sender_id = message_data.get("sender_id") |
| channel = channel_manager.get_or_create_channel("facebook", page_id) |
| conversation = channel.get_or_create_conversation(sender_id) |
| await conversation.process_message(message_data) |
| return {"status": "ok"} |
| except Exception as e: |
| logger.error(f"Error processing webhook: {e}\nTraceback: {traceback.format_exc()}") |
| raise HTTPException(status_code=500, detail="Internal server error") |
|
|
| |
|
|
| @app.delete("/api/document-chunks/clear") |
| @timing_decorator_async |
| async def delete_all_document_chunks(): |
| """ |
| API xóa toàn bộ bảng document_chunks. |
| """ |
| try: |
| logger.info("[API] Starting delete all document chunks") |
| success = supabase_client.delete_all_document_chunks() |
| |
| if success: |
| logger.info("[API] Successfully deleted all document chunks") |
| return {"status": "success", "message": "Đã xóa toàn bộ document chunks"} |
| else: |
| logger.error("[API] Failed to delete all document chunks") |
| raise HTTPException(status_code=500, detail="Lỗi khi xóa document chunks") |
| |
| except Exception as e: |
| logger.error(f"[API] Error in delete_all_document_chunks: {e}") |
| raise HTTPException(status_code=500, detail=f"Lỗi: {str(e)}") |
|
|
| @app.post("/api/document-chunks/update") |
| @timing_decorator_async |
| async def update_specific_document(file_name: str, document_id: int): |
| """ |
| API cập nhật file xác định trong thư mục data. |
| |
| Args: |
| file_name: Tên file trong thư mục data (ví dụ: "luat_giao_thong.txt") |
| document_id: ID văn bản luật |
| """ |
| try: |
| logger.info(f"[API] Starting update specific document: {file_name}, document_id: {document_id}") |
| |
| |
| file_path = f"data/{file_name}" |
| if not os.path.exists(file_path): |
| logger.error(f"[API] File not found: {file_path}") |
| raise HTTPException(status_code=404, detail=f"File không tồn tại: {file_name}") |
| |
| |
| logger.info(f"[API] Deleting old chunks for document_id: {document_id}") |
| supabase_client.delete_document_chunks_by_vanbanid(document_id) |
| |
| |
| logger.info(f"[API] Processing document: {file_path}") |
| success = await law_chunker.process_law_document(file_path, document_id) |
| |
| if success: |
| logger.info(f"[API] Successfully updated document: {file_name}") |
| return { |
| "status": "success", |
| "message": f"Đã cập nhật thành công văn bản: {file_name}", |
| "document_id": document_id, |
| "file_name": file_name |
| } |
| else: |
| logger.error(f"[API] Failed to update document: {file_name}") |
| raise HTTPException(status_code=500, detail=f"Lỗi khi xử lý văn bản: {file_name}") |
| |
| except HTTPException: |
| raise |
| except Exception as e: |
| logger.error(f"[API] Error in update_specific_document: {e}") |
| raise HTTPException(status_code=500, detail=f"Lỗi: {str(e)}") |
|
|
| @app.post("/api/document-chunks/update-all") |
| @timing_decorator_async |
| async def update_all_documents(): |
| """ |
| API cập nhật tự động toàn bộ file trong thư mục data. |
| """ |
| try: |
| logger.info("[API] Starting update all documents") |
| |
| |
| data_dir = "data" |
| if not os.path.exists(data_dir): |
| logger.warning(f"[API] Data directory not found: {data_dir}") |
| return { |
| "status": "warning", |
| "message": "Thư mục data không tồn tại", |
| "processed_files": [], |
| "failed_files": [] |
| } |
| |
| |
| txt_files = [f for f in os.listdir(data_dir) if f.endswith('.txt')] |
| |
| if not txt_files: |
| logger.warning("[API] No .txt files found in data directory") |
| return { |
| "status": "warning", |
| "message": "Không tìm thấy file .txt nào trong thư mục data", |
| "processed_files": [], |
| "failed_files": [] |
| } |
| |
| logger.info(f"[API] Found {len(txt_files)} .txt files to process") |
| |
| processed_files = [] |
| failed_files = [] |
| |
| |
| for i, file_name in enumerate(txt_files, 1): |
| try: |
| logger.info(f"[API] Processing file {i}/{len(txt_files)}: {file_name}") |
| |
| |
| document_id = i |
| |
| |
| supabase_client.delete_document_chunks_by_vanbanid(document_id) |
| |
| |
| file_path = os.path.join(data_dir, file_name) |
| success = await law_chunker.process_law_document(file_path, document_id) |
| |
| if success: |
| processed_files.append({ |
| "file_name": file_name, |
| "document_id": document_id, |
| "status": "success" |
| }) |
| logger.info(f"[API] Successfully processed: {file_name}") |
| else: |
| failed_files.append({ |
| "file_name": file_name, |
| "document_id": document_id, |
| "status": "failed", |
| "error": "Processing failed" |
| }) |
| logger.error(f"[API] Failed to process: {file_name}") |
| |
| except Exception as e: |
| logger.error(f"[API] Error processing {file_name}: {e}") |
| failed_files.append({ |
| "file_name": file_name, |
| "document_id": i, |
| "status": "failed", |
| "error": str(e) |
| }) |
| |
| |
| total_files = len(txt_files) |
| success_count = len(processed_files) |
| failed_count = len(failed_files) |
| |
| logger.info(f"[API] Update all completed: {success_count}/{total_files} files processed successfully") |
| |
| return { |
| "status": "success", |
| "message": f"Đã xử lý {success_count}/{total_files} files thành công", |
| "total_files": total_files, |
| "processed_files": processed_files, |
| "failed_files": failed_files |
| } |
| |
| except Exception as e: |
| logger.error(f"[API] Error in update_all_documents: {e}") |
| raise HTTPException(status_code=500, detail=f"Lỗi: {str(e)}") |
|
|
| @app.get("/api/document-chunks/view") |
| @timing_decorator_async |
| async def view_all_document_chunks(): |
| """ |
| API xem toàn bộ dữ liệu trong bảng document_chunks theo cấu trúc cây. |
| """ |
| try: |
| logger.info("[API] Starting view all document chunks") |
| |
| |
| chunks_data = supabase_client.get_all_document_chunks() |
| |
| |
| total_chunks = len(chunks_data) |
| unique_documents = len(set(chunk.get('vanbanid') for chunk in chunks_data if chunk.get('vanbanid'))) |
| |
| |
| chunks_by_document = {} |
| for chunk in chunks_data: |
| vanbanid = chunk.get('vanbanid') |
| if vanbanid not in chunks_by_document: |
| chunks_by_document[vanbanid] = [] |
| chunks_by_document[vanbanid].append(chunk) |
| |
| |
| document_stats = [] |
| hierarchical_data = [] |
| |
| for vanbanid, chunks in chunks_by_document.items(): |
| |
| document_stats.append({ |
| "vanbanid": vanbanid, |
| "chunk_count": len(chunks), |
| "document_title": chunks[0].get('document_title', 'Unknown') if chunks else 'Unknown' |
| }) |
| |
| |
| tree_structure = build_chunk_tree(chunks) |
| |
| hierarchical_data.append({ |
| "vanbanid": vanbanid, |
| "document_title": chunks[0].get('document_title', 'Unknown') if chunks else 'Unknown', |
| "chunk_count": len(chunks), |
| "chunks": tree_structure |
| }) |
| |
| return { |
| "status": "success", |
| "message": f"Đã lấy {total_chunks} chunks từ {unique_documents} văn bản", |
| "summary": { |
| "total_chunks": total_chunks, |
| "unique_documents": unique_documents, |
| "document_stats": document_stats |
| }, |
| "data": hierarchical_data |
| } |
| |
| except Exception as e: |
| logger.error(f"[API] Error in view_all_document_chunks: {e}") |
| raise HTTPException(status_code=500, detail=f"Lỗi: {str(e)}") |
|
|
| def build_chunk_tree(chunks: List[Dict[str, Any]]) -> List[Dict[str, Any]]: |
| """ |
| Xây dựng cấu trúc cây từ danh sách chunks phẳng. |
| Cách đơn giản: tìm root nodes (cha=None) trước, sau đó tìm children. |
| """ |
| if not chunks: |
| return [] |
| |
| |
| root_count = 0 |
| child_count = 0 |
| for chunk in chunks: |
| if chunk.get('cha') is None: |
| root_count += 1 |
| logger.debug(f"[TREE] Root chunk: {chunk.get('content', '')[:100]}") |
| else: |
| child_count += 1 |
| logger.debug(f"[TREE] Child chunk: {chunk.get('content', '')[:100]} -> parent: {chunk.get('cha')}") |
| |
| logger.info(f"[TREE] Found {root_count} root chunks and {child_count} child chunks from {len(chunks)} total chunks") |
| |
| |
| chunks_dict = {chunk['id']: chunk for chunk in chunks} |
| |
| def build_node(chunk_id: str) -> Dict[str, Any]: |
| """Tạo node và tìm tất cả children của nó.""" |
| chunk = chunks_dict[chunk_id] |
| |
| |
| node = { |
| "id": chunk_id, |
| "content": chunk.get('content', ''), |
| "vanbanid": chunk.get('vanbanid'), |
| "cha": chunk.get('cha'), |
| "document_title": chunk.get('document_title', ''), |
| "article_number": chunk.get('article_number'), |
| "article_title": chunk.get('article_title', ''), |
| "clause_number": chunk.get('clause_number', ''), |
| "sub_clause_letter": chunk.get('sub_clause_letter', ''), |
| "context_summary": chunk.get('context_summary', ''), |
| "data": chunk, |
| "children": [] |
| } |
| |
| |
| children_count = 0 |
| for other_chunk in chunks: |
| if other_chunk.get('cha') == chunk_id: |
| child_node = build_node(other_chunk['id']) |
| node["children"].append(child_node) |
| children_count += 1 |
| |
| logger.debug(f"[TREE] Node {chunk_id[:8]}... has {children_count} children") |
| return node |
| |
| |
| root_chunks = [] |
| processed_ids = set() |
| |
| for chunk in chunks: |
| if chunk.get('cha') is None and chunk['id'] not in processed_ids: |
| root_node = build_node(chunk['id']) |
| root_chunks.append(root_node) |
| processed_ids.add(chunk['id']) |
| logger.info(f"[TREE] Added root node: {chunk.get('content', '')[:100]}") |
| |
| logger.info(f"[TREE] Built tree with {len(root_chunks)} root nodes from {len(chunks)} total chunks") |
| logger.info(f"[TREE] Root chunks: {[chunk.get('content', '')[:50] for chunk in root_chunks]}") |
| return root_chunks |
|
|
| @app.get("/api/document-chunks/status") |
| @timing_decorator_async |
| async def get_document_chunks_status(): |
| """ |
| API lấy thông tin trạng thái của document chunks. |
| """ |
| try: |
| logger.info("[API] Getting document chunks status") |
| |
| |
| |
| |
| |
| data_dir = "data" |
| txt_files = [] |
| if os.path.exists(data_dir): |
| txt_files = [f for f in os.listdir(data_dir) if f.endswith('.txt')] |
| |
| return { |
| "status": "success", |
| "data_directory": data_dir, |
| "available_files": txt_files, |
| "file_count": len(txt_files), |
| "message": f"Tìm thấy {len(txt_files)} file .txt trong thư mục data" |
| } |
| |
| except Exception as e: |
| logger.error(f"[API] Error in get_document_chunks_status: {e}") |
| raise HTTPException(status_code=500, detail=f"Lỗi: {str(e)}") |
|
|
| if __name__ == "__main__": |
| import uvicorn |
| logger.info("[STARTUP] Bắt đầu chạy uvicorn server...") |
| uvicorn.run( |
| "app.main:app", |
| host="0.0.0.0", |
| port=port |
| ) |