|
|
|
|
|
|
|
|
import gradio as gr |
|
|
import os |
|
|
import re |
|
|
import shutil |
|
|
from langchain_community.document_loaders import PyPDFLoader |
|
|
from langchain.text_splitter import RecursiveCharacterTextSplitter |
|
|
from langchain_community.embeddings import HuggingFaceEmbeddings |
|
|
from langchain_community.vectorstores import FAISS |
|
|
from langchain_community.retrievers import BM25Retriever |
|
|
from langchain.retrievers import EnsembleRetriever |
|
|
from transformers import AutoTokenizer, AutoModelForCausalLM |
|
|
import torch |
|
|
|
|
|
|
|
|
@torch.no_grad() |
|
|
def load_models(): |
|
|
print("Memuat model (hanya terjadi sekali)...") |
|
|
device = "cuda" if torch.cuda.is_available() else "cpu" |
|
|
|
|
|
cache_dir = "./model_cache" |
|
|
os.makedirs(cache_dir, exist_ok=True) |
|
|
os.environ['SENTENCE_TRANSFORMERS_HOME'] = cache_dir |
|
|
|
|
|
embeddings = HuggingFaceEmbeddings( |
|
|
model_name="sentence-transformers/all-MiniLM-L6-v2", |
|
|
cache_folder=cache_dir |
|
|
) |
|
|
|
|
|
|
|
|
hf_token = os.getenv("HUGGING_FACE_HUB_TOKEN") |
|
|
|
|
|
tokenizer = AutoTokenizer.from_pretrained("google/gemma-3-270m-it", cache_dir=cache_dir, token=hf_token) |
|
|
llm = AutoModelForCausalLM.from_pretrained( |
|
|
"google/gemma-3-270m-it", |
|
|
cache_dir=cache_dir, |
|
|
device_map="auto", |
|
|
torch_dtype=torch.bfloat16, |
|
|
token=hf_token |
|
|
) |
|
|
print("Model berhasil dimuat.") |
|
|
return embeddings, tokenizer, llm |
|
|
|
|
|
embeddings, tokenizer, llm = load_models() |
|
|
|
|
|
rag_pipeline = {"retriever": None, "all_chunks": None} |
|
|
|
|
|
|
|
|
|
|
|
def process_document(uploaded_file): |
|
|
if uploaded_file is None: |
|
|
return "Mohon unggah file terlebih dahulu.", gr.update(interactive=False) |
|
|
|
|
|
try: |
|
|
|
|
|
file_path = uploaded_file.name |
|
|
|
|
|
loader = PyPDFLoader(file_path) |
|
|
docs = loader.load() |
|
|
text_splitter = RecursiveCharacterTextSplitter(chunk_size=2000, chunk_overlap=200) |
|
|
chunks = text_splitter.split_documents(docs) |
|
|
|
|
|
rag_pipeline["all_chunks"] = chunks |
|
|
|
|
|
faiss_db = FAISS.from_documents(chunks, embeddings) |
|
|
faiss_retriever = faiss_db.as_retriever(search_kwargs={"k": 10}) |
|
|
bm25_retriever = BM25Retriever.from_documents(chunks) |
|
|
bm25_retriever.k = 10 |
|
|
|
|
|
rag_pipeline["retriever"] = EnsembleRetriever( |
|
|
retrievers=[bm25_retriever, faiss_retriever], |
|
|
weights=[0.5, 0.5] |
|
|
) |
|
|
|
|
|
return f"File '{os.path.basename(file_path)}' berhasil diproses! Silakan ajukan pertanyaan.", gr.update(interactive=True) |
|
|
except Exception as e: |
|
|
return f"Error saat memproses file: {str(e)}", gr.update(interactive=False) |
|
|
|
|
|
|
|
|
def get_rag_response(query, chat_history): |
|
|
if rag_pipeline["retriever"] is None: |
|
|
return "Dokumen belum diproses. Mohon unggah file terlebih dahulu." |
|
|
|
|
|
query_original = query |
|
|
query_lower = query_original.lower() |
|
|
final_answer = "" |
|
|
found_source = "Tidak ada sumber spesifik" |
|
|
|
|
|
priority_keywords = ["jumlah aset lancar"] |
|
|
use_smart_lane = any(keyword in query_lower for keyword in priority_keywords) |
|
|
|
|
|
if use_smart_lane: |
|
|
|
|
|
year_match = re.search(r'\b(202[3-4])\b', query_lower) |
|
|
target_year = year_match.group(1) if year_match else "2024" |
|
|
for chunk in rag_pipeline["all_chunks"]: |
|
|
lines = chunk.page_content.split('\n') |
|
|
for line in lines: |
|
|
if any(keyword in line.lower() for keyword in priority_keywords): |
|
|
numbers = re.findall(r'(\d{1,3}(?:[.,]\d{3})*)', line) |
|
|
if len(numbers) >= 2: |
|
|
value_2024 = numbers[0] |
|
|
value_2023 = numbers[1] |
|
|
value = value_2024 if target_year == "2024" else value_2023 |
|
|
final_answer = f"Jumlah aset lancar untuk tahun {target_year} adalah **{value}**." |
|
|
found_source = f"Sumber: Halaman {chunk.metadata.get('page', 'NA')}" |
|
|
break |
|
|
if final_answer: break |
|
|
|
|
|
if not final_answer: |
|
|
|
|
|
retrieved_docs = rag_pipeline["retriever"].invoke(query_original) |
|
|
clean_context = "\n\n".join([doc.page_content for doc in retrieved_docs[:3]]) |
|
|
found_source = ", ".join(list(set([f"Halaman {doc.metadata.get('page', 'NA')}" for doc in retrieved_docs[:3]]))) |
|
|
|
|
|
chat_template = [{"role": "system", "content": "Anda adalah AI analis keuangan yang teliti. Jawab pertanyaan hanya berdasarkan teks yang diberikan."}, {"role": "user", "content": f"Dari TEKS di bawah, temukan jawaban untuk pertanyaan '{query_original}'.\n\nTEKS:\n{clean_context}\n\nJAWABAN:"}] |
|
|
final_prompt = tokenizer.apply_chat_template(chat_template, tokenize=False, add_generation_prompt=True) |
|
|
inputs = tokenizer(final_prompt, return_tensors="pt").to(llm.device) |
|
|
outputs = llm.generate(**inputs, max_new_tokens=250, do_sample=False, pad_token_id=tokenizer.eos_token_id) |
|
|
input_length = inputs.input_ids.shape[1] |
|
|
generated_tokens = outputs[0, input_length:] |
|
|
final_answer = tokenizer.decode(generated_tokens, skip_special_tokens=True) |
|
|
|
|
|
full_response = f"{final_answer}\n\n*{found_source}*" |
|
|
chat_history.append((query, full_response)) |
|
|
return "", chat_history |
|
|
|
|
|
|
|
|
|
|
|
with gr.Blocks() as demo: |
|
|
gr.Markdown("# 📊 Financial RAG Chatbot") |
|
|
|
|
|
with gr.Row(): |
|
|
with gr.Column(scale=1): |
|
|
file_output = gr.Textbox(label="Status Dokumen", interactive=False) |
|
|
upload_button = gr.UploadButton("Klik untuk Upload PDF", file_types=[".pdf"]) |
|
|
ask_button = gr.Button("Tanya", interactive=False) |
|
|
|
|
|
with gr.Column(scale=4): |
|
|
chatbot = gr.Chatbot(label="Chat") |
|
|
msg = gr.Textbox(label="Ketik Pertanyaan Anda di Sini...") |
|
|
|
|
|
|
|
|
upload_button.upload(process_document, upload_button, [file_output, ask_button]) |
|
|
msg.submit(get_rag_response, [msg, chatbot], [msg, chatbot]) |
|
|
ask_button.click(get_rag_response, [msg, chatbot], [msg, chatbot]) |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
demo.launch() |