""" RAG 검색 챗봇 웹 애플리케이션 """ import os import json import logging import tempfile import threading from flask import Flask, request, jsonify, render_template, send_from_directory from werkzeug.utils import secure_filename from dotenv import load_dotenv # 로거 설정 logging.basicConfig( format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', level=logging.INFO ) logger = logging.getLogger(__name__) # 환경 변수 로드 load_dotenv() # 로컬 모듈 임포트 from utils.vito_stt import VitoSTT from utils.llm_client import DeepSeekLLM from utils.document_processor import DocumentProcessor from retrieval.vector_retriever import VectorRetriever from retrieval.reranker import ReRanker # Flask 앱 초기화 app = Flask(__name__) # 최대 파일 크기 설정 (10MB) app.config['MAX_CONTENT_LENGTH'] = 10 * 1024 * 1024 app.config['UPLOAD_FOLDER'] = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'uploads') app.config['DATA_FOLDER'] = os.path.join(os.path.dirname(os.path.abspath(__file__)), '..', 'data') app.config['INDEX_PATH'] = os.path.join(os.path.dirname(os.path.abspath(__file__)), '..', 'data', 'index') # 업로드 폴더가 없으면 생성 os.makedirs(app.config['UPLOAD_FOLDER'], exist_ok=True) os.makedirs(app.config['DATA_FOLDER'], exist_ok=True) os.makedirs(app.config['INDEX_PATH'], exist_ok=True) # 허용되는 오디오 파일 확장자 ALLOWED_AUDIO_EXTENSIONS = {'mp3', 'wav', 'ogg', 'm4a'} # 허용되는 문서 파일 확장자 ALLOWED_DOC_EXTENSIONS = {'txt', 'md', 'pdf', 'docx', 'csv'} # DeepSeek LLM 클라이언트 초기화 llm_client = DeepSeekLLM() # VITO STT 클라이언트 초기화 stt_client = VitoSTT() # 전역 검색기 객체와 재순위화 검색기 객체 base_retriever = None retriever = None # 앱 초기화 상태 app_ready = False def allowed_audio_file(filename): """파일이 허용된 오디오 확장자를 가지는지 확인""" return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_AUDIO_EXTENSIONS def allowed_doc_file(filename): """파일이 허용된 문서 확장자를 가지는지 확인""" return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_DOC_EXTENSIONS def init_retriever(): """검색기 객체 초기화 또는 로드""" global base_retriever, retriever index_path = app.config['INDEX_PATH'] # 기존 인덱스가 있는지 확인 if os.path.exists(os.path.join(index_path, "documents.json")): try: logger.info(f"기존 벡터 인덱스를 '{index_path}'에서 로드합니다...") base_retriever = VectorRetriever.load(index_path) logger.info(f"{len(base_retriever.documents)}개 문서가 로드되었습니다.") except Exception as e: logger.error(f"인덱스 로드 중 오류 발생: {e}") logger.info("새 검색기를 초기화합니다...") base_retriever = VectorRetriever() else: logger.info("기존 인덱스를 찾을 수 없어 새 검색기를 초기화합니다...") base_retriever = VectorRetriever() # 데이터 폴더의 문서 로드 data_path = app.config['DATA_FOLDER'] if not base_retriever.documents and os.path.exists(data_path): logger.info(f"{data_path}에서 문서를 로드합니다...") docs = DocumentProcessor.load_documents_from_directory( data_path, extensions=[".txt", ".md", ".csv"], recursive=True ) if docs: logger.info(f"{len(docs)}개 문서를 검색기에 추가합니다...") base_retriever.add_documents(docs) # 인덱스 저장 logger.info(f"검색기 상태를 '{index_path}'에 저장합니다...") try: base_retriever.save(index_path) logger.info("인덱스 저장 완료") except Exception as e: logger.error(f"인덱스 저장 중 오류 발생: {e}") # 재순위화 검색기 초기화 logger.info("재순위화 검색기를 초기화합니다...") # 자체 구현된 재순위화 함수 - 간단한 TF-IDF 기반 점수 재계산 def custom_rerank_fn(query, results): """간단한 자체 구현 재순위화 함수""" # 쿼리 단어 분석 query_terms = set(query.lower().split()) # 결과 재점수화 for result in results: if "text" in result: text = result["text"].lower() # 간단한 TF 기반 점수 계산 term_freq = sum(1 for term in query_terms if term in text) # 길이 정규화 normalized_score = term_freq / (len(text.split()) + 1) * 10 # 기존 임베딩 점수와 새 점수 결합 result["rerank_score"] = result.get("score", 0) * 0.7 + normalized_score * 0.3 else: # 텍스트가 없는 경우 원래 점수 유지 result["rerank_score"] = result.get("score", 0) # 재점수화된 결과 정렬 results.sort(key=lambda x: x.get("rerank_score", 0), reverse=True) return results # 재순위화 검색기 객체 생성 (CrossEncoder 대신 사용자 정의 함수 사용) retriever = ReRanker( base_retriever=base_retriever, rerank_fn=custom_rerank_fn, rerank_field="text" ) logger.info("재순위화 검색기 초기화 완료") return retriever # 비동기 초기화 함수 def background_init(): """백그라운드에서 검색기 초기화 수행""" global app_ready, retriever try: logger.info("백그라운드 초기화 시작") retriever = init_retriever() app_ready = True logger.info("앱 초기화 완료") except Exception as e: logger.error(f"앱 초기화 중 오류 발생: {e}", exc_info=True) app_ready = False # 백그라운드 스레드에서 초기화 시작 init_thread = threading.Thread(target=background_init) init_thread.daemon = True init_thread.start() @app.route('/') def index(): """메인 페이지""" if not app_ready: return render_template('loading.html') return render_template('index.html') @app.route('/api/status') def app_status(): """앱 초기화 상태 확인 API""" return jsonify({"ready": app_ready}) @app.route('/api/chat', methods=['POST']) def chat(): """텍스트 기반 챗봇 API""" global retriever, app_ready # 앱 준비 상태 확인 if not app_ready: return jsonify({"error": "앱이 아직 초기화 중입니다. 잠시 후 다시 시도해주세요."}), 503 try: data = request.get_json() if not data or 'query' not in data: return jsonify({"error": "쿼리가 제공되지 않았습니다."}), 400 query = data['query'] logger.info(f"쿼리 수신: {query}") # RAG 검색 수행 (재순위화 적용) search_results = retriever.search(query, top_k=5, first_stage_k=20) # 검색 결과에서 컨텍스트 추출 context = DocumentProcessor.prepare_rag_context(search_results, field="text") if not context: logger.warning("검색 결과가 없습니다.") return jsonify({ "answer": "죄송합니다. 관련 정보를 찾을 수 없습니다.", "sources": [] }) # LLM에 질의 answer = llm_client.rag_generate(query, context) # 소스 정보 추출 sources = [] for result in search_results: if "source" in result: source_info = { "source": result.get("source", "Unknown"), "score": result.get("rerank_score", result.get("score", 0)) } sources.append(source_info) return jsonify({ "answer": answer, "sources": sources }) except Exception as e: logger.error(f"채팅 처리 중 오류 발생: {e}", exc_info=True) return jsonify({"error": f"처리 중 오류가 발생했습니다: {str(e)}"}), 500 @app.route('/api/voice', methods=['POST']) def voice_chat(): """ 음성 챗 API 엔드포인트: 오디오 파일을 받아 텍스트로 변환하고, 질문에 대한 응답과 소스를 반환 Returns: JSON 응답: - transcription: 인식된 텍스트 - answer: LLM에서 생성한 응답 - sources: 검색된 문서 소스 (리스트) - error: 오류 발생 시 오류 메시지 - details: 오류 상세 정보 (선택적) """ logger.info("음성 챗 요청 수신") # 오디오 파일 확인 if 'audio' not in request.files: logger.error("오디오 파일이 제공되지 않음") return jsonify({"error": "오디오 파일이 제공되지 않았습니다."}), 400 audio_file = request.files['audio'] logger.info(f"수신된 파일: {audio_file.filename}") try: # 오디오 파일 읽기 with audio_file.stream as f: audio_bytes = f.read() # 음성인식 (VitoSTT) stt = VitoSTT() stt_result = stt.transcribe_audio(audio_bytes, language="ko") if not stt_result["success"]: logger.error(f"음성인식 실패: {stt_result['error']}") return jsonify({ "error": stt_result["error"], "details": stt_result.get("details", "") }), 500 transcription = stt_result["text"] if not transcription: logger.warning("음성인식 결과가 비어있습니다.") return jsonify({"error": "음성에서 텍스트를 인식하지 못했습니다."}), 400 logger.info(f"음성인식 성공: {transcription[:50]}...") # 검색기 호출: 인식된 텍스트를 쿼리로 사용 sources = retriever.search(transcription) if not sources: logger.warning("검색된 소스가 없습니다.") sources = [] # 소스 문서 내용을 컨텍스트로 준비 context = "\n".join([doc["text"] for doc in sources]) logger.info(f"검색된 소스 수: {len(sources)}") # LLM 호출: 질문과 컨텍스트를 바탕으로 응답 생성 prompt = f"질문: {transcription}\n\n컨텍스트:\n{context}\n\n답변:" answer = llm_client.generate(prompt) if not answer: logger.error("LLM 응답 생성 실패") return jsonify({"error": "응답 생성에 실패했습니다."}), 500 logger.info(f"LLM 응답 생성 성공: {answer[:50]}...") # 응답 반환 return jsonify({ "transcription": transcription, "answer": answer, "sources": sources # [{ "text": "...", "metadata": {...} }, ...] }) except Exception as e: logger.error(f"음성 챗 처리 중 오류 발생: {str(e)}", exc_info=True) return jsonify({ "error": "음성 처리 중 내부 오류 발생", "details": str(e) }), 500 @app.route('/api/upload', methods=['POST']) def upload_document(): """지식베이스 문서 업로드 API""" global base_retriever, retriever, app_ready # 앱 준비 상태 확인 if not app_ready: return jsonify({"error": "앱이 아직 초기화 중입니다. 잠시 후 다시 시도해주세요."}), 503 try: # 파일이 요청에 포함되어 있는지 확인 if 'document' not in request.files: return jsonify({"error": "문서 파일이 제공되지 않았습니다."}), 400 doc_file = request.files['document'] logger.info(f"받은 파일명: {doc_file.filename}") # 파일명이 비어있는지 확인 if doc_file.filename == '': return jsonify({"error": "선택된 파일이 없습니다."}), 400 # 파일 형식 확인 if not allowed_doc_file(doc_file.filename): logger.error(f"허용되지 않는 파일 형식: {doc_file.filename}") return jsonify({"error": "허용되지 않는 파일 형식입니다. 현재 허용된 파일 형식: {}".format(', '.join(ALLOWED_DOC_EXTENSIONS))}), 400 # 파일명 보안 처리 filename = secure_filename(doc_file.filename) # 데이터 폴더에 저장 filepath = os.path.join(app.config['DATA_FOLDER'], filename) doc_file.save(filepath) logger.info(f"문서가 저장되었습니다: {filepath}") # 문서 처리 try: # 먼저 UTF-8로 시도 try: with open(filepath, 'r', encoding='utf-8') as f: content = f.read() except UnicodeDecodeError: # UTF-8로 실패하면 CP949(한국어 Windows 기본 인코딩)로 시도 logger.info(f"UTF-8 디코딩 실패, CP949로 시도: {filename}") with open(filepath, 'r', encoding='cp949') as f: content = f.read() # 메타데이터 생성 metadata = { "source": filename, "filename": filename, "filetype": filename.rsplit('.', 1)[1].lower(), "filepath": filepath } # 문서 청크 생성 docs = DocumentProcessor.text_to_documents( content, metadata=metadata, chunk_size=512, chunk_overlap=50 ) if docs: logger.info(f"{len(docs)}개 문서 청크를 검색기에 추가합니다...") base_retriever.add_documents(docs) # 인덱스 저장 logger.info(f"검색기 상태를 저장합니다...") index_path = app.config['INDEX_PATH'] try: base_retriever.save(index_path) logger.info("인덱스 저장 완료") except Exception as e: logger.error(f"인덱스 저장 중 오류 발생: {e}") return jsonify({"error": f"인덱스 저장 중 오류: {str(e)}"}), 500 return jsonify({ "success": True, "message": f"파일 '{filename}'가 성공적으로 업로드되고 {len(docs)}개 청크가 추가되었습니다." }) else: logger.warning(f"파일 '{filename}'에서 처리할 문서가 없습니다.") return jsonify({ "warning": True, "message": f"파일 '{filename}'이 저장되었지만 처리할 내용이 없습니다." }) except Exception as e: logger.error(f"문서 '{filename}' 처리 중 오류 발생: {e}", exc_info=True) return jsonify({"error": f"문서 처리 중 오류: {str(e)}"}), 500 except Exception as e: logger.error(f"파일 업로드 중 오류 발생: {e}", exc_info=True) return jsonify({"error": f"파일 업로드 중 오류: {str(e)}"}), 500 @app.route('/api/documents', methods=['GET']) def list_documents(): """지식베이스 문서 목록 API""" global base_retriever, retriever, app_ready # 앱 준비 상태 확인 if not app_ready: return jsonify({"error": "앱이 아직 초기화 중입니다. 잠시 후 다시 시도해주세요."}), 503 try: # 문서 소스 목록 생성 sources = {} if base_retriever and base_retriever.documents: for doc in base_retriever.documents: source = doc.get("source", "unknown") if source in sources: sources[source]["chunks"] += 1 else: sources[source] = { "filename": doc.get("filename", source), "chunks": 1, "filetype": doc.get("filetype", "unknown") } # 목록 형식으로 변환 documents = [] for source, info in sources.items(): documents.append({ "source": source, "filename": info["filename"], "chunks": info["chunks"], "filetype": info["filetype"] }) # 청크 수로 정렬 documents.sort(key=lambda x: x["chunks"], reverse=True) return jsonify({ "documents": documents, "total_documents": len(documents), "total_chunks": sum(doc["chunks"] for doc in documents) }) except Exception as e: logger.error(f"문서 목록 조회 중 오류 발생: {e}", exc_info=True) return jsonify({"error": f"문서 목록 조회 중 오류: {str(e)}"}), 500 # 정적 파일 서빙 @app.route('/static/') def send_static(path): return send_from_directory('static', path) if __name__ == '__main__': app.run(debug=False, host='0.0.0.0', port=5000)