|
|
import requests |
|
|
import PyPDF2 |
|
|
from io import BytesIO |
|
|
from openai import OpenAI |
|
|
import numpy as np |
|
|
from sklearn.feature_extraction.text import TfidfVectorizer |
|
|
from sklearn.metrics.pairwise import cosine_similarity |
|
|
import re |
|
|
from typing import List, Tuple |
|
|
import gradio as gr |
|
|
|
|
|
|
|
|
client = OpenAI( |
|
|
api_key="sk-EiLiW1tVzR6ra7LoAvAWRbppMJWnezTanz3AfvvVrGYBEN1b", |
|
|
base_url="https://api.opentyphoon.ai/v1" |
|
|
) |
|
|
|
|
|
class PDPAKnowledgeBase: |
|
|
def __init__(self, pdf_url: str): |
|
|
self.pdf_url = pdf_url |
|
|
self.chunks = [] |
|
|
|
|
|
self.vectorizer = TfidfVectorizer(stop_words='english', max_features=1000) |
|
|
self.chunk_vectors = None |
|
|
self.load_and_process_pdf() |
|
|
|
|
|
def download_pdf(self) -> bytes: |
|
|
"""Download PDF from GitHub URL""" |
|
|
print("📥 กำลังดาวน์โหลด PDPA PDF...") |
|
|
try: |
|
|
response = requests.get(self.pdf_url, timeout=30) |
|
|
response.raise_for_status() |
|
|
print("✅ ดาวน์โหลดสำเร็จ!") |
|
|
return response.content |
|
|
except Exception as e: |
|
|
print(f"❌ ไม่สามารถดาวน์โหลด PDF ได้: {e}") |
|
|
return None |
|
|
|
|
|
def extract_text_from_pdf(self, pdf_content: bytes) -> str: |
|
|
"""Extract text from PDF content""" |
|
|
print("📄 กำลังแยกข้อความจาก PDF...") |
|
|
try: |
|
|
pdf_file = BytesIO(pdf_content) |
|
|
pdf_reader = PyPDF2.PdfReader(pdf_file) |
|
|
|
|
|
text = "" |
|
|
for page_num, page in enumerate(pdf_reader.pages): |
|
|
try: |
|
|
page_text = page.extract_text() |
|
|
|
|
|
text += f"\n--- หน้า {page_num + 1} ---\n{page_text}\n" |
|
|
except Exception as e: |
|
|
print(f"⚠️ ไม่สามารถอ่านหน้า {page_num + 1}: {e}") |
|
|
continue |
|
|
|
|
|
print(f"✅ แยกข้อความสำเร็จ! จำนวน {len(pdf_reader.pages)} หน้า") |
|
|
return text |
|
|
except Exception as e: |
|
|
print(f"❌ ไม่สามารถแยกข้อความได้: {e}") |
|
|
return "" |
|
|
|
|
|
def chunk_text(self, text: str, chunk_size: int = 1000, overlap: int = 200) -> List[str]: |
|
|
"""Split text into overlapping chunks""" |
|
|
print("✂️ กำลังแบ่งข้อความเป็นส่วนๆ...") |
|
|
|
|
|
|
|
|
text = re.sub(r'\s+', ' ', text.strip()) |
|
|
|
|
|
chunks = [] |
|
|
start = 0 |
|
|
|
|
|
while start < len(text): |
|
|
end = start + chunk_size |
|
|
|
|
|
|
|
|
if end < len(text): |
|
|
|
|
|
for i in range(end, max(start + chunk_size - 200, start), -1): |
|
|
if text[i] in '.!?': |
|
|
end = i + 1 |
|
|
break |
|
|
|
|
|
chunk = text[start:end].strip() |
|
|
if chunk: |
|
|
chunks.append(chunk) |
|
|
|
|
|
start = end - overlap |
|
|
if start >= len(text): |
|
|
break |
|
|
|
|
|
print(f"✅ แบ่งเป็น {len(chunks)} ส่วน") |
|
|
return chunks |
|
|
|
|
|
def create_embeddings(self, chunks: List[str]): |
|
|
"""Create TF-IDF vectors for chunks""" |
|
|
print("🔢 กำลังสร้าง embeddings...") |
|
|
try: |
|
|
self.chunk_vectors = self.vectorizer.fit_transform(chunks) |
|
|
print("✅ สร้าง embeddings สำเร็จ!") |
|
|
except Exception as e: |
|
|
print(f"❌ ไม่สามารถสร้าง embeddings ได้: {e}") |
|
|
|
|
|
def load_and_process_pdf(self): |
|
|
"""Download and process the PDF""" |
|
|
pdf_content = self.download_pdf() |
|
|
if not pdf_content: |
|
|
return |
|
|
|
|
|
text = self.extract_text_from_pdf(pdf_content) |
|
|
if not text: |
|
|
return |
|
|
|
|
|
self.chunks = self.chunk_text(text) |
|
|
if self.chunks: |
|
|
self.create_embeddings(self.chunks) |
|
|
|
|
|
def search_relevant_chunks(self, query: str, top_k: int = 3) -> List[Tuple[str, float]]: |
|
|
"""Search for relevant chunks based on query""" |
|
|
if not self.chunks or self.chunk_vectors is None: |
|
|
return [] |
|
|
|
|
|
try: |
|
|
|
|
|
processed_query = ' '.join([word for word in self.vectorizer.build_tokenizer()(query) if word not in self.vectorizer.get_stop_words()]) |
|
|
if not processed_query: |
|
|
return [] |
|
|
|
|
|
|
|
|
query_vector = self.vectorizer.transform([query]) |
|
|
|
|
|
|
|
|
similarities = cosine_similarity(query_vector, self.chunk_vectors)[0] |
|
|
|
|
|
|
|
|
top_indices = np.argsort(similarities)[::-1][:top_k] |
|
|
|
|
|
results = [] |
|
|
for idx in top_indices: |
|
|
if similarities[idx] > 0.1: |
|
|
results.append((self.chunks[idx], similarities[idx])) |
|
|
|
|
|
return results |
|
|
except Exception as e: |
|
|
print(f"⚠️ ข้อผิดพลาดในการค้นหา: {e}") |
|
|
return [] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
pdf_url = "https://raw.githubusercontent.com/tiya1012/pdpa/main/PDPA_Guideline_v_1_merged.pdf" |
|
|
kb = PDPAKnowledgeBase(pdf_url) |
|
|
|
|
|
if not kb.chunks: |
|
|
print("❌ ไม่สามารถโหลดข้อมูล PDPA ได้ กำลังใช้โหมดปกติ...") |
|
|
use_rag = False |
|
|
else: |
|
|
print(f"✅ โหลดข้อมูล PDPA สำเร็จ! มีข้อมูล {len(kb.chunks)} ส่วน") |
|
|
use_rag = True |
|
|
|
|
|
def predict_chat(message: str, history: List[Tuple[str, str]]) -> str: |
|
|
""" |
|
|
Function to handle chat predictions for Gradio interface. |
|
|
Args: |
|
|
message (str): The user's current input message. |
|
|
history (List[Tuple[str, str]]): List of previous conversation turns. |
|
|
Each turn is a tuple of (user_message, bot_message). |
|
|
Returns: |
|
|
str: The chatbot's response. |
|
|
""" |
|
|
|
|
|
global use_rag, kb |
|
|
|
|
|
|
|
|
messages = [ |
|
|
{"role": "system", "content": "คุณเป็นผู้เชี่ยวชาญด้าน PDPA ให้คำแนะนำเกี่ยวกับกฎหมาย PDPA โดยตอบเป็นภาษาไทยเท่านั้น หากมีข้อมูลอ้างอิงจากเอกสาร PDPA ให้อ้างอิงแหล่งที่มาด้วย"} |
|
|
] |
|
|
|
|
|
|
|
|
for human_msg, ai_msg in history: |
|
|
messages.append({"role": "user", "content": human_msg}) |
|
|
messages.append({"role": "assistant", "content": ai_msg}) |
|
|
|
|
|
|
|
|
context = "" |
|
|
if use_rag: |
|
|
relevant_chunks = kb.search_relevant_chunks(message) |
|
|
|
|
|
if relevant_chunks: |
|
|
context = "\n\nข้อมูลอ้างอิงจากเอกสาร PDPA:\n" |
|
|
for i, (chunk, score) in enumerate(relevant_chunks, 1): |
|
|
context += f"\n[อ้างอิง {i}] {chunk[:500]}...\n" |
|
|
|
|
|
|
|
|
enhanced_input = message |
|
|
if context: |
|
|
enhanced_input = f"{message}\n{context}\n\nโปรดตอบโดยอ้างอิงจากข้อมูลข้างต้นหากเกี่ยวข้อง และระบุแหล่งอ้างอิงด้วย" |
|
|
|
|
|
|
|
|
messages.append({"role": "user", "content": enhanced_input}) |
|
|
|
|
|
try: |
|
|
|
|
|
completion = client.chat.completions.create( |
|
|
model="typhoon-v2.1-12b-instruct", |
|
|
messages=messages, |
|
|
temperature=0.7, |
|
|
max_tokens=2048, |
|
|
top_p=0.9, |
|
|
stream=False |
|
|
) |
|
|
|
|
|
full_response = completion.choices[0].message.content |
|
|
return full_response |
|
|
|
|
|
except Exception as e: |
|
|
return f"❌ เกิดข้อผิดพลาด: {e}. กรุณาลองใหม่อีกครั้ง." |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
print("\n🚀 กำลังเริ่มต้น Gradio Chatbot...") |
|
|
|
|
|
|
|
|
custom_css = """ |
|
|
body { |
|
|
background-color: #f4f7f6; /* Light grey background */ |
|
|
font-family: 'IBM Plex Sans Thai', 'Segoe UI', Tahoma, Geneva, Verdana, sans-serif; |
|
|
} |
|
|
.gradio-container { |
|
|
max-width: 900px; |
|
|
margin: 40px auto; |
|
|
box-shadow: 0 4px 15px rgba(0,0,0,0.1); |
|
|
border-radius: 15px; |
|
|
overflow: hidden; |
|
|
} |
|
|
.panel-header { |
|
|
background-color: #28a745; /* Green header */ |
|
|
color: white; |
|
|
padding: 18px 25px; |
|
|
text-align: center; |
|
|
font-size: 1.6em; |
|
|
font-weight: bold; |
|
|
border-top-left-radius: 15px; |
|
|
border-top-right-radius: 15px; |
|
|
} |
|
|
.chat-message.bot { |
|
|
background-color: #e6ffed; /* Light green for bot messages */ |
|
|
border-left: 5px solid #28a745; |
|
|
padding: 12px 18px; |
|
|
border-radius: 10px; |
|
|
margin-bottom: 10px; |
|
|
margin-right: 15%; /* Keep some space on the right */ |
|
|
word-wrap: break-word; /* Ensure long words break */ |
|
|
} |
|
|
.chat-message.user { |
|
|
background-color: #e0f2f7; /* Light blue for user messages */ |
|
|
border-right: 5px solid #007bff; |
|
|
padding: 12px 18px; |
|
|
border-radius: 10px; |
|
|
margin-bottom: 10px; |
|
|
margin-left: 15%; /* Keep some space on the left */ |
|
|
word-wrap: break-word; /* Ensure long words break */ |
|
|
} |
|
|
.gr-button { |
|
|
background-color: #007bff; /* Blue buttons */ |
|
|
color: white; |
|
|
border-radius: 8px; |
|
|
padding: 10px 18px; |
|
|
font-size: 1em; |
|
|
border: none; |
|
|
transition: background-color 0.3s ease; |
|
|
} |
|
|
.gr-button:hover { |
|
|
background-color: #0056b3; |
|
|
} |
|
|
.gr-textarea { |
|
|
border-radius: 8px; |
|
|
border: 1px solid #ced4da; |
|
|
} |
|
|
.gradio-container h1 { |
|
|
padding: 15px 0; |
|
|
margin-bottom: 0; |
|
|
} |
|
|
/* Style for the chatbot examples */ |
|
|
.gr-samples-container button { |
|
|
background-color: #f8f9fa; |
|
|
color: #495057; |
|
|
border: 1px solid #dee2e6; |
|
|
border-radius: 5px; |
|
|
padding: 8px 12px; |
|
|
margin: 5px; |
|
|
cursor: pointer; |
|
|
transition: all 0.2s ease-in-out; |
|
|
} |
|
|
.gr-samples-container button:hover { |
|
|
background-color: #e2e6ea; |
|
|
border-color: #dae0e5; |
|
|
color: #212529; |
|
|
} |
|
|
""" |
|
|
|
|
|
|
|
|
demo = gr.ChatInterface( |
|
|
fn=predict_chat, |
|
|
title="🤖 Typhoon PDPA RAG Expert Chatbot", |
|
|
description="คุณเป็นผู้เชี่ยวชาญด้าน PDPA ให้คำแนะนำเกี่ยวกับกฎหมาย PDPA โดยตอบเป็นภาษาไทยเท่านั้น หากมีข้อมูลอ้างอิงจากเอกสาร PDPA ให้อ้างอิงแหล่งที่มาด้วย", |
|
|
examples=[ |
|
|
"PDPA คืออะไร?", |
|
|
"สิทธิของเจ้าของข้อมูลส่วนบุคคลมีอะไรบ้าง?", |
|
|
"ใครคือผู้ควบคุมข้อมูลส่วนบุคคล?", |
|
|
"กรณีใดบ้างที่สามารถเก็บรวบรวมข้อมูลส่วนบุคคลได้โดยไม่ต้องขอความยินยอม?", |
|
|
"การแจ้งวัตถุประสงค์ในการเก็บข้อมูลสำคัญอย่างไร?" |
|
|
], |
|
|
chatbot=gr.Chatbot(height=500), |
|
|
theme=gr.themes.Soft(), |
|
|
css=custom_css, |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
demo.launch(share=True) |