from flask import Blueprint, render_template, request, jsonify, send_from_directory, redirect, url_for, flash from flask_login import login_user, logout_user, login_required, current_user from werkzeug.utils import secure_filename from app.database import db, UploadedFile, User, ChatSession, ChatMessage, DocumentChunk, ParentChunk, SystemConfig from app.vector_db import get_vector_db from app.gemini_client import get_gemini_client import requests import os from datetime import datetime import uuid import re import json main_bp = Blueprint('main', __name__) def admin_required(f): """관리자 권한이 필요한 데코레이터""" from functools import wraps @wraps(f) @login_required def decorated_function(*args, **kwargs): if not current_user.is_admin: # API 요청인 경우 JSON 응답 반환 if request.path.startswith('/api/'): return jsonify({'error': '관리자 권한이 필요합니다.'}), 403 flash('관리자 권한이 필요합니다.', 'error') return redirect(url_for('main.index')) return f(*args, **kwargs) return decorated_function # Ollama 기본 URL (환경 변수로 설정 가능) OLLAMA_BASE_URL = os.getenv('OLLAMA_BASE_URL', 'http://localhost:11434') # 업로드 설정 UPLOAD_FOLDER = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'uploads') ALLOWED_EXTENSIONS = {'txt', 'md', 'pdf', 'docx', 'epub'} # 업로드 폴더 경로 출력 (디버깅용) print(f"[업로드 설정] 업로드 폴더 경로: {UPLOAD_FOLDER}") print(f"[업로드 설정] 업로드 폴더 존재 여부: {os.path.exists(UPLOAD_FOLDER)}") def allowed_file(filename): return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS def ensure_upload_folder(): """업로드 폴더가 없으면 생성""" try: if not os.path.exists(UPLOAD_FOLDER): print(f"업로드 폴더 생성 중: {UPLOAD_FOLDER}") os.makedirs(UPLOAD_FOLDER, exist_ok=True) if not os.path.exists(UPLOAD_FOLDER): raise Exception(f'업로드 폴더를 생성할 수 없습니다: {UPLOAD_FOLDER}') # 폴더 쓰기 권한 확인 test_file = os.path.join(UPLOAD_FOLDER, '.write_test') try: with open(test_file, 'w') as f: f.write('test') os.remove(test_file) print(f"업로드 폴더 쓰기 권한 확인 완료: {UPLOAD_FOLDER}") except PermissionError as e: raise Exception(f'업로드 폴더에 쓰기 권한이 없습니다: {UPLOAD_FOLDER} - {str(e)}') except Exception as e: raise Exception(f'업로드 폴더 쓰기 테스트 실패: {UPLOAD_FOLDER} - {str(e)}') except Exception as e: print(f"업로드 폴더 생성 오류: {str(e)}") import traceback traceback.print_exc() raise def split_text_into_chunks(text, min_chunk_size=200, max_chunk_size=1000, overlap=150): """의미 기반 텍스트 청킹 (문장과 문단 경계를 고려하여 분할)""" if not text or len(text.strip()) == 0: return [] # 1단계: 문단 단위로 분할 (빈 줄 기준) paragraphs = re.split(r'\n\s*\n', text.strip()) paragraphs = [p.strip() for p in paragraphs if p.strip()] if not paragraphs: return [] # 2단계: 각 문단을 문장 단위로 분할 # 문장 종결 기호: . ! ? (한글과 영문 모두 지원) # 구두점 뒤에 공백이나 줄바꿈이 오는 경우 문장 종료로 간주 sentence_pattern = r'([.!?]+)(?=\s+|$)' all_sentences = [] for para in paragraphs: # 문장 분리 (구두점 포함) parts = re.split(sentence_pattern, para) combined_sentences = [] current_sentence = "" for i, part in enumerate(parts): if part.strip(): if re.match(r'^[.!?]+$', part): # 구두점인 경우 현재 문장에 추가하고 문장 완성 current_sentence += part if current_sentence.strip(): combined_sentences.append(current_sentence.strip()) current_sentence = "" else: # 텍스트인 경우 현재 문장에 추가 current_sentence += part # 마지막 문장 처리 (구두점이 없는 경우) if current_sentence.strip(): combined_sentences.append(current_sentence.strip()) # 문장이 하나도 없는 경우 (구두점이 전혀 없는 문단) if not combined_sentences and para.strip(): combined_sentences.append(para.strip()) all_sentences.extend(combined_sentences) if not all_sentences: # 문장 분리가 안 되는 경우 원본 텍스트를 그대로 반환 return [text] if text.strip() else [] # 3단계: 문장들을 모아서 의미 있는 청크 생성 chunks = [] current_chunk = [] current_size = 0 for sentence in all_sentences: sentence_size = len(sentence) # 현재 청크에 문장 추가 시 최대 크기를 초과하는 경우 if current_size + sentence_size > max_chunk_size and current_chunk: # 현재 청크 저장 (줄바꿈 유지) chunk_text = '\n'.join(current_chunk) if len(chunk_text.strip()) >= min_chunk_size: chunks.append(chunk_text) else: # 최소 크기 미만이면 다음 청크와 병합 (오버랩 효과) if chunks: chunks[-1] = chunks[-1] + '\n' + chunk_text else: chunks.append(chunk_text) # 오버랩을 위한 문장 유지 (마지막 몇 문장을 다음 청크에 포함) overlap_sentences = [] overlap_size = 0 for s in reversed(current_chunk): if overlap_size + len(s) <= overlap: overlap_sentences.insert(0, s) overlap_size += len(s) + 1 # 줄바꿈 포함 else: break current_chunk = overlap_sentences + [sentence] current_size = overlap_size + sentence_size else: # 현재 청크에 문장 추가 current_chunk.append(sentence) current_size += sentence_size + 1 # 줄바꿈 포함 # 마지막 청크 추가 if current_chunk: chunk_text = '\n'.join(current_chunk) if chunks and len(chunk_text.strip()) < min_chunk_size: # 최소 크기 미만이면 이전 청크와 병합 chunks[-1] = chunks[-1] + '\n' + chunk_text else: chunks.append(chunk_text) # 빈 청크 제거 및 최소 크기 미만 청크 처리 final_chunks = [] for chunk in chunks: chunk = chunk.strip() if chunk and len(chunk) >= min_chunk_size: final_chunks.append(chunk) elif chunk: # 최소 크기 미만 청크는 이전 청크와 병합 if final_chunks: final_chunks[-1] = final_chunks[-1] + '\n' + chunk else: final_chunks.append(chunk) return final_chunks if final_chunks else [text] if text.strip() else [] def extract_chapter_number(text): """텍스트에서 챕터 번호 추출""" # 다양한 챕터 패턴 매칭 patterns = [ r'제\s*(\d+)\s*장', # 제1장, 제 1 장 r'제\s*(\d+)\s*화', # 제1화 r'Chapter\s*(\d+)', # Chapter 1 r'CHAPTER\s*(\d+)', # CHAPTER 1 r'Ch\.\s*(\d+)', # Ch. 1 r'(\d+)\s*장', # 1장 r'(\d+)\s*화', # 1화 r'CHAPTER\s*(\d+)', # CHAPTER 1 r'chap\.\s*(\d+)', # chap. 1 r'ch\s*(\d+)', # ch 1 r'(\d+)\s*章', # 1章 ] # 텍스트의 처음 500자만 검사 (챕터 정보는 보통 앞부분에 있음) search_text = text[:500] for pattern in patterns: match = re.search(pattern, search_text, re.IGNORECASE) if match: try: chapter_num = int(match.group(1)) return chapter_num except: continue return None def split_content_by_episodes(content): """원본 웹소설을 #작품설명, #1화, #2화 등으로 분할 Returns: list: [(section_type, section_title, section_content, metadata), ...] section_type: '작품설명' or '화' section_title: '작품설명' or '1화', '2화', ... metadata: {'chapter': '#작품설명'} or {'chapter': '1화'} """ if not content or len(content.strip()) == 0: return [] sections = [] # #작품설명, #1화, #2화 등의 패턴 찾기 # 패턴: #작품설명, #1화, #2화, #10화 등 episode_pattern = r'^#\s*(작품설명|\d+화)' lines = content.split('\n') current_section_type = None current_section_title = None current_section_content = [] current_section_start_line = 0 for i, line in enumerate(lines): # 줄 시작 부분에서 #작품설명 또는 #n화 패턴 찾기 match = re.match(episode_pattern, line.strip()) if match: # 이전 섹션 저장 if current_section_type and current_section_content: section_content = '\n'.join(current_section_content).strip() if section_content: # 메타데이터 생성 if current_section_type == '작품설명': metadata = {'chapter': '#작품설명'} else: metadata = {'chapter': current_section_title} sections.append(( current_section_type, current_section_title, section_content, metadata )) # 새 섹션 시작 section_title = match.group(1) if section_title == '작품설명': current_section_type = '작품설명' current_section_title = '작품설명' else: current_section_type = '화' current_section_title = section_title # '1화', '2화' 등 current_section_content = [line] # 헤더 라인 포함 current_section_start_line = i else: # 현재 섹션에 내용 추가 if current_section_content is not None: current_section_content.append(line) # 마지막 섹션 저장 if current_section_type and current_section_content: section_content = '\n'.join(current_section_content).strip() if section_content: # 메타데이터 생성 if current_section_type == '작품설명': metadata = {'chapter': '#작품설명'} else: metadata = {'chapter': current_section_title} sections.append(( current_section_type, current_section_title, section_content, metadata )) # 섹션이 하나도 없으면 전체를 하나의 섹션으로 처리 if not sections: sections.append(( '기타', '전체', content.strip(), {'chapter': None} )) return sections def extract_metadata_with_ai(chunk_content, full_content=None, parent_chunk=None, model_name=None): """AI를 사용하여 청크의 메타데이터 추출 (화자, 등장인물, 시간적 배경, 인물 관계) Args: chunk_content: 분석할 청크 내용 full_content: 원본 웹소설 전체 내용 (인물 관계 파악용) parent_chunk: Parent Chunk 객체 (선택사항) model_name: 사용할 AI 모델명 """ try: # 원본 웹소설 전체 내용을 참조하여 인물 관계 파악 full_content_preview = "" if full_content: # 전체 내용이 너무 길면 앞부분과 뒷부분 일부만 사용 (최대 20000자) if len(full_content) > 20000: full_content_preview = full_content[:10000] + "\n... (중간 생략) ...\n" + full_content[-10000:] else: full_content_preview = full_content # 프롬프트 생성 prompt = f"""다음 웹소설 텍스트를 분석하여 아래 정보를 JSON 형식으로만 응답하세요. 원본 웹소설 전체 내용 (참고용): {full_content_preview[:50000] if full_content_preview else "없음"} 분석할 청크 텍스트: {chunk_content[:2000]} 다음 형식으로만 응답하세요 (JSON 형식): {{ "pov": "화자/시점을 설명하세요 (예: 1인칭 주인공, 3인칭 전지적 작가 등)", "characters": ["등장인물1", "등장인물2"], "time_background": "시간적 배경 설명 (예: 과거 회상, 현재 시점, 미래 등)", "character_relationships": [ {{ "character1": "인물1", "character2": "인물2", "relationship": "현재 시점에서의 관계 설명 (예: 연인, 적, 친구, 가족 등)" }} ] }} character_relationships는 이 청크에 등장하는 인물들 간의 현재 관계를 원본 웹소설 전체 내용을 참고하여 파악한 것입니다. 응답은 오직 JSON 형식만 사용하고, 다른 설명은 포함하지 마세요.""" # 모델명이 없으면 기본값 사용 (Gemini 우선 시도) if not model_name: # Gemini 시도 try: gemini_client = get_gemini_client() if gemini_client.is_configured(): result = gemini_client.generate_response( prompt=prompt, model_name="gemini-1.5-flash", temperature=0.3, max_output_tokens=500 ) if not result['error'] and result.get('response'): response_text = result['response'].strip() # JSON 추출 json_match = re.search(r'\{.*\}', response_text, re.DOTALL) if json_match: metadata = json.loads(json_match.group(0)) return metadata except: pass # 모델명이 있거나 Gemini 실패 시 해당 모델 사용 if model_name: model_name_lower = model_name.lower().strip() is_gemini = model_name_lower.startswith('gemini:') or model_name_lower.startswith('gemini-') if is_gemini: gemini_model_name = model_name.strip() if gemini_model_name.lower().startswith('gemini:'): gemini_model_name = gemini_model_name.split(':', 1)[1].strip() gemini_client = get_gemini_client() if gemini_client.is_configured(): result = gemini_client.generate_response( prompt=prompt, model_name=gemini_model_name, temperature=0.3, max_output_tokens=500 ) if not result['error'] and result.get('response'): response_text = result['response'].strip() json_match = re.search(r'\{.*\}', response_text, re.DOTALL) if json_match: metadata = json.loads(json_match.group(0)) return metadata else: # Ollama API 호출 try: ollama_response = requests.post( f'{OLLAMA_BASE_URL}/api/generate', json={ 'model': model_name, 'prompt': prompt, 'stream': False, 'options': { 'temperature': 0.3, 'num_predict': 500 } }, timeout=30 ) if ollama_response.status_code == 200: response_data = ollama_response.json() response_text = response_data.get('response', '').strip() json_match = re.search(r'\{.*\}', response_text, re.DOTALL) if json_match: metadata = json.loads(json_match.group(0)) return metadata except: pass # AI 추출 실패 시 기본값 반환 return { "pov": None, "characters": [], "time_background": None, "character_relationships": [] } except Exception as e: print(f"[메타데이터 추출] 오류: {str(e)}") return { "pov": None, "characters": [], "time_background": None, "character_relationships": [] } def extract_chunk_metadata(chunk_content, full_content=None, chunk_index=None, file_id=None, model_name=None): """청크의 메타데이터 추출 (화자, 등장인물, 시간적 배경, 인물 관계) Args: chunk_content: 분석할 청크 내용 full_content: 원본 웹소설 전체 내용 (인물 관계 파악용) chunk_index: 청크 인덱스 file_id: 파일 ID model_name: 사용할 AI 모델명 """ metadata = { "pov": None, "characters": [], "time_background": None, "character_relationships": [] } # AI를 사용한 메타데이터 추출 (화자, 등장인물, 시간적 배경, 인물 관계) # Parent Chunk가 있으면 참조 parent_chunk = None if file_id: try: parent_chunk = ParentChunk.query.filter_by(file_id=file_id).first() except: pass # 원본 웹소설 전체 내용을 참조하여 메타데이터 추출 ai_metadata = extract_metadata_with_ai(chunk_content, full_content, parent_chunk, model_name) if ai_metadata: metadata["pov"] = ai_metadata.get("pov") metadata["characters"] = ai_metadata.get("characters", []) metadata["time_background"] = ai_metadata.get("time_background") metadata["character_relationships"] = ai_metadata.get("character_relationships", []) return metadata def create_chunks_for_file(file_id, content): """파일 내용을 섹션별로 분할하여 의미 기반 청크로 저장 (벡터 DB 포함) 섹션 분할 규칙: - #작품설명부터 #1화까지: '작품설명' 섹션, 메타데이터에 #작품설명 추가 - #n화부터 #n+1화까지: 'n화' 섹션, 메타데이터에 회차 정보(n화) 추가 Args: file_id: 파일 ID content: 파일 내용 """ try: print(f"[청크 생성] 파일 ID {file_id}에 대한 청크 생성 시작") print(f"[청크 생성] 원본 텍스트 길이: {len(content)}자") # 파일 정보 가져오기 (모델명 등) uploaded_file = UploadedFile.query.get(file_id) model_name = uploaded_file.model_name if uploaded_file else None # 벡터 DB 매니저 가져오기 vector_db = get_vector_db() # 기존 청크 삭제 (DB + 벡터 DB) existing_chunks = DocumentChunk.query.filter_by(file_id=file_id).all() if existing_chunks: print(f"[청크 생성] 기존 청크 {len(existing_chunks)}개 삭제 중...") # 벡터 DB에서 삭제 vector_db.delete_chunks_by_file_id(file_id) # DB에서 삭제 DocumentChunk.query.filter_by(file_id=file_id).delete() db.session.commit() # 원본 웹소설을 섹션별로 분할 (#작품설명, #1화, #2화 등) sections = split_content_by_episodes(content) print(f"[청크 생성] 섹션 분할 완료: {len(sections)}개 섹션") for i, (section_type, section_title, section_content, section_metadata) in enumerate(sections): print(f"[청크 생성] 섹션 {i+1}: {section_title} ({len(section_content)}자)") if len(sections) == 0: print(f"[청크 생성] 경고: 섹션이 생성되지 않았습니다.") return 0 # 각 섹션별로 청크 생성 및 저장 saved_count = 0 vector_saved_count = 0 global_chunk_index = 0 # 전체 청크 인덱스 for section_idx, (section_type, section_title, section_content, section_metadata) in enumerate(sections): print(f"[청크 생성] 섹션 '{section_title}' 처리 중... ({len(section_content)}자)") # 각 섹션을 의미 기반 청킹 (문장과 문단 경계를 고려하여 분할) # min_chunk_size: 최소 200자, max_chunk_size: 최대 1000자, overlap: 150자 section_chunks = split_text_into_chunks(section_content, min_chunk_size=200, max_chunk_size=1000, overlap=150) print(f"[청크 생성] 섹션 '{section_title}' 분할된 청크 수: {len(section_chunks)}개") # 각 청크를 데이터베이스와 벡터 DB에 저장 for chunk_idx, chunk_content in enumerate(section_chunks): try: # 섹션 메타데이터를 기본으로 사용 (chapter 정보 포함) chunk_metadata = section_metadata.copy() # DB에 청크 저장 (섹션 메타데이터 포함) chunk = DocumentChunk( file_id=file_id, chunk_index=global_chunk_index, content=chunk_content, chunk_metadata=json.dumps(chunk_metadata, ensure_ascii=False) # 섹션 메타데이터 저장 ) db.session.add(chunk) db.session.flush() # ID 생성 # 벡터 DB에 청크 추가 if vector_db.add_chunk( chunk_id=chunk.id, chunk_content=chunk_content, file_id=file_id, chunk_index=global_chunk_index ): vector_saved_count += 1 saved_count += 1 global_chunk_index += 1 # 진행 상황 출력 (10개마다) if saved_count % 10 == 0: print(f"[청크 생성] 진행 중: {saved_count}개 청크 저장 중... (DB: {saved_count}, 벡터 DB: {vector_saved_count})") except Exception as e: print(f"[청크 생성] 경고: 청크 {global_chunk_index} 저장 중 오류: {str(e)}") import traceback traceback.print_exc() continue db.session.commit() print(f"[청크 생성] 완료: {saved_count}개 청크가 데이터베이스에 저장되었습니다. (벡터 DB: {vector_saved_count}개)") # 저장 확인 verified_count = DocumentChunk.query.filter_by(file_id=file_id).count() if verified_count != saved_count: print(f"[청크 생성] 경고: 저장된 청크 수({saved_count})와 확인된 청크 수({verified_count})가 일치하지 않습니다.") else: print(f"[청크 생성] 검증 완료: {verified_count}개 청크가 정상적으로 저장되었습니다.") return saved_count except Exception as e: db.session.rollback() print(f"[청크 생성] 오류: {str(e)}") import traceback traceback.print_exc() return 0 def create_parent_chunk_with_ai(file_id, content, model_name): """AI를 사용하여 Parent Chunk 생성 (웹소설 분석)""" try: print(f"[Parent Chunk 생성] 파일 ID {file_id}에 대한 Parent Chunk 생성 시작") print(f"[Parent Chunk 생성] 사용 모델: {model_name}") print(f"[Parent Chunk 생성] 원본 텍스트 길이: {len(content)}자") # 모델명이 None이거나 빈 문자열인 경우 처리 if not model_name or not model_name.strip(): print(f"[Parent Chunk 생성] ❌ 오류: 모델명이 제공되지 않았습니다.") return None # 텍스트가 너무 길면 일부만 사용 (최대 50000자) content_preview = content[:50000] if len(content) > 50000 else content if len(content) > 50000: print(f"[Parent Chunk 생성] 텍스트가 길어 일부만 사용: {len(content_preview)}자 (전체: {len(content)}자)") # 분석 프롬프트 생성 analysis_prompt = f"""다음 웹소설 텍스트를 분석하여 다음 항목들을 작성해주세요. 각 항목은 명확하고 구체적으로 작성해주세요. 텍스트 내용: {content_preview} 위 텍스트를 분석하여 다음 형식으로 답변해주세요: ## 세계관 설명 [세계관에 대한 상세한 설명을 작성하세요. 배경, 설정, 규칙 등을 포함하세요.] ## 주요 캐릭터 분석 [주요 등장인물들의 이름, 역할, 성격, 특징 등을 분석하여 작성하세요. 각 캐릭터별로 구분하여 작성하세요.] ## 주요 스토리 분석 [전체적인 스토리 흐름, 주요 사건, 갈등 구조 등을 분석하여 작성하세요.] ## 주요 에피소드 분석 [중요한 에피소드나 챕터별 주요 내용을 분석하여 작성하세요. 시간 순서대로 정리하면 좋습니다.] ## 기타 [위 카테고리에 포함되지 않지만 중요한 정보나 특징 등을 작성하세요.] 각 항목을 명확하게 구분하여 작성해주세요.""" # 모델 타입 확인 (Gemini 또는 Ollama) # Gemini 모델명 형식: "gemini:모델명" 또는 "gemini-1.5-flash" (접두사 없는 경우도 지원) model_name_lower = model_name.lower().strip() is_gemini = model_name_lower.startswith('gemini:') or model_name_lower.startswith('gemini-') print(f"[Parent Chunk 생성] 모델 타입 확인: is_gemini={is_gemini}, model_name={model_name}") if is_gemini: # Gemini API 호출 # 모델명에서 "gemini:" 접두사 제거 (대소문자 구분 없이) gemini_model_name = model_name.strip() if gemini_model_name.lower().startswith('gemini:'): gemini_model_name = gemini_model_name.split(':', 1)[1].strip() # "gemini-"로 시작하는 경우 (예: "gemini-1.5-flash") 그대로 사용 print(f"[Parent Chunk 생성] Gemini API에 분석 요청 전송 중... (모델: {gemini_model_name})") print(f"[Parent Chunk 생성] 원본 모델명: {model_name} -> Gemini 모델명: {gemini_model_name}") gemini_client = get_gemini_client() if not gemini_client.is_configured(): print(f"[Parent Chunk 생성] ❌ 오류: Gemini API 키가 설정되지 않았습니다.") print(f"[Parent Chunk 생성] 디버그: Gemini 클라이언트 상태 확인 중...") # API 키 상태 다시 확인 from app.gemini_client import get_gemini_api_key api_key = get_gemini_api_key() if api_key: print(f"[Parent Chunk 생성] 디버그: API 키는 존재하지만 클라이언트가 설정되지 않았습니다. (길이: {len(api_key)})") else: print(f"[Parent Chunk 생성] 디버그: API 키가 데이터베이스에 없습니다.") return None print(f"[Parent Chunk 생성] Gemini API 키 확인 완료. API 호출 시작...") result = gemini_client.generate_response( prompt=analysis_prompt, model_name=gemini_model_name, temperature=0.7, max_output_tokens=8192 ) if result['error']: print(f"[Parent Chunk 생성] ❌ 오류: Gemini API 호출 실패 - {result['error']}") print(f"[Parent Chunk 생성] 디버그: result 객체 내용: {result}") return None if not result.get('response'): print(f"[Parent Chunk 생성] ❌ 오류: Gemini API 응답이 비어있습니다.") print(f"[Parent Chunk 생성] 디버그: result 객체 내용: {result}") return None analysis_result = result['response'] print(f"[Parent Chunk 생성] Gemini API 응답 수신 성공: {len(analysis_result)}자") else: # Ollama API 호출 print(f"[Parent Chunk 생성] Ollama API에 분석 요청 전송 중... (모델: {model_name})") try: ollama_response = requests.post( f'{OLLAMA_BASE_URL}/api/chat', json={ 'model': model_name, 'messages': [ { 'role': 'user', 'content': analysis_prompt } ], 'stream': False }, timeout=300 # 5분 타임아웃 ) if ollama_response.status_code != 200: error_detail = ollama_response.text if ollama_response.text else '상세 정보 없음' if ollama_response.status_code == 404: error_msg = f'Ollama API 오류 404: 모델 "{model_name}"을(를) 찾을 수 없습니다. 모델이 Ollama에 설치되어 있는지 확인하세요.' print(f"[Parent Chunk 생성] ❌ 오류: {error_msg}") print(f"[Parent Chunk 생성] 디버그: 만약 Gemini 모델을 사용하려면 모델명이 'gemini:' 또는 'gemini-'로 시작해야 합니다.") else: error_msg = f'Ollama API 오류: {ollama_response.status_code} - {error_detail[:200]}' print(f"[Parent Chunk 생성] ❌ 오류: {error_msg}") return None response_data = ollama_response.json() analysis_result = response_data.get('message', {}).get('content', '') print(f"[Parent Chunk 생성] Ollama API 응답 수신 성공: {len(analysis_result)}자") except requests.exceptions.RequestException as e: print(f"[Parent Chunk 생성] ❌ Ollama API 연결 오류: {str(e)}") print(f"[Parent Chunk 생성] 디버그: Ollama URL: {OLLAMA_BASE_URL}") raise if not analysis_result: print(f"[Parent Chunk 생성] ⚠️ 경고: 분석 결과가 비어있습니다.") return None print(f"[Parent Chunk 생성] 분석 결과 수신 완료: {len(analysis_result)}자") # 분석 결과 파싱 world_view = "" characters = "" story = "" episodes = "" others = "" # 각 섹션 추출 sections = { 'world_view': ['## 세계관 설명', '## 세계관', '세계관 설명'], 'characters': ['## 주요 캐릭터 분석', '## 주요 캐릭터', '주요 캐릭터 분석', '## 캐릭터'], 'story': ['## 주요 스토리 분석', '## 주요 스토리', '주요 스토리 분석', '## 스토리'], 'episodes': ['## 주요 에피소드 분석', '## 주요 에피소드', '주요 에피소드 분석', '## 에피소드'], 'others': ['## 기타', '기타'] } lines = analysis_result.split('\n') current_section = None current_content = [] for line in lines: line_stripped = line.strip() # 섹션 헤더 확인 section_found = False for section_key, section_headers in sections.items(): for header in section_headers: if header in line_stripped: # 이전 섹션 저장 if current_section: if current_section == 'world_view': world_view = '\n'.join(current_content).strip() elif current_section == 'characters': characters = '\n'.join(current_content).strip() elif current_section == 'story': story = '\n'.join(current_content).strip() elif current_section == 'episodes': episodes = '\n'.join(current_content).strip() elif current_section == 'others': others = '\n'.join(current_content).strip() current_section = section_key current_content = [] section_found = True break if section_found: break if not section_found and current_section: # 현재 섹션에 내용 추가 if line_stripped and not line_stripped.startswith('#'): current_content.append(line) # 마지막 섹션 저장 if current_section: if current_section == 'world_view': world_view = '\n'.join(current_content).strip() elif current_section == 'characters': characters = '\n'.join(current_content).strip() elif current_section == 'story': story = '\n'.join(current_content).strip() elif current_section == 'episodes': episodes = '\n'.join(current_content).strip() elif current_section == 'others': others = '\n'.join(current_content).strip() # 파싱 실패 시 전체 내용을 "기타"에 저장 if not world_view and not characters and not story and not episodes: print(f"[Parent Chunk 생성] 경고: 섹션 파싱 실패. 전체 내용을 '기타'에 저장합니다.") others = analysis_result.strip() # 기존 Parent Chunk 삭제 (있으면) existing_parent = ParentChunk.query.filter_by(file_id=file_id).first() if existing_parent: db.session.delete(existing_parent) db.session.commit() print(f"[Parent Chunk 생성] 기존 Parent Chunk 삭제 완료") # Parent Chunk 생성 및 저장 parent_chunk = ParentChunk( file_id=file_id, world_view=world_view if world_view else None, characters=characters if characters else None, story=story if story else None, episodes=episodes if episodes else None, others=others if others else None ) db.session.add(parent_chunk) db.session.commit() print(f"[Parent Chunk 생성] ✅ 완료: Parent Chunk가 생성되었습니다.") print(f"[Parent Chunk 생성] - 세계관: {len(world_view)}자") print(f"[Parent Chunk 생성] - 캐릭터: {len(characters)}자") print(f"[Parent Chunk 생성] - 스토리: {len(story)}자") print(f"[Parent Chunk 생성] - 에피소드: {len(episodes)}자") print(f"[Parent Chunk 생성] - 기타: {len(others)}자") return parent_chunk except requests.exceptions.RequestException as e: error_msg = f'Ollama API 연결 오류: {str(e)}' print(f"[Parent Chunk 생성] ❌ 오류: {error_msg}") import traceback traceback.print_exc() return None except Exception as e: db.session.rollback() error_msg = f'Parent Chunk 생성 중 오류: {str(e)}' print(f"[Parent Chunk 생성] ❌ 오류: {error_msg}") import traceback traceback.print_exc() return None def get_parent_chunks_for_files(file_ids): """파일 ID 목록에 대한 Parent Chunk 조회 (문맥 파악용)""" try: if not file_ids: return [] parent_chunks = [] for file_id in file_ids: parent_chunk = ParentChunk.query.filter_by(file_id=file_id).first() if parent_chunk: parent_chunks.append(parent_chunk) return parent_chunks except Exception as e: print(f"[Parent Chunk 조회] 오류: {str(e)}") return [] def search_relevant_chunks(query, file_ids=None, model_name=None, top_k=5, min_score=1): """ 질문과 관련된 청크 검색 (벡터 검색 + Re-ranking) 1. 벡터 검색으로 초기 30개 문서 검색 2. Cross-Encoder로 리랭킹 3. 상위 top_k개 반환 (기본 5개) """ try: # 벡터 DB 매니저 가져오기 vector_db = get_vector_db() # 파일 ID 확장 (이어서 업로드된 파일 포함) expanded_file_ids = None if file_ids: expanded_file_ids = list(file_ids) for file_id in file_ids: # 원본 파일인 경우 이어서 업로드된 파일들도 포함 child_files = UploadedFile.query.filter_by(parent_file_id=file_id).all() expanded_file_ids.extend([child.id for child in child_files]) # 원본 파일이 선택된 경우, 이어서 업로드된 파일들도 포함 parent_files = UploadedFile.query.filter(UploadedFile.id.in_(file_ids), UploadedFile.parent_file_id.is_(None)).all() for parent_file in parent_files: child_files = UploadedFile.query.filter_by(parent_file_id=parent_file.id).all() expanded_file_ids.extend([child.id for child in child_files]) # 모델 필터링이 필요한 경우 파일 ID 필터링 if model_name and expanded_file_ids: filtered_files = UploadedFile.query.filter( UploadedFile.id.in_(expanded_file_ids), UploadedFile.model_name == model_name ).all() expanded_file_ids = [f.id for f in filtered_files] elif model_name and not expanded_file_ids: # 파일 ID가 없으면 모델 이름으로만 필터링 filtered_files = UploadedFile.query.filter_by(model_name=model_name).all() expanded_file_ids = [f.id for f in filtered_files] # 1단계: 벡터 검색으로 초기 30개 문서 검색 print(f"[벡터 검색] 쿼리: {query[:50]}..., 파일 ID: {expanded_file_ids if expanded_file_ids else '모든 파일'}") vector_results = vector_db.search_chunks( query=query, file_ids=expanded_file_ids, top_k=30 ) if not vector_results: print(f"[벡터 검색] 결과 없음, 키워드 기반 검색으로 대체") # 벡터 검색 결과가 없으면 기존 키워드 기반 검색으로 대체 return search_relevant_chunks_fallback(query, file_ids, model_name, top_k, min_score) # 2단계: Cross-Encoder로 리랭킹 print(f"[리랭킹] {len(vector_results)}개 청크에 대한 리랭킹 시작...") reranked_chunks = vector_db.rerank_chunks( query=query, chunks=vector_results, top_k=top_k ) # 3단계: DB에서 청크 객체 가져오기 final_chunks = [] for reranked in reranked_chunks: chunk_id = reranked['chunk_id'] chunk = DocumentChunk.query.get(chunk_id) if chunk: final_chunks.append(chunk) print(f"[벡터 검색 + 리랭킹] 최종 {len(final_chunks)}개 청크 반환") return final_chunks except Exception as e: print(f"[벡터 검색] 오류: {str(e)}") import traceback traceback.print_exc() # 오류 시 기존 키워드 기반 검색으로 대체 print(f"[벡터 검색] 키워드 기반 검색으로 대체") return search_relevant_chunks_fallback(query, file_ids, model_name, top_k, min_score) def search_relevant_chunks_fallback(query, file_ids=None, model_name=None, top_k=25, min_score=1): """기존 키워드 기반 검색 (Fallback)""" try: # 검색 쿼리 준비 - 한글과 영문 단어 모두 추출 query_words = set(re.findall(r'[가-힣]+|\w+', query.lower())) if not query_words: return [] # 청크 조회 query_obj = DocumentChunk.query.join(UploadedFile) if file_ids: # 선택된 파일 ID와 그 파일에 이어서 업로드된 모든 파일 ID 포함 expanded_file_ids = list(file_ids) for file_id in file_ids: # 원본 파일인 경우 이어서 업로드된 파일들도 포함 child_files = UploadedFile.query.filter_by(parent_file_id=file_id).all() expanded_file_ids.extend([child.id for child in child_files]) # 원본 파일이 선택된 경우, 이어서 업로드된 파일들도 포함 parent_files = UploadedFile.query.filter(UploadedFile.id.in_(file_ids), UploadedFile.parent_file_id.is_(None)).all() for parent_file in parent_files: child_files = UploadedFile.query.filter_by(parent_file_id=parent_file.id).all() expanded_file_ids.extend([child.id for child in child_files]) query_obj = query_obj.filter(UploadedFile.id.in_(expanded_file_ids)) if model_name: query_obj = query_obj.filter(UploadedFile.model_name == model_name) all_chunks = query_obj.all() if not all_chunks: return [] # 각 청크의 관련도 점수 계산 (개선된 알고리즘) scored_chunks = [] for chunk in all_chunks: chunk_content_lower = chunk.content.lower() chunk_words = set(re.findall(r'[가-힣]+|\w+', chunk_content_lower)) # 1. 공통 단어 수 (기본 점수) common_words = query_words & chunk_words base_score = len(common_words) # 2. 쿼리 단어의 빈도 가중치 (중요한 단어가 더 많이 나타날수록 높은 점수) frequency_score = 0 for word in query_words: frequency_score += chunk_content_lower.count(word) # 3. 쿼리 단어 비율 (청크에서 쿼리 단어가 차지하는 비율) if len(chunk_words) > 0: ratio_score = len(common_words) / len(chunk_words) * 10 else: ratio_score = 0 # 최종 점수 계산 (가중치 적용) final_score = base_score * 2 + frequency_score * 0.5 + ratio_score # 최소 점수 이상인 청크만 포함 if final_score >= min_score: scored_chunks.append((final_score, chunk)) # 점수 순으로 정렬하고 상위 k개 선택 scored_chunks.sort(key=lambda x: x[0], reverse=True) # top_k개 선택 top_chunks = [chunk for score, chunk in scored_chunks[:top_k]] return top_chunks except Exception as e: print(f"[키워드 검색] 오류: {str(e)}") import traceback traceback.print_exc() return [] @main_bp.route('/login', methods=['GET', 'POST']) def login(): """로그인 페이지""" if current_user.is_authenticated: # 관리자인 경우 관리자 페이지로 리다이렉트 if current_user.is_admin: return redirect(url_for('main.admin')) return redirect(url_for('main.index')) if request.method == 'POST': username = request.form.get('username', '').strip() password = request.form.get('password', '') if not username or not password: flash('사용자명과 비밀번호를 입력해주세요.', 'error') return render_template('login.html') user = User.query.filter_by(username=username).first() if user and user.check_password(password) and user.is_active: login_user(user) user.last_login = datetime.utcnow() db.session.commit() next_page = request.args.get('next') # 관리자인 경우 관리자 페이지로 리다이렉트 if user.is_admin: return redirect(next_page) if next_page else redirect(url_for('main.admin')) return redirect(next_page) if next_page else redirect(url_for('main.index')) else: flash('사용자명 또는 비밀번호가 올바르지 않습니다.', 'error') return render_template('login.html') @main_bp.route('/logout') @login_required def logout(): """로그아웃""" logout_user() flash('로그아웃되었습니다.', 'info') return redirect(url_for('main.login')) @main_bp.route('/') @login_required def index(): return render_template('index.html') @main_bp.route('/webnovels') @login_required def webnovels(): """업로드된 웹소설 목록 페이지""" return render_template('webnovels.html') @main_bp.route('/admin') @admin_required def admin(): """관리자 페이지""" users = User.query.order_by(User.created_at.desc()).all() return render_template('admin.html', users=users) @main_bp.route('/admin/messages') @admin_required def admin_messages(): """관리자 메시지 확인 페이지""" return render_template('admin_messages.html') @main_bp.route('/admin/webnovels') @admin_required def admin_webnovels(): """웹소설 관리 페이지""" return render_template('admin_webnovels.html') @main_bp.route('/admin/prompts') @admin_required def admin_prompts(): """프롬프트 관리 페이지""" return render_template('admin_prompts.html') @main_bp.route('/admin/files') @admin_required def admin_files(): """파일 목록 관리 페이지""" return render_template('admin_files.html') @main_bp.route('/api/admin/users', methods=['GET']) @admin_required def get_users(): """사용자 목록 API""" try: users = User.query.order_by(User.created_at.desc()).all() return jsonify({ 'users': [user.to_dict() for user in users] }), 200 except Exception as e: return jsonify({'error': f'사용자 목록 조회 중 오류가 발생했습니다: {str(e)}'}), 500 @main_bp.route('/api/admin/users', methods=['POST']) @admin_required def create_user(): """사용자 생성 API""" try: data = request.json username = data.get('username', '').strip() nickname = data.get('nickname', '').strip() password = data.get('password', '') is_admin = data.get('is_admin', False) if not username or not password: return jsonify({'error': '사용자명과 비밀번호를 입력해주세요.'}), 400 if User.query.filter_by(username=username).first(): return jsonify({'error': '이미 존재하는 사용자명입니다.'}), 400 user = User(username=username, nickname=nickname if nickname else None, is_admin=is_admin, is_active=True) user.set_password(password) db.session.add(user) db.session.commit() return jsonify({ 'message': '사용자가 성공적으로 생성되었습니다.', 'user': user.to_dict() }), 200 except Exception as e: db.session.rollback() return jsonify({'error': f'사용자 생성 중 오류가 발생했습니다: {str(e)}'}), 500 @main_bp.route('/api/admin/users/', methods=['PUT']) @admin_required def update_user(user_id): """사용자 정보 수정 API""" try: user = User.query.get_or_404(user_id) data = request.json # 자기 자신의 관리자 권한을 제거하는 것은 방지 if user_id == current_user.id and data.get('is_admin') == False: return jsonify({'error': '자기 자신의 관리자 권한을 제거할 수 없습니다.'}), 400 if 'username' in data: new_username = data['username'].strip() if new_username != user.username: if User.query.filter_by(username=new_username).first(): return jsonify({'error': '이미 존재하는 사용자명입니다.'}), 400 user.username = new_username if 'nickname' in data: user.nickname = data['nickname'].strip() if data['nickname'] else None if 'password' in data and data['password']: user.set_password(data['password']) if 'is_admin' in data: user.is_admin = data['is_admin'] if 'is_active' in data: user.is_active = data['is_active'] db.session.commit() return jsonify({ 'message': '사용자 정보가 성공적으로 수정되었습니다.', 'user': user.to_dict() }), 200 except Exception as e: db.session.rollback() return jsonify({'error': f'사용자 정보 수정 중 오류가 발생했습니다: {str(e)}'}), 500 @main_bp.route('/api/admin/messages', methods=['GET']) @admin_required def get_all_messages(): """전체 메시지 조회 (관리자용)""" try: user_id = request.args.get('user_id', type=int) session_id = request.args.get('session_id', type=int) page = request.args.get('page', 1, type=int) per_page = request.args.get('per_page', 50, type=int) query = ChatMessage.query.join(ChatSession) if user_id: query = query.filter(ChatSession.user_id == user_id) if session_id: query = query.filter(ChatMessage.session_id == session_id) messages = query.order_by(ChatMessage.created_at.desc())\ .paginate(page=page, per_page=per_page, error_out=False) return jsonify({ 'messages': [msg.to_dict() for msg in messages.items], 'total': messages.total, 'pages': messages.pages, 'current_page': page }), 200 except Exception as e: return jsonify({'error': f'메시지 조회 중 오류가 발생했습니다: {str(e)}'}), 500 @main_bp.route('/api/admin/sessions', methods=['GET']) @admin_required def get_all_sessions(): """전체 대화 세션 조회 (관리자용)""" try: user_id = request.args.get('user_id', type=int) page = request.args.get('page', 1, type=int) per_page = request.args.get('per_page', 50, type=int) query = ChatSession.query if user_id: query = query.filter(ChatSession.user_id == user_id) sessions = query.order_by(ChatSession.updated_at.desc())\ .paginate(page=page, per_page=per_page, error_out=False) sessions_data = [] for session in sessions.items: session_dict = session.to_dict() session_dict['username'] = session.user.username if session.user else 'Unknown' session_dict['nickname'] = session.user.nickname if session.user else None sessions_data.append(session_dict) return jsonify({ 'sessions': sessions_data, 'total': sessions.total, 'pages': sessions.pages, 'current_page': page }), 200 except Exception as e: return jsonify({'error': f'대화 세션 조회 중 오류가 발생했습니다: {str(e)}'}), 500 @main_bp.route('/api/admin/users/', methods=['DELETE']) @admin_required def delete_user(user_id): """사용자 삭제 API""" try: user = User.query.get_or_404(user_id) # 자기 자신을 삭제하는 것은 방지 if user_id == current_user.id: return jsonify({'error': '자기 자신을 삭제할 수 없습니다.'}), 400 db.session.delete(user) db.session.commit() return jsonify({'message': '사용자가 성공적으로 삭제되었습니다.'}), 200 except Exception as e: db.session.rollback() return jsonify({'error': f'사용자 삭제 중 오류가 발생했습니다: {str(e)}'}), 500 @main_bp.route('/api/admin/gemini-api-key', methods=['GET']) @admin_required def get_gemini_api_key(): """Gemini API 키 조회""" try: # SystemConfig에서 API 키 가져오기 (테이블이 없으면 빈 문자열 반환) api_key = SystemConfig.get_config('gemini_api_key', '') # 보안을 위해 마스킹된 값 반환 (처음 8자만 표시) masked_key = api_key[:8] + '...' if api_key and len(api_key) > 8 else '' return jsonify({ 'has_api_key': bool(api_key), 'masked_key': masked_key }), 200 except Exception as e: print(f"[Gemini API 키 조회] 오류: {e}") import traceback traceback.print_exc() return jsonify({'error': f'API 키 조회 중 오류가 발생했습니다: {str(e)}'}), 500 @main_bp.route('/api/admin/gemini-api-key', methods=['POST']) @admin_required def set_gemini_api_key(): """Gemini API 키 저장/업데이트""" try: if not request.is_json: return jsonify({'error': 'Content-Type이 application/json이 아닙니다.'}), 400 data = request.json if not data: return jsonify({'error': '요청 데이터가 없습니다.'}), 400 api_key = data.get('api_key', '').strip() if not api_key: return jsonify({'error': 'API 키를 입력해주세요.'}), 400 # API 키 저장 (SystemConfig.set_config 내부에서 테이블 생성 처리) SystemConfig.set_config( key='gemini_api_key', value=api_key, description='Google Gemini API 키' ) # Gemini 클라이언트에 API 키 재로드 알림 try: from app.gemini_client import reset_gemini_client reset_gemini_client() print(f"[Gemini] API 키가 업데이트되어 클라이언트가 재로드되었습니다.") except Exception as e: print(f"[Gemini] API 키 재로드 실패: {e}") return jsonify({ 'message': 'Gemini API 키가 성공적으로 저장되었습니다.', 'has_api_key': True }), 200 except Exception as e: db.session.rollback() print(f"[Gemini API 키 저장] 오류: {e}") import traceback traceback.print_exc() return jsonify({'error': f'API 키 저장 중 오류가 발생했습니다: {str(e)}'}), 500 @main_bp.route('/api/ollama/models', methods=['GET']) @login_required def get_ollama_models(): """Ollama 및 Gemini에서 사용 가능한 모델 목록 가져오기 (로컬 AI 모델은 학습된 웹소설이 있는 모델만 표시)""" try: all_models = [] # 1. Ollama 모델 목록 가져오기 (학습된 웹소설이 있는 모델만 필터링) try: response = requests.get(f'{OLLAMA_BASE_URL}/api/tags', timeout=5) if response.status_code == 200: data = response.json() ollama_models_raw = [model['name'] for model in data.get('models', [])] # 각 Ollama 모델에 대해 학습된 웹소설이 있는지 확인 filtered_ollama_models = [] for model_name in ollama_models_raw: # 해당 모델로 학습된 원본 파일이 있는지 확인 (parent_file_id가 None인 파일만) file_count = UploadedFile.query.filter_by( model_name=model_name, parent_file_id=None ).count() if file_count > 0: filtered_ollama_models.append({'name': model_name, 'type': 'ollama'}) print(f"[모델 목록] Ollama 모델 '{model_name}' - 학습된 웹소설 {file_count}개") else: print(f"[모델 목록] Ollama 모델 '{model_name}' - 학습된 웹소설 없음, 목록에서 제외") all_models.extend(filtered_ollama_models) print(f"[모델 목록] Ollama 모델 {len(filtered_ollama_models)}개 추가 (전체 {len(ollama_models_raw)}개 중 {len(filtered_ollama_models)}개 필터링됨)") except Exception as e: print(f"[모델 목록] Ollama 모델 목록 조회 실패: {e}") # 2. Gemini 모델 목록 가져오기 (학습된 웹소설이 있는 모델만 필터링) try: gemini_client = get_gemini_client() if gemini_client.is_configured(): gemini_models = gemini_client.get_available_models() # 각 Gemini 모델에 대해 학습된 웹소설이 있는지 확인 filtered_gemini_models = [] for model_name in gemini_models: full_model_name = f'gemini:{model_name}' # 해당 모델로 학습된 원본 파일이 있는지 확인 (parent_file_id가 None인 파일만) file_count = UploadedFile.query.filter_by( model_name=full_model_name, parent_file_id=None ).count() if file_count > 0: filtered_gemini_models.append({'name': full_model_name, 'type': 'gemini'}) print(f"[모델 목록] Gemini 모델 '{full_model_name}' - 학습된 웹소설 {file_count}개") else: print(f"[모델 목록] Gemini 모델 '{full_model_name}' - 학습된 웹소설 없음, 목록에서 제외") all_models.extend(filtered_gemini_models) print(f"[모델 목록] Gemini 모델 {len(filtered_gemini_models)}개 추가 (전체 {len(gemini_models)}개 중 {len(filtered_gemini_models)}개 필터링됨)") else: print(f"[모델 목록] Gemini API 키가 설정되지 않아 Gemini 모델을 불러올 수 없습니다.") except Exception as e: print(f"[모델 목록] Gemini 모델 목록 조회 실패: {e}") if all_models: return jsonify({'models': all_models}) else: return jsonify({'error': '사용 가능한 모델이 없습니다. Ollama가 실행 중인지, 또는 Gemini API 키가 설정되었는지 확인하세요.', 'models': []}), 500 except Exception as e: return jsonify({'error': f'모델 목록을 가져오는 중 오류가 발생했습니다: {str(e)}', 'models': []}), 500 @main_bp.route('/api/admin/prompts', methods=['GET']) @admin_required def get_system_prompt(): """시스템 프롬프트 가져오기""" try: prompt = SystemConfig.get_config('system_prompt', '') return jsonify({'prompt': prompt}), 200 except Exception as e: return jsonify({'error': f'프롬프트를 가져오는 중 오류가 발생했습니다: {str(e)}'}), 500 @main_bp.route('/api/admin/prompts', methods=['POST']) @admin_required def save_system_prompt(): """시스템 프롬프트 저장""" try: data = request.json prompt = data.get('prompt', '').strip() SystemConfig.set_config( key='system_prompt', value=prompt, description='질문할 때 자동으로 붙이는 시스템 프롬프트' ) return jsonify({ 'message': '프롬프트가 성공적으로 저장되었습니다.', 'prompt': prompt }), 200 except Exception as e: db.session.rollback() return jsonify({'error': f'프롬프트 저장 중 오류가 발생했습니다: {str(e)}'}), 500 @main_bp.route('/api/admin/ollama/models', methods=['GET']) @admin_required def get_all_ollama_models(): """관리자용: Ollama 및 Gemini에서 사용 가능한 모든 모델 목록 가져오기 (필터링 없이 전체 목록)""" try: all_models = [] # 1. Ollama 모델 목록 가져오기 (필터링 없이 전체 목록) try: response = requests.get(f'{OLLAMA_BASE_URL}/api/tags', timeout=5) if response.status_code == 200: data = response.json() ollama_models_raw = [model['name'] for model in data.get('models', [])] # 필터링 없이 모든 Ollama 모델 추가 for model_name in ollama_models_raw: # 각 모델의 학습된 웹소설 개수 확인 (정보 제공용) file_count = UploadedFile.query.filter_by( model_name=model_name, parent_file_id=None ).count() all_models.append({ 'name': model_name, 'type': 'ollama', 'file_count': file_count # 정보 제공용 }) print(f"[관리자 모델 목록] Ollama 모델 '{model_name}' - 학습된 웹소설 {file_count}개") print(f"[관리자 모델 목록] Ollama 모델 {len(ollama_models_raw)}개 추가") except Exception as e: print(f"[관리자 모델 목록] Ollama 모델 목록 조회 실패: {e}") # 2. Gemini 모델 목록 가져오기 (필터링 없이 전체 목록) try: gemini_client = get_gemini_client() if gemini_client.is_configured(): gemini_models = gemini_client.get_available_models() # 필터링 없이 모든 Gemini 모델 추가 for model_name in gemini_models: full_model_name = f'gemini:{model_name}' # 각 모델의 학습된 웹소설 개수 확인 (정보 제공용) file_count = UploadedFile.query.filter_by( model_name=full_model_name, parent_file_id=None ).count() all_models.append({ 'name': full_model_name, 'type': 'gemini', 'file_count': file_count # 정보 제공용 }) print(f"[관리자 모델 목록] Gemini 모델 '{full_model_name}' - 학습된 웹소설 {file_count}개") print(f"[관리자 모델 목록] Gemini 모델 {len(gemini_models)}개 추가") else: print(f"[관리자 모델 목록] Gemini API 키가 설정되지 않아 Gemini 모델을 불러올 수 없습니다.") except Exception as e: print(f"[관리자 모델 목록] Gemini 모델 목록 조회 실패: {e}") if all_models: return jsonify({'models': all_models}) else: return jsonify({'error': '사용 가능한 모델이 없습니다. Ollama가 실행 중인지, 또는 Gemini API 키가 설정되었는지 확인하세요.', 'models': []}), 500 except Exception as e: return jsonify({'error': f'모델 목록을 가져오는 중 오류가 발생했습니다: {str(e)}', 'models': []}), 500 @main_bp.route('/api/chat', methods=['POST']) @login_required def chat(): """채팅 API 엔드포인트""" try: data = request.json message = data.get('message', '') model = data.get('model', '') file_ids = [int(fid) for fid in data.get('file_ids', []) if fid] # 선택한 웹소설 파일 ID 목록 session_id = data.get('session_id', None) # 대화 세션 ID (정수로 변환) if not message: return jsonify({'error': '메시지가 필요합니다.'}), 400 # 모델이 선택된 경우 Ollama 사용 if model: try: # RAG: 질문과 관련된 청크 검색 context = "" use_rag = True # RAG 사용 여부 if use_rag: print(f"\n[RAG 검색] 모델: {model}, 질문: {message[:50]}...") print(f"[RAG 검색] 선택된 파일 ID: {file_ids if file_ids else '없음 (모든 파일 검색)'}") # 1단계: Parent Chunk로 문맥 파악 parent_chunks = [] if file_ids: print(f"[RAG 검색 1단계] Parent Chunk 조회 시작...") parent_chunks = get_parent_chunks_for_files(file_ids) print(f"[RAG 검색 1단계] Parent Chunk 조회 완료: {len(parent_chunks)}개 파일") # 2단계: 벡터 검색 + 리랭킹으로 Child Chunk 정밀 검색 print(f"[RAG 검색 2단계] 벡터 검색 + 리랭킹 시작...") relevant_chunks = search_relevant_chunks( query=message, file_ids=file_ids if file_ids else None, model_name=model, top_k=5, # 리랭킹 후 상위 5개만 선택 min_score=0.5 # 최소 점수 임계값 ) print(f"[RAG 검색 2단계] 벡터 검색 + 리랭킹 완료: {len(relevant_chunks)}개 청크 (상위 5개)") # 컨텍스트 구성 context_parts = [] # Parent Chunk 정보 추가 (문맥 파악용) if parent_chunks: parent_context_sections = [] for parent_chunk in parent_chunks: file = parent_chunk.file file_info = f"\n=== {file.original_filename} 전체 개요 ===\n" sections = [] if parent_chunk.world_view: sections.append(f"[세계관]\n{parent_chunk.world_view}") if parent_chunk.characters: sections.append(f"[주요 캐릭터]\n{parent_chunk.characters}") if parent_chunk.story: sections.append(f"[주요 스토리]\n{parent_chunk.story}") if parent_chunk.episodes: sections.append(f"[주요 에피소드]\n{parent_chunk.episodes}") if parent_chunk.others: sections.append(f"[기타 정보]\n{parent_chunk.others}") if sections: parent_context_sections.append(file_info + "\n\n".join(sections)) if parent_context_sections: parent_context = "\n\n".join(parent_context_sections) context_parts.append(f"다음은 웹소설의 전체적인 문맥과 개요입니다:\n\n{parent_context}") print(f"[RAG 검색] Parent Chunk 컨텍스트 추가: {len(parent_context)}자") # Child Chunk 정보 추가 (정밀 검색 결과) if relevant_chunks: child_context_parts = [] seen_files = set() for chunk in relevant_chunks: file = chunk.file if file.original_filename not in seen_files: seen_files.add(file.original_filename) print(f"[RAG 검색] 사용된 파일: {file.original_filename} (모델: {file.model_name})") child_context_parts.append(f"[{file.original_filename} - 청크 {chunk.chunk_index + 1}]\n{chunk.content}") if child_context_parts: # 컨텍스트 길이 확인 및 최적화 full_child_context = "\n\n".join(child_context_parts) child_context_length = len(full_child_context) # Child Chunk 컨텍스트가 너무 길면 일부만 사용 (최대 15000자) if child_context_length > 15000: truncated_parts = [] current_length = 0 for part in child_context_parts: if current_length + len(part) > 15000: break truncated_parts.append(part) current_length += len(part) full_child_context = "\n\n".join(truncated_parts) print(f"[RAG 검색] Child Chunk 컨텍스트 길이 조절: {child_context_length}자 → {len(full_child_context)}자") context_parts.append(f"다음은 질문과 관련된 웹소설의 구체적인 내용입니다 (정밀 검색 결과, 총 {len(relevant_chunks)}개 청크):\n\n{full_child_context}") print(f"[RAG 검색] Child Chunk 컨텍스트 추가: {len(full_child_context)}자") # 최종 컨텍스트 구성 if context_parts: full_context = "\n\n" + "\n\n---\n\n".join(context_parts) + "\n\n" # Parent Chunk와 Child Chunk 모두 있는 경우 if parent_chunks and relevant_chunks: context = f"""다음은 질문에 답하기 위한 웹소설 정보입니다: {full_context} 위 정보를 참고하여 답변해주세요: - 먼저 전체적인 문맥(Parent Chunk)을 이해하여 웹소설의 배경과 설정을 파악하세요. - 그 다음 구체적인 내용(Child Chunk)을 통해 질문에 대한 정확한 답변을 제공하세요. - 웹소설의 맥락과 스토리를 고려하여 일관성 있는 답변을 작성하세요. 중요: 질문에 답변할 때는 반드시 제공된 [소설 본문] 내의 내용을 근거로 해야 합니다. 답변의 각 문장 끝에는 참고한 본문의 문장을 [근거: "문장 내용..."] 형식으로 반드시 붙이세요. 근거를 찾을 수 없다면 "내용을 찾을 수 없습니다"라고 답하고 지어내지 마세요. 질문: """ elif parent_chunks: # Parent Chunk만 있는 경우 context = f"""다음은 웹소설의 전체적인 문맥과 개요입니다: {full_context} 위 정보를 참고하여 질문에 답변해주세요. 웹소설의 배경과 설정을 고려하여 답변하세요. 중요: 질문에 답변할 때는 반드시 제공된 [소설 본문] 내의 내용을 근거로 해야 합니다. 답변의 각 문장 끝에는 참고한 본문의 문장을 [근거: "문장 내용..."] 형식으로 반드시 붙이세요. 근거를 찾을 수 없다면 "내용을 찾을 수 없습니다"라고 답하고 지어내지 마세요. 질문: """ else: # Child Chunk만 있는 경우 context = f"""다음은 질문과 관련된 웹소설의 구체적인 내용입니다: {full_context} 위 내용을 충분히 참고하여 다음 질문에 정확하고 상세하게 답변해주세요. 웹소설의 맥락과 스토리를 고려하여 답변해주세요. 중요: 질문에 답변할 때는 반드시 제공된 [소설 본문] 내의 내용을 근거로 해야 합니다. 답변의 각 문장 끝에는 참고한 본문의 문장을 [근거: "문장 내용..."] 형식으로 반드시 붙이세요. 근거를 찾을 수 없다면 "내용을 찾을 수 없습니다"라고 답하고 지어내지 마세요. 질문: """ context += message print(f"[RAG 검색] 최종 컨텍스트 생성 완료 (Parent Chunk: {len(parent_chunks)}개, Child Chunk: {len(relevant_chunks)}개, 총 {len(context)}자)") else: # RAG 검색 결과가 없으면 기존 방식 사용 print(f"[RAG 검색] 관련 청크를 찾지 못했습니다. 전체 파일 내용 사용") use_rag = False # RAG 검색 결과가 없거나 비활성화된 경우 기존 방식 사용 if not context and not use_rag: if file_ids: # 선택한 파일 ID와 이어서 업로드된 파일들도 포함 expanded_file_ids = list(file_ids) for file_id in file_ids: # 원본 파일인 경우 이어서 업로드된 파일들도 포함 child_files = UploadedFile.query.filter_by(parent_file_id=file_id).all() expanded_file_ids.extend([child.id for child in child_files]) uploaded_files = UploadedFile.query.filter( UploadedFile.id.in_(expanded_file_ids), UploadedFile.model_name == model ).all() print(f"[파일 사용] 선택된 파일 ID로 조회 (이어서 업로드 포함): {len(uploaded_files)}개 파일") else: # 파일 ID가 없으면 해당 모델의 모든 파일 사용 (원본 및 이어서 업로드 포함) uploaded_files = UploadedFile.query.filter_by(model_name=model).all() print(f"[파일 사용] 모델 '{model}'의 모든 파일 사용: {len(uploaded_files)}개 파일") if uploaded_files: print(f"[파일 사용] 사용되는 파일 목록:") for f in uploaded_files: is_child = f.parent_file_id is not None prefix = " └─ " if is_child else " - " print(f"{prefix}{f.original_filename} (모델: {f.model_name})") context_parts = [] for file in uploaded_files: try: if os.path.exists(file.file_path): encoding = 'utf-8' try: with open(file.file_path, 'r', encoding=encoding) as f: file_content = f.read() except UnicodeDecodeError: with open(file.file_path, 'r', encoding='cp949') as f: file_content = f.read() # 파일 내용이 너무 길면 일부만 사용 (최대 20000자로 증가) if len(file_content) > 20000: file_content = file_content[:20000] + "..." context_parts.append(f"[{file.original_filename}]\n{file_content}") except Exception as e: print(f"파일 읽기 오류 ({file.original_filename}): {str(e)}") continue if context_parts: context = "\n\n".join(context_parts) context = f"""다음은 학습된 웹소설 내용입니다: {context} 위 내용을 참고하여 다음 질문에 답변해주세요. 중요: 질문에 답변할 때는 반드시 제공된 [소설 본문] 내의 내용을 근거로 해야 합니다. 답변의 각 문장 끝에는 참고한 본문의 문장을 [근거: "문장 내용..."] 형식으로 반드시 붙이세요. 근거를 찾을 수 없다면 "내용을 찾을 수 없습니다"라고 답하고 지어내지 마세요. 질문: """ # 시스템 프롬프트 가져오기 system_prompt = SystemConfig.get_config('system_prompt', '').strip() # 프롬프트 구성 (시스템 프롬프트 + 컨텍스트 + 사용자 메시지) prompt_parts = [] if system_prompt: prompt_parts.append(system_prompt) if context: prompt_parts.append(context) prompt_parts.append(message) full_prompt = "\n\n".join(prompt_parts) if system_prompt: print(f"[프롬프트] 시스템 프롬프트 적용: {len(system_prompt)}자") # 모델 타입 확인 (Gemini 또는 Ollama) is_gemini = model.startswith('gemini:') if is_gemini: # Gemini API 호출 gemini_model_name = model.replace('gemini:', '') print(f"[Gemini] 모델: {gemini_model_name}, 질문: {message[:50]}...") gemini_client = get_gemini_client() if not gemini_client.is_configured(): return jsonify({'error': 'Gemini API 키가 설정되지 않았습니다. GEMINI_API_KEY 환경 변수를 설정하세요.'}), 500 result = gemini_client.generate_response( prompt=full_prompt, model_name=gemini_model_name, temperature=0.7, max_output_tokens=8192 ) if result['error']: return jsonify({'error': result['error']}), 500 response_text = result['response'] else: # Ollama API 호출 ollama_response = requests.post( f'{OLLAMA_BASE_URL}/api/generate', json={ 'model': model, 'prompt': full_prompt, 'stream': False }, timeout=120 # 파일이 많을 수 있으므로 타임아웃 증가 ) if ollama_response.status_code != 200: # 오류 상세 정보 가져오기 try: error_detail = ollama_response.json().get('error', ollama_response.text[:200]) except: error_detail = ollama_response.text[:200] if ollama_response.text else '상세 정보 없음' if ollama_response.status_code == 404: error_msg = f'모델 "{model}"을(를) 찾을 수 없습니다. 모델이 Ollama에 설치되어 있는지 확인하세요. (오류: {error_detail})' else: error_msg = f'Ollama 서버 오류: {ollama_response.status_code} (오류: {error_detail})' return jsonify({'error': error_msg}), ollama_response.status_code ollama_data = ollama_response.json() response_text = ollama_data.get('response', '응답을 생성할 수 없습니다.') # 대화 세션에 메시지 저장 (Gemini와 Ollama 공통) session_id = data.get('session_id') session_dict = None if session_id: try: session = ChatSession.query.filter_by( id=session_id, user_id=current_user.id ).first() if session: # 사용자 메시지가 이미 저장되어 있는지 확인 (중복 방지) # 가장 최근 메시지를 확인하여 중복 저장 방지 latest_user_msg = ChatMessage.query.filter_by( session_id=session_id, role='user' ).order_by(ChatMessage.created_at.desc()).first() # 최근 10초 이내에 같은 내용의 메시지가 없으면 저장 should_save = True if latest_user_msg: time_diff = (datetime.utcnow() - latest_user_msg.created_at).total_seconds() if latest_user_msg.content == message and time_diff < 10: should_save = False print(f"[중복 방지] 최근 {time_diff:.2f}초 전에 같은 메시지가 저장되어 있습니다. 저장을 건너뜁니다.") if should_save: user_msg = ChatMessage( session_id=session_id, role='user', content=message ) db.session.add(user_msg) print(f"[메시지 저장] 사용자 메시지 저장: {message[:50]}...") # 세션 제목 업데이트 (첫 사용자 메시지인 경우) title_needs_update = ( not session.title or session.title.strip() == '' or session.title == '새 대화' ) if title_needs_update and message.strip(): # 메시지 내용을 제목으로 사용 (최대 30자) title = message.strip()[:30] if len(message.strip()) > 30: title += '...' session.title = title print(f"[세션 제목] 업데이트: '{title}' (원본 길이: {len(message.strip())}자)") elif title_needs_update: print(f"[세션 제목] 메시지가 비어있어 제목을 업데이트하지 않습니다.") else: print(f"[메시지 저장] 중복 메시지로 인해 저장을 건너뜁니다.") # AI 응답 저장 ai_msg = ChatMessage( session_id=session_id, role='ai', content=response_text ) db.session.add(ai_msg) session.updated_at = datetime.utcnow() db.session.commit() # 세션 정보를 응답에 포함 (제목 업데이트 반영) session_dict = session.to_dict() except Exception as e: print(f"메시지 저장 오류: {str(e)}") db.session.rollback() session_dict = None response_data = {'response': response_text, 'session_id': session_id} if session_dict: response_data['session'] = session_dict return jsonify(response_data) except requests.exceptions.ConnectionError: return jsonify({'error': 'Ollama 서버에 연결할 수 없습니다. Ollama가 실행 중인지 확인하세요.'}), 503 except requests.exceptions.Timeout: return jsonify({'error': '응답 시간이 초과되었습니다. 더 짧은 메시지를 시도해보세요.'}), 504 except Exception as e: return jsonify({'error': f'Ollama 통신 중 오류가 발생했습니다: {str(e)}'}), 500 else: # 모델이 선택되지 않은 경우 기본 응답 response_text = f"안녕하세요! '{message}'에 대한 답변을 준비 중입니다.\n\n좌측 하단에서 로컬 AI 모델을 선택하면 더 정확한 답변을 제공할 수 있습니다." return jsonify({'response': response_text}) except Exception as e: return jsonify({'error': f'채팅 처리 중 오류가 발생했습니다: {str(e)}'}), 500 @main_bp.route('/api/upload', methods=['POST']) @login_required def upload_file(): """웹소설 파일 업로드""" import sys import traceback # 모든 출력을 즉시 플러시하여 로그가 바로 보이도록 def log_print(*args, **kwargs): from datetime import datetime timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3] print(f"[{timestamp}]", *args, **kwargs) sys.stdout.flush() try: log_print(f"\n{'='*60}") log_print(f"=== 파일 업로드 요청 시작 ===") log_print(f"요청 URL: {request.url}") log_print(f"요청 메서드: {request.method}") log_print(f"Content-Type: {request.content_type}") log_print(f"Content-Length: {request.content_length}") log_print(f"Remote Address: {request.remote_addr}") log_print(f"Headers: {dict(request.headers)}") log_print(f"Form 데이터 키: {list(request.form.keys())}") log_print(f"Files 키: {list(request.files.keys())}") log_print(f"사용자: {current_user.username if current_user and current_user.is_authenticated else 'None'}") log_print(f"사용자 인증 상태: {current_user.is_authenticated if current_user else False}") log_print(f"{'='*60}\n") # 업로드 폴더 확인 및 생성 try: ensure_upload_folder() log_print(f"[1/8] 업로드 폴더 확인 완료: {UPLOAD_FOLDER}") except Exception as e: error_msg = f'업로드 폴더를 준비할 수 없습니다: {str(e)}' log_print(f"[ERROR] {error_msg}") traceback.print_exc() return jsonify({'error': error_msg, 'step': 'folder_check'}), 500 if 'file' not in request.files: error_msg = '파일이 없습니다.' log_print(f"[ERROR] {error_msg}") log_print(f"사용 가능한 키: {list(request.files.keys())}") return jsonify({'error': error_msg, 'step': 'file_check'}), 400 file = request.files['file'] model_name = request.form.get('model_name', '').strip() parent_file_id = request.form.get('parent_file_id', None) # 이어서 업로드할 경우 원본 파일 ID log_print(f"[2/8] 파일 수신: {file.filename if file else 'None'}") log_print(f"[2/8] 모델명: {model_name if model_name else 'None (비어있음)'}") log_print(f"[2/8] 이어서 업로드: {parent_file_id if parent_file_id else '아니오'}") if file.filename == '': error_msg = '파일명이 없습니다.' log_print(f"[ERROR] {error_msg}") return jsonify({'error': error_msg, 'step': 'filename_check'}), 400 # 모델명 검증 if not model_name: error_msg = 'AI 모델을 선택해주세요.' log_print(f"[ERROR] {error_msg}") return jsonify({'error': error_msg, 'step': 'model_check'}), 400 # parent_file_id 검증 (이어서 업로드인 경우) parent_file = None if parent_file_id: try: parent_file_id = int(parent_file_id) parent_file = UploadedFile.query.filter_by( id=parent_file_id, uploaded_by=current_user.id ).first() if not parent_file: error_msg = '원본 파일을 찾을 수 없습니다.' log_print(f"[ERROR] {error_msg}") return jsonify({'error': error_msg, 'step': 'parent_file_check'}), 404 # 같은 모델인지 확인 if parent_file.model_name != model_name: error_msg = '같은 모델의 파일에만 이어서 업로드할 수 있습니다.' log_print(f"[ERROR] {error_msg}") return jsonify({'error': error_msg, 'step': 'model_mismatch'}), 400 log_print(f"[이어서 업로드] 원본 파일: {parent_file.original_filename} (ID: {parent_file_id})") except (ValueError, TypeError): parent_file_id = None log_print(f"[경고] 잘못된 parent_file_id: {parent_file_id}") log_print(f"[3/8] 업로드 시도: {file.filename}, 모델: {model_name}") if not allowed_file(file.filename): error_msg = f'허용되지 않은 파일 형식입니다. 허용 형식: {", ".join(ALLOWED_EXTENSIONS)}' log_print(f"[ERROR] {error_msg}") return jsonify({'error': error_msg, 'step': 'file_type_check'}), 400 log_print(f"[4/8] 파일 형식 확인 완료: {file.filename}") # 파일 크기 확인 (Content-Length 헤더 사용) file_size = 0 try: # Content-Length 헤더 확인 if request.content_length: file_size = request.content_length print(f"Content-Length로 파일 크기 확인: {file_size} bytes") else: # Content-Length가 없으면 파일 스트림에서 크기 확인 시도 try: # 파일 스트림의 현재 위치 저장 current_pos = file.tell() # 파일 끝으로 이동 file.seek(0, os.SEEK_END) file_size = file.tell() # 원래 위치로 복원 file.seek(current_pos, os.SEEK_SET) print(f"파일 스트림으로 크기 확인: {file_size} bytes") except (AttributeError, IOError, OSError) as e: print(f"파일 크기 확인 실패 (저장 후 확인): {str(e)}") file_size = 0 # 저장 후 확인하도록 0으로 설정 except Exception as e: print(f"파일 크기 확인 오류: {str(e)}") file_size = 0 # 저장 후 확인하도록 0으로 설정 # 파일 크기 사전 체크 (가능한 경우에만) if file_size > 0: if file_size > 100 * 1024 * 1024: # 100MB print(f"파일 크기 초과: {file_size} bytes") return jsonify({'error': '파일 크기가 너무 큽니다. 최대 100MB까지 업로드 가능합니다.'}), 400 if file_size == 0: print("빈 파일 업로드 시도") return jsonify({'error': '빈 파일은 업로드할 수 없습니다.'}), 400 # 안전한 파일명 생성 original_filename = file.filename filename = secure_filename(original_filename) if not filename: return jsonify({'error': '유효하지 않은 파일명입니다.'}), 400 unique_filename = f"{uuid.uuid4().hex}_{filename}" file_path = os.path.join(UPLOAD_FOLDER, unique_filename) # 파일 저장 try: log_print(f"[6/8] 파일 저장 시도: {file_path}") file.save(file_path) log_print(f"[6/8] 파일 저장 완료: {file_path}") except IOError as e: error_msg = f'파일 저장 중 오류가 발생했습니다: {str(e)}' log_print(f"[ERROR] 파일 저장 IOError: {error_msg}") traceback.print_exc() return jsonify({'error': error_msg, 'step': 'file_save'}), 500 except PermissionError as e: error_msg = f'파일 저장 권한 오류: {str(e)}' log_print(f"[ERROR] 파일 저장 PermissionError: {error_msg}") traceback.print_exc() return jsonify({'error': error_msg, 'step': 'file_save_permission'}), 500 except Exception as e: error_msg = f'파일 저장 실패: {str(e)}' log_print(f"[ERROR] 파일 저장 Exception: {error_msg}") traceback.print_exc() return jsonify({'error': error_msg, 'step': 'file_save'}), 500 # 저장된 파일 크기 확인 if not os.path.exists(file_path): error_msg = '파일이 저장되지 않았습니다.' print(f"파일 존재 확인 실패: {file_path}") return jsonify({'error': error_msg}), 500 saved_file_size = os.path.getsize(file_path) if saved_file_size == 0: os.remove(file_path) # 빈 파일 삭제 error_msg = '파일이 제대로 저장되지 않았습니다.' print(f"빈 파일 삭제: {file_path}") return jsonify({'error': error_msg}), 500 print(f"저장된 파일 크기: {saved_file_size} bytes") # 데이터베이스에 저장 try: log_print(f"[7/8] 데이터베이스 저장 시도: {original_filename}") uploaded_file = UploadedFile( filename=unique_filename, original_filename=original_filename, file_path=file_path, file_size=saved_file_size, model_name=model_name, # 이미 검증됨 uploaded_by=current_user.id, parent_file_id=parent_file_id if parent_file else None # 이어서 업로드인 경우 ) db.session.add(uploaded_file) db.session.flush() # ID를 얻기 위해 flush log_print(f"[7/8] 데이터베이스 flush 완료, 파일 ID: {uploaded_file.id}") # 텍스트 파일인 경우 청크로 분할하여 저장 (RAG용) if original_filename.lower().endswith(('.txt', '.md')): try: log_print(f"[7/8] 청크 생성 시작: {original_filename}") log_print(f"[7/8] 파일 ID: {uploaded_file.id}") # 파일 내용 읽기 encoding = 'utf-8' try: with open(file_path, 'r', encoding=encoding) as f: content = f.read() log_print(f"[7/8] UTF-8 인코딩으로 파일 읽기 성공: {len(content)}자") except UnicodeDecodeError: log_print(f"[7/8] UTF-8 인코딩 실패, CP949 시도: {original_filename}") with open(file_path, 'r', encoding='cp949') as f: content = f.read() log_print(f"[7/8] CP949 인코딩으로 파일 읽기 성공: {len(content)}자") # 1. Parent Chunk 생성 (AI 분석) - 먼저 생성 try: log_print(f"[7/8] Parent Chunk 생성 시작 (AI 분석)...") parent_chunk = create_parent_chunk_with_ai(uploaded_file.id, content, model_name) if parent_chunk: log_print(f"[7/8] ✅ Parent Chunk 생성 완료: {original_filename}") print(f"Parent Chunk가 생성되었습니다: {original_filename}") else: log_print(f"[7/8] ⚠️ 경고: Parent Chunk 생성 실패: {original_filename}") print(f"경고: Parent Chunk 생성에 실패했습니다: {original_filename}") except Exception as parent_chunk_error: # Parent Chunk 생성 실패해도 업로드는 계속 진행 log_print(f"[7/8] ⚠️ 경고: Parent Chunk 생성 중 예외 발생: {str(parent_chunk_error)}") print(f"경고: Parent Chunk 생성 중 오류가 발생했습니다: {original_filename}") import traceback traceback.print_exc() # 2. Child Chunk 생성 및 저장 (섹션별 분할) log_print(f"[8/8] Child Chunk 생성 함수 호출 중...") chunk_count = create_chunks_for_file(uploaded_file.id, content) if chunk_count > 0: log_print(f"[8/8] ✅ 성공: 파일 {original_filename}을 {chunk_count}개의 청크로 분할했습니다.") print(f"파일 {original_filename}을 {chunk_count}개의 청크로 분할했습니다.") else: log_print(f"[8/8] ⚠️ 경고: 청크가 생성되지 않았습니다. (파일이 너무 짧거나 비어있을 수 있습니다.)") print(f"경고: 파일 {original_filename}에 대한 청크가 생성되지 않았습니다.") except Exception as e: error_msg = f"청크 생성 중 오류: {str(e)}" log_print(f"[7/8] ❌ 오류: {error_msg}") print(error_msg) import traceback traceback.print_exc() # 청크 생성 실패해도 파일 업로드는 계속 진행 (경고만 표시) log_print(f"[7/8] ⚠️ 경고: 청크 생성 실패했지만 파일 업로드는 계속 진행합니다.") # 최종 청크 개수 확인 및 저장 chunk_count = 0 if original_filename.lower().endswith(('.txt', '.md')): chunk_count = DocumentChunk.query.filter_by(file_id=uploaded_file.id).count() log_print(f"[8/8] 최종 청크 개수 확인: {chunk_count}개") db.session.commit() log_print(f"[8/8] 데이터베이스 커밋 완료: {original_filename}") log_print(f"[8/8] 연결된 모델: {model_name}") log_print(f"[8/8] 생성된 청크 수: {chunk_count}") # 학습 상태 요약 if chunk_count > 0: log_print(f"[8/8] ✅ AI 학습 준비 완료: {chunk_count}개 청크가 저장되어 RAG 검색에 사용 가능합니다.") else: log_print(f"[8/8] ⚠️ 경고: 청크가 생성되지 않아 RAG 검색이 불가능합니다.") log_print(f"{'='*60}") log_print(f"=== 파일 업로드 성공 ===") log_print(f"{'='*60}\n") log_print(f"[8/8] 업로드 완료 - 파일: {original_filename}, 모델: {model_name}, 크기: {saved_file_size} bytes") return jsonify({ 'message': f'파일이 성공적으로 업로드되었습니다. (모델: {model_name})', 'file': uploaded_file.to_dict(), 'model_name': model_name, 'chunk_count': chunk_count }), 200 except Exception as e: db.session.rollback() error_msg = f'데이터베이스 저장 중 오류가 발생했습니다: {str(e)}' log_print(f"[ERROR] 데이터베이스 저장 오류: {error_msg}") traceback.print_exc() # 데이터베이스 저장 실패 시 파일도 삭제 if 'file_path' in locals() and os.path.exists(file_path): try: os.remove(file_path) log_print(f"오류로 인한 파일 삭제: {file_path}") except Exception as del_e: log_print(f"파일 삭제 실패: {str(del_e)}") return jsonify({'error': error_msg, 'step': 'database_save'}), 500 except Exception as e: db.session.rollback() error_msg = str(e) error_type = type(e).__name__ log_print(f"\n{'='*60}") log_print(f"=== 업로드 처리 중 예외 발생 ===") log_print(f"예외 타입: {error_type}") log_print(f"에러 메시지: {error_msg}") traceback.print_exc() log_print(f"{'='*60}\n") # 파일 크기 초과 오류 처리 if '413' in error_msg or 'Request Entity Too Large' in error_msg or error_type == 'RequestEntityTooLarge': return jsonify({'error': '파일 크기가 너무 큽니다. 최대 100MB까지 업로드 가능합니다.', 'step': 'file_size'}), 413 return jsonify({'error': f'파일 업로드 중 오류가 발생했습니다: {error_type}: {error_msg}', 'step': 'exception'}), 500 @main_bp.route('/api/files', methods=['GET']) @login_required def get_files(): """업로드된 파일 목록 조회""" try: model_name = request.args.get('model_name', None) # 원본 파일만 조회 (parent_file_id가 None인 파일) # 모든 사용자가 업로드된 모든 파일을 볼 수 있음 query = UploadedFile.query.filter_by(parent_file_id=None) print(f"[파일 조회] 모든 파일 조회 (사용자: {current_user.username})") # 모델 필터링 전 전체 파일 수 확인 total_before_filter = query.count() print(f"[파일 조회] 필터링 전 파일 수: {total_before_filter}개") if model_name: query = query.filter_by(model_name=model_name) print(f"[파일 조회] 모델 '{model_name}' 필터링") files = query.order_by(UploadedFile.uploaded_at.desc()).all() # 필터링 후 파일 수와 모델명 확인 print(f"[파일 조회] 필터링 후 파일 수: {len(files)}개") if len(files) > 0: print(f"[파일 조회] 첫 번째 파일 모델명: {files[0].model_name}") else: # 필터링 결과가 없을 때 실제 존재하는 모델명 확인 all_files = UploadedFile.query.filter_by(parent_file_id=None).all() unique_models = set(f.model_name for f in all_files if f.model_name) print(f"[파일 조회] 데이터베이스에 존재하는 모델명 목록: {list(unique_models)}") # 각 원본 파일에 대해 이어서 업로드된 파일도 포함 files_with_children = [] for file in files: file_dict = file.to_dict() # 청크 개수 추가 chunk_count = DocumentChunk.query.filter_by(file_id=file.id).count() file_dict['chunk_count'] = chunk_count # Parent Chunk 존재 여부 확인 has_parent_chunk = ParentChunk.query.filter_by(file_id=file.id).first() is not None file_dict['has_parent_chunk'] = has_parent_chunk # 이어서 업로드된 파일들도 조회 child_files = UploadedFile.query.filter_by(parent_file_id=file.id).order_by(UploadedFile.uploaded_at.asc()).all() child_files_dict = [] for child in child_files: child_dict = child.to_dict() child_chunk_count = DocumentChunk.query.filter_by(file_id=child.id).count() child_dict['chunk_count'] = child_chunk_count # Child 파일도 Parent Chunk 확인 child_has_parent_chunk = ParentChunk.query.filter_by(file_id=child.id).first() is not None child_dict['has_parent_chunk'] = child_has_parent_chunk child_files_dict.append(child_dict) file_dict['child_files'] = child_files_dict files_with_children.append(file_dict) # 모델별 통계 정보 추가 (원본 파일만 카운트) model_stats = {} if not model_name: # 모든 모델의 통계 (원본 파일만) # 모든 사용자가 모든 파일을 볼 수 있음 all_files = UploadedFile.query.filter_by(parent_file_id=None).all() for file in all_files: model = file.model_name or '미지정' if model not in model_stats: model_stats[model] = {'count': 0, 'total_size': 0} model_stats[model]['count'] += 1 model_stats[model]['total_size'] += file.file_size else: # 특정 모델의 통계 model_stats[model_name] = { 'count': len(files), 'total_size': sum(f.file_size for f in files) } print(f"[파일 조회] 조회된 원본 파일 수: {len(files)}개") return jsonify({ 'files': files_with_children, 'model_stats': model_stats, 'filtered_model': model_name }), 200 except Exception as e: return jsonify({'error': f'파일 목록 조회 중 오류가 발생했습니다: {str(e)}'}), 500 @main_bp.route('/api/files//chunks', methods=['GET']) @login_required def get_file_chunks(file_id): """파일의 청크 정보 조회 (학습 상태 확인용)""" try: file = UploadedFile.query.filter_by(id=file_id, uploaded_by=current_user.id).first() if not file: return jsonify({'error': '파일을 찾을 수 없습니다.'}), 404 chunks = DocumentChunk.query.filter_by(file_id=file_id).order_by(DocumentChunk.chunk_index.asc()).all() total_chunks = len(chunks) # 샘플 청크 (처음 3개) sample_chunks = [] for chunk in chunks[:3]: sample_chunks.append({ 'index': chunk.chunk_index, 'content_preview': chunk.content[:100] + '...' if len(chunk.content) > 100 else chunk.content, 'content_length': len(chunk.content) }) return jsonify({ 'file_id': file_id, 'filename': file.original_filename, 'model_name': file.model_name, 'total_chunks': total_chunks, 'sample_chunks': sample_chunks, 'learning_status': 'ready' if total_chunks > 0 else 'not_ready', 'message': f'{total_chunks}개 청크가 저장되어 RAG 검색에 사용 가능합니다.' if total_chunks > 0 else '청크가 생성되지 않아 RAG 검색이 불가능합니다.' }), 200 except Exception as e: return jsonify({'error': f'청크 정보 조회 중 오류가 발생했습니다: {str(e)}'}), 500 @main_bp.route('/api/files//chunks/all', methods=['GET']) @login_required def get_all_file_chunks(file_id): """파일의 모든 청크 목록과 내용 조회 (관리자용)""" try: # 관리자는 모든 파일 조회 가능 if current_user.is_admin: file = UploadedFile.query.get(file_id) else: file = UploadedFile.query.filter_by(id=file_id, uploaded_by=current_user.id).first() if not file: return jsonify({'error': '파일을 찾을 수 없습니다.'}), 404 chunks = DocumentChunk.query.filter_by(file_id=file_id).order_by(DocumentChunk.chunk_index.asc()).all() chunks_data = [] for chunk in chunks: chunk_dict = { 'id': chunk.id, 'chunk_index': chunk.chunk_index, 'content': chunk.content, 'content_length': len(chunk.content), 'created_at': chunk.created_at.isoformat() if chunk.created_at else None } # 메타데이터 파싱 if chunk.chunk_metadata: try: metadata = json.loads(chunk.chunk_metadata) chunk_dict['metadata'] = metadata except: chunk_dict['metadata'] = None else: chunk_dict['metadata'] = None chunks_data.append(chunk_dict) return jsonify({ 'file_id': file_id, 'filename': file.original_filename, 'model_name': file.model_name, 'total_chunks': len(chunks_data), 'chunks': chunks_data }), 200 except Exception as e: return jsonify({'error': f'청크 목록 조회 중 오류가 발생했습니다: {str(e)}'}), 500 @main_bp.route('/api/files//parent-chunk', methods=['GET']) @login_required def get_file_parent_chunk(file_id): """파일의 Parent Chunk 조회""" try: file = UploadedFile.query.filter_by(id=file_id, uploaded_by=current_user.id).first() if not file: return jsonify({'error': '파일을 찾을 수 없습니다.'}), 404 parent_chunk = ParentChunk.query.filter_by(file_id=file_id).first() if not parent_chunk: return jsonify({ 'file_id': file_id, 'filename': file.original_filename, 'has_parent_chunk': False, 'message': 'Parent Chunk가 생성되지 않았습니다.' }), 200 return jsonify({ 'file_id': file_id, 'filename': file.original_filename, 'has_parent_chunk': True, 'parent_chunk': parent_chunk.to_dict(), 'message': 'Parent Chunk가 존재합니다.' }), 200 except Exception as e: return jsonify({'error': f'Parent Chunk 조회 중 오류가 발생했습니다: {str(e)}'}), 500 @main_bp.route('/api/files//parent-chunk', methods=['POST']) @login_required def create_file_parent_chunk(file_id): """파일의 Parent Chunk 수동 생성 (재생성)""" try: file = UploadedFile.query.filter_by(id=file_id, uploaded_by=current_user.id).first() if not file: return jsonify({'error': '파일을 찾을 수 없습니다.'}), 404 # 모델명 확인 if not file.model_name: return jsonify({'error': '파일에 연결된 AI 모델이 없습니다. Parent Chunk를 생성할 수 없습니다.'}), 400 # 파일이 텍스트 파일인지 확인 if not file.original_filename.lower().endswith(('.txt', '.md')): return jsonify({'error': 'Parent Chunk는 텍스트 파일(.txt, .md)에만 생성할 수 있습니다.'}), 400 # 파일 내용 읽기 try: encoding = 'utf-8' try: with open(file.file_path, 'r', encoding=encoding) as f: content = f.read() except UnicodeDecodeError: with open(file.file_path, 'r', encoding='cp949') as f: content = f.read() except Exception as e: return jsonify({'error': f'파일을 읽을 수 없습니다: {str(e)}'}), 500 if not content or len(content.strip()) == 0: return jsonify({'error': '파일 내용이 비어있습니다.'}), 400 # Parent Chunk 생성 print(f"[Parent Chunk 수동 생성] 파일 ID {file_id}에 대한 Parent Chunk 생성 시작") print(f"[Parent Chunk 수동 생성] 모델명: {file.model_name}") print(f"[Parent Chunk 수동 생성] 파일명: {file.original_filename}") parent_chunk = create_parent_chunk_with_ai(file_id, content, file.model_name) if parent_chunk: return jsonify({ 'file_id': file_id, 'filename': file.original_filename, 'has_parent_chunk': True, 'parent_chunk': parent_chunk.to_dict(), 'message': 'Parent Chunk가 성공적으로 생성되었습니다.' }), 200 else: return jsonify({ 'error': 'Parent Chunk 생성에 실패했습니다. 서버 로그를 확인하세요.', 'file_id': file_id, 'filename': file.original_filename }), 500 except Exception as e: return jsonify({'error': f'Parent Chunk 생성 중 오류가 발생했습니다: {str(e)}'}), 500 @main_bp.route('/api/files//metadata', methods=['POST']) @login_required def create_file_metadata(file_id): """파일의 모든 청크에 메타데이터 생성 (수동 생성)""" try: file = UploadedFile.query.get_or_404(file_id) # 권한 확인 if not current_user.is_admin and file.uploaded_by != current_user.id: return jsonify({'error': '권한이 없습니다.'}), 403 # 모델명 확인 if not file.model_name: return jsonify({'error': '파일에 연결된 AI 모델이 없습니다. 메타데이터를 생성할 수 없습니다.'}), 400 # 텍스트 파일만 가능 if not file.original_filename.lower().endswith(('.txt', '.md')): return jsonify({'error': '메타데이터는 텍스트 파일(.txt, .md)에만 생성할 수 있습니다.'}), 400 # 파일 내용 읽기 encoding = 'utf-8' try: with open(file.file_path, 'r', encoding=encoding) as f: content = f.read() except UnicodeDecodeError: with open(file.file_path, 'r', encoding='cp949') as f: content = f.read() # 모든 청크 가져오기 chunks = DocumentChunk.query.filter_by(file_id=file_id).order_by(DocumentChunk.chunk_index).all() if not chunks: return jsonify({'error': '청크가 없습니다. 먼저 파일을 업로드하세요.'}), 400 print(f"[메타데이터 생성] 파일 ID {file_id}에 대한 메타데이터 생성 시작") print(f"[메타데이터 생성] 모델명: {file.model_name}") print(f"[메타데이터 생성] 파일명: {file.original_filename}") print(f"[메타데이터 생성] 청크 개수: {len(chunks)}개") # 각 청크에 메타데이터 생성 success_count = 0 fail_count = 0 for chunk in chunks: try: # 기존 메타데이터 읽기 existing_metadata = {} if chunk.chunk_metadata: try: existing_metadata = json.loads(chunk.chunk_metadata) except: existing_metadata = {} # 새 메타데이터 추출 new_metadata = extract_chunk_metadata( chunk_content=chunk.content, full_content=content, # 원본 웹소설 전체 내용 참조 chunk_index=chunk.chunk_index, file_id=file_id, model_name=file.model_name ) # 기존 메타데이터와 새 메타데이터 병합 (새 메타데이터가 우선) # 기존 메타데이터의 모든 필드를 유지하되, 새로 추출한 필드로 업데이트 # chapter 필드는 파일 업로드 시 추가된 회차 정보이므로 유지 merged_metadata = existing_metadata.copy() for key, value in new_metadata.items(): if value is not None and value != []: # 리스트인 경우 중복 제거 후 병합 if isinstance(value, list) and isinstance(merged_metadata.get(key), list): merged_list = merged_metadata.get(key, []).copy() for item in value: if item not in merged_list: merged_list.append(item) merged_metadata[key] = merged_list else: merged_metadata[key] = value # 메타데이터를 JSON 문자열로 변환 metadata_json = json.dumps(merged_metadata, ensure_ascii=False) if merged_metadata else None # 청크에 메타데이터 저장 chunk.chunk_metadata = metadata_json success_count += 1 # 진행 상황 출력 (10개마다) if (success_count + fail_count) % 10 == 0: print(f"[메타데이터 생성] 진행 중: {success_count + fail_count}/{len(chunks)}개 청크 처리 중...") except Exception as e: print(f"[메타데이터 생성] 경고: 청크 {chunk.chunk_index} 메타데이터 생성 실패: {str(e)}") fail_count += 1 continue # 데이터베이스 커밋 db.session.commit() print(f"[메타데이터 생성] 완료: {success_count}개 성공, {fail_count}개 실패") return jsonify({ 'file_id': file_id, 'filename': file.original_filename, 'total_chunks': len(chunks), 'success_count': success_count, 'fail_count': fail_count, 'message': f'메타데이터 생성이 완료되었습니다. (성공: {success_count}개, 실패: {fail_count}개)' }), 200 except Exception as e: db.session.rollback() print(f"[메타데이터 생성] 오류: {str(e)}") import traceback traceback.print_exc() return jsonify({'error': f'메타데이터 생성 중 오류가 발생했습니다: {str(e)}'}), 500 @main_bp.route('/api/files/', methods=['DELETE']) @login_required def delete_file(file_id): """업로드된 파일 삭제 (연관된 모든 파일도 함께 삭제)""" try: file = UploadedFile.query.get_or_404(file_id) # 원본 파일인 경우 (parent_file_id가 None인 경우) # 이어서 업로드된 모든 파일도 함께 삭제 files_to_delete = [] if file.parent_file_id is None: # 원본 파일이면, 이어서 업로드된 모든 파일도 찾아서 삭제 child_files = UploadedFile.query.filter_by(parent_file_id=file_id).all() files_to_delete = [file] + child_files print(f"[파일 삭제] 원본 파일 삭제: {file.original_filename}, 연관 파일 {len(child_files)}개도 함께 삭제") else: # 이어서 업로드된 파일이면 원본 파일도 함께 삭제 parent_file = UploadedFile.query.get(file.parent_file_id) if parent_file: # 원본 파일과 모든 연관 파일 삭제 all_child_files = UploadedFile.query.filter_by(parent_file_id=file.parent_file_id).all() files_to_delete = [parent_file] + all_child_files print(f"[파일 삭제] 이어서 업로드된 파일 삭제: {file.original_filename}, 원본 및 연관 파일 {len(all_child_files)}개도 함께 삭제") else: files_to_delete = [file] deleted_count = 0 deleted_files = [] for file_to_delete in files_to_delete: try: # 파일 시스템에서 삭제 if os.path.exists(file_to_delete.file_path): os.remove(file_to_delete.file_path) print(f"[파일 삭제] 파일 시스템에서 삭제: {file_to_delete.file_path}") # 관련 Child Chunk (DocumentChunk) 삭제 child_chunk_count = DocumentChunk.query.filter_by(file_id=file_to_delete.id).count() if child_chunk_count > 0: DocumentChunk.query.filter_by(file_id=file_to_delete.id).delete() print(f"[파일 삭제] Child Chunk {child_chunk_count}개 삭제 완료") # 벡터 DB에서도 해당 파일의 청크 삭제 try: vector_db = get_vector_db() vector_db.delete_chunks_by_file_id(file_to_delete.id) print(f"[파일 삭제] 벡터 DB에서 청크 삭제 완료") except Exception as vector_e: print(f"[파일 삭제] 벡터 DB 삭제 오류 (무시): {str(vector_e)}") # 관련 Parent Chunk 삭제 parent_chunk = ParentChunk.query.filter_by(file_id=file_to_delete.id).first() if parent_chunk: db.session.delete(parent_chunk) print(f"[파일 삭제] Parent Chunk 삭제 완료") deleted_files.append(file_to_delete.original_filename) db.session.delete(file_to_delete) deleted_count += 1 print(f"[파일 삭제] 데이터베이스에서 파일 삭제 완료: {file_to_delete.original_filename}") except Exception as e: print(f"[파일 삭제 오류] {file_to_delete.original_filename}: {str(e)}") import traceback traceback.print_exc() db.session.commit() message = f'파일이 성공적으로 삭제되었습니다.' if deleted_count > 1: message = f'파일 {deleted_count}개가 성공적으로 삭제되었습니다. (원본 및 연관 파일 포함)' return jsonify({ 'message': message, 'deleted_count': deleted_count, 'deleted_files': deleted_files }), 200 except Exception as e: db.session.rollback() return jsonify({'error': f'파일 삭제 중 오류가 발생했습니다: {str(e)}'}), 500 @main_bp.route('/api/files//content', methods=['GET']) @login_required def get_file_content(file_id): """업로드된 파일 내용 조회""" try: file = UploadedFile.query.get_or_404(file_id) if not os.path.exists(file.file_path): return jsonify({'error': '파일을 찾을 수 없습니다.'}), 404 # 텍스트 파일 읽기 encoding = 'utf-8' try: with open(file.file_path, 'r', encoding=encoding) as f: content = f.read() except UnicodeDecodeError: # UTF-8로 읽을 수 없으면 다른 인코딩 시도 with open(file.file_path, 'r', encoding='cp949') as f: content = f.read() return jsonify({ 'content': content, 'filename': file.original_filename }), 200 except Exception as e: return jsonify({'error': f'파일 내용 조회 중 오류가 발생했습니다: {str(e)}'}), 500 @main_bp.route('/api/chat/sessions', methods=['GET']) @login_required def get_chat_sessions(): """사용자의 대화 세션 목록 조회 (최근 20개만 표시)""" try: sessions = ChatSession.query.filter_by(user_id=current_user.id)\ .order_by(ChatSession.updated_at.desc())\ .limit(20).all() return jsonify({ 'sessions': [session.to_dict() for session in sessions] }), 200 except Exception as e: return jsonify({'error': f'대화 세션 조회 중 오류가 발생했습니다: {str(e)}'}), 500 @main_bp.route('/api/chat/sessions', methods=['POST']) @login_required def create_chat_session(): """새 대화 세션 생성""" try: data = request.json title = data.get('title', '새 대화') model_name = data.get('model_name', None) session = ChatSession( user_id=current_user.id, title=title, model_name=model_name ) db.session.add(session) db.session.commit() return jsonify({ 'message': '대화 세션이 생성되었습니다.', 'session': session.to_dict() }), 200 except Exception as e: db.session.rollback() return jsonify({'error': f'대화 세션 생성 중 오류가 발생했습니다: {str(e)}'}), 500 @main_bp.route('/api/chat/sessions/', methods=['GET']) @login_required def get_chat_session(session_id): """대화 세션 상세 조회 (메시지 포함)""" try: session = ChatSession.query.filter_by( id=session_id, user_id=current_user.id ).first_or_404() session_dict = session.to_dict() session_dict['messages'] = [msg.to_dict() for msg in session.messages] return jsonify({'session': session_dict}), 200 except Exception as e: return jsonify({'error': f'대화 세션 조회 중 오류가 발생했습니다: {str(e)}'}), 500 @main_bp.route('/api/chat/sessions/', methods=['PUT']) @login_required def update_chat_session(session_id): """대화 세션 수정 (제목 등)""" try: session = ChatSession.query.filter_by( id=session_id, user_id=current_user.id ).first_or_404() data = request.json if 'title' in data: session.title = data['title'] session.updated_at = datetime.utcnow() db.session.commit() return jsonify({ 'message': '대화 세션이 수정되었습니다.', 'session': session.to_dict() }), 200 except Exception as e: db.session.rollback() return jsonify({'error': f'대화 세션 수정 중 오류가 발생했습니다: {str(e)}'}), 500 @main_bp.route('/api/chat/sessions/', methods=['DELETE']) @login_required def delete_chat_session(session_id): """대화 세션 삭제""" try: session = ChatSession.query.filter_by( id=session_id, user_id=current_user.id ).first_or_404() db.session.delete(session) db.session.commit() return jsonify({'message': '대화 세션이 삭제되었습니다.'}), 200 except Exception as e: db.session.rollback() return jsonify({'error': f'대화 세션 삭제 중 오류가 발생했습니다: {str(e)}'}), 500 @main_bp.route('/api/chat/sessions//messages', methods=['POST']) @login_required def add_chat_message(session_id): """대화 메시지 추가""" try: session = ChatSession.query.filter_by( id=session_id, user_id=current_user.id ).first_or_404() data = request.json role = data.get('role', 'user') content = data.get('content', '') if not content: return jsonify({'error': '메시지 내용이 필요합니다.'}), 400 message = ChatMessage( session_id=session_id, role=role, content=content ) db.session.add(message) # 세션 제목 업데이트 (첫 사용자 메시지인 경우) if not session.title or session.title == '새 대화': if role == 'user': title = content[:30] + '...' if len(content) > 30 else content session.title = title session.updated_at = datetime.utcnow() db.session.commit() return jsonify({ 'message': '메시지가 추가되었습니다.', 'chat_message': message.to_dict() }), 200 except Exception as e: db.session.rollback() return jsonify({'error': f'메시지 추가 중 오류가 발생했습니다: {str(e)}'}), 500