papuabaratdaya / app.py
LPDA0's picture
Update app.py
ea6bfbc verified
# ===================================================================
# LOAD LIBRARY & ENV
# ===================================================================
from flask import Flask, render_template, request, jsonify, session
import psycopg2, redis, os, hashlib, json, time, requests
from langchain_google_genai import ChatGoogleGenerativeAI
from langchain_huggingface import HuggingFaceEmbeddings
from langchain_chroma import Chroma
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain.chains import create_history_aware_retriever, create_retrieval_chain
from langchain.chains.combine_documents import create_stuff_documents_chain
from langchain_core.messages import HumanMessage, AIMessage
from dotenv import load_dotenv
load_dotenv()
# ===================================================================
# FLASK APP CONFIG
# ===================================================================
app = Flask(__name__)
app.secret_key = os.urandom(24)
FLASK_DEBUG = os.getenv("FLASK_DEBUG")
# ===================================================================
# FUNGSI UNTUK MENDAPATKAN IP PENGGUNA
# ===================================================================
def get_user_ip():
forwarded_for = request.headers.get('X-Forwarded-For')
if forwarded_for:
return forwarded_for.split(',')[0].strip()
return request.remote_addr
# ===================================================================
# REDIS CONFIG (MENGGUNAKAN URL)
# ===================================================================
REDIS_URL = os.getenv("REDIS_URL")
REDIS_ENABLED = False
if REDIS_URL:
try:
redis_client = redis.Redis.from_url(
REDIS_URL,
decode_responses=True
)
redis_client.ping()
REDIS_ENABLED = True
print("✅ Redis (via URL) aktif dan siap digunakan", flush=True)
except Exception as e:
print(f"⚠️ Gagal terhubung ke Redis: {e}. Fitur cache dan rate-limit kustom nonaktif.", flush=True)
else:
print("⚠️ Variabel REDIS_URL tidak ditemukan di .env. Fitur cache dan rate-limit kustom nonaktif.", flush=True)
CACHE_EXPIRATION_SECONDS = int(os.getenv("CACHE_EXPIRATION_SECONDS"))
# ===================================================================
# RATE LIMIT CONFIG (CUSTOM DARI .ENV)
# ===================================================================
GLOBAL_LIMIT_PER_HOUR = int(os.getenv("GLOBAL_LIMIT_PER_HOUR"))
IP_BURST_LIMIT = int(os.getenv("IP_BURST_LIMIT"))
IP_BURST_SECONDS = int(os.getenv("IP_BURST_SECONDS"))
IP_MINUTE_LIMIT = int(os.getenv("IP_MINUTE_LIMIT"))
# ===================================================================
# DATABASE CONFIG (DIGANTI DENGAN NEON POSTGRESQL)
# ===================================================================
NEON_DATABASE_URL = os.getenv("NEON_DATABASE_URL")
def init_neon_db():
if not NEON_DATABASE_URL:
print("⚠️ Variabel NEON_DATABASE_URL tidak ditemukan. Logging ke database nonaktif.", flush=True)
return
try:
conn = psycopg2.connect(NEON_DATABASE_URL)
cur = conn.cursor()
cur.execute("""
CREATE TABLE IF NOT EXISTS percakapan (
id SERIAL PRIMARY KEY,
ip TEXT,
sender TEXT,
message TEXT,
timestamp TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP
)
""")
conn.commit()
cur.close()
conn.close()
print("✅ Koneksi ke Neon DB berhasil dan tabel siap.", flush=True)
except Exception as e:
print(f"⚠️ Gagal terhubung atau inisialisasi Neon DB: {e}", flush=True)
def log_message(ip, sender, message):
if not NEON_DATABASE_URL:
return
try:
conn = psycopg2.connect(NEON_DATABASE_URL)
cur = conn.cursor()
cur.execute("INSERT INTO percakapan (ip, sender, message) VALUES (%s, %s, %s)",
(ip, sender, message))
conn.commit()
cur.close()
conn.close()
except Exception as e:
print(f"⚠️ Gagal simpan log ke Neon DB: {e}", flush=True)
init_neon_db()
# ===================================================================
# LLM & EMBEDDING CONFIG
# ===================================================================
GEMINI_MODEL = os.getenv("GEMINI_MODEL")
GOOGLE_API_KEY = os.getenv("GOOGLE_API_KEY")
EMBEDDING_MODEL = os.getenv("EMBEDDING_MODEL")
LLM_TEMPERATURE = float(os.getenv("LLM_TEMPERATURE"))
RETRIEVER_SEARCH_K = int(os.getenv("RETRIEVER_SEARCH_K"))
VECTORSTORE_DIRECTORY = os.getenv("VECTORSTORE_DIRECTORY")
vectorstore = Chroma(
persist_directory=VECTORSTORE_DIRECTORY,
embedding_function=HuggingFaceEmbeddings(model_name=EMBEDDING_MODEL)
)
retriever = vectorstore.as_retriever(search_type="similarity", search_kwargs={"k": RETRIEVER_SEARCH_K})
print(f"✅ Vectorstore siap: {EMBEDDING_MODEL}", flush=True)
llm = ChatGoogleGenerativeAI(
model=GEMINI_MODEL,
google_api_key=GOOGLE_API_KEY,
temperature=LLM_TEMPERATURE,
convert_system_message_to_human=True
)
print(f"✅ LLM siap: {GEMINI_MODEL}", flush=True)
# ===================================================================
# RECAPTCHA CONFIG
# ===================================================================
RECAPTCHA_SITE_KEY = os.getenv("RECAPTCHA_SITE_KEY")
RECAPTCHA_SECRET_KEY = os.getenv("RECAPTCHA_SECRET_KEY")
def verify_recaptcha(token, ip):
if not RECAPTCHA_SECRET_KEY:
return True
try:
r = requests.post(
"https://www.google.com/recaptcha/api/siteverify",
data={"secret": RECAPTCHA_SECRET_KEY, "response": token, "remoteip": ip}
)
return r.json().get("success", False)
except:
return False
# ===================================================================
# CUSTOM RATE LIMIT FUNCTION
# ===================================================================
def check_custom_rate_limit(ip):
if not REDIS_ENABLED:
return "ok"
try:
# 1. Cek limit per menit untuk setiap IP
minute_key = f"ip_minute_limit:{ip}"
count_minute = redis_client.incr(minute_key)
if count_minute == 1:
redis_client.expire(minute_key, 60)
if count_minute > IP_MINUTE_LIMIT:
return "rate_limited_ip"
# 2. Cek global limit
global_key = "global_rate_limit"
count_global = redis_client.incr(global_key)
if count_global == 1:
redis_client.expire(global_key, 3600)
if count_global > GLOBAL_LIMIT_PER_HOUR:
return "rate_limited_global"
# 3. Cek IP burst untuk memicu CAPTCHA
ip_key = f"ip_rate_limit:{ip}"
now = time.time()
redis_client.zremrangebyscore(ip_key, 0, now - IP_BURST_SECONDS)
unique_member = f"{now}:{os.urandom(4).hex()}"
redis_client.zadd(ip_key, {unique_member: now})
if redis_client.zcard(ip_key) > IP_BURST_LIMIT:
return "captcha_required"
return "ok"
except Exception as e:
print(f"⚠️ Error saat memeriksa rate limit Redis: {e}", flush=True)
return "ok"
# ===================================================================
# SETUP RAG CHAIN
# ===================================================================
contextualize_q_prompt = ChatPromptTemplate.from_messages([
("system",
"Anda adalah asisten untuk tugas reformulasi pertanyaan. "
"Rumuskan pertanyaan yang dapat berdiri sendiri berdasarkan riwayat percakapan. "
"JANGAN menjawab pertanyaan, hanya reformulasi jika diperlukan."),
MessagesPlaceholder("chat_history"),
("human", "{input}")
])
history_aware_retriever = create_history_aware_retriever(llm, retriever, contextualize_q_prompt)
qa_system_prompt = (
"Anda adalah layanan publik digital asisten KPU Provinsi Papua Barat Daya bernama LPDA. "
"Gunakan semua informasi dalam data yang Anda miliki untuk menjawab dengan akurat. "
"Jangan katakan data berasal dari sumber lain. "
"Fokus Anda adalah pada seluruh data yang dikelola oleh KPU Provinsi Papua Barat Daya. "
"Jika data berupa angka, sebutkan angkanya dengan akurat. "
"Jika tidak tahu jawabannya, katakan belum memiliki datanya. "
"Berikan jawaban singkat, sopan, netral, dan akurat berdasarkan pertanyaan. "
"{context}"
)
qa_prompt = ChatPromptTemplate.from_messages([
("system", qa_system_prompt),
MessagesPlaceholder("chat_history"),
("human", "{input}")
])
question_answer_chain = create_stuff_documents_chain(llm, qa_prompt)
rag_chain = create_retrieval_chain(history_aware_retriever, question_answer_chain)
print("✅ RAG Chain siap digunakan", flush=True)
# ===================================================================
# MIDDLEWARE UNTUK RATE LIMIT (PENGECEKAN DI AWAL) - BARU
# ===================================================================
@app.before_request
def rate_limit_before_request():
# Pengecekan ini hanya berlaku untuk endpoint 'get_response' (/get)
if request.endpoint == 'get_response':
user_ip = get_user_ip()
captcha_token = request.args.get('captcha_token')
# Jangan jalankan limit jika pengguna sedang mengirimkan token captcha
if captcha_token:
return
# Jalankan pengecekan rate limit
status = check_custom_rate_limit(user_ip)
if status == "captcha_required":
session['captcha_required'] = True
return jsonify({"captcha_required": True, "reason": "Rate limit triggered"})
elif status == "rate_limited_ip":
return jsonify({"error": "Maaf, Anda telah melebihi batas permintaan. Silakan tunggu beberapa saat sebelum mencoba lagi."}), 429
elif status == "rate_limited_global":
return jsonify({"error": "Mohon maaf, sistem sedang menerima banyak permintaan. Silakan coba lagi nanti."}), 429
# ===================================================================
# FLASK ROUTES
# ===================================================================
@app.route('/')
def home():
return render_template('index.html')
@app.route('/lpda')
def lpda():
captcha_on_load = session.get('captcha_required', False)
return render_template(
'lpda.html',
recaptcha_site_key=RECAPTCHA_SITE_KEY,
captcha_on_load=captcha_on_load
)
@app.route('/get', methods=['GET'])
def get_response():
if not rag_chain:
return jsonify({"error": "RAG Chain tidak terinisialisasi."}), 500
user_message = request.args.get('msg')
user_ip = get_user_ip()
captcha_token = request.args.get('captcha_token')
# Cek session captcha (masih diperlukan jika halaman di-reload)
if session.get('captcha_required') and not captcha_token:
return jsonify({"captcha_required": True, "reason": "Session flagged"})
# Verifikasi CAPTCHA jika ada
if captcha_token:
if not verify_recaptcha(captcha_token, user_ip):
return jsonify({"error": "Verifikasi CAPTCHA gagal."}), 400
session.pop('captcha_required', None)
if REDIS_ENABLED:
try:
# Reset burst limit setelah CAPTCHA berhasil
redis_client.delete(f"ip_rate_limit:{user_ip}")
except Exception as e:
print(f"⚠️ Gagal reset burst limit untuk IP {user_ip}: {e}", flush=True)
# Chat history
chat_history_from_session = session.get("chat_history", [])
langchain_history = [
HumanMessage(content=m["message"]) if m["sender"]=="user" else AIMessage(content=m["message"])
for m in chat_history_from_session
]
# Cache key
context_hash = hashlib.sha256(json.dumps(chat_history_from_session, sort_keys=True).encode()).hexdigest()
cache_key = hashlib.sha256(f"{user_message}:{context_hash}".encode()).hexdigest()
# Cek cache
bot_response = None
if REDIS_ENABLED:
try:
cached = redis_client.get(cache_key)
if cached:
bot_response = cached
except Exception as e:
print(f"⚠️ Gagal mengambil dari cache Redis: {e}", flush=True)
# Query RAG Chain jika tidak ada di cache
if not bot_response:
try:
res = rag_chain.invoke({"input": user_message, "chat_history": langchain_history})
bot_response = res.get("answer", "").strip()
except Exception as e:
print(f"Error invoking RAG chain: {e}", flush=True)
bot_response = "Sistem sedang sibuk, coba lagi nanti."
if not bot_response or "belum memiliki data" in bot_response.lower():
bot_response = (
"Mohon maaf data untuk pertanyaan tersebut belum tersedia dalam sistem. "
"Silakan cek situs resmi KPU Papua Barat Daya untuk informasi lebih lengkap: "
"<a href='https://papuabaratdaya.kpu.go.id/' target='_blank' "
"style='color:#fafbfc !important; text-decoration: underline;'>"
"https://papuabaratdaya.kpu.go.id/</a>."
)
if REDIS_ENABLED:
try:
redis_client.setex(cache_key, CACHE_EXPIRATION_SECONDS, bot_response)
except Exception as e:
print(f"⚠️ Gagal menyimpan ke cache Redis: {e}", flush=True)
# Simpan ke session & log
session["chat_history"] = chat_history_from_session + [
{"sender":"user","message":user_message},
{"sender":"bot","message":bot_response}
]
session.modified = True
log_message(user_ip, "user", user_message)
log_message(user_ip, "bot", bot_response)
return jsonify(bot_response)
@app.route('/load_history', methods=['GET'])
def load_history():
return jsonify(session.get("chat_history", []))
@app.route('/clear_history', methods=['POST'])
def clear_history():
session.pop("chat_history", None)
return jsonify({"status": "success", "message": "Riwayat percakapan dihapus"})
# ===================================================================
# RUN FLASK
# ===================================================================
if __name__ == '__main__':
print("🚀 Sistem siap di http://127.0.0.1:5000/", flush=True)
app.run(debug=FLASK_DEBUG)