|
|
| import os |
| import re |
| import csv |
| import uuid |
| import html |
| import json |
| import fitz |
| from pathlib import Path |
| from datetime import datetime |
| from typing import TypedDict, Literal |
|
|
| import streamlit as st |
| import streamlit.components.v1 as components |
| from openai import OpenAI |
|
|
| from langchain_community.document_loaders import PyMuPDFLoader |
| from langchain_text_splitters import RecursiveCharacterTextSplitter |
| from langchain_openai import OpenAIEmbeddings, ChatOpenAI |
| from langchain_community.vectorstores import FAISS |
| from langchain_core.prompts import ChatPromptTemplate |
| from langchain_core.output_parsers import StrOutputParser |
| from langchain_core.documents import Document |
|
|
| from langgraph.graph import StateGraph, END |
|
|
|
|
| |
| |
| |
| st.set_page_config( |
| page_title="AIVLE νμ΅λμ°λ―Έ", |
| page_icon="π€", |
| layout="wide" |
| ) |
|
|
| BASE_DIR = Path(__file__).resolve().parent |
| |
| |
| PDF_CANDIDATES = [ |
| BASE_DIR / "AIVLE_School_λ°±μ_ν΅ν©λ³Έ.pdf", |
| ] |
|
|
| PDF_PATH = next((path for path in PDF_CANDIDATES if path.exists()), PDF_CANDIDATES[0]) |
| UPLOAD_DIR = BASE_DIR / "uploads" |
| TEMP_DIR = BASE_DIR / "temp" |
| IMAGE_BANNER_PATH = BASE_DIR / "images" / "μ΄λ―Έμ§.png" |
| LOGO_PATH = BASE_DIR / "images" / "μμ΄λΈνμ΅λμ°λ―Έ λ‘κ³ .png" |
|
|
| UPLOAD_DIR.mkdir(exist_ok=True) |
| TEMP_DIR.mkdir(exist_ok=True) |
|
|
| client = OpenAI() |
|
|
|
|
| |
| |
| |
| st.markdown(""" |
| <style> |
| .main .block-container { |
| padding-top: 2rem; |
| max-width: 1100px; |
| } |
| |
| .hero-box { |
| background: #d9f3f2; |
| border-radius: 22px; |
| padding: 34px 38px; |
| margin-bottom: 28px; |
| display: flex; |
| justify-content: space-between; |
| align-items: center; |
| } |
| |
| .hero-title { |
| font-size: 30px; |
| font-weight: 800; |
| color: #111827; |
| margin-bottom: 10px; |
| } |
| |
| .hero-sub { |
| font-size: 15px; |
| color: #6b7280; |
| } |
| |
| .robot { |
| font-size: 72px; |
| } |
| |
| .section-title { |
| font-size: 17px; |
| font-weight: 800; |
| margin: 20px 0 12px 0; |
| } |
| |
| .login-lock-box { |
| background: #f9fafb; |
| border: 1px solid #e5e7eb; |
| border-radius: 18px; |
| padding: 36px; |
| text-align: center; |
| margin-top: 80px; |
| } |
| |
| .login-lock-title { |
| font-size: 26px; |
| font-weight: 800; |
| margin-bottom: 10px; |
| } |
| |
| .login-lock-sub { |
| color: #6b7280; |
| font-size: 15px; |
| } |
| |
| section[data-testid="stSidebar"] { |
| background-color: #f8fafc; |
| } |
| |
| .sidebar-title { |
| font-size: 22px; |
| font-weight: 900; |
| color: #111827; |
| margin-bottom: 4px; |
| } |
| |
| .sidebar-subtitle { |
| font-size: 13px; |
| color: #6b7280; |
| line-height: 1.5; |
| margin-bottom: 18px; |
| } |
| |
| .account-card { |
| background: #ecfdf5; |
| border: 1px solid #d1fae5; |
| border-radius: 16px; |
| padding: 14px 16px; |
| margin-bottom: 12px; |
| } |
| |
| .account-name { |
| font-size: 15px; |
| font-weight: 800; |
| color: #065f46; |
| } |
| |
| .account-status { |
| font-size: 12px; |
| color: #059669; |
| margin-top: 4px; |
| } |
| |
| .sidebar-section-title { |
| font-size: 13px; |
| font-weight: 800; |
| color: #374151; |
| margin-top: 22px; |
| margin-bottom: 8px; |
| } |
| |
| .history-caption { |
| font-size: 11px; |
| color: #9ca3af; |
| margin-top: -6px; |
| margin-bottom: 8px; |
| } |
| |
| .sidebar-help { |
| font-size: 12px; |
| color: #9ca3af; |
| line-height: 1.5; |
| } |
| |
| .chat-wrap { |
| border-top: 1px solid #e5e7eb; |
| margin-top: 28px; |
| padding-top: 26px; |
| } |
| |
| .user-bubble { |
| background: #d9f3f2; |
| padding: 14px 18px; |
| border-radius: 18px; |
| width: fit-content; |
| max-width: 70%; |
| margin-left: auto; |
| margin-bottom: 14px; |
| font-weight: 500; |
| line-height: 1.6; |
| word-break: break-word; |
| } |
| |
| .assistant-card { |
| background: white; |
| border: 1px solid #e5e7eb; |
| border-radius: 18px; |
| padding: 20px; |
| margin-top: 8px; |
| margin-bottom: 8px; |
| max-width: 780px; |
| box-shadow: 0 2px 10px rgba(0,0,0,0.04); |
| line-height: 1.7; |
| word-break: break-word; |
| } |
| |
| .assistant-name { |
| font-size: 14px; |
| color: #6b7280; |
| font-weight: 700; |
| margin-bottom: 8px; |
| } |
| |
| .message-tools { |
| display: flex; |
| gap: 8px; |
| margin-bottom: 18px; |
| } |
| </style> |
| """, unsafe_allow_html=True) |
|
|
|
|
| |
| |
| |
| def init_session_state(): |
| if "logged_in" not in st.session_state: |
| st.session_state.logged_in = False |
|
|
| if "user_id" not in st.session_state: |
| st.session_state.user_id = "" |
|
|
| if "user_name" not in st.session_state: |
| st.session_state.user_name = "" |
|
|
| if "page" not in st.session_state: |
| st.session_state.page = "home" |
|
|
| if "chats" not in st.session_state: |
| st.session_state.chats = {} |
|
|
| if "current_chat_id" not in st.session_state: |
| st.session_state.current_chat_id = None |
|
|
| if "recommended_questions" not in st.session_state: |
| st.session_state.recommended_questions = [ |
| "μΆμ μΈμ μμ²μ μ΄λ»κ² νλμ?", |
| "κ°μ λ€μ보기λ₯Ό ν μ μλμ?", |
| "νλ ¨μ₯λ €κΈμ μΈμ μ§κΈλλμ?", |
| "κ°μΈ ν¬νΈν΄λ¦¬μ€λ‘ νμ©ν μ μλ λ²μκ° μ΄λ»κ² λλμ?" |
| ] |
|
|
| if "rag_upload_signature" not in st.session_state: |
| st.session_state.rag_upload_signature = None |
|
|
| if "retriever" not in st.session_state: |
| st.session_state.retriever = None |
|
|
| if "rag_chain" not in st.session_state: |
| st.session_state.rag_chain = None |
|
|
| if "format_docs" not in st.session_state: |
| st.session_state.format_docs = None |
|
|
|
|
| init_session_state() |
|
|
|
|
| |
| |
| |
| def create_new_chat(): |
| chat_id = str(uuid.uuid4()) |
|
|
| st.session_state.chats[chat_id] = { |
| "title": "μ λν", |
| "created_at": datetime.now().strftime("%Y-%m-%d %H:%M"), |
| "messages": [] |
| } |
|
|
| st.session_state.current_chat_id = chat_id |
| st.session_state.page = "home" |
|
|
|
|
| def get_current_messages(): |
| if st.session_state.current_chat_id is None: |
| create_new_chat() |
|
|
| return st.session_state.chats[st.session_state.current_chat_id]["messages"] |
|
|
|
|
| def update_chat_title(question): |
| chat_id = st.session_state.current_chat_id |
|
|
| if chat_id is None: |
| return |
|
|
| current_title = st.session_state.chats[chat_id]["title"] |
|
|
| if current_title == "μ λν": |
| title = question.strip() |
| if len(title) > 24: |
| title = title[:24] + "..." |
| st.session_state.chats[chat_id]["title"] = title |
|
|
|
|
| def delete_chat(chat_id): |
| if chat_id in st.session_state.chats: |
| del st.session_state.chats[chat_id] |
|
|
| if st.session_state.current_chat_id == chat_id: |
| if st.session_state.chats: |
| latest_chat_id = list(st.session_state.chats.keys())[-1] |
| st.session_state.current_chat_id = latest_chat_id |
| else: |
| create_new_chat() |
|
|
| st.session_state.page = "home" |
|
|
|
|
| |
| |
| |
| def safe_filename(filename): |
| filename = re.sub(r"[^κ°-ν£a-zA-Z0-9_.-]", "_", filename) |
| return filename |
|
|
|
|
| def load_txt_file(file_path): |
| try: |
| text = file_path.read_text(encoding="utf-8") |
| except UnicodeDecodeError: |
| text = file_path.read_text(encoding="cp949") |
|
|
| return [ |
| Document( |
| page_content=text, |
| metadata={"source": str(file_path)} |
| ) |
| ] |
|
|
|
|
| def load_csv_file(file_path): |
| rows = [] |
|
|
| try: |
| f = open(file_path, "r", encoding="utf-8") |
| except UnicodeDecodeError: |
| f = open(file_path, "r", encoding="cp949") |
|
|
| with f: |
| reader = csv.reader(f) |
| for row in reader: |
| rows.append(" | ".join(row)) |
|
|
| text = "\n".join(rows) |
|
|
| return [ |
| Document( |
| page_content=text, |
| metadata={"source": str(file_path)} |
| ) |
| ] |
|
|
|
|
| def load_pdf_file(file_path): |
| loader = PyMuPDFLoader(str(file_path)) |
| return loader.load() |
|
|
|
|
| def load_faq_from_pdf(pdf_path): |
| """ |
| μ
λ‘λλ ν΅ν© λ°±μ PDF μμ μ 4μ₯ FAQ νλ₯Ό μ½μ΄μ |
| category/question/answer κ΅¬μ‘°λ‘ λ³ννλ€. |
| |
| μ΄ ν¨μλ λ³λ FAQ PDFλ₯Ό μ¬μ©νμ§ μλλ€. |
| νμ¬ νμΌμ²λΌ FAQκ° 18~31μͺ½μ μ¬λ¬ νλ‘ λλμ΄ μκ³ , |
| μΌλΆ νμ΄μ§μλ ν ν€λκ° λ°λ³΅λμ§ μκ±°λ λ΅λ³μ΄ λ€μ νμ΄μ§λ‘ μ΄μ΄μ§λ κ²½μ°κΉμ§ μ²λ¦¬νλ€. |
| """ |
| if not pdf_path.exists(): |
| st.warning(f"ν΅ν© λ°±μ PDF νμΌμ μ°Ύμ μ μμ΅λλ€: {pdf_path}") |
| return [] |
|
|
| faq_data = [] |
|
|
| |
| |
| valid_categories = { |
| "λͺ¨μ§/μ λ°", |
| "μΆμ", |
| "κ°μ", |
| "KDT", |
| "AIVLE-EDU", |
| "κ΅μ‘μ₯", |
| "κΈ°ν", |
| "μ½λ© νμ΅", |
| "λ
ΈνΈλΆ", |
| "κ΅λ―Όμ·¨μ
μ λ", |
| "μ·¨μ
μ§μ/μ±μ©μ°κ³", |
| "νμ΅", |
| "νλ‘μ νΈ", |
| } |
|
|
| def clean_text(text): |
| if text is None: |
| return "" |
|
|
| text = str(text).replace("\n", " ") |
| text = re.sub(r"\s+", " ", text) |
| return text.strip() |
|
|
| def normalize_category(text): |
| category = clean_text(text) |
|
|
| |
| category = category.replace("κ΅λ―Όμ·¨μ
μ λ", "κ΅λ―Όμ·¨μ
μ λ") |
| category = category.replace("μ·¨μ
μ§μ/ μ±μ©μ°κ³", "μ·¨μ
μ§μ/μ±μ©μ°κ³") |
|
|
| return category |
|
|
| try: |
| doc = fitz.open(str(pdf_path)) |
|
|
| in_faq_section = False |
| last_item = None |
|
|
| for page in doc: |
| page_text = page.get_text("text") |
|
|
| |
| if ( |
| "FAQ λ μ§μ μ κΆκΈμ¦" in page_text |
| or ( |
| re.search(r"μ \s*4\s*μ₯\.?\s*μμ£Ό 묻λ μ§λ¬Έ", page_text) |
| and "λͺ© μ°¨" not in page_text |
| and page.number > 5 |
| ) |
| ): |
| in_faq_section = True |
|
|
| if not in_faq_section: |
| continue |
|
|
| tables = page.find_tables() |
|
|
| for table in tables: |
| rows = table.extract() |
|
|
| for row in rows: |
| |
| |
| cells = [clean_text(cell) for cell in row if clean_text(cell)] |
|
|
| if not cells: |
| continue |
|
|
| |
| if cells == ["ꡬλΆ", "μ§λ¬Έ", "λ΅λ³"]: |
| continue |
|
|
| |
| |
| if len(cells) == 1: |
| if last_item and not cells[0].startswith("KT AIVLE"): |
| last_item["answer"] = f"{last_item['answer']}\n{cells[0]}".strip() |
| continue |
|
|
| if len(cells) < 3: |
| continue |
|
|
| category = normalize_category(cells[0]) |
| question = clean_text(cells[1]) |
| answer = clean_text(cells[2]) |
|
|
| if category not in valid_categories: |
| continue |
|
|
| if not question or not answer: |
| continue |
|
|
| if question == "μ§λ¬Έ" or answer == "λ΅λ³": |
| continue |
|
|
| item = { |
| "category": category, |
| "question": question, |
| "answer": answer |
| } |
|
|
| faq_data.append(item) |
| last_item = item |
|
|
| |
| |
| |
| if re.search(r"μ \s*5\s*μ₯\.?\s*μ λ°°", page_text): |
| break |
|
|
| doc.close() |
|
|
| except Exception as e: |
| st.warning(f"ν΅ν© λ°±μμμ FAQ ν μΆμΆ μ€ μ€λ₯κ° λ°μνμ΅λλ€: {e}") |
| return [] |
|
|
| |
| unique_faq_data = [] |
| seen_questions = set() |
|
|
| for item in faq_data: |
| normalized_question = re.sub(r"\s+", " ", item["question"]).strip() |
|
|
| if normalized_question in seen_questions: |
| continue |
|
|
| seen_questions.add(normalized_question) |
| unique_faq_data.append(item) |
|
|
| return unique_faq_data |
|
|
|
|
| def save_uploaded_files(uploaded_files): |
| saved_paths = [] |
|
|
| if not uploaded_files: |
| return saved_paths |
|
|
| for uploaded_file in uploaded_files: |
| filename = safe_filename(uploaded_file.name) |
| save_path = UPLOAD_DIR / filename |
|
|
| with open(save_path, "wb") as f: |
| f.write(uploaded_file.getbuffer()) |
|
|
| saved_paths.append(save_path) |
|
|
| return saved_paths |
|
|
|
|
| def load_all_documents(uploaded_files): |
| docs = [] |
|
|
| if PDF_PATH.exists(): |
| docs.extend(load_pdf_file(PDF_PATH)) |
| else: |
| st.warning(f"κΈ°λ³Έ λ°±μ PDFλ₯Ό μ°Ύμ μ μμ΅λλ€: {PDF_PATH}") |
|
|
| |
| |
| saved_paths = save_uploaded_files(uploaded_files) |
|
|
| for path in saved_paths: |
| suffix = path.suffix.lower() |
|
|
| if suffix == ".pdf": |
| docs.extend(load_pdf_file(path)) |
| elif suffix == ".txt": |
| docs.extend(load_txt_file(path)) |
| elif suffix == ".csv": |
| docs.extend(load_csv_file(path)) |
|
|
| return docs |
|
|
|
|
| def get_upload_signature(uploaded_files): |
| if not uploaded_files: |
| return "no_upload" |
|
|
| return "|".join([f"{f.name}:{f.size}" for f in uploaded_files]) |
|
|
|
|
| FAQ_DATA = load_faq_from_pdf(PDF_PATH) |
|
|
| if not FAQ_DATA: |
| st.warning("ν΅ν© λ°±μμμ FAQ λ°μ΄ν°λ₯Ό λΆλ¬μ€μ§ λͺ»νμ΅λλ€. FAQ ν ꡬ쑰λ₯Ό νμΈνμΈμ.") |
|
|
|
|
| |
| |
| |
| def build_rag(uploaded_files): |
| docs = load_all_documents(uploaded_files) |
|
|
| if not docs: |
| st.error("RAGμ μ¬μ©ν λ¬Έμκ° μμ΅λλ€. κΈ°λ³Έ λ°±μ PDF λλ μ
λ‘λ νμΌμ νμΈνμΈμ.") |
| st.stop() |
|
|
| splitter = RecursiveCharacterTextSplitter( |
| chunk_size=1200, |
| chunk_overlap=200, |
| separators=["\n\n", "\n", ".", " ", ""] |
| ) |
|
|
| chunks = splitter.split_documents(docs) |
| embedding = OpenAIEmbeddings(model="text-embedding-3-small") |
| vectorstore = FAISS.from_documents(chunks, embedding) |
|
|
| retriever = vectorstore.as_retriever( |
| search_kwargs={"k": 8} |
| ) |
|
|
| llm = ChatOpenAI( |
| model="gpt-4o-mini", |
| temperature=0 |
| ) |
|
|
| prompt = ChatPromptTemplate.from_template(""" |
| λΉμ μ AIVLE School λ°±μ, FAQ, μ¬μ©μκ° μ
λ‘λν μλ£λ₯Ό κΈ°λ°μΌλ‘ λ΅λ³νλ νμ΅λμ°λ―Έ μ±λ΄μ
λλ€. |
| |
| κ·μΉ: |
| 1. λ°λμ [λ¬Έμ λ΄μ©]μ κ·Όκ±°ν΄μ λ΅λ³νμΈμ. |
| 2. λ¬Έμμμ νμΈλμ§ μλ λ΄μ©μ "λ¬Έμμμ νμΈλμ§ μμ΅λλ€."λΌκ³ λ΅νμΈμ. |
| 3. νκ΅μ΄λ‘ μΉμ νκ³ κ°κ²°νκ² λ΅λ³νμΈμ. |
| 4. μ§λ¬Έκ³Ό μ§μ κ΄λ ¨λ λ΄μ©λ§ λ΅λ³νμΈμ. |
| |
| [λ¬Έμ λ΄μ©] |
| {context} |
| |
| [μ§λ¬Έ] |
| {question} |
| """) |
|
|
| chain = prompt | llm | StrOutputParser() |
|
|
| def format_docs(docs): |
| return "\n\n".join(doc.page_content for doc in docs) |
|
|
| return retriever, chain, format_docs |
|
|
|
|
| def ensure_rag_ready(uploaded_files): |
| current_signature = get_upload_signature(uploaded_files) |
|
|
| if ( |
| st.session_state.retriever is None |
| or st.session_state.rag_chain is None |
| or st.session_state.rag_upload_signature != current_signature |
| ): |
| with st.spinner("λ¬Έμλ₯Ό μ½κ³ AI κ²μ μΈλ±μ€λ₯Ό μ€λΉνλ μ€μ
λλ€..."): |
| retriever, rag_chain, format_docs = build_rag(uploaded_files) |
|
|
| st.session_state.retriever = retriever |
| st.session_state.rag_chain = rag_chain |
| st.session_state.format_docs = format_docs |
| st.session_state.rag_upload_signature = current_signature |
|
|
|
|
| |
| |
| |
| @st.cache_resource |
| def build_faq_retriever(): |
| faq_docs = [] |
|
|
| for item in FAQ_DATA: |
| faq_docs.append( |
| Document( |
| page_content=item["question"], |
| metadata={ |
| "category": item["category"], |
| "answer": item["answer"] |
| } |
| ) |
| ) |
|
|
| if not faq_docs: |
| return None |
|
|
| embedding = OpenAIEmbeddings(model="text-embedding-3-small") |
| faq_vectorstore = FAISS.from_documents(faq_docs, embedding) |
|
|
| return faq_vectorstore.as_retriever(search_kwargs={"k": 4}) |
|
|
|
|
| def recommend_questions(user_question): |
| try: |
| faq_retriever = build_faq_retriever() |
|
|
| if faq_retriever is None: |
| return [] |
|
|
| docs = faq_retriever.invoke(user_question) |
| recommended = [] |
|
|
| for doc in docs: |
| q = doc.page_content.strip() |
| if q and q not in recommended: |
| recommended.append(q) |
|
|
| return recommended[:4] |
|
|
| except Exception as e: |
| st.warning(f"μΆμ²μ§λ¬Έ μμ± μ€ μ€λ₯κ° λ°μνμ΅λλ€: {e}") |
| return [] |
|
|
|
|
| |
| |
| |
| class ChatState(TypedDict): |
| question: str |
| route: str |
| context: str |
| answer: str |
|
|
|
|
| def classify_question_node(state: ChatState) -> ChatState: |
| question = state["question"] |
|
|
| llm = ChatOpenAI( |
| model="gpt-4o-mini", |
| temperature=0 |
| ) |
|
|
| prompt = ChatPromptTemplate.from_template(""" |
| λΉμ μ μ¬μ©μ μ§λ¬Έμ λΆλ₯νλ λΌμ°ν°μ
λλ€. |
| |
| μλ κΈ°μ€μ λ°λΌ μ§λ¬Έμ λ°λμ λ μ€ νλλ‘λ§ λΆλ₯νμΈμ. |
| |
| [AIVLE] |
| - KT AIVLE School, μμ΄λΈμ€μΏ¨, μμ΄λΈλ¬ κ΄λ ¨ μ§λ¬Έ |
| - μΆμ, κ²°μ, μ§κ°, μ‘°ν΄, μΈμΆ, μΆμ μΈμ κ΄λ ¨ μ§λ¬Έ |
| - κ°μ, λ€μ보기, κ΅μ‘ μΌμ , 체ν¬μΈ/체ν¬μμ κ΄λ ¨ μ§λ¬Έ |
| - KDT, νλ ¨μ₯λ €κΈ, λ΄μΌλ°°μμΉ΄λ, κ΅λ―Όμ·¨μ
μ§μμ λ κ΄λ ¨ μ§λ¬Έ |
| - κ΅μ‘μ₯, λ
ΈνΈλΆ, μ½λ©λ§μ€ν°μ€, μ½λ© νμ΅ νλ«νΌ κ΄λ ¨ μ§λ¬Έ |
| - μ±μ©μ°κ³, ν¬νΈν΄λ¦¬μ€, AX μ±λ¦°μ§ λ± μμ΄λΈμ€μΏ¨ μ λ κ΄λ ¨ μ§λ¬Έ |
| - μ¬μ©μκ° μ
λ‘λν λ°±μ/FAQ/λ¬Έμμμ λ΅ν΄μΌ ν μ§λ¬Έ |
| |
| [GENERAL] |
| - μμ 무κ΄ν μΌλ° μ§μ, μ½λ©, μν, μμ, 건κ°, λ¬Έμ μμ±, λ²μ, μλ΄ μ§λ¬Έ |
| - AIVLE λ¬Έμ κ·Όκ±°κ° νμ μλ μΌλ° μ§λ¬Έ |
| |
| μΆλ ₯ κ·μΉ: |
| - AIVLE κ΄λ ¨μ΄λ©΄ AIVLE |
| - μΌλ° μ§λ¬Έμ΄λ©΄ GENERAL |
| - λ€λ₯Έ μ€λͺ
μμ΄ λ¨μ΄ νλλ§ μΆλ ₯ |
| |
| [μ¬μ©μ μ§λ¬Έ] |
| {question} |
| """) |
|
|
| chain = prompt | llm | StrOutputParser() |
|
|
| route = chain.invoke({ |
| "question": question |
| }).strip().upper() |
|
|
| if "AIVLE" in route: |
| state["route"] = "aivle" |
| else: |
| state["route"] = "general" |
|
|
| return state |
|
|
|
|
| def route_condition(state: ChatState) -> Literal["aivle", "general"]: |
| return state["route"] |
|
|
|
|
| def aivle_rag_node(state: ChatState) -> ChatState: |
| question = state["question"] |
|
|
| docs = st.session_state.retriever.invoke(question) |
| context = st.session_state.format_docs(docs) |
|
|
| answer = st.session_state.rag_chain.invoke({ |
| "context": context, |
| "question": question |
| }) |
|
|
| state["context"] = context |
| state["answer"] = answer |
|
|
| return state |
|
|
|
|
| def general_llm_node(state: ChatState) -> ChatState: |
| question = state["question"] |
|
|
| llm = ChatOpenAI( |
| model="gpt-4o-mini", |
| temperature=0.3 |
| ) |
|
|
| prompt = ChatPromptTemplate.from_template(""" |
| λΉμ μ μΉμ ν AI νμ΅ λμ°λ―Έμ
λλ€. |
| μ¬μ©μμ μ§λ¬Έμ λν΄ νκ΅μ΄λ‘ μμ°μ€λ½κ³ μ΄ν΄νκΈ° μ½κ² λ΅λ³νμΈμ. |
| |
| [μ¬μ©μ μ§λ¬Έ] |
| {question} |
| """) |
|
|
| chain = prompt | llm | StrOutputParser() |
|
|
| answer = chain.invoke({ |
| "question": question |
| }) |
|
|
| state["context"] = "" |
| state["answer"] = answer |
|
|
| return state |
|
|
|
|
| def build_question_graph(): |
| graph = StateGraph(ChatState) |
|
|
| graph.add_node("classify", classify_question_node) |
| graph.add_node("aivle_rag", aivle_rag_node) |
| graph.add_node("general_llm", general_llm_node) |
|
|
| graph.set_entry_point("classify") |
|
|
| graph.add_conditional_edges( |
| "classify", |
| route_condition, |
| { |
| "aivle": "aivle_rag", |
| "general": "general_llm" |
| } |
| ) |
|
|
| graph.add_edge("aivle_rag", END) |
| graph.add_edge("general_llm", END) |
|
|
| return graph.compile() |
|
|
|
|
| def run_langgraph_answer(question): |
| app = build_question_graph() |
|
|
| result = app.invoke({ |
| "question": question, |
| "route": "", |
| "context": "", |
| "answer": "" |
| }) |
|
|
| return result |
|
|
|
|
| |
| |
| |
| def copy_button(text, key): |
| text_json = json.dumps(text, ensure_ascii=False) |
|
|
| components.html( |
| f""" |
| <button onclick='navigator.clipboard.writeText({text_json})' |
| style=" |
| margin-top:4px; |
| padding:6px 12px; |
| border-radius:8px; |
| border:1px solid #d1d5db; |
| background:white; |
| cursor:pointer; |
| font-size:13px; |
| "> |
| π λ³΅μ¬ |
| </button> |
| """, |
| height=40 |
| ) |
|
|
|
|
| def generate_tts(text, filename): |
| speech_file = TEMP_DIR / f"{filename}.mp3" |
|
|
| response = client.audio.speech.create( |
| model="gpt-4o-mini-tts", |
| voice="nova", |
| input=text |
| ) |
|
|
| response.stream_to_file(str(speech_file)) |
| return str(speech_file) |
|
|
|
|
| def transcribe_audio(audio_file): |
| transcript = client.audio.transcriptions.create( |
| model="whisper-1", |
| file=audio_file |
| ) |
|
|
| return transcript.text |
|
|
|
|
| def render_text_for_html(text): |
| return html.escape(text).replace("\n", "<br>") |
|
|
|
|
| def answer_question(question): |
| messages = get_current_messages() |
|
|
| messages.append({ |
| "role": "user", |
| "content": question |
| }) |
|
|
| update_chat_title(question) |
|
|
| result = run_langgraph_answer(question) |
|
|
| answer = result["answer"] |
| route = result["route"] |
|
|
| if route == "aivle": |
| route_label = "π FAQ/λ°±μ κΈ°λ° λ΅λ³" |
| else: |
| route_label = "π¬ μΌλ° AI λ΅λ³" |
|
|
| final_answer = f"{route_label}\n\n{answer}" |
|
|
| messages.append({ |
| "role": "assistant", |
| "content": final_answer |
| }) |
|
|
| if route == "aivle": |
| st.session_state.recommended_questions = recommend_questions(question) |
| else: |
| st.session_state.recommended_questions = [] |
|
|
|
|
| |
| |
| |
| uploaded_files = None |
|
|
| with st.sidebar: |
| if LOGO_PATH.exists(): |
| st.image(str(LOGO_PATH), width=240) |
| else: |
| st.markdown('<div class="sidebar-title">AIVLE νμ΅λμ°λ―Έ</div>', unsafe_allow_html=True) |
|
|
| st.markdown( |
| '<div class="sidebar-subtitle">λ°±μμ μ
λ‘λ μλ£λ₯Ό κΈ°λ°μΌλ‘<br>νμ΅ μ§λ¬Έμ λ΅λ³νλ AI μ±λ΄μ
λλ€.</div>', |
| unsafe_allow_html=True |
| ) |
|
|
| st.markdown('<div class="sidebar-section-title">π€ κ³μ </div>', unsafe_allow_html=True) |
|
|
| if not st.session_state.logged_in: |
| with st.form("login_form", clear_on_submit=False): |
| login_name = st.text_input("μ΄λ¦", placeholder="μ΄λ¦μ μ
λ ₯νμΈμ") |
| login_id = st.text_input("μμ΄λ", placeholder="μμ΄λλ₯Ό μ
λ ₯νμΈμ") |
| login_pw = st.text_input("λΉλ°λ²νΈ", type="password", placeholder="λΉλ°λ²νΈλ₯Ό μ
λ ₯νμΈμ") |
|
|
| login_btn = st.form_submit_button("λ‘κ·ΈμΈ", use_container_width=True) |
|
|
| if login_btn: |
| if login_name.strip() and login_id.strip() and login_pw.strip(): |
| st.session_state.logged_in = True |
| st.session_state.user_name = login_name.strip() |
| st.session_state.user_id = login_id.strip() |
|
|
| if st.session_state.current_chat_id is None: |
| create_new_chat() |
|
|
| st.rerun() |
| else: |
| st.warning("μ΄λ¦, μμ΄λ, λΉλ°λ²νΈλ₯Ό λͺ¨λ μ
λ ₯νμΈμ.") |
|
|
| else: |
| st.markdown( |
| f""" |
| <div class="account-card"> |
| <div class="account-name">{html.escape(st.session_state.user_name)}λ</div> |
| <div class="account-status">λ‘κ·ΈμΈ μ€</div> |
| </div> |
| """, |
| unsafe_allow_html=True |
| ) |
|
|
| if st.button("λ‘κ·Έμμ", use_container_width=True): |
| st.session_state.logged_in = False |
| st.session_state.user_name = "" |
| st.session_state.user_id = "" |
| st.session_state.page = "home" |
| st.rerun() |
|
|
| st.divider() |
|
|
| if st.button("οΌ μ λν μμ", use_container_width=True, disabled=not st.session_state.logged_in): |
| create_new_chat() |
| st.rerun() |
|
|
| col_home, col_faq = st.columns(2) |
|
|
| with col_home: |
| if st.button("π ν", use_container_width=True, disabled=not st.session_state.logged_in): |
| st.session_state.page = "home" |
| st.rerun() |
|
|
| with col_faq: |
| if st.button("β FAQ", use_container_width=True, disabled=not st.session_state.logged_in): |
| st.session_state.page = "faq" |
| st.rerun() |
|
|
| st.markdown('<div class="sidebar-section-title">π μλ£ μΆκ°</div>', unsafe_allow_html=True) |
|
|
| with st.expander("νμΌ μ
λ‘λ", expanded=False): |
| uploaded_files = st.file_uploader( |
| "PDF, TXT, CSV νμΌμ μΆκ°ν μ μμ΅λλ€.", |
| type=["pdf", "txt", "csv"], |
| accept_multiple_files=True, |
| disabled=not st.session_state.logged_in |
| ) |
|
|
| if uploaded_files: |
| st.success(f"{len(uploaded_files)}κ° νμΌμ΄ μΆκ°λμμ΅λλ€.") |
| else: |
| st.markdown( |
| '<div class="sidebar-help">μ
λ‘λν νμΌμ κΈ°μ‘΄ λ°±μμ ν¨κ» AI λ΅λ³μ νμ©λ©λλ€.</div>', |
| unsafe_allow_html=True |
| ) |
|
|
| st.divider() |
|
|
| st.markdown('<div class="sidebar-section-title">π¬ λν κΈ°λ‘</div>', unsafe_allow_html=True) |
|
|
| if not st.session_state.logged_in: |
| st.caption("λ‘κ·ΈμΈ ν λν κΈ°λ‘μ μ¬μ©ν μ μμ΅λλ€.") |
|
|
| elif st.session_state.chats: |
| chat_items = list(st.session_state.chats.items())[::-1] |
|
|
| for chat_id, chat_info in chat_items: |
| title = chat_info["title"] |
| created_at = chat_info["created_at"] |
|
|
| col_chat, col_delete = st.columns([5, 1]) |
|
|
| with col_chat: |
| if st.button(f"π¬ {title}", key=f"chat_{chat_id}", use_container_width=True): |
| st.session_state.current_chat_id = chat_id |
| st.session_state.page = "home" |
| st.rerun() |
|
|
| with col_delete: |
| if st.button("π", key=f"delete_{chat_id}", use_container_width=True): |
| delete_chat(chat_id) |
| st.rerun() |
|
|
| st.markdown( |
| f"<div class='history-caption'>{created_at}</div>", |
| unsafe_allow_html=True |
| ) |
|
|
| else: |
| st.caption("μμ§ λν κΈ°λ‘μ΄ μμ΅λλ€.") |
|
|
|
|
| |
| |
| |
| if not st.session_state.logged_in: |
| st.markdown(""" |
| <div class="login-lock-box"> |
| <div class="login-lock-title">π λ‘κ·ΈμΈμ΄ νμν©λλ€</div> |
| <div class="login-lock-sub"> |
| μ¬μ΄λλ°μμ μ΄λ¦, μμ΄λ, λΉλ°λ²νΈλ₯Ό μ
λ ₯ν΄μΌ<br> |
| μ±λ΄, FAQ, νμΌ μ
λ‘λ κΈ°λ₯μ μ¬μ©ν μ μμ΅λλ€. |
| </div> |
| </div> |
| """, unsafe_allow_html=True) |
|
|
| st.stop() |
|
|
|
|
| |
| |
| |
| ensure_rag_ready(uploaded_files) |
|
|
|
|
| |
| |
| |
| def render_faq_page(): |
| st.markdown("## β FAQ") |
| st.caption("μμ£Ό 묻λ μ§λ¬Έμ μΉ΄ν
κ³ λ¦¬λ³λ‘ νμΈν μ μμ΅λλ€.") |
|
|
| if not FAQ_DATA: |
| st.warning("ν΅ν© λ°±μμμ FAQ λ°μ΄ν°λ₯Ό λΆλ¬μ€μ§ λͺ»νμ΅λλ€. FAQ ν ꡬ쑰λ₯Ό νμΈνμΈμ.") |
| return |
|
|
| categories = ["μ 체"] + sorted(list(set(item["category"] for item in FAQ_DATA))) |
| selected_tabs = st.tabs(categories) |
|
|
| for idx, tab in enumerate(selected_tabs): |
| category = categories[idx] |
|
|
| with tab: |
| if category == "μ 체": |
| items = FAQ_DATA |
| else: |
| items = [ |
| item for item in FAQ_DATA |
| if item["category"] == category |
| ] |
|
|
| if not items: |
| st.info("ν΄λΉ μΉ΄ν
κ³ λ¦¬μ FAQκ° μμ΅λλ€.") |
| continue |
|
|
| for item in items: |
| with st.expander(f"[{item['category']}] {item['question']}"): |
| st.write(item["answer"]) |
|
|
|
|
| |
| |
| |
| def render_home_page(): |
| if IMAGE_BANNER_PATH.exists(): |
| st.image(str(IMAGE_BANNER_PATH), use_container_width=True) |
| else: |
| st.markdown(f""" |
| <div class="hero-box"> |
| <div> |
| <div class="hero-title">μλ
νμΈμ, {html.escape(st.session_state.user_name)}λ! π<br>무μμ λμλ릴κΉμ?</div> |
| <div class="hero-sub">AIVLE λ°±μμ μ
λ‘λν μλ£λ₯Ό κΈ°λ°μΌλ‘ λ΅λ³λ릴κ²μ.</div> |
| </div> |
| <div class="robot">π€</div> |
| </div> |
| """, unsafe_allow_html=True) |
|
|
| |
| |
| |
| st.markdown('<div class="section-title">μΆμ² μ§λ¬Έ</div>', unsafe_allow_html=True) |
|
|
| rec_questions = st.session_state.recommended_questions |
|
|
| if rec_questions: |
| cols = st.columns(2) |
|
|
| for i, q in enumerate(rec_questions): |
| with cols[i % 2]: |
| if st.button(q, key=f"rec_{i}_{q}", use_container_width=True): |
| answer_question(q) |
| st.rerun() |
| else: |
| st.caption("AIVLE κ΄λ ¨ μ§λ¬Έμ μ
λ ₯νλ©΄ FAQ κΈ°λ° μΆμ² μ§λ¬Έμ΄ νμλ©λλ€.") |
|
|
| |
| |
| |
| messages = get_current_messages() |
|
|
| st.markdown('<div class="chat-wrap">', unsafe_allow_html=True) |
|
|
| if not messages: |
| st.markdown(""" |
| <div style="min-height: 420px;"> |
| <div class="assistant-name">π€ AIVLE λμ°λ―Έ</div> |
| <div class="assistant-card"> |
| μλ
νμΈμ! AIVLE λ°±μμ FAQ, μ
λ‘λν λ¬Έμλ₯Ό κΈ°λ°μΌλ‘ μ§λ¬Έμ λ΅λ³λ릴κ²μ.<br> |
| μΌλ° μ§λ¬Έμ λ¬Έμ κ²μ μμ΄ μΌλ° AI λ΅λ³μΌλ‘ μλ΄ν©λλ€. |
| </div> |
| </div> |
| """, unsafe_allow_html=True) |
|
|
| for idx, msg in enumerate(messages): |
| content_html = render_text_for_html(msg["content"]) |
|
|
| if msg["role"] == "user": |
| st.markdown( |
| f'<div class="user-bubble">{content_html}</div>', |
| unsafe_allow_html=True |
| ) |
| else: |
| st.markdown( |
| f""" |
| <div class="assistant-name">π€ AIVLE λμ°λ―Έ</div> |
| <div class="assistant-card">{content_html}</div> |
| """, |
| unsafe_allow_html=True |
| ) |
|
|
| col_copy, col_tts = st.columns([1, 5]) |
|
|
| with col_copy: |
| copy_button(msg["content"], key=f"copy_{idx}") |
|
|
| with col_tts: |
| if st.button("π μμ±μΌλ‘ λ£κΈ°", key=f"tts_{idx}"): |
| with st.spinner("μμ±μ μμ±νλ μ€μ
λλ€..."): |
| audio_path = generate_tts(msg["content"], f"audio_{idx}_{st.session_state.current_chat_id}") |
| st.audio(audio_path) |
|
|
| st.markdown('</div>', unsafe_allow_html=True) |
|
|
| |
| |
| |
| col_text, col_audio = st.columns([5, 1]) |
|
|
| with col_text: |
| user_input = st.chat_input("λ©μμ§λ₯Ό μ
λ ₯νμΈμ...") |
|
|
| with col_audio: |
| audio_file = st.audio_input( |
| "π€ μμ± μ§λ¬Έ", |
| label_visibility="collapsed" |
| ) |
|
|
| if user_input: |
| answer_question(user_input) |
| st.rerun() |
|
|
| if audio_file is not None: |
| with st.spinner("μμ±μ λΆμνλ μ€μ
λλ€..."): |
| question = transcribe_audio(audio_file) |
|
|
| answer_question(question) |
| st.rerun() |
|
|
|
|
| |
| |
| |
| if st.session_state.page == "faq": |
| render_faq_page() |
| else: |
| render_home_page() |
|
|