from flask import Blueprint, render_template, request, jsonify, send_from_directory, redirect, url_for, flash from flask_login import login_user, logout_user, login_required, current_user from werkzeug.utils import secure_filename from app.database import db, UploadedFile, User, ChatSession, ChatMessage, DocumentChunk, ParentChunk, SystemConfig from app.vector_db import get_vector_db from app.gemini_client import get_gemini_client import requests import os from datetime import datetime import uuid import re import json main_bp = Blueprint('main', __name__) def admin_required(f): """관리자 권한이 필요한 데코레이터""" from functools import wraps @wraps(f) @login_required def decorated_function(*args, **kwargs): if not current_user.is_admin: # API 요청인 경우 JSON 응답 반환 if request.path.startswith('/api/'): return jsonify({'error': '관리자 권한이 필요합니다.'}), 403 flash('관리자 권한이 필요합니다.', 'error') return redirect(url_for('main.index')) return f(*args, **kwargs) return decorated_function # Ollama 기본 URL (환경 변수로 설정 가능) OLLAMA_BASE_URL = os.getenv('OLLAMA_BASE_URL', 'http://localhost:11434') # 업로드 설정 UPLOAD_FOLDER = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'uploads') ALLOWED_EXTENSIONS = {'txt', 'md', 'pdf', 'docx', 'epub'} # 업로드 폴더 경로 출력 (디버깅용) print(f"[업로드 설정] 업로드 폴더 경로: {UPLOAD_FOLDER}") print(f"[업로드 설정] 업로드 폴더 존재 여부: {os.path.exists(UPLOAD_FOLDER)}") def allowed_file(filename): return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS def ensure_upload_folder(): """업로드 폴더가 없으면 생성""" try: if not os.path.exists(UPLOAD_FOLDER): print(f"업로드 폴더 생성 중: {UPLOAD_FOLDER}") os.makedirs(UPLOAD_FOLDER, exist_ok=True) if not os.path.exists(UPLOAD_FOLDER): raise Exception(f'업로드 폴더를 생성할 수 없습니다: {UPLOAD_FOLDER}') # 폴더 쓰기 권한 확인 test_file = os.path.join(UPLOAD_FOLDER, '.write_test') try: with open(test_file, 'w') as f: f.write('test') os.remove(test_file) print(f"업로드 폴더 쓰기 권한 확인 완료: {UPLOAD_FOLDER}") except PermissionError as e: raise Exception(f'업로드 폴더에 쓰기 권한이 없습니다: {UPLOAD_FOLDER} - {str(e)}') except Exception as e: raise Exception(f'업로드 폴더 쓰기 테스트 실패: {UPLOAD_FOLDER} - {str(e)}') except Exception as e: print(f"업로드 폴더 생성 오류: {str(e)}") import traceback traceback.print_exc() raise def split_text_into_chunks(text, min_chunk_size=200, max_chunk_size=1000, overlap=150): """의미 기반 텍스트 청킹 (문장과 문단 경계를 고려하여 분할)""" if not text or len(text.strip()) == 0: return [] # 1단계: 문단 단위로 분할 (빈 줄 기준) paragraphs = re.split(r'\n\s*\n', text.strip()) paragraphs = [p.strip() for p in paragraphs if p.strip()] if not paragraphs: return [] # 2단계: 각 문단을 문장 단위로 분할 # 문장 종결 기호: . ! ? (한글과 영문 모두 지원) # 구두점 뒤에 공백이나 줄바꿈이 오는 경우 문장 종료로 간주 sentence_pattern = r'([.!?]+)(?=\s+|$)' all_sentences = [] for para in paragraphs: # 문장 분리 (구두점 포함) parts = re.split(sentence_pattern, para) combined_sentences = [] current_sentence = "" for i, part in enumerate(parts): if part.strip(): if re.match(r'^[.!?]+$', part): # 구두점인 경우 현재 문장에 추가하고 문장 완성 current_sentence += part if current_sentence.strip(): combined_sentences.append(current_sentence.strip()) current_sentence = "" else: # 텍스트인 경우 현재 문장에 추가 current_sentence += part # 마지막 문장 처리 (구두점이 없는 경우) if current_sentence.strip(): combined_sentences.append(current_sentence.strip()) # 문장이 하나도 없는 경우 (구두점이 전혀 없는 문단) if not combined_sentences and para.strip(): combined_sentences.append(para.strip()) all_sentences.extend(combined_sentences) if not all_sentences: # 문장 분리가 안 되는 경우 원본 텍스트를 그대로 반환 return [text] if text.strip() else [] # 3단계: 문장들을 모아서 의미 있는 청크 생성 chunks = [] current_chunk = [] current_size = 0 for sentence in all_sentences: sentence_size = len(sentence) # 현재 청크에 문장 추가 시 최대 크기를 초과하는 경우 if current_size + sentence_size > max_chunk_size and current_chunk: # 현재 청크 저장 (줄바꿈 유지) chunk_text = '\n'.join(current_chunk) if len(chunk_text.strip()) >= min_chunk_size: chunks.append(chunk_text) else: # 최소 크기 미만이면 다음 청크와 병합 (오버랩 효과) if chunks: chunks[-1] = chunks[-1] + '\n' + chunk_text else: chunks.append(chunk_text) # 오버랩을 위한 문장 유지 (마지막 몇 문장을 다음 청크에 포함) overlap_sentences = [] overlap_size = 0 for s in reversed(current_chunk): if overlap_size + len(s) <= overlap: overlap_sentences.insert(0, s) overlap_size += len(s) + 1 # 줄바꿈 포함 else: break current_chunk = overlap_sentences + [sentence] current_size = overlap_size + sentence_size else: # 현재 청크에 문장 추가 current_chunk.append(sentence) current_size += sentence_size + 1 # 줄바꿈 포함 # 마지막 청크 추가 if current_chunk: chunk_text = '\n'.join(current_chunk) if chunks and len(chunk_text.strip()) < min_chunk_size: # 최소 크기 미만이면 이전 청크와 병합 chunks[-1] = chunks[-1] + '\n' + chunk_text else: chunks.append(chunk_text) # 빈 청크 제거 및 최소 크기 미만 청크 처리 final_chunks = [] for chunk in chunks: chunk = chunk.strip() if chunk and len(chunk) >= min_chunk_size: final_chunks.append(chunk) elif chunk: # 최소 크기 미만 청크는 이전 청크와 병합 if final_chunks: final_chunks[-1] = final_chunks[-1] + '\n' + chunk else: final_chunks.append(chunk) return final_chunks if final_chunks else [text] if text.strip() else [] def create_chunks_for_file(file_id, content): """파일 내용을 의미 기반 청크로 분할하여 저장 (벡터 DB 포함)""" try: print(f"[청크 생성] 파일 ID {file_id}에 대한 청크 생성 시작") print(f"[청크 생성] 원본 텍스트 길이: {len(content)}자") # 벡터 DB 매니저 가져오기 vector_db = get_vector_db() # 기존 청크 삭제 (DB + 벡터 DB) existing_chunks = DocumentChunk.query.filter_by(file_id=file_id).all() if existing_chunks: print(f"[청크 생성] 기존 청크 {len(existing_chunks)}개 삭제 중...") # 벡터 DB에서 삭제 vector_db.delete_chunks_by_file_id(file_id) # DB에서 삭제 DocumentChunk.query.filter_by(file_id=file_id).delete() db.session.commit() # 의미 기반 청킹 (문장과 문단 경계를 고려하여 분할) # min_chunk_size: 최소 200자, max_chunk_size: 최대 1000자, overlap: 150자 chunks = split_text_into_chunks(content, min_chunk_size=200, max_chunk_size=1000, overlap=150) print(f"[청크 생성] 분할된 청크 수: {len(chunks)}개") if len(chunks) == 0: print(f"[청크 생성] 경고: 청크가 생성되지 않았습니다. 텍스트가 너무 짧거나 비어있을 수 있습니다.") return 0 # 각 청크를 데이터베이스와 벡터 DB에 저장 saved_count = 0 vector_saved_count = 0 for idx, chunk_content in enumerate(chunks): try: # DB에 청크 저장 chunk = DocumentChunk( file_id=file_id, chunk_index=idx, content=chunk_content ) db.session.add(chunk) db.session.flush() # ID 생성 # 벡터 DB에 청크 추가 if vector_db.add_chunk( chunk_id=chunk.id, chunk_content=chunk_content, file_id=file_id, chunk_index=idx ): vector_saved_count += 1 saved_count += 1 # 진행 상황 출력 (10개마다) if (idx + 1) % 10 == 0: print(f"[청크 생성] 진행 중: {idx + 1}/{len(chunks)}개 청크 저장 중... (DB: {saved_count}, 벡터 DB: {vector_saved_count})") except Exception as e: print(f"[청크 생성] 경고: 청크 {idx} 저장 중 오류: {str(e)}") continue db.session.commit() print(f"[청크 생성] 완료: {saved_count}개 청크가 데이터베이스에 저장되었습니다. (벡터 DB: {vector_saved_count}개)") # 저장 확인 verified_count = DocumentChunk.query.filter_by(file_id=file_id).count() if verified_count != saved_count: print(f"[청크 생성] 경고: 저장된 청크 수({saved_count})와 확인된 청크 수({verified_count})가 일치하지 않습니다.") else: print(f"[청크 생성] 검증 완료: {verified_count}개 청크가 정상적으로 저장되었습니다.") return saved_count except Exception as e: db.session.rollback() print(f"[청크 생성] 오류: {str(e)}") import traceback traceback.print_exc() return 0 def create_parent_chunk_with_ai(file_id, content, model_name): """AI를 사용하여 Parent Chunk 생성 (웹소설 분석)""" try: print(f"[Parent Chunk 생성] 파일 ID {file_id}에 대한 Parent Chunk 생성 시작") print(f"[Parent Chunk 생성] 사용 모델: {model_name}") print(f"[Parent Chunk 생성] 원본 텍스트 길이: {len(content)}자") # 모델명이 None이거나 빈 문자열인 경우 처리 if not model_name or not model_name.strip(): print(f"[Parent Chunk 생성] ❌ 오류: 모델명이 제공되지 않았습니다.") return None # 텍스트가 너무 길면 일부만 사용 (최대 50000자) content_preview = content[:50000] if len(content) > 50000 else content if len(content) > 50000: print(f"[Parent Chunk 생성] 텍스트가 길어 일부만 사용: {len(content_preview)}자 (전체: {len(content)}자)") # 분석 프롬프트 생성 analysis_prompt = f"""다음 웹소설 텍스트를 분석하여 다음 항목들을 작성해주세요. 각 항목은 명확하고 구체적으로 작성해주세요. 텍스트 내용: {content_preview} 위 텍스트를 분석하여 다음 형식으로 답변해주세요: ## 세계관 설명 [세계관에 대한 상세한 설명을 작성하세요. 배경, 설정, 규칙 등을 포함하세요.] ## 주요 캐릭터 분석 [주요 등장인물들의 이름, 역할, 성격, 특징 등을 분석하여 작성하세요. 각 캐릭터별로 구분하여 작성하세요.] ## 주요 스토리 분석 [전체적인 스토리 흐름, 주요 사건, 갈등 구조 등을 분석하여 작성하세요.] ## 주요 에피소드 분석 [중요한 에피소드나 챕터별 주요 내용을 분석하여 작성하세요. 시간 순서대로 정리하면 좋습니다.] ## 기타 [위 카테고리에 포함되지 않지만 중요한 정보나 특징 등을 작성하세요.] 각 항목을 명확하게 구분하여 작성해주세요.""" # 모델 타입 확인 (Gemini 또는 Ollama) # Gemini 모델명 형식: "gemini:모델명" 또는 "gemini-1.5-flash" (접두사 없는 경우도 지원) model_name_lower = model_name.lower().strip() is_gemini = model_name_lower.startswith('gemini:') or model_name_lower.startswith('gemini-') print(f"[Parent Chunk 생성] 모델 타입 확인: is_gemini={is_gemini}, model_name={model_name}") if is_gemini: # Gemini API 호출 # 모델명에서 "gemini:" 접두사 제거 (대소문자 구분 없이) gemini_model_name = model_name.strip() if gemini_model_name.lower().startswith('gemini:'): gemini_model_name = gemini_model_name.split(':', 1)[1].strip() # "gemini-"로 시작하는 경우 (예: "gemini-1.5-flash") 그대로 사용 print(f"[Parent Chunk 생성] Gemini API에 분석 요청 전송 중... (모델: {gemini_model_name})") print(f"[Parent Chunk 생성] 원본 모델명: {model_name} -> Gemini 모델명: {gemini_model_name}") gemini_client = get_gemini_client() if not gemini_client.is_configured(): print(f"[Parent Chunk 생성] ❌ 오류: Gemini API 키가 설정되지 않았습니다.") print(f"[Parent Chunk 생성] 디버그: Gemini 클라이언트 상태 확인 중...") # API 키 상태 다시 확인 from app.gemini_client import get_gemini_api_key api_key = get_gemini_api_key() if api_key: print(f"[Parent Chunk 생성] 디버그: API 키는 존재하지만 클라이언트가 설정되지 않았습니다. (길이: {len(api_key)})") else: print(f"[Parent Chunk 생성] 디버그: API 키가 데이터베이스에 없습니다.") return None print(f"[Parent Chunk 생성] Gemini API 키 확인 완료. API 호출 시작...") result = gemini_client.generate_response( prompt=analysis_prompt, model_name=gemini_model_name, temperature=0.7, max_output_tokens=8192 ) if result['error']: print(f"[Parent Chunk 생성] ❌ 오류: Gemini API 호출 실패 - {result['error']}") print(f"[Parent Chunk 생성] 디버그: result 객체 내용: {result}") return None if not result.get('response'): print(f"[Parent Chunk 생성] ❌ 오류: Gemini API 응답이 비어있습니다.") print(f"[Parent Chunk 생성] 디버그: result 객체 내용: {result}") return None analysis_result = result['response'] print(f"[Parent Chunk 생성] Gemini API 응답 수신 성공: {len(analysis_result)}자") else: # Ollama API 호출 print(f"[Parent Chunk 생성] Ollama API에 분석 요청 전송 중... (모델: {model_name})") try: ollama_response = requests.post( f'{OLLAMA_BASE_URL}/api/chat', json={ 'model': model_name, 'messages': [ { 'role': 'user', 'content': analysis_prompt } ], 'stream': False }, timeout=300 # 5분 타임아웃 ) if ollama_response.status_code != 200: error_detail = ollama_response.text if ollama_response.text else '상세 정보 없음' if ollama_response.status_code == 404: error_msg = f'Ollama API 오류 404: 모델 "{model_name}"을(를) 찾을 수 없습니다. 모델이 Ollama에 설치되어 있는지 확인하세요.' print(f"[Parent Chunk 생성] ❌ 오류: {error_msg}") print(f"[Parent Chunk 생성] 디버그: 만약 Gemini 모델을 사용하려면 모델명이 'gemini:' 또는 'gemini-'로 시작해야 합니다.") else: error_msg = f'Ollama API 오류: {ollama_response.status_code} - {error_detail[:200]}' print(f"[Parent Chunk 생성] ❌ 오류: {error_msg}") return None response_data = ollama_response.json() analysis_result = response_data.get('message', {}).get('content', '') print(f"[Parent Chunk 생성] Ollama API 응답 수신 성공: {len(analysis_result)}자") except requests.exceptions.RequestException as e: print(f"[Parent Chunk 생성] ❌ Ollama API 연결 오류: {str(e)}") print(f"[Parent Chunk 생성] 디버그: Ollama URL: {OLLAMA_BASE_URL}") raise if not analysis_result: print(f"[Parent Chunk 생성] ⚠️ 경고: 분석 결과가 비어있습니다.") return None print(f"[Parent Chunk 생성] 분석 결과 수신 완료: {len(analysis_result)}자") # 분석 결과 파싱 world_view = "" characters = "" story = "" episodes = "" others = "" # 각 섹션 추출 sections = { 'world_view': ['## 세계관 설명', '## 세계관', '세계관 설명'], 'characters': ['## 주요 캐릭터 분석', '## 주요 캐릭터', '주요 캐릭터 분석', '## 캐릭터'], 'story': ['## 주요 스토리 분석', '## 주요 스토리', '주요 스토리 분석', '## 스토리'], 'episodes': ['## 주요 에피소드 분석', '## 주요 에피소드', '주요 에피소드 분석', '## 에피소드'], 'others': ['## 기타', '기타'] } lines = analysis_result.split('\n') current_section = None current_content = [] for line in lines: line_stripped = line.strip() # 섹션 헤더 확인 section_found = False for section_key, section_headers in sections.items(): for header in section_headers: if header in line_stripped: # 이전 섹션 저장 if current_section: if current_section == 'world_view': world_view = '\n'.join(current_content).strip() elif current_section == 'characters': characters = '\n'.join(current_content).strip() elif current_section == 'story': story = '\n'.join(current_content).strip() elif current_section == 'episodes': episodes = '\n'.join(current_content).strip() elif current_section == 'others': others = '\n'.join(current_content).strip() current_section = section_key current_content = [] section_found = True break if section_found: break if not section_found and current_section: # 현재 섹션에 내용 추가 if line_stripped and not line_stripped.startswith('#'): current_content.append(line) # 마지막 섹션 저장 if current_section: if current_section == 'world_view': world_view = '\n'.join(current_content).strip() elif current_section == 'characters': characters = '\n'.join(current_content).strip() elif current_section == 'story': story = '\n'.join(current_content).strip() elif current_section == 'episodes': episodes = '\n'.join(current_content).strip() elif current_section == 'others': others = '\n'.join(current_content).strip() # 파싱 실패 시 전체 내용을 "기타"에 저장 if not world_view and not characters and not story and not episodes: print(f"[Parent Chunk 생성] 경고: 섹션 파싱 실패. 전체 내용을 '기타'에 저장합니다.") others = analysis_result.strip() # 기존 Parent Chunk 삭제 (있으면) existing_parent = ParentChunk.query.filter_by(file_id=file_id).first() if existing_parent: db.session.delete(existing_parent) db.session.commit() print(f"[Parent Chunk 생성] 기존 Parent Chunk 삭제 완료") # Parent Chunk 생성 및 저장 parent_chunk = ParentChunk( file_id=file_id, world_view=world_view if world_view else None, characters=characters if characters else None, story=story if story else None, episodes=episodes if episodes else None, others=others if others else None ) db.session.add(parent_chunk) db.session.commit() print(f"[Parent Chunk 생성] ✅ 완료: Parent Chunk가 생성되었습니다.") print(f"[Parent Chunk 생성] - 세계관: {len(world_view)}자") print(f"[Parent Chunk 생성] - 캐릭터: {len(characters)}자") print(f"[Parent Chunk 생성] - 스토리: {len(story)}자") print(f"[Parent Chunk 생성] - 에피소드: {len(episodes)}자") print(f"[Parent Chunk 생성] - 기타: {len(others)}자") return parent_chunk except requests.exceptions.RequestException as e: error_msg = f'Ollama API 연결 오류: {str(e)}' print(f"[Parent Chunk 생성] ❌ 오류: {error_msg}") import traceback traceback.print_exc() return None except Exception as e: db.session.rollback() error_msg = f'Parent Chunk 생성 중 오류: {str(e)}' print(f"[Parent Chunk 생성] ❌ 오류: {error_msg}") import traceback traceback.print_exc() return None def get_parent_chunks_for_files(file_ids): """파일 ID 목록에 대한 Parent Chunk 조회 (문맥 파악용)""" try: if not file_ids: return [] parent_chunks = [] for file_id in file_ids: parent_chunk = ParentChunk.query.filter_by(file_id=file_id).first() if parent_chunk: parent_chunks.append(parent_chunk) return parent_chunks except Exception as e: print(f"[Parent Chunk 조회] 오류: {str(e)}") return [] def search_relevant_chunks(query, file_ids=None, model_name=None, top_k=5, min_score=1): """ 질문과 관련된 청크 검색 (벡터 검색 + Re-ranking) 1. 벡터 검색으로 초기 30개 문서 검색 2. Cross-Encoder로 리랭킹 3. 상위 top_k개 반환 (기본 5개) """ try: # 벡터 DB 매니저 가져오기 vector_db = get_vector_db() # 파일 ID 확장 (이어서 업로드된 파일 포함) expanded_file_ids = None if file_ids: expanded_file_ids = list(file_ids) for file_id in file_ids: # 원본 파일인 경우 이어서 업로드된 파일들도 포함 child_files = UploadedFile.query.filter_by(parent_file_id=file_id).all() expanded_file_ids.extend([child.id for child in child_files]) # 원본 파일이 선택된 경우, 이어서 업로드된 파일들도 포함 parent_files = UploadedFile.query.filter(UploadedFile.id.in_(file_ids), UploadedFile.parent_file_id.is_(None)).all() for parent_file in parent_files: child_files = UploadedFile.query.filter_by(parent_file_id=parent_file.id).all() expanded_file_ids.extend([child.id for child in child_files]) # 모델 필터링이 필요한 경우 파일 ID 필터링 if model_name and expanded_file_ids: filtered_files = UploadedFile.query.filter( UploadedFile.id.in_(expanded_file_ids), UploadedFile.model_name == model_name ).all() expanded_file_ids = [f.id for f in filtered_files] elif model_name and not expanded_file_ids: # 파일 ID가 없으면 모델 이름으로만 필터링 filtered_files = UploadedFile.query.filter_by(model_name=model_name).all() expanded_file_ids = [f.id for f in filtered_files] # 1단계: 벡터 검색으로 초기 30개 문서 검색 print(f"[벡터 검색] 쿼리: {query[:50]}..., 파일 ID: {expanded_file_ids if expanded_file_ids else '모든 파일'}") vector_results = vector_db.search_chunks( query=query, file_ids=expanded_file_ids, top_k=30 ) if not vector_results: print(f"[벡터 검색] 결과 없음, 키워드 기반 검색으로 대체") # 벡터 검색 결과가 없으면 기존 키워드 기반 검색으로 대체 return search_relevant_chunks_fallback(query, file_ids, model_name, top_k, min_score) # 2단계: Cross-Encoder로 리랭킹 print(f"[리랭킹] {len(vector_results)}개 청크에 대한 리랭킹 시작...") reranked_chunks = vector_db.rerank_chunks( query=query, chunks=vector_results, top_k=top_k ) # 3단계: DB에서 청크 객체 가져오기 final_chunks = [] for reranked in reranked_chunks: chunk_id = reranked['chunk_id'] chunk = DocumentChunk.query.get(chunk_id) if chunk: final_chunks.append(chunk) print(f"[벡터 검색 + 리랭킹] 최종 {len(final_chunks)}개 청크 반환") return final_chunks except Exception as e: print(f"[벡터 검색] 오류: {str(e)}") import traceback traceback.print_exc() # 오류 시 기존 키워드 기반 검색으로 대체 print(f"[벡터 검색] 키워드 기반 검색으로 대체") return search_relevant_chunks_fallback(query, file_ids, model_name, top_k, min_score) def search_relevant_chunks_fallback(query, file_ids=None, model_name=None, top_k=25, min_score=1): """기존 키워드 기반 검색 (Fallback)""" try: # 검색 쿼리 준비 - 한글과 영문 단어 모두 추출 query_words = set(re.findall(r'[가-힣]+|\w+', query.lower())) if not query_words: return [] # 청크 조회 query_obj = DocumentChunk.query.join(UploadedFile) if file_ids: # 선택된 파일 ID와 그 파일에 이어서 업로드된 모든 파일 ID 포함 expanded_file_ids = list(file_ids) for file_id in file_ids: # 원본 파일인 경우 이어서 업로드된 파일들도 포함 child_files = UploadedFile.query.filter_by(parent_file_id=file_id).all() expanded_file_ids.extend([child.id for child in child_files]) # 원본 파일이 선택된 경우, 이어서 업로드된 파일들도 포함 parent_files = UploadedFile.query.filter(UploadedFile.id.in_(file_ids), UploadedFile.parent_file_id.is_(None)).all() for parent_file in parent_files: child_files = UploadedFile.query.filter_by(parent_file_id=parent_file.id).all() expanded_file_ids.extend([child.id for child in child_files]) query_obj = query_obj.filter(UploadedFile.id.in_(expanded_file_ids)) if model_name: query_obj = query_obj.filter(UploadedFile.model_name == model_name) all_chunks = query_obj.all() if not all_chunks: return [] # 각 청크의 관련도 점수 계산 (개선된 알고리즘) scored_chunks = [] for chunk in all_chunks: chunk_content_lower = chunk.content.lower() chunk_words = set(re.findall(r'[가-힣]+|\w+', chunk_content_lower)) # 1. 공통 단어 수 (기본 점수) common_words = query_words & chunk_words base_score = len(common_words) # 2. 쿼리 단어의 빈도 가중치 (중요한 단어가 더 많이 나타날수록 높은 점수) frequency_score = 0 for word in query_words: frequency_score += chunk_content_lower.count(word) # 3. 쿼리 단어 비율 (청크에서 쿼리 단어가 차지하는 비율) if len(chunk_words) > 0: ratio_score = len(common_words) / len(chunk_words) * 10 else: ratio_score = 0 # 최종 점수 계산 (가중치 적용) final_score = base_score * 2 + frequency_score * 0.5 + ratio_score # 최소 점수 이상인 청크만 포함 if final_score >= min_score: scored_chunks.append((final_score, chunk)) # 점수 순으로 정렬하고 상위 k개 선택 scored_chunks.sort(key=lambda x: x[0], reverse=True) # top_k개 선택 top_chunks = [chunk for score, chunk in scored_chunks[:top_k]] return top_chunks except Exception as e: print(f"[키워드 검색] 오류: {str(e)}") import traceback traceback.print_exc() return [] @main_bp.route('/login', methods=['GET', 'POST']) def login(): """로그인 페이지""" if current_user.is_authenticated: # 관리자인 경우 관리자 페이지로 리다이렉트 if current_user.is_admin: return redirect(url_for('main.admin')) return redirect(url_for('main.index')) if request.method == 'POST': username = request.form.get('username', '').strip() password = request.form.get('password', '') if not username or not password: flash('사용자명과 비밀번호를 입력해주세요.', 'error') return render_template('login.html') user = User.query.filter_by(username=username).first() if user and user.check_password(password) and user.is_active: login_user(user) user.last_login = datetime.utcnow() db.session.commit() next_page = request.args.get('next') # 관리자인 경우 관리자 페이지로 리다이렉트 if user.is_admin: return redirect(next_page) if next_page else redirect(url_for('main.admin')) return redirect(next_page) if next_page else redirect(url_for('main.index')) else: flash('사용자명 또는 비밀번호가 올바르지 않습니다.', 'error') return render_template('login.html') @main_bp.route('/logout') @login_required def logout(): """로그아웃""" logout_user() flash('로그아웃되었습니다.', 'info') return redirect(url_for('main.login')) @main_bp.route('/') @login_required def index(): return render_template('index.html') @main_bp.route('/admin') @admin_required def admin(): """관리자 페이지""" users = User.query.order_by(User.created_at.desc()).all() return render_template('admin.html', users=users) @main_bp.route('/admin/messages') @admin_required def admin_messages(): """관리자 메시지 확인 페이지""" return render_template('admin_messages.html') @main_bp.route('/admin/webnovels') @admin_required def admin_webnovels(): """웹소설 관리 페이지""" return render_template('admin_webnovels.html') @main_bp.route('/api/admin/users', methods=['GET']) @admin_required def get_users(): """사용자 목록 API""" try: users = User.query.order_by(User.created_at.desc()).all() return jsonify({ 'users': [user.to_dict() for user in users] }), 200 except Exception as e: return jsonify({'error': f'사용자 목록 조회 중 오류가 발생했습니다: {str(e)}'}), 500 @main_bp.route('/api/admin/users', methods=['POST']) @admin_required def create_user(): """사용자 생성 API""" try: data = request.json username = data.get('username', '').strip() nickname = data.get('nickname', '').strip() password = data.get('password', '') is_admin = data.get('is_admin', False) if not username or not password: return jsonify({'error': '사용자명과 비밀번호를 입력해주세요.'}), 400 if User.query.filter_by(username=username).first(): return jsonify({'error': '이미 존재하는 사용자명입니다.'}), 400 user = User(username=username, nickname=nickname if nickname else None, is_admin=is_admin, is_active=True) user.set_password(password) db.session.add(user) db.session.commit() return jsonify({ 'message': '사용자가 성공적으로 생성되었습니다.', 'user': user.to_dict() }), 200 except Exception as e: db.session.rollback() return jsonify({'error': f'사용자 생성 중 오류가 발생했습니다: {str(e)}'}), 500 @main_bp.route('/api/admin/users/', methods=['PUT']) @admin_required def update_user(user_id): """사용자 정보 수정 API""" try: user = User.query.get_or_404(user_id) data = request.json # 자기 자신의 관리자 권한을 제거하는 것은 방지 if user_id == current_user.id and data.get('is_admin') == False: return jsonify({'error': '자기 자신의 관리자 권한을 제거할 수 없습니다.'}), 400 if 'username' in data: new_username = data['username'].strip() if new_username != user.username: if User.query.filter_by(username=new_username).first(): return jsonify({'error': '이미 존재하는 사용자명입니다.'}), 400 user.username = new_username if 'nickname' in data: user.nickname = data['nickname'].strip() if data['nickname'] else None if 'password' in data and data['password']: user.set_password(data['password']) if 'is_admin' in data: user.is_admin = data['is_admin'] if 'is_active' in data: user.is_active = data['is_active'] db.session.commit() return jsonify({ 'message': '사용자 정보가 성공적으로 수정되었습니다.', 'user': user.to_dict() }), 200 except Exception as e: db.session.rollback() return jsonify({'error': f'사용자 정보 수정 중 오류가 발생했습니다: {str(e)}'}), 500 @main_bp.route('/api/admin/messages', methods=['GET']) @admin_required def get_all_messages(): """전체 메시지 조회 (관리자용)""" try: user_id = request.args.get('user_id', type=int) session_id = request.args.get('session_id', type=int) page = request.args.get('page', 1, type=int) per_page = request.args.get('per_page', 50, type=int) query = ChatMessage.query.join(ChatSession) if user_id: query = query.filter(ChatSession.user_id == user_id) if session_id: query = query.filter(ChatMessage.session_id == session_id) messages = query.order_by(ChatMessage.created_at.desc())\ .paginate(page=page, per_page=per_page, error_out=False) return jsonify({ 'messages': [msg.to_dict() for msg in messages.items], 'total': messages.total, 'pages': messages.pages, 'current_page': page }), 200 except Exception as e: return jsonify({'error': f'메시지 조회 중 오류가 발생했습니다: {str(e)}'}), 500 @main_bp.route('/api/admin/sessions', methods=['GET']) @admin_required def get_all_sessions(): """전체 대화 세션 조회 (관리자용)""" try: user_id = request.args.get('user_id', type=int) page = request.args.get('page', 1, type=int) per_page = request.args.get('per_page', 50, type=int) query = ChatSession.query if user_id: query = query.filter(ChatSession.user_id == user_id) sessions = query.order_by(ChatSession.updated_at.desc())\ .paginate(page=page, per_page=per_page, error_out=False) sessions_data = [] for session in sessions.items: session_dict = session.to_dict() session_dict['username'] = session.user.username if session.user else 'Unknown' session_dict['nickname'] = session.user.nickname if session.user else None sessions_data.append(session_dict) return jsonify({ 'sessions': sessions_data, 'total': sessions.total, 'pages': sessions.pages, 'current_page': page }), 200 except Exception as e: return jsonify({'error': f'대화 세션 조회 중 오류가 발생했습니다: {str(e)}'}), 500 @main_bp.route('/api/admin/users/', methods=['DELETE']) @admin_required def delete_user(user_id): """사용자 삭제 API""" try: user = User.query.get_or_404(user_id) # 자기 자신을 삭제하는 것은 방지 if user_id == current_user.id: return jsonify({'error': '자기 자신을 삭제할 수 없습니다.'}), 400 db.session.delete(user) db.session.commit() return jsonify({'message': '사용자가 성공적으로 삭제되었습니다.'}), 200 except Exception as e: db.session.rollback() return jsonify({'error': f'사용자 삭제 중 오류가 발생했습니다: {str(e)}'}), 500 @main_bp.route('/api/admin/gemini-api-key', methods=['GET']) @admin_required def get_gemini_api_key(): """Gemini API 키 조회""" try: # SystemConfig에서 API 키 가져오기 (테이블이 없으면 빈 문자열 반환) api_key = SystemConfig.get_config('gemini_api_key', '') # 보안을 위해 마스킹된 값 반환 (처음 8자만 표시) masked_key = api_key[:8] + '...' if api_key and len(api_key) > 8 else '' return jsonify({ 'has_api_key': bool(api_key), 'masked_key': masked_key }), 200 except Exception as e: print(f"[Gemini API 키 조회] 오류: {e}") import traceback traceback.print_exc() return jsonify({'error': f'API 키 조회 중 오류가 발생했습니다: {str(e)}'}), 500 @main_bp.route('/api/admin/gemini-api-key', methods=['POST']) @admin_required def set_gemini_api_key(): """Gemini API 키 저장/업데이트""" try: if not request.is_json: return jsonify({'error': 'Content-Type이 application/json이 아닙니다.'}), 400 data = request.json if not data: return jsonify({'error': '요청 데이터가 없습니다.'}), 400 api_key = data.get('api_key', '').strip() if not api_key: return jsonify({'error': 'API 키를 입력해주세요.'}), 400 # API 키 저장 (SystemConfig.set_config 내부에서 테이블 생성 처리) SystemConfig.set_config( key='gemini_api_key', value=api_key, description='Google Gemini API 키' ) # Gemini 클라이언트에 API 키 재로드 알림 try: from app.gemini_client import reset_gemini_client reset_gemini_client() print(f"[Gemini] API 키가 업데이트되어 클라이언트가 재로드되었습니다.") except Exception as e: print(f"[Gemini] API 키 재로드 실패: {e}") return jsonify({ 'message': 'Gemini API 키가 성공적으로 저장되었습니다.', 'has_api_key': True }), 200 except Exception as e: db.session.rollback() print(f"[Gemini API 키 저장] 오류: {e}") import traceback traceback.print_exc() return jsonify({'error': f'API 키 저장 중 오류가 발생했습니다: {str(e)}'}), 500 @main_bp.route('/api/ollama/models', methods=['GET']) @login_required def get_ollama_models(): """Ollama 및 Gemini에서 사용 가능한 모델 목록 가져오기""" try: all_models = [] # 1. Ollama 모델 목록 가져오기 try: response = requests.get(f'{OLLAMA_BASE_URL}/api/tags', timeout=5) if response.status_code == 200: data = response.json() ollama_models = [{'name': model['name'], 'type': 'ollama'} for model in data.get('models', [])] all_models.extend(ollama_models) print(f"[모델 목록] Ollama 모델 {len(ollama_models)}개 추가") except Exception as e: print(f"[모델 목록] Ollama 모델 목록 조회 실패: {e}") # 2. Gemini 모델 목록 가져오기 try: gemini_client = get_gemini_client() if gemini_client.is_configured(): gemini_models = gemini_client.get_available_models() gemini_model_list = [{'name': f'gemini:{model_name}', 'type': 'gemini'} for model_name in gemini_models] all_models.extend(gemini_model_list) print(f"[모델 목록] Gemini 모델 {len(gemini_model_list)}개 추가") else: print(f"[모델 목록] Gemini API 키가 설정되지 않아 Gemini 모델을 불러올 수 없습니다.") except Exception as e: print(f"[모델 목록] Gemini 모델 목록 조회 실패: {e}") if all_models: return jsonify({'models': all_models}) else: return jsonify({'error': '사용 가능한 모델이 없습니다. Ollama가 실행 중인지, 또는 Gemini API 키가 설정되었는지 확인하세요.', 'models': []}), 500 except Exception as e: return jsonify({'error': f'모델 목록을 가져오는 중 오류가 발생했습니다: {str(e)}', 'models': []}), 500 @main_bp.route('/api/chat', methods=['POST']) @login_required def chat(): """채팅 API 엔드포인트""" try: data = request.json message = data.get('message', '') model = data.get('model', '') file_ids = [int(fid) for fid in data.get('file_ids', []) if fid] # 선택한 웹소설 파일 ID 목록 session_id = data.get('session_id', None) # 대화 세션 ID (정수로 변환) if not message: return jsonify({'error': '메시지가 필요합니다.'}), 400 # 모델이 선택된 경우 Ollama 사용 if model: try: # RAG: 질문과 관련된 청크 검색 context = "" use_rag = True # RAG 사용 여부 if use_rag: print(f"\n[RAG 검색] 모델: {model}, 질문: {message[:50]}...") print(f"[RAG 검색] 선택된 파일 ID: {file_ids if file_ids else '없음 (모든 파일 검색)'}") # 1단계: Parent Chunk로 문맥 파악 parent_chunks = [] if file_ids: print(f"[RAG 검색 1단계] Parent Chunk 조회 시작...") parent_chunks = get_parent_chunks_for_files(file_ids) print(f"[RAG 검색 1단계] Parent Chunk 조회 완료: {len(parent_chunks)}개 파일") # 2단계: 벡터 검색 + 리랭킹으로 Child Chunk 정밀 검색 print(f"[RAG 검색 2단계] 벡터 검색 + 리랭킹 시작...") relevant_chunks = search_relevant_chunks( query=message, file_ids=file_ids if file_ids else None, model_name=model, top_k=5, # 리랭킹 후 상위 5개만 선택 min_score=0.5 # 최소 점수 임계값 ) print(f"[RAG 검색 2단계] 벡터 검색 + 리랭킹 완료: {len(relevant_chunks)}개 청크 (상위 5개)") # 컨텍스트 구성 context_parts = [] # Parent Chunk 정보 추가 (문맥 파악용) if parent_chunks: parent_context_sections = [] for parent_chunk in parent_chunks: file = parent_chunk.file file_info = f"\n=== {file.original_filename} 전체 개요 ===\n" sections = [] if parent_chunk.world_view: sections.append(f"[세계관]\n{parent_chunk.world_view}") if parent_chunk.characters: sections.append(f"[주요 캐릭터]\n{parent_chunk.characters}") if parent_chunk.story: sections.append(f"[주요 스토리]\n{parent_chunk.story}") if parent_chunk.episodes: sections.append(f"[주요 에피소드]\n{parent_chunk.episodes}") if parent_chunk.others: sections.append(f"[기타 정보]\n{parent_chunk.others}") if sections: parent_context_sections.append(file_info + "\n\n".join(sections)) if parent_context_sections: parent_context = "\n\n".join(parent_context_sections) context_parts.append(f"다음은 웹소설의 전체적인 문맥과 개요입니다:\n\n{parent_context}") print(f"[RAG 검색] Parent Chunk 컨텍스트 추가: {len(parent_context)}자") # Child Chunk 정보 추가 (정밀 검색 결과) if relevant_chunks: child_context_parts = [] seen_files = set() for chunk in relevant_chunks: file = chunk.file if file.original_filename not in seen_files: seen_files.add(file.original_filename) print(f"[RAG 검색] 사용된 파일: {file.original_filename} (모델: {file.model_name})") child_context_parts.append(f"[{file.original_filename} - 청크 {chunk.chunk_index + 1}]\n{chunk.content}") if child_context_parts: # 컨텍스트 길이 확인 및 최적화 full_child_context = "\n\n".join(child_context_parts) child_context_length = len(full_child_context) # Child Chunk 컨텍스트가 너무 길면 일부만 사용 (최대 15000자) if child_context_length > 15000: truncated_parts = [] current_length = 0 for part in child_context_parts: if current_length + len(part) > 15000: break truncated_parts.append(part) current_length += len(part) full_child_context = "\n\n".join(truncated_parts) print(f"[RAG 검색] Child Chunk 컨텍스트 길이 조절: {child_context_length}자 → {len(full_child_context)}자") context_parts.append(f"다음은 질문과 관련된 웹소설의 구체적인 내용입니다 (정밀 검색 결과, 총 {len(relevant_chunks)}개 청크):\n\n{full_child_context}") print(f"[RAG 검색] Child Chunk 컨텍스트 추가: {len(full_child_context)}자") # 최종 컨텍스트 구성 if context_parts: full_context = "\n\n" + "\n\n---\n\n".join(context_parts) + "\n\n" # Parent Chunk와 Child Chunk 모두 있는 경우 if parent_chunks and relevant_chunks: context = f"""다음은 질문에 답하기 위한 웹소설 정보입니다: {full_context} 위 정보를 참고하여 답변해주세요: - 먼저 전체적인 문맥(Parent Chunk)을 이해하여 웹소설의 배경과 설정을 파악하세요. - 그 다음 구체적인 내용(Child Chunk)을 통해 질문에 대한 정확한 답변을 제공하세요. - 웹소설의 맥락과 스토리를 고려하여 일관성 있는 답변을 작성하세요. 중요: 질문에 답변할 때는 반드시 제공된 [소설 본문] 내의 내용을 근거로 해야 합니다. 답변의 각 문장 끝에는 참고한 본문의 문장을 [근거: "문장 내용..."] 형식으로 반드시 붙이세요. 근거를 찾을 수 없다면 "내용을 찾을 수 없습니다"라고 답하고 지어내지 마세요. 질문: """ elif parent_chunks: # Parent Chunk만 있는 경우 context = f"""다음은 웹소설의 전체적인 문맥과 개요입니다: {full_context} 위 정보를 참고하여 질문에 답변해주세요. 웹소설의 배경과 설정을 고려하여 답변하세요. 중요: 질문에 답변할 때는 반드시 제공된 [소설 본문] 내의 내용을 근거로 해야 합니다. 답변의 각 문장 끝에는 참고한 본문의 문장을 [근거: "문장 내용..."] 형식으로 반드시 붙이세요. 근거를 찾을 수 없다면 "내용을 찾을 수 없습니다"라고 답하고 지어내지 마세요. 질문: """ else: # Child Chunk만 있는 경우 context = f"""다음은 질문과 관련된 웹소설의 구체적인 내용입니다: {full_context} 위 내용을 충분히 참고하여 다음 질문에 정확하고 상세하게 답변해주세요. 웹소설의 맥락과 스토리를 고려하여 답변해주세요. 중요: 질문에 답변할 때는 반드시 제공된 [소설 본문] 내의 내용을 근거로 해야 합니다. 답변의 각 문장 끝에는 참고한 본문의 문장을 [근거: "문장 내용..."] 형식으로 반드시 붙이세요. 근거를 찾을 수 없다면 "내용을 찾을 수 없습니다"라고 답하고 지어내지 마세요. 질문: """ context += message print(f"[RAG 검색] 최종 컨텍스트 생성 완료 (Parent Chunk: {len(parent_chunks)}개, Child Chunk: {len(relevant_chunks)}개, 총 {len(context)}자)") else: # RAG 검색 결과가 없으면 기존 방식 사용 print(f"[RAG 검색] 관련 청크를 찾지 못했습니다. 전체 파일 내용 사용") use_rag = False # RAG 검색 결과가 없거나 비활성화된 경우 기존 방식 사용 if not context and not use_rag: if file_ids: # 선택한 파일 ID와 이어서 업로드된 파일들도 포함 expanded_file_ids = list(file_ids) for file_id in file_ids: # 원본 파일인 경우 이어서 업로드된 파일들도 포함 child_files = UploadedFile.query.filter_by(parent_file_id=file_id).all() expanded_file_ids.extend([child.id for child in child_files]) uploaded_files = UploadedFile.query.filter( UploadedFile.id.in_(expanded_file_ids), UploadedFile.model_name == model ).all() print(f"[파일 사용] 선택된 파일 ID로 조회 (이어서 업로드 포함): {len(uploaded_files)}개 파일") else: # 파일 ID가 없으면 해당 모델의 모든 파일 사용 (원본 및 이어서 업로드 포함) uploaded_files = UploadedFile.query.filter_by(model_name=model).all() print(f"[파일 사용] 모델 '{model}'의 모든 파일 사용: {len(uploaded_files)}개 파일") if uploaded_files: print(f"[파일 사용] 사용되는 파일 목록:") for f in uploaded_files: is_child = f.parent_file_id is not None prefix = " └─ " if is_child else " - " print(f"{prefix}{f.original_filename} (모델: {f.model_name})") context_parts = [] for file in uploaded_files: try: if os.path.exists(file.file_path): encoding = 'utf-8' try: with open(file.file_path, 'r', encoding=encoding) as f: file_content = f.read() except UnicodeDecodeError: with open(file.file_path, 'r', encoding='cp949') as f: file_content = f.read() # 파일 내용이 너무 길면 일부만 사용 (최대 20000자로 증가) if len(file_content) > 20000: file_content = file_content[:20000] + "..." context_parts.append(f"[{file.original_filename}]\n{file_content}") except Exception as e: print(f"파일 읽기 오류 ({file.original_filename}): {str(e)}") continue if context_parts: context = "\n\n".join(context_parts) context = f"""다음은 학습된 웹소설 내용입니다: {context} 위 내용을 참고하여 다음 질문에 답변해주세요. 중요: 질문에 답변할 때는 반드시 제공된 [소설 본문] 내의 내용을 근거로 해야 합니다. 답변의 각 문장 끝에는 참고한 본문의 문장을 [근거: "문장 내용..."] 형식으로 반드시 붙이세요. 근거를 찾을 수 없다면 "내용을 찾을 수 없습니다"라고 답하고 지어내지 마세요. 질문: """ # 프롬프트 구성 full_prompt = context + message if context else message # 모델 타입 확인 (Gemini 또는 Ollama) is_gemini = model.startswith('gemini:') if is_gemini: # Gemini API 호출 gemini_model_name = model.replace('gemini:', '') print(f"[Gemini] 모델: {gemini_model_name}, 질문: {message[:50]}...") gemini_client = get_gemini_client() if not gemini_client.is_configured(): return jsonify({'error': 'Gemini API 키가 설정되지 않았습니다. GEMINI_API_KEY 환경 변수를 설정하세요.'}), 500 result = gemini_client.generate_response( prompt=full_prompt, model_name=gemini_model_name, temperature=0.7, max_output_tokens=8192 ) if result['error']: return jsonify({'error': result['error']}), 500 response_text = result['response'] else: # Ollama API 호출 ollama_response = requests.post( f'{OLLAMA_BASE_URL}/api/generate', json={ 'model': model, 'prompt': full_prompt, 'stream': False }, timeout=120 # 파일이 많을 수 있으므로 타임아웃 증가 ) if ollama_response.status_code != 200: # 오류 상세 정보 가져오기 try: error_detail = ollama_response.json().get('error', ollama_response.text[:200]) except: error_detail = ollama_response.text[:200] if ollama_response.text else '상세 정보 없음' if ollama_response.status_code == 404: error_msg = f'모델 "{model}"을(를) 찾을 수 없습니다. 모델이 Ollama에 설치되어 있는지 확인하세요. (오류: {error_detail})' else: error_msg = f'Ollama 서버 오류: {ollama_response.status_code} (오류: {error_detail})' return jsonify({'error': error_msg}), ollama_response.status_code ollama_data = ollama_response.json() response_text = ollama_data.get('response', '응답을 생성할 수 없습니다.') # 대화 세션에 메시지 저장 (Gemini와 Ollama 공통) session_id = data.get('session_id') session_dict = None if session_id: try: session = ChatSession.query.filter_by( id=session_id, user_id=current_user.id ).first() if session: # 사용자 메시지가 이미 저장되어 있는지 확인 (중복 방지) # 가장 최근 메시지를 확인하여 중복 저장 방지 latest_user_msg = ChatMessage.query.filter_by( session_id=session_id, role='user' ).order_by(ChatMessage.created_at.desc()).first() # 최근 10초 이내에 같은 내용의 메시지가 없으면 저장 should_save = True if latest_user_msg: time_diff = (datetime.utcnow() - latest_user_msg.created_at).total_seconds() if latest_user_msg.content == message and time_diff < 10: should_save = False print(f"[중복 방지] 최근 {time_diff:.2f}초 전에 같은 메시지가 저장되어 있습니다. 저장을 건너뜁니다.") if should_save: user_msg = ChatMessage( session_id=session_id, role='user', content=message ) db.session.add(user_msg) print(f"[메시지 저장] 사용자 메시지 저장: {message[:50]}...") # 세션 제목 업데이트 (첫 사용자 메시지인 경우) title_needs_update = ( not session.title or session.title.strip() == '' or session.title == '새 대화' ) if title_needs_update and message.strip(): # 메시지 내용을 제목으로 사용 (최대 30자) title = message.strip()[:30] if len(message.strip()) > 30: title += '...' session.title = title print(f"[세션 제목] 업데이트: '{title}' (원본 길이: {len(message.strip())}자)") elif title_needs_update: print(f"[세션 제목] 메시지가 비어있어 제목을 업데이트하지 않습니다.") else: print(f"[메시지 저장] 중복 메시지로 인해 저장을 건너뜁니다.") # AI 응답 저장 ai_msg = ChatMessage( session_id=session_id, role='ai', content=response_text ) db.session.add(ai_msg) session.updated_at = datetime.utcnow() db.session.commit() # 세션 정보를 응답에 포함 (제목 업데이트 반영) session_dict = session.to_dict() except Exception as e: print(f"메시지 저장 오류: {str(e)}") db.session.rollback() session_dict = None response_data = {'response': response_text, 'session_id': session_id} if session_dict: response_data['session'] = session_dict return jsonify(response_data) except requests.exceptions.ConnectionError: return jsonify({'error': 'Ollama 서버에 연결할 수 없습니다. Ollama가 실행 중인지 확인하세요.'}), 503 except requests.exceptions.Timeout: return jsonify({'error': '응답 시간이 초과되었습니다. 더 짧은 메시지를 시도해보세요.'}), 504 except Exception as e: return jsonify({'error': f'Ollama 통신 중 오류가 발생했습니다: {str(e)}'}), 500 else: # 모델이 선택되지 않은 경우 기본 응답 response_text = f"안녕하세요! '{message}'에 대한 답변을 준비 중입니다.\n\n좌측 하단에서 로컬 AI 모델을 선택하면 더 정확한 답변을 제공할 수 있습니다." return jsonify({'response': response_text}) except Exception as e: return jsonify({'error': f'채팅 처리 중 오류가 발생했습니다: {str(e)}'}), 500 @main_bp.route('/api/upload', methods=['POST']) @login_required def upload_file(): """웹소설 파일 업로드""" import sys import traceback # 모든 출력을 즉시 플러시하여 로그가 바로 보이도록 def log_print(*args, **kwargs): print(*args, **kwargs) sys.stdout.flush() try: log_print(f"\n{'='*60}") log_print(f"=== 파일 업로드 요청 시작 ===") log_print(f"요청 메서드: {request.method}") log_print(f"Content-Type: {request.content_type}") log_print(f"Content-Length: {request.content_length}") log_print(f"Form 데이터 키: {list(request.form.keys())}") log_print(f"Files 키: {list(request.files.keys())}") log_print(f"사용자: {current_user.username if current_user else 'None'}") log_print(f"{'='*60}\n") # 업로드 폴더 확인 및 생성 try: ensure_upload_folder() log_print(f"[1/8] 업로드 폴더 확인 완료: {UPLOAD_FOLDER}") except Exception as e: error_msg = f'업로드 폴더를 준비할 수 없습니다: {str(e)}' log_print(f"[ERROR] {error_msg}") traceback.print_exc() return jsonify({'error': error_msg, 'step': 'folder_check'}), 500 if 'file' not in request.files: error_msg = '파일이 없습니다.' log_print(f"[ERROR] {error_msg}") log_print(f"사용 가능한 키: {list(request.files.keys())}") return jsonify({'error': error_msg, 'step': 'file_check'}), 400 file = request.files['file'] model_name = request.form.get('model_name', '').strip() parent_file_id = request.form.get('parent_file_id', None) # 이어서 업로드할 경우 원본 파일 ID log_print(f"[2/8] 파일 수신: {file.filename if file else 'None'}") log_print(f"[2/8] 모델명: {model_name if model_name else 'None (비어있음)'}") log_print(f"[2/8] 이어서 업로드: {parent_file_id if parent_file_id else '아니오'}") if file.filename == '': error_msg = '파일명이 없습니다.' log_print(f"[ERROR] {error_msg}") return jsonify({'error': error_msg, 'step': 'filename_check'}), 400 # 모델명 검증 if not model_name: error_msg = 'AI 모델을 선택해주세요.' log_print(f"[ERROR] {error_msg}") return jsonify({'error': error_msg, 'step': 'model_check'}), 400 # parent_file_id 검증 (이어서 업로드인 경우) parent_file = None if parent_file_id: try: parent_file_id = int(parent_file_id) parent_file = UploadedFile.query.filter_by( id=parent_file_id, uploaded_by=current_user.id ).first() if not parent_file: error_msg = '원본 파일을 찾을 수 없습니다.' log_print(f"[ERROR] {error_msg}") return jsonify({'error': error_msg, 'step': 'parent_file_check'}), 404 # 같은 모델인지 확인 if parent_file.model_name != model_name: error_msg = '같은 모델의 파일에만 이어서 업로드할 수 있습니다.' log_print(f"[ERROR] {error_msg}") return jsonify({'error': error_msg, 'step': 'model_mismatch'}), 400 log_print(f"[이어서 업로드] 원본 파일: {parent_file.original_filename} (ID: {parent_file_id})") except (ValueError, TypeError): parent_file_id = None log_print(f"[경고] 잘못된 parent_file_id: {parent_file_id}") log_print(f"[3/8] 업로드 시도: {file.filename}, 모델: {model_name}") if not allowed_file(file.filename): error_msg = f'허용되지 않은 파일 형식입니다. 허용 형식: {", ".join(ALLOWED_EXTENSIONS)}' log_print(f"[ERROR] {error_msg}") return jsonify({'error': error_msg, 'step': 'file_type_check'}), 400 log_print(f"[4/8] 파일 형식 확인 완료: {file.filename}") # 파일 크기 확인 (Content-Length 헤더 사용) file_size = 0 try: # Content-Length 헤더 확인 if request.content_length: file_size = request.content_length print(f"Content-Length로 파일 크기 확인: {file_size} bytes") else: # Content-Length가 없으면 파일 스트림에서 크기 확인 시도 try: # 파일 스트림의 현재 위치 저장 current_pos = file.tell() # 파일 끝으로 이동 file.seek(0, os.SEEK_END) file_size = file.tell() # 원래 위치로 복원 file.seek(current_pos, os.SEEK_SET) print(f"파일 스트림으로 크기 확인: {file_size} bytes") except (AttributeError, IOError, OSError) as e: print(f"파일 크기 확인 실패 (저장 후 확인): {str(e)}") file_size = 0 # 저장 후 확인하도록 0으로 설정 except Exception as e: print(f"파일 크기 확인 오류: {str(e)}") file_size = 0 # 저장 후 확인하도록 0으로 설정 # 파일 크기 사전 체크 (가능한 경우에만) if file_size > 0: if file_size > 100 * 1024 * 1024: # 100MB print(f"파일 크기 초과: {file_size} bytes") return jsonify({'error': '파일 크기가 너무 큽니다. 최대 100MB까지 업로드 가능합니다.'}), 400 if file_size == 0: print("빈 파일 업로드 시도") return jsonify({'error': '빈 파일은 업로드할 수 없습니다.'}), 400 # 안전한 파일명 생성 original_filename = file.filename filename = secure_filename(original_filename) if not filename: return jsonify({'error': '유효하지 않은 파일명입니다.'}), 400 unique_filename = f"{uuid.uuid4().hex}_{filename}" file_path = os.path.join(UPLOAD_FOLDER, unique_filename) # 파일 저장 try: log_print(f"[6/8] 파일 저장 시도: {file_path}") file.save(file_path) log_print(f"[6/8] 파일 저장 완료: {file_path}") except IOError as e: error_msg = f'파일 저장 중 오류가 발생했습니다: {str(e)}' log_print(f"[ERROR] 파일 저장 IOError: {error_msg}") traceback.print_exc() return jsonify({'error': error_msg, 'step': 'file_save'}), 500 except PermissionError as e: error_msg = f'파일 저장 권한 오류: {str(e)}' log_print(f"[ERROR] 파일 저장 PermissionError: {error_msg}") traceback.print_exc() return jsonify({'error': error_msg, 'step': 'file_save_permission'}), 500 except Exception as e: error_msg = f'파일 저장 실패: {str(e)}' log_print(f"[ERROR] 파일 저장 Exception: {error_msg}") traceback.print_exc() return jsonify({'error': error_msg, 'step': 'file_save'}), 500 # 저장된 파일 크기 확인 if not os.path.exists(file_path): error_msg = '파일이 저장되지 않았습니다.' print(f"파일 존재 확인 실패: {file_path}") return jsonify({'error': error_msg}), 500 saved_file_size = os.path.getsize(file_path) if saved_file_size == 0: os.remove(file_path) # 빈 파일 삭제 error_msg = '파일이 제대로 저장되지 않았습니다.' print(f"빈 파일 삭제: {file_path}") return jsonify({'error': error_msg}), 500 print(f"저장된 파일 크기: {saved_file_size} bytes") # 데이터베이스에 저장 try: log_print(f"[7/8] 데이터베이스 저장 시도: {original_filename}") uploaded_file = UploadedFile( filename=unique_filename, original_filename=original_filename, file_path=file_path, file_size=saved_file_size, model_name=model_name, # 이미 검증됨 uploaded_by=current_user.id, parent_file_id=parent_file_id if parent_file else None # 이어서 업로드인 경우 ) db.session.add(uploaded_file) db.session.flush() # ID를 얻기 위해 flush log_print(f"[7/8] 데이터베이스 flush 완료, 파일 ID: {uploaded_file.id}") # 텍스트 파일인 경우 청크로 분할하여 저장 (RAG용) if original_filename.lower().endswith(('.txt', '.md')): try: log_print(f"[7/8] 청크 생성 시작: {original_filename}") log_print(f"[7/8] 파일 ID: {uploaded_file.id}") # 파일 내용 읽기 encoding = 'utf-8' try: with open(file_path, 'r', encoding=encoding) as f: content = f.read() log_print(f"[7/8] UTF-8 인코딩으로 파일 읽기 성공: {len(content)}자") except UnicodeDecodeError: log_print(f"[7/8] UTF-8 인코딩 실패, CP949 시도: {original_filename}") with open(file_path, 'r', encoding='cp949') as f: content = f.read() log_print(f"[7/8] CP949 인코딩으로 파일 읽기 성공: {len(content)}자") # 청크 생성 및 저장 log_print(f"[7/8] 청크 생성 함수 호출 중...") chunk_count = create_chunks_for_file(uploaded_file.id, content) if chunk_count > 0: log_print(f"[7/8] ✅ 성공: 파일 {original_filename}을 {chunk_count}개의 청크로 분할했습니다.") print(f"파일 {original_filename}을 {chunk_count}개의 청크로 분할했습니다.") else: log_print(f"[7/8] ⚠️ 경고: 청크가 생성되지 않았습니다. (파일이 너무 짧거나 비어있을 수 있습니다.)") print(f"경고: 파일 {original_filename}에 대한 청크가 생성되지 않았습니다.") # Parent Chunk 생성 (AI 분석) log_print(f"[7/9] Parent Chunk 생성 시작 (AI 분석)...") parent_chunk = create_parent_chunk_with_ai(uploaded_file.id, content, model_name) if parent_chunk: log_print(f"[7/9] ✅ Parent Chunk 생성 완료: {original_filename}") print(f"Parent Chunk가 생성되었습니다: {original_filename}") else: log_print(f"[7/9] ⚠️ 경고: Parent Chunk 생성 실패: {original_filename}") print(f"경고: Parent Chunk 생성에 실패했습니다: {original_filename}") except Exception as e: error_msg = f"청크 생성 중 오류: {str(e)}" log_print(f"[7/8] ❌ 오류: {error_msg}") print(error_msg) import traceback traceback.print_exc() # 최종 청크 개수 확인 및 저장 chunk_count = 0 if original_filename.lower().endswith(('.txt', '.md')): chunk_count = DocumentChunk.query.filter_by(file_id=uploaded_file.id).count() log_print(f"[8/8] 최종 청크 개수 확인: {chunk_count}개") db.session.commit() log_print(f"[8/8] 데이터베이스 커밋 완료: {original_filename}") log_print(f"[8/8] 연결된 모델: {model_name}") log_print(f"[8/8] 생성된 청크 수: {chunk_count}") # 학습 상태 요약 if chunk_count > 0: log_print(f"[8/8] ✅ AI 학습 준비 완료: {chunk_count}개 청크가 저장되어 RAG 검색에 사용 가능합니다.") else: log_print(f"[8/8] ⚠️ 경고: 청크가 생성되지 않아 RAG 검색이 불가능합니다.") log_print(f"{'='*60}") log_print(f"=== 파일 업로드 성공 ===") log_print(f"{'='*60}\n") except Exception as e: db.session.rollback() error_msg = f'데이터베이스 저장 중 오류가 발생했습니다: {str(e)}' log_print(f"[ERROR] 데이터베이스 저장 오류: {error_msg}") traceback.print_exc() # 데이터베이스 저장 실패 시 파일도 삭제 if os.path.exists(file_path): try: os.remove(file_path) log_print(f"오류로 인한 파일 삭제: {file_path}") except Exception as del_e: log_print(f"파일 삭제 실패: {str(del_e)}") return jsonify({'error': error_msg, 'step': 'database_save'}), 500 log_print(f"[8/8] 업로드 완료 - 파일: {original_filename}, 모델: {model_name}, 크기: {saved_file_size} bytes") return jsonify({ 'message': f'파일이 성공적으로 업로드되었습니다. (모델: {model_name})', 'file': uploaded_file.to_dict(), 'model_name': model_name, 'chunk_count': chunk_count if 'chunk_count' in locals() else 0 }), 200 except Exception as e: db.session.rollback() error_msg = str(e) error_type = type(e).__name__ log_print(f"\n{'='*60}") log_print(f"=== 업로드 처리 중 예외 발생 ===") log_print(f"예외 타입: {error_type}") log_print(f"에러 메시지: {error_msg}") traceback.print_exc() log_print(f"{'='*60}\n") # 파일 크기 초과 오류 처리 if '413' in error_msg or 'Request Entity Too Large' in error_msg or error_type == 'RequestEntityTooLarge': return jsonify({'error': '파일 크기가 너무 큽니다. 최대 100MB까지 업로드 가능합니다.', 'step': 'file_size'}), 413 return jsonify({'error': f'파일 업로드 중 오류가 발생했습니다: {error_type}: {error_msg}', 'step': 'exception'}), 500 @main_bp.route('/api/files', methods=['GET']) @login_required def get_files(): """업로드된 파일 목록 조회""" try: model_name = request.args.get('model_name', None) # 원본 파일만 조회 (parent_file_id가 None인 파일) query = UploadedFile.query.filter_by(parent_file_id=None) if model_name: query = query.filter_by(model_name=model_name) print(f"[파일 조회] 모델 '{model_name}' 필터링") files = query.order_by(UploadedFile.uploaded_at.desc()).all() # 각 원본 파일에 대해 이어서 업로드된 파일도 포함 files_with_children = [] for file in files: file_dict = file.to_dict() # 청크 개수 추가 chunk_count = DocumentChunk.query.filter_by(file_id=file.id).count() file_dict['chunk_count'] = chunk_count # 이어서 업로드된 파일들도 조회 child_files = UploadedFile.query.filter_by(parent_file_id=file.id).order_by(UploadedFile.uploaded_at.asc()).all() child_files_dict = [] for child in child_files: child_dict = child.to_dict() child_chunk_count = DocumentChunk.query.filter_by(file_id=child.id).count() child_dict['chunk_count'] = child_chunk_count child_files_dict.append(child_dict) file_dict['child_files'] = child_files_dict files_with_children.append(file_dict) # 모델별 통계 정보 추가 (원본 파일만 카운트) model_stats = {} if not model_name: # 모든 모델의 통계 (원본 파일만) all_files = UploadedFile.query.filter_by(parent_file_id=None).all() for file in all_files: model = file.model_name or '미지정' if model not in model_stats: model_stats[model] = {'count': 0, 'total_size': 0} model_stats[model]['count'] += 1 model_stats[model]['total_size'] += file.file_size else: # 특정 모델의 통계 model_stats[model_name] = { 'count': len(files), 'total_size': sum(f.file_size for f in files) } print(f"[파일 조회] 조회된 원본 파일 수: {len(files)}개") return jsonify({ 'files': files_with_children, 'model_stats': model_stats, 'filtered_model': model_name }), 200 except Exception as e: return jsonify({'error': f'파일 목록 조회 중 오류가 발생했습니다: {str(e)}'}), 500 @main_bp.route('/api/files//chunks', methods=['GET']) @login_required def get_file_chunks(file_id): """파일의 청크 정보 조회 (학습 상태 확인용)""" try: file = UploadedFile.query.filter_by(id=file_id, uploaded_by=current_user.id).first() if not file: return jsonify({'error': '파일을 찾을 수 없습니다.'}), 404 chunks = DocumentChunk.query.filter_by(file_id=file_id).order_by(DocumentChunk.chunk_index.asc()).all() total_chunks = len(chunks) # 샘플 청크 (처음 3개) sample_chunks = [] for chunk in chunks[:3]: sample_chunks.append({ 'index': chunk.chunk_index, 'content_preview': chunk.content[:100] + '...' if len(chunk.content) > 100 else chunk.content, 'content_length': len(chunk.content) }) return jsonify({ 'file_id': file_id, 'filename': file.original_filename, 'model_name': file.model_name, 'total_chunks': total_chunks, 'sample_chunks': sample_chunks, 'learning_status': 'ready' if total_chunks > 0 else 'not_ready', 'message': f'{total_chunks}개 청크가 저장되어 RAG 검색에 사용 가능합니다.' if total_chunks > 0 else '청크가 생성되지 않아 RAG 검색이 불가능합니다.' }), 200 except Exception as e: return jsonify({'error': f'청크 정보 조회 중 오류가 발생했습니다: {str(e)}'}), 500 @main_bp.route('/api/files//parent-chunk', methods=['GET']) @login_required def get_file_parent_chunk(file_id): """파일의 Parent Chunk 조회""" try: file = UploadedFile.query.filter_by(id=file_id, uploaded_by=current_user.id).first() if not file: return jsonify({'error': '파일을 찾을 수 없습니다.'}), 404 parent_chunk = ParentChunk.query.filter_by(file_id=file_id).first() if not parent_chunk: return jsonify({ 'file_id': file_id, 'filename': file.original_filename, 'has_parent_chunk': False, 'message': 'Parent Chunk가 생성되지 않았습니다.' }), 200 return jsonify({ 'file_id': file_id, 'filename': file.original_filename, 'has_parent_chunk': True, 'parent_chunk': parent_chunk.to_dict(), 'message': 'Parent Chunk가 존재합니다.' }), 200 except Exception as e: return jsonify({'error': f'Parent Chunk 조회 중 오류가 발생했습니다: {str(e)}'}), 500 @main_bp.route('/api/files/', methods=['DELETE']) @login_required def delete_file(file_id): """업로드된 파일 삭제 (연관된 모든 파일도 함께 삭제)""" try: file = UploadedFile.query.get_or_404(file_id) # 원본 파일인 경우 (parent_file_id가 None인 경우) # 이어서 업로드된 모든 파일도 함께 삭제 files_to_delete = [] if file.parent_file_id is None: # 원본 파일이면, 이어서 업로드된 모든 파일도 찾아서 삭제 child_files = UploadedFile.query.filter_by(parent_file_id=file_id).all() files_to_delete = [file] + child_files print(f"[파일 삭제] 원본 파일 삭제: {file.original_filename}, 연관 파일 {len(child_files)}개도 함께 삭제") else: # 이어서 업로드된 파일이면 원본 파일도 함께 삭제 parent_file = UploadedFile.query.get(file.parent_file_id) if parent_file: # 원본 파일과 모든 연관 파일 삭제 all_child_files = UploadedFile.query.filter_by(parent_file_id=file.parent_file_id).all() files_to_delete = [parent_file] + all_child_files print(f"[파일 삭제] 이어서 업로드된 파일 삭제: {file.original_filename}, 원본 및 연관 파일 {len(all_child_files)}개도 함께 삭제") else: files_to_delete = [file] deleted_count = 0 deleted_files = [] for file_to_delete in files_to_delete: try: # 파일 시스템에서 삭제 if os.path.exists(file_to_delete.file_path): os.remove(file_to_delete.file_path) print(f"[파일 삭제] 파일 시스템에서 삭제: {file_to_delete.file_path}") # 관련 Child Chunk (DocumentChunk) 삭제 child_chunk_count = DocumentChunk.query.filter_by(file_id=file_to_delete.id).count() if child_chunk_count > 0: DocumentChunk.query.filter_by(file_id=file_to_delete.id).delete() print(f"[파일 삭제] Child Chunk {child_chunk_count}개 삭제 완료") # 벡터 DB에서도 해당 파일의 청크 삭제 try: vector_db = get_vector_db() vector_db.delete_chunks_by_file_id(file_to_delete.id) print(f"[파일 삭제] 벡터 DB에서 청크 삭제 완료") except Exception as vector_e: print(f"[파일 삭제] 벡터 DB 삭제 오류 (무시): {str(vector_e)}") # 관련 Parent Chunk 삭제 parent_chunk = ParentChunk.query.filter_by(file_id=file_to_delete.id).first() if parent_chunk: db.session.delete(parent_chunk) print(f"[파일 삭제] Parent Chunk 삭제 완료") deleted_files.append(file_to_delete.original_filename) db.session.delete(file_to_delete) deleted_count += 1 print(f"[파일 삭제] 데이터베이스에서 파일 삭제 완료: {file_to_delete.original_filename}") except Exception as e: print(f"[파일 삭제 오류] {file_to_delete.original_filename}: {str(e)}") import traceback traceback.print_exc() db.session.commit() message = f'파일이 성공적으로 삭제되었습니다.' if deleted_count > 1: message = f'파일 {deleted_count}개가 성공적으로 삭제되었습니다. (원본 및 연관 파일 포함)' return jsonify({ 'message': message, 'deleted_count': deleted_count, 'deleted_files': deleted_files }), 200 except Exception as e: db.session.rollback() return jsonify({'error': f'파일 삭제 중 오류가 발생했습니다: {str(e)}'}), 500 @main_bp.route('/api/files//content', methods=['GET']) @login_required def get_file_content(file_id): """업로드된 파일 내용 조회""" try: file = UploadedFile.query.get_or_404(file_id) if not os.path.exists(file.file_path): return jsonify({'error': '파일을 찾을 수 없습니다.'}), 404 # 텍스트 파일 읽기 encoding = 'utf-8' try: with open(file.file_path, 'r', encoding=encoding) as f: content = f.read() except UnicodeDecodeError: # UTF-8로 읽을 수 없으면 다른 인코딩 시도 with open(file.file_path, 'r', encoding='cp949') as f: content = f.read() return jsonify({ 'content': content, 'filename': file.original_filename }), 200 except Exception as e: return jsonify({'error': f'파일 내용 조회 중 오류가 발생했습니다: {str(e)}'}), 500 @main_bp.route('/api/chat/sessions', methods=['GET']) @login_required def get_chat_sessions(): """사용자의 대화 세션 목록 조회 (최근 20개만 표시)""" try: sessions = ChatSession.query.filter_by(user_id=current_user.id)\ .order_by(ChatSession.updated_at.desc())\ .limit(20).all() return jsonify({ 'sessions': [session.to_dict() for session in sessions] }), 200 except Exception as e: return jsonify({'error': f'대화 세션 조회 중 오류가 발생했습니다: {str(e)}'}), 500 @main_bp.route('/api/chat/sessions', methods=['POST']) @login_required def create_chat_session(): """새 대화 세션 생성""" try: data = request.json title = data.get('title', '새 대화') model_name = data.get('model_name', None) session = ChatSession( user_id=current_user.id, title=title, model_name=model_name ) db.session.add(session) db.session.commit() return jsonify({ 'message': '대화 세션이 생성되었습니다.', 'session': session.to_dict() }), 200 except Exception as e: db.session.rollback() return jsonify({'error': f'대화 세션 생성 중 오류가 발생했습니다: {str(e)}'}), 500 @main_bp.route('/api/chat/sessions/', methods=['GET']) @login_required def get_chat_session(session_id): """대화 세션 상세 조회 (메시지 포함)""" try: session = ChatSession.query.filter_by( id=session_id, user_id=current_user.id ).first_or_404() session_dict = session.to_dict() session_dict['messages'] = [msg.to_dict() for msg in session.messages] return jsonify({'session': session_dict}), 200 except Exception as e: return jsonify({'error': f'대화 세션 조회 중 오류가 발생했습니다: {str(e)}'}), 500 @main_bp.route('/api/chat/sessions/', methods=['PUT']) @login_required def update_chat_session(session_id): """대화 세션 수정 (제목 등)""" try: session = ChatSession.query.filter_by( id=session_id, user_id=current_user.id ).first_or_404() data = request.json if 'title' in data: session.title = data['title'] session.updated_at = datetime.utcnow() db.session.commit() return jsonify({ 'message': '대화 세션이 수정되었습니다.', 'session': session.to_dict() }), 200 except Exception as e: db.session.rollback() return jsonify({'error': f'대화 세션 수정 중 오류가 발생했습니다: {str(e)}'}), 500 @main_bp.route('/api/chat/sessions/', methods=['DELETE']) @login_required def delete_chat_session(session_id): """대화 세션 삭제""" try: session = ChatSession.query.filter_by( id=session_id, user_id=current_user.id ).first_or_404() db.session.delete(session) db.session.commit() return jsonify({'message': '대화 세션이 삭제되었습니다.'}), 200 except Exception as e: db.session.rollback() return jsonify({'error': f'대화 세션 삭제 중 오류가 발생했습니다: {str(e)}'}), 500 @main_bp.route('/api/chat/sessions//messages', methods=['POST']) @login_required def add_chat_message(session_id): """대화 메시지 추가""" try: session = ChatSession.query.filter_by( id=session_id, user_id=current_user.id ).first_or_404() data = request.json role = data.get('role', 'user') content = data.get('content', '') if not content: return jsonify({'error': '메시지 내용이 필요합니다.'}), 400 message = ChatMessage( session_id=session_id, role=role, content=content ) db.session.add(message) # 세션 제목 업데이트 (첫 사용자 메시지인 경우) if not session.title or session.title == '새 대화': if role == 'user': title = content[:30] + '...' if len(content) > 30 else content session.title = title session.updated_at = datetime.utcnow() db.session.commit() return jsonify({ 'message': '메시지가 추가되었습니다.', 'chat_message': message.to_dict() }), 200 except Exception as e: db.session.rollback() return jsonify({'error': f'메시지 추가 중 오류가 발생했습니다: {str(e)}'}), 500