from flask import Blueprint, render_template, request, jsonify, send_from_directory, redirect, url_for, flash, send_file from flask_login import login_user, logout_user, login_required, current_user from werkzeug.utils import secure_filename from app.database import db, UploadedFile, User, ChatSession, ChatMessage, DocumentChunk, ParentChunk, SystemConfig, EpisodeAnalysis, GraphEntity, GraphRelationship, GraphEvent from app.vector_db import get_vector_db from app.gemini_client import get_gemini_client import requests import os from datetime import datetime import uuid import re import json main_bp = Blueprint('main', __name__) def admin_required(f): """관리자 권한이 필요한 데코레이터""" from functools import wraps @wraps(f) @login_required def decorated_function(*args, **kwargs): if not current_user.is_admin: # API 요청인 경우 JSON 응답 반환 if request.path.startswith('/api/'): return jsonify({'error': '관리자 권한이 필요합니다.'}), 403 flash('관리자 권한이 필요합니다.', 'error') return redirect(url_for('main.index')) return f(*args, **kwargs) return decorated_function # Ollama 기본 URL (환경 변수로 설정 가능) OLLAMA_BASE_URL = os.getenv('OLLAMA_BASE_URL', 'http://localhost:11434') def get_model_token_limit(model_name, default_tokens=2000, token_type='output'): """모델별 토큰 수 제한 가져오기 (하위 호환성을 위해 기본값은 출력 토큰) Args: model_name: AI 모델명 (예: "gemini-2.0-flash-exp", "gemini:gemini-2.0-flash-exp", "gemma2:9b") default_tokens: 기본 토큰 수 (설정이 없을 때 사용) token_type: 'input' 또는 'output' (기본값: 'output') Returns: 토큰 수 (정수) """ return get_model_token_limit_by_type(model_name, default_tokens, token_type) def get_model_token_limit_by_type(model_name, default_tokens=2000, token_type='output'): """모델별 토큰 수 제한 가져오기 (입력/출력/Parent Chunk 구분) Args: model_name: AI 모델명 (예: "gemini-2.0-flash-exp", "gemini:gemini-2.0-flash-exp", "gemma2:9b") default_tokens: 기본 토큰 수 (설정이 없을 때 사용) token_type: 'input', 'output', 또는 'parent_chunk' Returns: 토큰 수 (정수) """ if not model_name: return default_tokens try: from app.database import SystemConfig # 여러 형식의 모델명을 시도 # 1. 원본 모델명 그대로 # 2. Gemini 모델의 경우 "gemini:" 접두사 추가/제거 버전 # 3. Ollama 모델의 경우 그대로 model_name_clean = model_name.strip() possible_keys = [model_name_clean] # Gemini 모델 처리 if model_name_clean.startswith('gemini:'): # "gemini:gemini-2.0-flash-exp" -> "gemini:gemini-2.0-flash-exp" (그대로) # 또는 "gemini-2.0-flash-exp" (접두사 제거) possible_keys.append(model_name_clean.replace('gemini:', '', 1)) elif model_name_clean.startswith('gemini-'): # "gemini-2.0-flash-exp" -> "gemini:gemini-2.0-flash-exp" (접두사 추가) possible_keys.append(f'gemini:{model_name_clean}') # 각 가능한 키를 시도 for key in possible_keys: # 새로운 형식: model_token_input_{model_name}, model_token_output_{model_name}, model_token_parent_chunk_{model_name} config_key = f"model_token_{token_type}_{key}" token_value = SystemConfig.get_config(config_key) if token_value: try: token_int = int(token_value) print(f"[get_model_token_limit_by_type] 모델 '{model_name}'의 {token_type} 토큰 수 {token_int} 사용 (키: {config_key})") return token_int except (ValueError, TypeError): continue # 하위 호환성: 기존 형식 model_token_{model_name}도 확인 (출력 토큰으로 간주) if token_type == 'output': old_config_key = f"model_token_{key}" token_value = SystemConfig.get_config(old_config_key) if token_value: try: token_int = int(token_value) print(f"[get_model_token_limit_by_type] 모델 '{model_name}'의 출력 토큰 수 {token_int} 사용 (기존 키: {old_config_key})") return token_int except (ValueError, TypeError): continue # 설정이 없으면 기본값 사용 print(f"[get_model_token_limit_by_type] 모델 '{model_name}'의 {token_type} 토큰 수 설정이 없어 기본값 {default_tokens} 사용") except Exception as e: print(f"[get_model_token_limit_by_type] 오류: {e}") return default_tokens # 업로드 설정 UPLOAD_FOLDER = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'uploads') def save_system_token_usage(model_name, input_tokens=None, output_tokens=None, task_type='file_processing'): """시스템 사용 토큰 저장 (파일 업로드, 분석 등) Args: model_name: 사용된 AI 모델명 input_tokens: 입력 토큰 수 output_tokens: 출력 토큰 수 task_type: 작업 유형 ('parent_chunk', 'episode_analysis', 'graph_extraction', 'metadata_extraction', 'file_processing') """ try: # 토큰이 모두 None이거나 0이어도 저장 (통계 목적) print(f"[시스템 토큰 저장] 호출됨 - 작업: {task_type}, 모델: {model_name}, 입력: {input_tokens}, 출력: {output_tokens}") system_message = ChatMessage( session_id=None, # 시스템 사용은 세션이 없음 role='ai', content=f'시스템 작업: {task_type}', input_tokens=input_tokens, output_tokens=output_tokens, model_name=model_name, usage_type='system' ) db.session.add(system_message) db.session.commit() print(f"[시스템 토큰 저장] 성공 - {task_type} - 모델: {model_name}, 입력: {input_tokens}, 출력: {output_tokens}, 메시지 ID: {system_message.id}") except Exception as e: print(f"[시스템 토큰 저장] 오류: {str(e)}") import traceback traceback.print_exc() db.session.rollback() ALLOWED_EXTENSIONS = {'txt', 'md', 'pdf', 'docx', 'epub'} # 업로드 폴더 경로 출력 (디버깅용) print(f"[업로드 설정] 업로드 폴더 경로: {UPLOAD_FOLDER}") print(f"[업로드 설정] 업로드 폴더 존재 여부: {os.path.exists(UPLOAD_FOLDER)}") def allowed_file(filename): return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS def ensure_upload_folder(): """업로드 폴더가 없으면 생성""" try: if not os.path.exists(UPLOAD_FOLDER): print(f"업로드 폴더 생성 중: {UPLOAD_FOLDER}") os.makedirs(UPLOAD_FOLDER, exist_ok=True) if not os.path.exists(UPLOAD_FOLDER): raise Exception(f'업로드 폴더를 생성할 수 없습니다: {UPLOAD_FOLDER}') # 폴더 쓰기 권한 확인 test_file = os.path.join(UPLOAD_FOLDER, '.write_test') try: with open(test_file, 'w') as f: f.write('test') os.remove(test_file) print(f"업로드 폴더 쓰기 권한 확인 완료: {UPLOAD_FOLDER}") except PermissionError as e: raise Exception(f'업로드 폴더에 쓰기 권한이 없습니다: {UPLOAD_FOLDER} - {str(e)}') except Exception as e: raise Exception(f'업로드 폴더 쓰기 테스트 실패: {UPLOAD_FOLDER} - {str(e)}') except Exception as e: print(f"업로드 폴더 생성 오류: {str(e)}") import traceback traceback.print_exc() raise def split_text_into_chunks(text, min_chunk_size=200, max_chunk_size=1000, overlap=150): """의미 기반 텍스트 청킹 (문장과 문단 경계를 고려하여 분할)""" if not text or len(text.strip()) == 0: return [] # 1단계: 문단 단위로 분할 (빈 줄 기준) paragraphs = re.split(r'\n\s*\n', text.strip()) paragraphs = [p.strip() for p in paragraphs if p.strip()] if not paragraphs: return [] # 2단계: 각 문단을 문장 단위로 분할 # 문장 종결 기호: . ! ? (한글과 영문 모두 지원) # 구두점 뒤에 공백이나 줄바꿈이 오는 경우 문장 종료로 간주 sentence_pattern = r'([.!?]+)(?=\s+|$)' all_sentences = [] for para in paragraphs: # 문장 분리 (구두점 포함) parts = re.split(sentence_pattern, para) combined_sentences = [] current_sentence = "" for i, part in enumerate(parts): if part.strip(): if re.match(r'^[.!?]+$', part): # 구두점인 경우 현재 문장에 추가하고 문장 완성 current_sentence += part if current_sentence.strip(): combined_sentences.append(current_sentence.strip()) current_sentence = "" else: # 텍스트인 경우 현재 문장에 추가 current_sentence += part # 마지막 문장 처리 (구두점이 없는 경우) if current_sentence.strip(): combined_sentences.append(current_sentence.strip()) # 문장이 하나도 없는 경우 (구두점이 전혀 없는 문단) if not combined_sentences and para.strip(): combined_sentences.append(para.strip()) all_sentences.extend(combined_sentences) if not all_sentences: # 문장 분리가 안 되는 경우 원본 텍스트를 그대로 반환 return [text] if text.strip() else [] # 3단계: 문장들을 모아서 의미 있는 청크 생성 chunks = [] current_chunk = [] current_size = 0 for sentence in all_sentences: sentence_size = len(sentence) # 현재 청크에 문장 추가 시 최대 크기를 초과하는 경우 if current_size + sentence_size > max_chunk_size and current_chunk: # 현재 청크 저장 (줄바꿈 유지) chunk_text = '\n'.join(current_chunk) if len(chunk_text.strip()) >= min_chunk_size: chunks.append(chunk_text) else: # 최소 크기 미만이면 다음 청크와 병합 (오버랩 효과) if chunks: chunks[-1] = chunks[-1] + '\n' + chunk_text else: chunks.append(chunk_text) # 오버랩을 위한 문장 유지 (마지막 몇 문장을 다음 청크에 포함) overlap_sentences = [] overlap_size = 0 for s in reversed(current_chunk): if overlap_size + len(s) <= overlap: overlap_sentences.insert(0, s) overlap_size += len(s) + 1 # 줄바꿈 포함 else: break current_chunk = overlap_sentences + [sentence] current_size = overlap_size + sentence_size else: # 현재 청크에 문장 추가 current_chunk.append(sentence) current_size += sentence_size + 1 # 줄바꿈 포함 # 마지막 청크 추가 if current_chunk: chunk_text = '\n'.join(current_chunk) if chunks and len(chunk_text.strip()) < min_chunk_size: # 최소 크기 미만이면 이전 청크와 병합 chunks[-1] = chunks[-1] + '\n' + chunk_text else: chunks.append(chunk_text) # 빈 청크 제거 및 최소 크기 미만 청크 처리 final_chunks = [] for chunk in chunks: chunk = chunk.strip() if chunk and len(chunk) >= min_chunk_size: final_chunks.append(chunk) elif chunk: # 최소 크기 미만 청크는 이전 청크와 병합 if final_chunks: final_chunks[-1] = final_chunks[-1] + '\n' + chunk else: final_chunks.append(chunk) return final_chunks if final_chunks else [text] if text.strip() else [] def extract_chapter_number(text): """텍스트에서 챕터 번호 추출""" # 다양한 챕터 패턴 매칭 patterns = [ r'제\s*(\d+)\s*장', # 제1장, 제 1 장 r'제\s*(\d+)\s*화', # 제1화 r'Chapter\s*(\d+)', # Chapter 1 r'CHAPTER\s*(\d+)', # CHAPTER 1 r'Ch\.\s*(\d+)', # Ch. 1 r'(\d+)\s*장', # 1장 r'(\d+)\s*화', # 1화 r'CHAPTER\s*(\d+)', # CHAPTER 1 r'chap\.\s*(\d+)', # chap. 1 r'ch\s*(\d+)', # ch 1 r'(\d+)\s*章', # 1章 ] # 텍스트의 처음 500자만 검사 (챕터 정보는 보통 앞부분에 있음) search_text = text[:500] for pattern in patterns: match = re.search(pattern, search_text, re.IGNORECASE) if match: try: chapter_num = int(match.group(1)) return chapter_num except: continue return None def split_content_by_episodes(content): """원본 웹소설을 #작품설명, #1화, #2화 등으로 분할 Returns: list: [(section_type, section_title, section_content, metadata), ...] section_type: '작품설명' or '화' section_title: '작품설명' or '1화', '2화', ... metadata: {'chapter': '#작품설명'} or {'chapter': '1화'} """ if not content or len(content.strip()) == 0: return [] sections = [] # #작품설명, #1화, #2화 등의 패턴 찾기 # 패턴: #작품설명, #1화, #2화, #10화 등 episode_pattern = r'^#\s*(작품설명|\d+화)' lines = content.split('\n') current_section_type = None current_section_title = None current_section_content = [] current_section_start_line = 0 for i, line in enumerate(lines): # 줄 시작 부분에서 #작품설명 또는 #n화 패턴 찾기 match = re.match(episode_pattern, line.strip()) if match: # 이전 섹션 저장 if current_section_type and current_section_content: section_content = '\n'.join(current_section_content).strip() if section_content: # 메타데이터 생성 if current_section_type == '작품설명': metadata = {'chapter': '#작품설명'} else: metadata = {'chapter': current_section_title} sections.append(( current_section_type, current_section_title, section_content, metadata )) # 새 섹션 시작 section_title = match.group(1) if section_title == '작품설명': current_section_type = '작품설명' current_section_title = '작품설명' else: current_section_type = '화' current_section_title = section_title # '1화', '2화' 등 current_section_content = [line] # 헤더 라인 포함 current_section_start_line = i else: # 현재 섹션에 내용 추가 if current_section_content is not None: current_section_content.append(line) # 마지막 섹션 저장 if current_section_type and current_section_content: section_content = '\n'.join(current_section_content).strip() if section_content: # 메타데이터 생성 if current_section_type == '작품설명': metadata = {'chapter': '#작품설명'} else: metadata = {'chapter': current_section_title} sections.append(( current_section_type, current_section_title, section_content, metadata )) # 섹션이 하나도 없으면 전체를 하나의 섹션으로 처리 if not sections: sections.append(( '기타', '전체', content.strip(), {'chapter': None} )) return sections def extract_metadata_with_ai(chunk_content, full_content=None, parent_chunk=None, model_name=None): """AI를 사용하여 청크의 메타데이터 추출 (화자, 등장인물, 시간적 배경, 인물 관계) Args: chunk_content: 분석할 청크 내용 full_content: 원본 웹소설 전체 내용 (인물 관계 파악용) parent_chunk: Parent Chunk 객체 (선택사항) model_name: 사용할 AI 모델명 """ try: # 원본 웹소설 전체 내용을 참조하여 인물 관계 파악 full_content_preview = "" if full_content: # 전체 내용이 너무 길면 앞부분과 뒷부분 일부만 사용 (최대 20000자) if len(full_content) > 20000: full_content_preview = full_content[:10000] + "\n... (중간 생략) ...\n" + full_content[-10000:] else: full_content_preview = full_content # 프롬프트 생성 prompt = f"""다음 웹소설 텍스트를 분석하여 아래 정보를 JSON 형식으로만 응답하세요. 원본 웹소설 전체 내용 (참고용): {full_content_preview[:50000] if full_content_preview else "없음"} 분석할 청크 텍스트: {chunk_content[:2000]} 다음 형식으로만 응답하세요 (JSON 형식): {{ "pov": "화자/시점을 설명하세요 (예: 1인칭 주인공, 3인칭 전지적 작가 등)", "characters": ["등장인물1", "등장인물2"], "time_background": "시간적 배경 설명 (예: 과거 회상, 현재 시점, 미래 등)", "character_relationships": [ {{ "character1": "인물1", "character2": "인물2", "relationship": "현재 시점에서의 관계 설명 (예: 연인, 적, 친구, 가족 등)" }} ] }} character_relationships는 이 청크에 등장하는 인물들 간의 현재 관계를 원본 웹소설 전체 내용을 참고하여 파악한 것입니다. 응답은 오직 JSON 형식만 사용하고, 다른 설명은 포함하지 마세요.""" # 모델명이 없으면 기본값 사용 (Gemini 우선 시도) if not model_name: # Gemini 시도 try: gemini_client = get_gemini_client() if gemini_client.is_configured(): result = gemini_client.generate_response( prompt=prompt, model_name="gemini-1.5-flash", temperature=0.3, max_output_tokens=get_model_token_limit(model_name or "gemini-1.5-flash", 500) # 저장된 토큰 수 사용 ) if not result['error'] and result.get('response'): response_text = result['response'].strip() # JSON 추출 json_match = re.search(r'\{.*\}', response_text, re.DOTALL) if json_match: metadata = json.loads(json_match.group(0)) return metadata except: pass # 모델명이 있거나 Gemini 실패 시 해당 모델 사용 if model_name: model_name_lower = model_name.lower().strip() is_gemini = model_name_lower.startswith('gemini:') or model_name_lower.startswith('gemini-') if is_gemini: gemini_model_name = model_name.strip() if gemini_model_name.lower().startswith('gemini:'): gemini_model_name = gemini_model_name.split(':', 1)[1].strip() gemini_client = get_gemini_client() if gemini_client.is_configured(): result = gemini_client.generate_response( prompt=prompt, model_name=gemini_model_name, temperature=0.3, max_output_tokens=get_model_token_limit(model_name or "gemini-1.5-flash", 500) # 저장된 토큰 수 사용 ) if not result['error'] and result.get('response'): response_text = result['response'].strip() json_match = re.search(r'\{.*\}', response_text, re.DOTALL) if json_match: metadata = json.loads(json_match.group(0)) return metadata else: # Ollama API 호출 try: # 입력 토큰 수를 num_ctx로 사용 num_ctx = get_model_token_limit_by_type(model_name, 100000, 'input') ollama_response = requests.post( f'{OLLAMA_BASE_URL}/api/generate', json={ 'model': model_name, 'prompt': prompt, 'stream': False, 'options': { 'temperature': 0.3, 'num_predict': get_model_token_limit(model_name, 500), # 저장된 토큰 수 사용 'num_ctx': num_ctx # 입력 토큰 수를 컨텍스트 윈도우로 사용 } }, timeout=120 # 2분 타임아웃 ) if ollama_response.status_code == 200: response_data = ollama_response.json() response_text = response_data.get('response', '').strip() json_match = re.search(r'\{.*\}', response_text, re.DOTALL) if json_match: metadata = json.loads(json_match.group(0)) return metadata except: pass # AI 추출 실패 시 기본값 반환 return { "pov": None, "characters": [], "time_background": None, "character_relationships": [] } except Exception as e: print(f"[메타데이터 추출] 오류: {str(e)}") return { "pov": None, "characters": [], "time_background": None, "character_relationships": [] } def extract_chunk_metadata(chunk_content, full_content=None, chunk_index=None, file_id=None, model_name=None): """청크의 메타데이터 추출 (화자, 등장인물, 시간적 배경, 인물 관계) Args: chunk_content: 분석할 청크 내용 full_content: 원본 웹소설 전체 내용 (인물 관계 파악용) chunk_index: 청크 인덱스 file_id: 파일 ID model_name: 사용할 AI 모델명 """ metadata = { "pov": None, "characters": [], "time_background": None, "character_relationships": [] } # AI를 사용한 메타데이터 추출 (화자, 등장인물, 시간적 배경, 인물 관계) # Parent Chunk가 있으면 참조 parent_chunk = None if file_id: try: parent_chunk = ParentChunk.query.filter_by(file_id=file_id).first() except: pass # 원본 웹소설 전체 내용을 참조하여 메타데이터 추출 ai_metadata = extract_metadata_with_ai(chunk_content, full_content, parent_chunk, model_name) if ai_metadata: metadata["pov"] = ai_metadata.get("pov") metadata["characters"] = ai_metadata.get("characters", []) metadata["time_background"] = ai_metadata.get("time_background") metadata["character_relationships"] = ai_metadata.get("character_relationships", []) return metadata def analyze_episode(episode_content, episode_title, full_content=None, parent_chunk=None, model_name=None): """회차별 분석 (주요 스토리, 등장인물, 인물 관계 변화, 기타) Args: episode_content: 분석할 회차 내용 episode_title: 회차 제목 (예: '1화', '2화') full_content: 원본 웹소설 전체 내용 (참고용) parent_chunk: Parent Chunk 객체 (선택사항) model_name: 사용할 AI 모델명 Returns: 분석 결과 텍스트 (하나의 텍스트로 이어서 저장) """ try: # 원본 웹소설 전체 내용을 참조 full_content_preview = "" if full_content: # 전체 내용이 너무 길면 앞부분과 뒷부분 일부만 사용 (최대 30000자) if len(full_content) > 30000: full_content_preview = full_content[:15000] + "\n... (중간 생략) ...\n" + full_content[-15000:] else: full_content_preview = full_content # Parent Chunk 정보 추가 parent_info = "" if parent_chunk: parent_info = f""" 작품 전체 정보: - 세계관: {parent_chunk.world_view or '없음'} - 주요 캐릭터: {parent_chunk.characters or '없음'} - 주요 스토리: {parent_chunk.story or '없음'} """ # 프롬프트 생성 prompt = f"""다음 웹소설의 {episode_title} 회차를 분석하여 아래 항목들을 하나의 텍스트로 이어서 작성해주세요. {parent_info} 원본 웹소설 전체 내용 (참고용): {full_content_preview[:50000] if full_content_preview else "없음"} 분석할 회차 내용 ({episode_title}): {episode_content[:10000] if len(episode_content) > 10000 else episode_content} 다음 형식으로 분석 결과를 작성해주세요 (하나의 텍스트로 이어서 작성): ## {episode_title} 주요 스토리 분석 [이 회차에서 일어난 주요 사건과 스토리 전개를 상세히 분석해주세요] ## {episode_title} 주요 등장 인물 분석 [이 회차에 등장한 주요 인물들과 그들의 역할, 행동, 특징을 분석해주세요] ## 인물과 인물간의 관계 변화 [이 회차에서 인물들 간의 관계가 어떻게 변화했는지, 새로운 관계가 형성되었는지 등을 분석해주세요] ## {episode_title} 인물 외모 분석 [이 회차에 등장한 인물들의 외모, 체형, 얼굴 특징, 신체적 특징 등을 상세히 분석해주세요. 특히 새로 등장한 인물이나 외모가 변경된 인물에 대해 자세히 설명해주세요] ## {episode_title} 인물 의복 분석 [이 회차에 등장한 인물들이 착용한 의복, 복장, 액세서리 등을 상세히 분석해주세요. 의복의 스타일, 색상, 특징, 상황에 맞는 복장인지 등을 분석해주세요] ## {episode_title} 배경 분석 [이 회차의 배경이 되는 장소, 환경, 시간대, 분위기 등을 상세히 분석해주세요. 장소의 특징, 분위기, 시간적 배경, 날씨, 계절 등을 포함하여 분석해주세요] ## 기타 [이 회차의 특별한 점, 중요 사건, 떡밥, 복선 등 기타 중요한 내용을 분석해주세요] 응답은 위 형식을 그대로 유지하면서 각 항목에 대한 상세한 분석 내용을 작성해주세요.""" # 모델명이 없으면 기본값 사용 (Gemini 우선 시도) if not model_name: # Gemini 시도 try: gemini_client = get_gemini_client() if gemini_client.is_configured(): result = gemini_client.generate_response( prompt=prompt, model_name="gemini-1.5-flash", temperature=0.5, max_output_tokens=get_model_token_limit("gemini-1.5-flash", 3000) # 저장된 토큰 수 사용 ) if not result['error'] and result.get('response'): # 시스템 사용 토큰 저장 (토큰이 None이어도 저장) print(f"[회차 분석] 토큰 정보 확인 - 입력: {result.get('input_tokens')}, 출력: {result.get('output_tokens')}") save_system_token_usage( model_name="gemini-1.5-flash", input_tokens=result.get('input_tokens'), output_tokens=result.get('output_tokens'), task_type='episode_analysis' ) return result['response'].strip() except Exception as e: print(f"[회차 분석] Gemini 기본 모델 오류: {str(e)}") # 모델명이 있거나 Gemini 실패 시 해당 모델 사용 if model_name: model_name_lower = model_name.lower().strip() is_gemini = model_name_lower.startswith('gemini:') or model_name_lower.startswith('gemini-') if is_gemini: gemini_model_name = model_name.strip() if gemini_model_name.lower().startswith('gemini:'): gemini_model_name = gemini_model_name.split(':', 1)[1].strip() gemini_client = get_gemini_client() if gemini_client.is_configured(): result = gemini_client.generate_response( prompt=prompt, model_name=gemini_model_name, temperature=0.5, max_output_tokens=get_model_token_limit(model_name, 3000) # 저장된 토큰 수 사용 ) if not result['error'] and result.get('response'): # 시스템 사용 토큰 저장 (토큰이 None이어도 저장) print(f"[회차 분석] 토큰 정보 확인 - 입력: {result.get('input_tokens')}, 출력: {result.get('output_tokens')}") save_system_token_usage( model_name=gemini_model_name, input_tokens=result.get('input_tokens'), output_tokens=result.get('output_tokens'), task_type='episode_analysis' ) return result['response'].strip() else: # Ollama API 호출 try: # 입력 토큰 수를 num_ctx로 사용 num_ctx = get_model_token_limit_by_type(model_name, 100000, 'input') ollama_response = requests.post( f'{OLLAMA_BASE_URL}/api/generate', json={ 'model': model_name, 'prompt': prompt, 'stream': False, 'options': { 'temperature': 0.5, 'num_predict': get_model_token_limit(model_name, 3000), # 저장된 토큰 수 사용 'num_ctx': num_ctx # 입력 토큰 수를 컨텍스트 윈도우로 사용 } }, timeout=300 # 5분 타임아웃 (회차 분석은 시간이 오래 걸릴 수 있음) ) if ollama_response.status_code == 200: response_data = ollama_response.json() # 시스템 사용 토큰 저장 (토큰이 None이어도 저장) ollama_input_tokens = response_data.get('prompt_eval_count') ollama_output_tokens = response_data.get('eval_count') print(f"[회차 분석] 토큰 정보 확인 - 입력: {ollama_input_tokens}, 출력: {ollama_output_tokens}") save_system_token_usage( model_name=model_name, input_tokens=ollama_input_tokens, output_tokens=ollama_output_tokens, task_type='episode_analysis' ) return response_data.get('response', '').strip() except requests.exceptions.Timeout: print(f"[회차 분석] Ollama 타임아웃: 요청 시간이 초과되었습니다. (5분)") print(f"[회차 분석] 회차 내용이 너무 길거나 모델 응답이 느릴 수 있습니다.") except requests.exceptions.ConnectionError: print(f"[회차 분석] Ollama 연결 오류: Ollama 서버에 연결할 수 없습니다.") except Exception as e: print(f"[회차 분석] Ollama 오류: {str(e)}") # AI 분석 실패 시 기본값 반환 return f"## {episode_title} 분석\n분석을 완료할 수 없었습니다." except Exception as e: print(f"[회차 분석] 오류: {str(e)}") import traceback traceback.print_exc() return f"## {episode_title} 분석\n분석 중 오류가 발생했습니다: {str(e)}" def extract_graph_from_episode(episode_content, episode_title, file_id, full_content=None, parent_chunk=None, model_name=None): """회차별 Graph Extraction (엔티티와 관계 추출) Args: episode_content: 분석할 회차 내용 episode_title: 회차 제목 (예: '1화', '2화') file_id: 파일 ID full_content: 원본 웹소설 전체 내용 (참고용) parent_chunk: Parent Chunk 객체 (선택사항) model_name: 사용할 AI 모델명 Returns: 추출 성공 여부 (bool) """ try: print(f"[Graph Extraction] '{episode_title}' Graph Extraction 시작...") # Parent Chunk 정보 추가 parent_info = "" if parent_chunk: parent_info = f""" 작품 전체 정보: - 세계관: {parent_chunk.world_view or '없음'} - 주요 캐릭터: {parent_chunk.characters or '없음'} - 주요 스토리: {parent_chunk.story or '없음'} """ # Graph Extraction 프롬프트 생성 from app.prompts.graph_extraction import get_graph_extraction_prompt prompt = get_graph_extraction_prompt( episode_content=episode_content, episode_title=episode_title, full_content=full_content, parent_chunk_info=parent_info, max_length=10000 ) # 모델명이 없으면 기본값 사용 (Gemini 우선 시도) response_text = None if not model_name: # Gemini 시도 try: gemini_client = get_gemini_client() if gemini_client.is_configured(): result = gemini_client.generate_response( prompt=prompt, model_name="gemini-1.5-flash", temperature=0.3, max_output_tokens=3000 ) if not result['error'] and result.get('response'): response_text = result['response'].strip() # 시스템 사용 토큰 저장 (토큰이 None이어도 저장) print(f"[Graph Extraction] 토큰 정보 확인 - 입력: {result.get('input_tokens')}, 출력: {result.get('output_tokens')}") save_system_token_usage( model_name="gemini-1.5-flash", input_tokens=result.get('input_tokens'), output_tokens=result.get('output_tokens'), task_type='graph_extraction' ) except Exception as e: print(f"[Graph Extraction] Gemini 기본 모델 오류: {str(e)}") # 모델명이 있거나 Gemini 실패 시 해당 모델 사용 if not response_text and model_name: model_name_lower = model_name.lower().strip() is_gemini = model_name_lower.startswith('gemini:') or model_name_lower.startswith('gemini-') if is_gemini: gemini_model_name = model_name.strip() if gemini_model_name.lower().startswith('gemini:'): gemini_model_name = gemini_model_name.split(':', 1)[1].strip() gemini_client = get_gemini_client() if gemini_client.is_configured(): result = gemini_client.generate_response( prompt=prompt, model_name=gemini_model_name, temperature=0.3, max_output_tokens=3000 ) if not result['error'] and result.get('response'): response_text = result['response'].strip() # 시스템 사용 토큰 저장 (토큰이 None이어도 저장) print(f"[Graph Extraction] 토큰 정보 확인 - 입력: {result.get('input_tokens')}, 출력: {result.get('output_tokens')}") save_system_token_usage( model_name=gemini_model_name, input_tokens=result.get('input_tokens'), output_tokens=result.get('output_tokens'), task_type='graph_extraction' ) else: # Ollama API 호출 try: # 입력 토큰 수를 num_ctx로 사용 num_ctx = get_model_token_limit_by_type(model_name, 100000, 'input') ollama_response = requests.post( f'{OLLAMA_BASE_URL}/api/generate', json={ 'model': model_name, 'prompt': prompt, 'stream': False, 'options': { 'temperature': 0.3, 'num_predict': 3000, 'num_ctx': num_ctx # 입력 토큰 수를 컨텍스트 윈도우로 사용 } }, timeout=300 # 5분 타임아웃 ) if ollama_response.status_code == 200: response_data = ollama_response.json() response_text = response_data.get('response', '').strip() # 시스템 사용 토큰 저장 (토큰이 None이어도 저장) ollama_input_tokens = response_data.get('prompt_eval_count') ollama_output_tokens = response_data.get('eval_count') print(f"[Graph Extraction] 토큰 정보 확인 - 입력: {ollama_input_tokens}, 출력: {ollama_output_tokens}") save_system_token_usage( model_name=model_name, input_tokens=ollama_input_tokens, output_tokens=ollama_output_tokens, task_type='graph_extraction' ) except requests.exceptions.Timeout: print(f"[Graph Extraction] Ollama 타임아웃: 요청 시간이 초과되었습니다. (5분)") except requests.exceptions.ConnectionError: print(f"[Graph Extraction] Ollama 연결 오류: Ollama 서버에 연결할 수 없습니다.") except Exception as e: print(f"[Graph Extraction] Ollama 오류: {str(e)}") if not response_text: print(f"[Graph Extraction] '{episode_title}' Graph Extraction 실패: 응답 없음") return False # JSON 추출 json_match = re.search(r'\{.*\}', response_text, re.DOTALL) if not json_match: print(f"[Graph Extraction] '{episode_title}' Graph Extraction 실패: JSON 형식이 아닙니다") print(f"[Graph Extraction] 응답 일부: {response_text[:500]}") return False try: graph_data = json.loads(json_match.group(0)) except json.JSONDecodeError as e: print(f"[Graph Extraction] '{episode_title}' JSON 파싱 오류: {str(e)}") print(f"[Graph Extraction] 응답 일부: {response_text[:500]}") return False # 기존 Graph 데이터 삭제 (같은 회차의 기존 데이터) GraphEntity.query.filter_by(file_id=file_id, episode_title=episode_title).delete() GraphRelationship.query.filter_by(file_id=file_id, episode_title=episode_title).delete() GraphEvent.query.filter_by(file_id=file_id, episode_title=episode_title).delete() db.session.commit() # 데이터베이스에 저장 saved_count = 0 # 엔티티 저장 entities = graph_data.get('entities', {}) # 인물 저장 characters = entities.get('characters', []) for char in characters: if char.get('name'): entity = GraphEntity( file_id=file_id, episode_title=episode_title, entity_name=char.get('name', ''), entity_type='character', description=char.get('description'), role=char.get('role'), category=None ) db.session.add(entity) saved_count += 1 # 장소 저장 locations = entities.get('locations', []) for loc in locations: if loc.get('name'): entity = GraphEntity( file_id=file_id, episode_title=episode_title, entity_name=loc.get('name', ''), entity_type='location', description=loc.get('description'), role=None, category=loc.get('category') ) db.session.add(entity) saved_count += 1 # 관계 저장 relationships = graph_data.get('relationships', []) for rel in relationships: if rel.get('source') and rel.get('target'): relationship = GraphRelationship( file_id=file_id, episode_title=episode_title, source=rel.get('source', ''), target=rel.get('target', ''), relationship_type=rel.get('type', ''), description=rel.get('description'), event=rel.get('event') ) db.session.add(relationship) saved_count += 1 # 사건 저장 events = graph_data.get('events', []) for event in events: if event.get('name') or event.get('description'): participants = event.get('participants', []) participants_json = json.dumps(participants, ensure_ascii=False) if participants else None graph_event = GraphEvent( file_id=file_id, episode_title=episode_title, event_name=event.get('name', ''), description=event.get('description', ''), participants=participants_json, location=event.get('location'), significance=event.get('significance') ) db.session.add(graph_event) saved_count += 1 db.session.commit() print(f"[Graph Extraction] '{episode_title}' Graph Extraction 완료: {saved_count}개 항목 저장") return True except Exception as e: print(f"[Graph Extraction] '{episode_title}' Graph Extraction 오류: {str(e)}") import traceback traceback.print_exc() db.session.rollback() return False def create_chunks_for_file(file_id, content, skip_episode_analysis=False, skip_graph_extraction=False): """파일 내용을 섹션별로 분할하여 의미 기반 청크로 저장 (벡터 DB 포함) 섹션 분할 규칙: - #작품설명부터 #1화까지: '작품설명' 섹션, 메타데이터에 #작품설명 추가 - #n화부터 #n+1화까지: 'n화' 섹션, 메타데이터에 회차 정보(n화) 추가 Args: file_id: 파일 ID content: 파일 내용 skip_episode_analysis: 회차 분석 건너뛰기 (기본값: False) skip_graph_extraction: Graph Extraction 건너뛰기 (기본값: False) """ try: print(f"[청크 생성] 파일 ID {file_id}에 대한 청크 생성 시작") print(f"[청크 생성] 원본 텍스트 길이: {len(content)}자") # 파일 정보 가져오기 (모델명 등) uploaded_file = UploadedFile.query.get(file_id) model_name = uploaded_file.model_name if uploaded_file else None # 벡터 DB 매니저 가져오기 vector_db = get_vector_db() # 기존 청크 삭제 (DB + 벡터 DB) existing_chunks = DocumentChunk.query.filter_by(file_id=file_id).all() if existing_chunks: print(f"[청크 생성] 기존 청크 {len(existing_chunks)}개 삭제 중...") # 벡터 DB에서 삭제 vector_db.delete_chunks_by_file_id(file_id) # DB에서 삭제 DocumentChunk.query.filter_by(file_id=file_id).delete() db.session.commit() # 원본 웹소설을 섹션별로 분할 (#작품설명, #1화, #2화 등) sections = split_content_by_episodes(content) print(f"[청크 생성] 섹션 분할 완료: {len(sections)}개 섹션") for i, (section_type, section_title, section_content, section_metadata) in enumerate(sections): print(f"[청크 생성] 섹션 {i+1}: {section_title} ({len(section_content)}자)") if len(sections) == 0: print(f"[청크 생성] 경고: 섹션이 생성되지 않았습니다.") return 0 # 기존 회차 분석 삭제 existing_analyses = EpisodeAnalysis.query.filter_by(file_id=file_id).all() if existing_analyses: print(f"[회차 분석] 기존 회차 분석 {len(existing_analyses)}개 삭제 중...") for analysis in existing_analyses: db.session.delete(analysis) db.session.commit() # '#작품설명'을 제외한 각 회차 분석 episode_sections = [s for s in sections if s[0] != '작품설명'] # section_type이 '작품설명'이 아닌 것만 if episode_sections and model_name and not skip_episode_analysis: print(f"[회차 분석] {len(episode_sections)}개 회차 분석 시작...") # Parent Chunk 가져오기 parent_chunk = None try: parent_chunk = ParentChunk.query.filter_by(file_id=file_id).first() except: pass # 각 회차 분석 결과를 하나의 텍스트로 이어서 저장 all_analyses = [] for section_type, section_title, section_content, section_metadata in episode_sections: try: print(f"[회차 분석] '{section_title}' 분석 중...") analysis_result = analyze_episode( episode_content=section_content, episode_title=section_title, full_content=content, parent_chunk=parent_chunk, model_name=model_name ) if analysis_result: all_analyses.append(f"\n\n{analysis_result}") print(f"[회차 분석] '{section_title}' 분석 완료") else: print(f"[회차 분석] '{section_title}' 분석 실패 (결과 없음)") except Exception as e: print(f"[회차 분석] '{section_title}' 분석 중 오류: {str(e)}") import traceback traceback.print_exc() continue # 모든 회차 분석 결과를 하나의 텍스트로 이어서 저장 if all_analyses: combined_analysis = "\n".join(all_analyses).strip() # 하나의 통합 분석으로 저장 (나눠서 저장하지 않고 하나에 이어서 저장) episode_analysis = EpisodeAnalysis( file_id=file_id, episode_title="전체 회차 통합 분석", analysis_content=combined_analysis # 모든 회차 분석을 하나의 텍스트로 저장 ) db.session.add(episode_analysis) db.session.commit() print(f"[회차 분석] 완료: {len(episode_sections)}개 회차 분석 결과를 하나의 텍스트로 저장") else: print(f"[회차 분석] 경고: 분석 결과가 없습니다.") # 회차별 Graph Extraction 실행 (회차 분석 성공 여부와 관계없이 실행) if episode_sections and model_name and not skip_graph_extraction: print(f"[Graph Extraction] 회차별 Graph Extraction 시작...") # Parent Chunk 가져오기 (회차 분석 블록 밖에서도 사용 가능하도록) parent_chunk = None try: parent_chunk = ParentChunk.query.filter_by(file_id=file_id).first() except: pass graph_extraction_success_count = 0 for section_type, section_title, section_content, section_metadata in episode_sections: try: print(f"[Graph Extraction] '{section_title}' Graph Extraction 중...") success = extract_graph_from_episode( episode_content=section_content, episode_title=section_title, file_id=file_id, full_content=content, parent_chunk=parent_chunk, model_name=model_name ) if success: graph_extraction_success_count += 1 print(f"[Graph Extraction] '{section_title}' Graph Extraction 완료") else: print(f"[Graph Extraction] '{section_title}' Graph Extraction 실패") except Exception as e: print(f"[Graph Extraction] '{section_title}' Graph Extraction 중 오류: {str(e)}") import traceback traceback.print_exc() continue print(f"[Graph Extraction] 완료: {graph_extraction_success_count}/{len(episode_sections)}개 회차 Graph Extraction 성공") else: if not model_name: print(f"[회차 분석] 모델명이 없어 회차 분석을 건너뜁니다.") elif not episode_sections: print(f"[회차 분석] 분석할 회차가 없습니다.") # 각 섹션별로 청크 생성 및 저장 saved_count = 0 vector_saved_count = 0 global_chunk_index = 0 # 전체 청크 인덱스 for section_idx, (section_type, section_title, section_content, section_metadata) in enumerate(sections): print(f"[청크 생성] 섹션 '{section_title}' 처리 중... ({len(section_content)}자)") # 각 섹션을 의미 기반 청킹 (문장과 문단 경계를 고려하여 분할) # min_chunk_size: 최소 200자, max_chunk_size: 최대 1000자, overlap: 150자 section_chunks = split_text_into_chunks(section_content, min_chunk_size=200, max_chunk_size=1000, overlap=150) print(f"[청크 생성] 섹션 '{section_title}' 분할된 청크 수: {len(section_chunks)}개") # 각 청크를 데이터베이스와 벡터 DB에 저장 for chunk_idx, chunk_content in enumerate(section_chunks): try: # 섹션 메타데이터를 기본으로 사용 (chapter 정보 포함) chunk_metadata = section_metadata.copy() # DB에 청크 저장 (섹션 메타데이터 포함) chunk = DocumentChunk( file_id=file_id, chunk_index=global_chunk_index, content=chunk_content, chunk_metadata=json.dumps(chunk_metadata, ensure_ascii=False) # 섹션 메타데이터 저장 ) db.session.add(chunk) db.session.flush() # ID 생성 # 벡터 DB에 청크 추가 if vector_db.add_chunk( chunk_id=chunk.id, chunk_content=chunk_content, file_id=file_id, chunk_index=global_chunk_index ): vector_saved_count += 1 saved_count += 1 global_chunk_index += 1 # 진행 상황 출력 (10개마다) if saved_count % 10 == 0: print(f"[청크 생성] 진행 중: {saved_count}개 청크 저장 중... (DB: {saved_count}, 벡터 DB: {vector_saved_count})") except Exception as e: print(f"[청크 생성] 경고: 청크 {global_chunk_index} 저장 중 오류: {str(e)}") import traceback traceback.print_exc() continue db.session.commit() print(f"[청크 생성] 완료: {saved_count}개 청크가 데이터베이스에 저장되었습니다. (벡터 DB: {vector_saved_count}개)") # 저장 확인 verified_count = DocumentChunk.query.filter_by(file_id=file_id).count() if verified_count != saved_count: print(f"[청크 생성] 경고: 저장된 청크 수({saved_count})와 확인된 청크 수({verified_count})가 일치하지 않습니다.") else: print(f"[청크 생성] 검증 완료: {verified_count}개 청크가 정상적으로 저장되었습니다.") return saved_count except Exception as e: db.session.rollback() print(f"[청크 생성] 오류: {str(e)}") import traceback traceback.print_exc() return 0 def create_parent_chunk_with_ai(file_id, content, model_name): """AI를 사용하여 Parent Chunk 생성 (웹소설 분석)""" try: print(f"[Parent Chunk 생성] 파일 ID {file_id}에 대한 Parent Chunk 생성 시작") print(f"[Parent Chunk 생성] 사용 모델: {model_name}") print(f"[Parent Chunk 생성] 원본 텍스트 길이: {len(content)}자") # 모델명이 None이거나 빈 문자열인 경우 처리 if not model_name or not model_name.strip(): print(f"[Parent Chunk 생성] ❌ 오류: 모델명이 제공되지 않았습니다.") return None # 텍스트가 너무 길면 일부만 사용 (최대 50000자) content_preview = content[:50000] if len(content) > 50000 else content if len(content) > 50000: print(f"[Parent Chunk 생성] 텍스트가 길어 일부만 사용: {len(content_preview)}자 (전체: {len(content)}자)") # 분석 프롬프트 생성 analysis_prompt = f"""다음 웹소설 텍스트를 분석하여 다음 항목들을 작성해주세요. 각 항목은 명확하고 구체적으로 작성해주세요. 텍스트 내용: {content_preview} 위 텍스트를 분석하여 다음 형식으로 답변해주세요: ## 세계관 설명 [세계관에 대한 상세한 설명을 작성하세요. 배경, 설정, 규칙 등을 포함하세요.] ## 주요 캐릭터 분석 [주요 등장인물들의 이름, 역할, 성격, 특징 등을 분석하여 작성하세요. 각 캐릭터별로 구분하여 작성하세요.] ## 주요 스토리 분석 [전체적인 스토리 흐름, 주요 사건, 갈등 구조 등을 분석하여 작성하세요.] ## 주요 에피소드 분석 [중요한 에피소드나 챕터별 주요 내용을 분석하여 작성하세요. 시간 순서대로 정리하면 좋습니다.] ## 기타 [위 카테고리에 포함되지 않지만 중요한 정보나 특징 등을 작성하세요.] 각 항목을 명확하게 구분하여 작성해주세요.""" # 모델 타입 확인 (Gemini 또는 Ollama) # Gemini 모델명 형식: "gemini:모델명" 또는 "gemini-1.5-flash" (접두사 없는 경우도 지원) model_name_lower = model_name.lower().strip() is_gemini = model_name_lower.startswith('gemini:') or model_name_lower.startswith('gemini-') print(f"[Parent Chunk 생성] 모델 타입 확인: is_gemini={is_gemini}, model_name={model_name}") if is_gemini: # Gemini API 호출 # 모델명에서 "gemini:" 접두사 제거 (대소문자 구분 없이) gemini_model_name = model_name.strip() if gemini_model_name.lower().startswith('gemini:'): gemini_model_name = gemini_model_name.split(':', 1)[1].strip() # "gemini-"로 시작하는 경우 (예: "gemini-1.5-flash") 그대로 사용 print(f"[Parent Chunk 생성] Gemini API에 분석 요청 전송 중... (모델: {gemini_model_name})") print(f"[Parent Chunk 생성] 원본 모델명: {model_name} -> Gemini 모델명: {gemini_model_name}") gemini_client = get_gemini_client() if not gemini_client.is_configured(): print(f"[Parent Chunk 생성] ❌ 오류: Gemini API 키가 설정되지 않았습니다.") print(f"[Parent Chunk 생성] 디버그: Gemini 클라이언트 상태 확인 중...") # API 키 상태 다시 확인 from app.gemini_client import get_gemini_api_key api_key = get_gemini_api_key() if api_key: print(f"[Parent Chunk 생성] 디버그: API 키는 존재하지만 클라이언트가 설정되지 않았습니다. (길이: {len(api_key)})") else: print(f"[Parent Chunk 생성] 디버그: API 키가 데이터베이스에 없습니다.") return None print(f"[Parent Chunk 생성] Gemini API 키 확인 완료. API 호출 시작...") result = gemini_client.generate_response( prompt=analysis_prompt, model_name=gemini_model_name, temperature=0.7, max_output_tokens=get_model_token_limit_by_type(model_name or "gemini-1.5-flash", 8192, 'parent_chunk') # Parent Chunk 전용 토큰 수 사용 ) if result['error']: print(f"[Parent Chunk 생성] ❌ 오류: Gemini API 호출 실패 - {result['error']}") print(f"[Parent Chunk 생성] 디버그: result 객체 내용: {result}") return None if not result.get('response'): print(f"[Parent Chunk 생성] ❌ 오류: Gemini API 응답이 비어있습니다.") print(f"[Parent Chunk 생성] 디버그: result 객체 내용: {result}") return None analysis_result = result['response'] print(f"[Parent Chunk 생성] Gemini API 응답 수신 성공: {len(analysis_result)}자") # 시스템 사용 토큰 저장 (토큰이 None이어도 저장) gemini_input_tokens = result.get('input_tokens') gemini_output_tokens = result.get('output_tokens') print(f"[Parent Chunk 생성] 토큰 정보 확인 - 입력: {gemini_input_tokens}, 출력: {gemini_output_tokens}") save_system_token_usage( model_name=gemini_model_name, input_tokens=gemini_input_tokens, output_tokens=gemini_output_tokens, task_type='parent_chunk' ) else: # Ollama API 호출 print(f"[Parent Chunk 생성] Ollama API에 분석 요청 전송 중... (모델: {model_name})") try: # 입력 토큰 수를 num_ctx로 사용 num_ctx = get_model_token_limit_by_type(model_name, 100000, 'input') ollama_response = requests.post( f'{OLLAMA_BASE_URL}/api/chat', json={ 'model': model_name, 'messages': [ { 'role': 'user', 'content': analysis_prompt } ], 'stream': False, 'options': { 'num_ctx': num_ctx # 입력 토큰 수를 컨텍스트 윈도우로 사용 } }, timeout=300 # 5분 타임아웃 ) if ollama_response.status_code != 200: error_detail = ollama_response.text if ollama_response.text else '상세 정보 없음' if ollama_response.status_code == 404: error_msg = f'Ollama API 오류 404: 모델 "{model_name}"을(를) 찾을 수 없습니다. 모델이 Ollama에 설치되어 있는지 확인하세요.' print(f"[Parent Chunk 생성] ❌ 오류: {error_msg}") print(f"[Parent Chunk 생성] 디버그: 만약 Gemini 모델을 사용하려면 모델명이 'gemini:' 또는 'gemini-'로 시작해야 합니다.") else: error_msg = f'Ollama API 오류: {ollama_response.status_code} - {error_detail[:200]}' print(f"[Parent Chunk 생성] ❌ 오류: {error_msg}") return None response_data = ollama_response.json() analysis_result = response_data.get('message', {}).get('content', '') print(f"[Parent Chunk 생성] Ollama API 응답 수신 성공: {len(analysis_result)}자") # 시스템 사용 토큰 저장 (토큰이 None이어도 저장) ollama_input_tokens = response_data.get('prompt_eval_count') ollama_output_tokens = response_data.get('eval_count') print(f"[Parent Chunk 생성] 토큰 정보 확인 - 입력: {ollama_input_tokens}, 출력: {ollama_output_tokens}") save_system_token_usage( model_name=model_name, input_tokens=ollama_input_tokens, output_tokens=ollama_output_tokens, task_type='parent_chunk' ) except requests.exceptions.Timeout: print(f"[Parent Chunk 생성] ❌ Ollama 타임아웃: 요청 시간이 초과되었습니다. (5분)") print(f"[Parent Chunk 생성] 파일이 너무 크거나 모델 응답이 느릴 수 있습니다.") return None except requests.exceptions.ConnectionError: print(f"[Parent Chunk 생성] ❌ Ollama 연결 오류: Ollama 서버에 연결할 수 없습니다.") print(f"[Parent Chunk 생성] 디버그: Ollama URL: {OLLAMA_BASE_URL}") return None except requests.exceptions.RequestException as e: print(f"[Parent Chunk 생성] ❌ Ollama API 오류: {str(e)}") print(f"[Parent Chunk 생성] 디버그: Ollama URL: {OLLAMA_BASE_URL}") return None if not analysis_result: print(f"[Parent Chunk 생성] ⚠️ 경고: 분석 결과가 비어있습니다.") return None print(f"[Parent Chunk 생성] 분석 결과 수신 완료: {len(analysis_result)}자") # 분석 결과 파싱 world_view = "" characters = "" story = "" episodes = "" others = "" # 각 섹션 추출 sections = { 'world_view': ['## 세계관 설명', '## 세계관', '세계관 설명'], 'characters': ['## 주요 캐릭터 분석', '## 주요 캐릭터', '주요 캐릭터 분석', '## 캐릭터'], 'story': ['## 주요 스토리 분석', '## 주요 스토리', '주요 스토리 분석', '## 스토리'], 'episodes': ['## 주요 에피소드 분석', '## 주요 에피소드', '주요 에피소드 분석', '## 에피소드'], 'others': ['## 기타', '기타'] } lines = analysis_result.split('\n') current_section = None current_content = [] for line in lines: line_stripped = line.strip() # 섹션 헤더 확인 section_found = False for section_key, section_headers in sections.items(): for header in section_headers: if header in line_stripped: # 이전 섹션 저장 if current_section: if current_section == 'world_view': world_view = '\n'.join(current_content).strip() elif current_section == 'characters': characters = '\n'.join(current_content).strip() elif current_section == 'story': story = '\n'.join(current_content).strip() elif current_section == 'episodes': episodes = '\n'.join(current_content).strip() elif current_section == 'others': others = '\n'.join(current_content).strip() current_section = section_key current_content = [] section_found = True break if section_found: break if not section_found and current_section: # 현재 섹션에 내용 추가 if line_stripped and not line_stripped.startswith('#'): current_content.append(line) # 마지막 섹션 저장 if current_section: if current_section == 'world_view': world_view = '\n'.join(current_content).strip() elif current_section == 'characters': characters = '\n'.join(current_content).strip() elif current_section == 'story': story = '\n'.join(current_content).strip() elif current_section == 'episodes': episodes = '\n'.join(current_content).strip() elif current_section == 'others': others = '\n'.join(current_content).strip() # 파싱 실패 시 전체 내용을 "기타"에 저장 if not world_view and not characters and not story and not episodes: print(f"[Parent Chunk 생성] 경고: 섹션 파싱 실패. 전체 내용을 '기타'에 저장합니다.") others = analysis_result.strip() # 기존 Parent Chunk 삭제 (있으면) existing_parent = ParentChunk.query.filter_by(file_id=file_id).first() if existing_parent: db.session.delete(existing_parent) db.session.commit() print(f"[Parent Chunk 생성] 기존 Parent Chunk 삭제 완료") # Parent Chunk 생성 및 저장 parent_chunk = ParentChunk( file_id=file_id, world_view=world_view if world_view else None, characters=characters if characters else None, story=story if story else None, episodes=episodes if episodes else None, others=others if others else None ) db.session.add(parent_chunk) db.session.commit() print(f"[Parent Chunk 생성] ✅ 완료: Parent Chunk가 생성되었습니다.") print(f"[Parent Chunk 생성] - 세계관: {len(world_view)}자") print(f"[Parent Chunk 생성] - 캐릭터: {len(characters)}자") print(f"[Parent Chunk 생성] - 스토리: {len(story)}자") print(f"[Parent Chunk 생성] - 에피소드: {len(episodes)}자") print(f"[Parent Chunk 생성] - 기타: {len(others)}자") return parent_chunk except requests.exceptions.RequestException as e: error_msg = f'Ollama API 연결 오류: {str(e)}' print(f"[Parent Chunk 생성] ❌ 오류: {error_msg}") import traceback traceback.print_exc() return None except Exception as e: db.session.rollback() error_msg = f'Parent Chunk 생성 중 오류: {str(e)}' print(f"[Parent Chunk 생성] ❌ 오류: {error_msg}") import traceback traceback.print_exc() return None def get_parent_chunks_for_files(file_ids): """파일 ID 목록에 대한 Parent Chunk 조회 (문맥 파악용)""" try: if not file_ids: return [] parent_chunks = [] for file_id in file_ids: parent_chunk = ParentChunk.query.filter_by(file_id=file_id).first() if parent_chunk: parent_chunks.append(parent_chunk) return parent_chunks except Exception as e: print(f"[Parent Chunk 조회] 오류: {str(e)}") return [] def get_episode_analyses_for_files(file_ids): """파일 ID 목록에 대한 회차별 분석(EpisodeAnalysis) 조회 (회차별 요약 참조용)""" try: if not file_ids: return [] episode_analyses = [] for file_id in file_ids: episode_analysis = EpisodeAnalysis.query.filter_by(file_id=file_id).first() if episode_analysis: episode_analyses.append(episode_analysis) return episode_analyses except Exception as e: print(f"[회차별 분석 조회] 오류: {str(e)}") return [] def get_relevant_graph_data(query, file_ids=None): """질문과 관련된 GraphRAG 데이터 조회 (엔티티, 관계, 사건) Args: query: 사용자 질문 file_ids: 파일 ID 목록 (None이면 모든 파일) Returns: dict: { 'entities': [...], 'relationships': [...], 'events': [...], 'episodes': [...] } """ try: if not file_ids: return { 'entities': [], 'relationships': [], 'events': [], 'episodes': [] } # 질문에서 키워드 추출 (한글 단어, 영문 단어) query_words = set(re.findall(r'[가-힣]+|\w+', query.lower())) # 파일 ID 확장 (이어서 업로드된 파일 포함) expanded_file_ids = list(file_ids) for file_id in file_ids: child_files = UploadedFile.query.filter_by(parent_file_id=file_id).all() expanded_file_ids.extend([child.id for child in child_files]) # 엔티티 검색 (인물, 장소 이름이 질문에 포함된 경우) entities = [] if query_words: # 엔티티 이름에 질문의 키워드가 포함된 경우 entity_query = GraphEntity.query.filter( GraphEntity.file_id.in_(expanded_file_ids) ) # 키워드 매칭 (엔티티 이름이나 설명에 포함) matching_entities = [] for entity in entity_query.all(): entity_name_lower = entity.entity_name.lower() entity_desc_lower = (entity.description or '').lower() # 엔티티 이름이나 설명에 질문 키워드가 포함되어 있는지 확인 if any(word in entity_name_lower or word in entity_desc_lower for word in query_words if len(word) > 1): matching_entities.append(entity) entities = matching_entities[:20] # 최대 20개 # 관계 검색 (관계의 주체나 대상이 질문에 포함된 경우) relationships = [] if query_words: relationship_query = GraphRelationship.query.filter( GraphRelationship.file_id.in_(expanded_file_ids) ) matching_relationships = [] for rel in relationship_query.all(): source_lower = rel.source.lower() target_lower = rel.target.lower() rel_type_lower = rel.relationship_type.lower() rel_desc_lower = (rel.description or '').lower() # 관계의 주체, 대상, 유형, 설명에 질문 키워드가 포함되어 있는지 확인 if any(word in source_lower or word in target_lower or word in rel_type_lower or word in rel_desc_lower for word in query_words if len(word) > 1): matching_relationships.append(rel) relationships = matching_relationships[:20] # 최대 20개 # 사건 검색 (사건 이름이나 설명에 질문 키워드가 포함된 경우) events = [] if query_words: event_query = GraphEvent.query.filter( GraphEvent.file_id.in_(expanded_file_ids) ) matching_events = [] for event in event_query.all(): event_name_lower = (event.event_name or '').lower() event_desc_lower = (event.description or '').lower() event_location_lower = (event.location or '').lower() # 사건 이름, 설명, 장소에 질문 키워드가 포함되어 있는지 확인 if any(word in event_name_lower or word in event_desc_lower or word in event_location_lower for word in query_words if len(word) > 1): matching_events.append(event) events = matching_events[:20] # 최대 20개 # 관련 회차 추출 episodes = set() for entity in entities: episodes.add(entity.episode_title) for rel in relationships: episodes.add(rel.episode_title) for event in events: episodes.add(event.episode_title) print(f"[GraphRAG 검색] 관련 데이터 발견: 엔티티 {len(entities)}개, 관계 {len(relationships)}개, 사건 {len(events)}개, 회차 {len(episodes)}개") return { 'entities': [e.to_dict() for e in entities], 'relationships': [r.to_dict() for r in relationships], 'events': [ev.to_dict() for ev in events], 'episodes': sorted(list(episodes)) } except Exception as e: print(f"[GraphRAG 검색] 오류: {str(e)}") import traceback traceback.print_exc() return { 'entities': [], 'relationships': [], 'events': [], 'episodes': [] } def search_relevant_chunks(query, file_ids=None, model_name=None, top_k=5, min_score=1): """ 질문과 관련된 청크 검색 (벡터 검색 + Re-ranking) 1. 벡터 검색으로 초기 30개 문서 검색 2. Cross-Encoder로 리랭킹 3. 상위 top_k개 반환 (기본 5개) """ try: # 벡터 DB 매니저 가져오기 vector_db = get_vector_db() # 파일 ID 확장 (이어서 업로드된 파일 포함) expanded_file_ids = None if file_ids: expanded_file_ids = list(file_ids) for file_id in file_ids: # 원본 파일인 경우 이어서 업로드된 파일들도 포함 child_files = UploadedFile.query.filter_by(parent_file_id=file_id).all() expanded_file_ids.extend([child.id for child in child_files]) # 원본 파일이 선택된 경우, 이어서 업로드된 파일들도 포함 parent_files = UploadedFile.query.filter(UploadedFile.id.in_(file_ids), UploadedFile.parent_file_id.is_(None)).all() for parent_file in parent_files: child_files = UploadedFile.query.filter_by(parent_file_id=parent_file.id).all() expanded_file_ids.extend([child.id for child in child_files]) # 모델 필터링이 필요한 경우 파일 ID 필터링 if model_name and expanded_file_ids: filtered_files = UploadedFile.query.filter( UploadedFile.id.in_(expanded_file_ids), UploadedFile.model_name == model_name ).all() expanded_file_ids = [f.id for f in filtered_files] elif model_name and not expanded_file_ids: # 파일 ID가 없으면 모델 이름으로만 필터링 filtered_files = UploadedFile.query.filter_by(model_name=model_name).all() expanded_file_ids = [f.id for f in filtered_files] # 1단계: 벡터 검색으로 초기 30개 문서 검색 print(f"[벡터 검색] 쿼리: {query[:50]}..., 파일 ID: {expanded_file_ids if expanded_file_ids else '모든 파일'}") vector_results = vector_db.search_chunks( query=query, file_ids=expanded_file_ids, top_k=30 ) if not vector_results: print(f"[벡터 검색] 결과 없음, 키워드 기반 검색으로 대체") # 벡터 검색 결과가 없으면 기존 키워드 기반 검색으로 대체 return search_relevant_chunks_fallback(query, file_ids, model_name, top_k, min_score) # 2단계: Cross-Encoder로 리랭킹 print(f"[리랭킹] {len(vector_results)}개 청크에 대한 리랭킹 시작...") reranked_chunks = vector_db.rerank_chunks( query=query, chunks=vector_results, top_k=top_k ) # 3단계: DB에서 청크 객체 가져오기 final_chunks = [] for reranked in reranked_chunks: chunk_id = reranked['chunk_id'] chunk = DocumentChunk.query.get(chunk_id) if chunk: final_chunks.append(chunk) print(f"[벡터 검색 + 리랭킹] 최종 {len(final_chunks)}개 청크 반환") return final_chunks except Exception as e: print(f"[벡터 검색] 오류: {str(e)}") import traceback traceback.print_exc() # 오류 시 기존 키워드 기반 검색으로 대체 print(f"[벡터 검색] 키워드 기반 검색으로 대체") return search_relevant_chunks_fallback(query, file_ids, model_name, top_k, min_score) def search_relevant_chunks_fallback(query, file_ids=None, model_name=None, top_k=25, min_score=1): """기존 키워드 기반 검색 (Fallback)""" try: # 검색 쿼리 준비 - 한글과 영문 단어 모두 추출 query_words = set(re.findall(r'[가-힣]+|\w+', query.lower())) if not query_words: return [] # 청크 조회 query_obj = DocumentChunk.query.join(UploadedFile) if file_ids: # 선택된 파일 ID와 그 파일에 이어서 업로드된 모든 파일 ID 포함 expanded_file_ids = list(file_ids) for file_id in file_ids: # 원본 파일인 경우 이어서 업로드된 파일들도 포함 child_files = UploadedFile.query.filter_by(parent_file_id=file_id).all() expanded_file_ids.extend([child.id for child in child_files]) # 원본 파일이 선택된 경우, 이어서 업로드된 파일들도 포함 parent_files = UploadedFile.query.filter(UploadedFile.id.in_(file_ids), UploadedFile.parent_file_id.is_(None)).all() for parent_file in parent_files: child_files = UploadedFile.query.filter_by(parent_file_id=parent_file.id).all() expanded_file_ids.extend([child.id for child in child_files]) query_obj = query_obj.filter(UploadedFile.id.in_(expanded_file_ids)) if model_name: query_obj = query_obj.filter(UploadedFile.model_name == model_name) all_chunks = query_obj.all() if not all_chunks: return [] # 각 청크의 관련도 점수 계산 (개선된 알고리즘) scored_chunks = [] for chunk in all_chunks: chunk_content_lower = chunk.content.lower() chunk_words = set(re.findall(r'[가-힣]+|\w+', chunk_content_lower)) # 1. 공통 단어 수 (기본 점수) common_words = query_words & chunk_words base_score = len(common_words) # 2. 쿼리 단어의 빈도 가중치 (중요한 단어가 더 많이 나타날수록 높은 점수) frequency_score = 0 for word in query_words: frequency_score += chunk_content_lower.count(word) # 3. 쿼리 단어 비율 (청크에서 쿼리 단어가 차지하는 비율) if len(chunk_words) > 0: ratio_score = len(common_words) / len(chunk_words) * 10 else: ratio_score = 0 # 최종 점수 계산 (가중치 적용) final_score = base_score * 2 + frequency_score * 0.5 + ratio_score # 최소 점수 이상인 청크만 포함 if final_score >= min_score: scored_chunks.append((final_score, chunk)) # 점수 순으로 정렬하고 상위 k개 선택 scored_chunks.sort(key=lambda x: x[0], reverse=True) # top_k개 선택 top_chunks = [chunk for score, chunk in scored_chunks[:top_k]] return top_chunks except Exception as e: print(f"[키워드 검색] 오류: {str(e)}") import traceback traceback.print_exc() return [] @main_bp.route('/login', methods=['GET', 'POST']) def login(): """로그인 페이지""" if current_user.is_authenticated: # 관리자인 경우 관리자 페이지로 리다이렉트 if current_user.is_admin: return redirect(url_for('main.admin')) return redirect(url_for('main.index')) if request.method == 'POST': username = request.form.get('username', '').strip() password = request.form.get('password', '') if not username or not password: flash('사용자명과 비밀번호를 입력해주세요.', 'error') return render_template('login.html') user = User.query.filter_by(username=username).first() if user and user.check_password(password) and user.is_active: login_user(user) user.last_login = datetime.utcnow() db.session.commit() next_page = request.args.get('next') # 관리자인 경우 관리자 페이지로 리다이렉트 if user.is_admin: return redirect(next_page) if next_page else redirect(url_for('main.admin')) return redirect(next_page) if next_page else redirect(url_for('main.index')) else: flash('사용자명 또는 비밀번호가 올바르지 않습니다.', 'error') return render_template('login.html') @main_bp.route('/logout') @login_required def logout(): """로그아웃""" logout_user() flash('로그아웃되었습니다.', 'info') return redirect(url_for('main.login')) @main_bp.route('/') @login_required def index(): return render_template('index.html') @main_bp.route('/webnovels') @login_required def webnovels(): """업로드된 웹소설 목록 페이지""" return render_template('webnovels.html') @main_bp.route('/admin') @admin_required def admin(): """관리자 페이지""" users = User.query.order_by(User.created_at.desc()).all() return render_template('admin.html', users=users) @main_bp.route('/admin/messages') @admin_required def admin_messages(): """관리자 메시지 확인 페이지""" return render_template('admin_messages.html') @main_bp.route('/admin/webnovels') @admin_required def admin_webnovels(): """웹소설 관리 페이지""" return render_template('admin_webnovels.html') @main_bp.route('/admin/prompts') @admin_required def admin_prompts(): """프롬프트 관리 페이지""" return render_template('admin_prompts.html') @main_bp.route('/admin/settings') @admin_required def admin_settings(): """AI 설정 관리 페이지 (API 키, 토큰 수)""" return render_template('admin_settings.html') @main_bp.route('/admin/files') @admin_required def admin_files(): """파일 목록 관리 페이지""" return render_template('admin_files.html') @main_bp.route('/admin/utils') @admin_required def admin_utils(): """유틸리티 관리 페이지""" return render_template('admin_utils.html') @main_bp.route('/admin/tokens') @admin_required def admin_tokens(): """토큰 사용량 통계 페이지""" return render_template('admin_tokens.html') def convert_episode_format(content): """다양한 회차 구분 방식을 #n화 형식으로 변환 지원하는 패턴: - @1, @2, @10 -> #1화, #2화, #10화 - @ 1, @ 2 -> #1화, #2화 - @1화, @2화 -> #1화, #2화 - @ 1화, @ 2화 -> #1화, #2화 - 1화, 2화, 10화 -> #1화, #2화, #10화 (앞에 기호 없이) - #01화, #010화 -> #1화, #10화 (앞의 0 제거) - 기타 유사한 패턴들 Args: content: 변환할 웹소설 내용 Returns: 변환된 내용 """ if not content: return content lines = content.split('\n') converted_lines = [] # 다양한 회차 패턴을 #n화 형식으로 변환 # 패턴: @숫자, @ 숫자, @숫자화, @ 숫자화, 숫자화 등 # 줄 시작 부분에서만 매칭하도록 ^ 사용 for line in lines: converted_line = line # 프롤로그 처리 (@프롤로그, #프롤로그 -> #0화 프롤로그) converted_line = re.sub(r'^[@#]\s*프롤로그\s*', '#0화 프롤로그', converted_line) # @숫자화 패턴 (공백 포함/미포함) converted_line = re.sub(r'^@\s*(\d+)\s*화\s*', r'#\1화', converted_line) # @숫자 패턴 (공백 포함/미포함) - 화가 없는 경우만 # 이미 화가 있는 경우는 위에서 처리되었으므로, 화가 없는 경우만 처리 if not re.search(r'^#\d+화', converted_line): converted_line = re.sub(r'^@\s*(\d+)(?!화)\s*', r'#\1화', converted_line) # 숫자화 패턴 (앞에 기호 없이) - 이미 #이 있는 경우는 제외 # 줄 시작에서 숫자로 시작하고 바로 "화"가 오는 경우 if not re.search(r'^#\d+화', converted_line): converted_line = re.sub(r'^(\d+)\s*화\s*', r'#\1화', converted_line) # #숫자 패턴 (화가 없는 경우) - #n -> #n화 if not re.search(r'^#\d+화', converted_line): converted_line = re.sub(r'^#\s*(\d+)(?!화)\s*', r'#\1화', converted_line) # #0n화 패턴에서 앞의 0 제거 (#01화 -> #1화, #010화 -> #10화) # 단, #0화는 그대로 유지 converted_line = re.sub(r'^#0+(\d+)화', r'#\1화', converted_line) converted_lines.append(converted_line) return '\n'.join(converted_lines) @main_bp.route('/api/admin/utils/detect-encoding', methods=['POST']) @admin_required def detect_file_encoding(): """파일 인코딩 감지 API""" try: if 'file' not in request.files: return jsonify({'error': '파일이 필요합니다.'}), 400 file = request.files['file'] if not file or not file.filename: return jsonify({'error': '파일이 필요합니다.'}), 400 file_content = file.read() file.seek(0) # 파일 포인터 리셋 # 여러 인코딩 시도하여 감지 encodings_to_try = ['utf-8', 'cp949', 'euc-kr', 'latin-1', 'utf-16', 'utf-16-le', 'utf-16-be'] detected_encoding = None detected_content = None for encoding in encodings_to_try: try: content = file_content.decode(encoding) detected_encoding = encoding detected_content = content break except (UnicodeDecodeError, UnicodeError): continue if not detected_encoding: return jsonify({ 'error': '파일 인코딩을 감지할 수 없습니다.', 'detected_encoding': None, 'confidence': 0 }), 400 # 간단한 신뢰도 계산 (UTF-8 BOM 확인, 일반적인 문자 범위 확인 등) confidence = 0.8 if detected_encoding == 'utf-8': # UTF-8 BOM 확인 if file_content.startswith(b'\xef\xbb\xbf'): confidence = 0.95 # 한글 문자 포함 여부 확인 if any(ord(char) >= 0xAC00 and ord(char) <= 0xD7A3 for char in detected_content[:1000]): confidence = 0.9 return jsonify({ 'success': True, 'detected_encoding': detected_encoding, 'confidence': confidence, 'preview': detected_content[:200] if detected_content else '' }), 200 except Exception as e: import traceback print(f"[인코딩 감지] 오류: {str(e)}") print(traceback.format_exc()) return jsonify({'error': f'인코딩 감지 중 오류가 발생했습니다: {str(e)}'}), 500 @main_bp.route('/api/admin/utils/convert-episode-format', methods=['POST']) @admin_required def convert_episode_format_api(): """회차 구분 방식 변환 API (파일 업로드 또는 텍스트 입력)""" try: content = None original_filename = 'converted_file.txt' specified_encoding = None # 파일 업로드 확인 if 'file' in request.files: file = request.files['file'] if file and file.filename and file.filename != '': # 파일 읽기 try: # 요청에서 인코딩 정보 가져오기 specified_encoding = request.form.get('encoding', 'utf-8') file_content = file.read() file.seek(0) # 파일 포인터 리셋 (다시 읽을 수 있도록) # 지정된 인코딩으로 시도 try: content = file_content.decode(specified_encoding) except (UnicodeDecodeError, UnicodeError): # 지정된 인코딩 실패 시 자동 감지 시도 encodings_to_try = ['utf-8', 'cp949', 'euc-kr', 'latin-1'] for encoding in encodings_to_try: if encoding == specified_encoding: continue try: content = file_content.decode(encoding) specified_encoding = encoding # 성공한 인코딩으로 업데이트 break except (UnicodeDecodeError, UnicodeError): continue if not content: return jsonify({'error': f'파일 인코딩 오류: {specified_encoding} 및 자동 감지 실패'}), 500 original_filename = file.filename except Exception as e: import traceback print(f"[회차 변환] 파일 읽기 오류: {str(e)}") print(traceback.format_exc()) return jsonify({'error': f'파일 읽기 오류: {str(e)}'}), 500 # JSON 데이터에서 내용 가져오기 (파일이 없는 경우) if not content: try: data = request.get_json(silent=True) or {} content = data.get('content', '') original_filename = data.get('filename', 'converted_file.txt') except Exception as e: print(f"[회차 변환] JSON 파싱 오류: {str(e)}") return jsonify({'error': f'요청 데이터 파싱 오류: {str(e)}'}), 400 if not content or not content.strip(): return jsonify({'error': '파일을 업로드하거나 내용을 입력해주세요.'}), 400 # 변환 실행 try: converted_content = convert_episode_format(content) except Exception as e: import traceback print(f"[회차 변환] 변환 함수 오류: {str(e)}") print(traceback.format_exc()) return jsonify({'error': f'변환 실행 오류: {str(e)}'}), 500 # 임시 파일로 저장 try: temp_dir = os.path.join(os.getcwd(), 'uploads', 'temp') os.makedirs(temp_dir, exist_ok=True) # 원본 파일명에서 확장자 추출 if '.' in original_filename: name, ext = os.path.splitext(original_filename) converted_filename = f"{name}_converted{ext}" else: converted_filename = f"{original_filename}_converted.txt" # 안전한 파일명 생성 safe_filename = secure_filename(converted_filename) if not safe_filename: safe_filename = 'converted_file.txt' temp_file_path = os.path.join(temp_dir, f"{uuid.uuid4().hex}_{safe_filename}") # 변환된 내용을 임시 파일에 저장 with open(temp_file_path, 'w', encoding='utf-8') as f: f.write(converted_content) # 상대 경로 반환 (다운로드용) relative_path = os.path.relpath(temp_file_path, os.getcwd()) # Windows 경로를 URL 인코딩 relative_path = relative_path.replace('\\', '/') return jsonify({ 'success': True, 'message': '회차 구분 방식이 변환되었습니다.', 'converted_content': converted_content[:500] + '...' if len(converted_content) > 500 else converted_content, 'download_url': f'/api/admin/utils/download-converted-file?path={relative_path}', 'filename': safe_filename }), 200 except Exception as e: import traceback print(f"[회차 변환] 파일 저장 오류: {str(e)}") print(traceback.format_exc()) return jsonify({'error': f'파일 저장 오류: {str(e)}'}), 500 except Exception as e: import traceback print(f"[회차 변환] 전체 오류: {str(e)}") print(traceback.format_exc()) return jsonify({'error': f'변환 중 오류가 발생했습니다: {str(e)}'}), 500 @main_bp.route('/api/admin/utils/download-converted-file', methods=['GET']) @admin_required def download_converted_file(): """변환된 파일 다운로드""" try: file_path = request.args.get('path') if not file_path: return jsonify({'error': '파일 경로가 필요합니다.'}), 400 # 보안: 상대 경로만 허용 if os.path.isabs(file_path) or '..' in file_path: return jsonify({'error': '잘못된 파일 경로입니다.'}), 400 full_path = os.path.join(os.getcwd(), file_path) # 파일 존재 확인 if not os.path.exists(full_path): return jsonify({'error': '파일을 찾을 수 없습니다.'}), 404 # 파일명 추출 filename = os.path.basename(full_path) # UUID 접두사 제거 if '_' in filename: filename = '_'.join(filename.split('_')[1:]) return send_file( full_path, as_attachment=True, download_name=filename, mimetype='text/plain; charset=utf-8' ) except Exception as e: return jsonify({'error': f'파일 다운로드 중 오류가 발생했습니다: {str(e)}'}), 500 @main_bp.route('/api/admin/token-usage', methods=['GET']) @admin_required def get_token_usage(): """토큰 사용량 통계 API (날짜 범위, 모델별 필터링 지원)""" try: from datetime import datetime, timedelta from sqlalchemy import func, and_ # 필터 파라미터 가져오기 start_date = request.args.get('start_date') end_date = request.args.get('end_date') model_name = request.args.get('model_name') # 특정 모델 필터링 group_by = request.args.get('group_by', 'day') # 'day', 'week', 'month', 'model' # 기본 날짜 범위 설정 (최근 30일) if not end_date: end_date = datetime.utcnow() else: try: # ISO 형식 또는 YYYY-MM-DD 형식 파싱 if 'T' in end_date: end_date = datetime.fromisoformat(end_date.replace('Z', '+00:00')) else: # YYYY-MM-DD 형식인 경우 시간을 23:59:59로 설정 end_date = datetime.strptime(end_date, '%Y-%m-%d') end_date = end_date.replace(hour=23, minute=59, second=59) except Exception as e: print(f"[토큰 통계] 종료 날짜 파싱 오류: {end_date}, {str(e)}") end_date = datetime.utcnow() if not start_date: start_date = end_date - timedelta(days=30) else: try: # ISO 형식 또는 YYYY-MM-DD 형식 파싱 if 'T' in start_date: start_date = datetime.fromisoformat(start_date.replace('Z', '+00:00')) else: # YYYY-MM-DD 형식인 경우 시간을 00:00:00으로 설정 start_date = datetime.strptime(start_date, '%Y-%m-%d') start_date = start_date.replace(hour=0, minute=0, second=0) except Exception as e: print(f"[토큰 통계] 시작 날짜 파싱 오류: {start_date}, {str(e)}") start_date = end_date - timedelta(days=30) # 쿼리 구성 query = ChatMessage.query.filter( ChatMessage.role == 'ai', ChatMessage.created_at >= start_date, ChatMessage.created_at <= end_date ) # 모델 필터링 if model_name: query = query.filter(ChatMessage.model_name == model_name) # 토큰 정보가 있는 메시지만 조회 (선택적 필터) # 토큰 정보가 없는 메시지도 포함하려면 이 필터를 제거할 수 있음 # 하지만 통계 목적상 토큰 정보가 있는 메시지만 조회하는 것이 맞음 query = query.filter( db.or_( ChatMessage.input_tokens.isnot(None), ChatMessage.output_tokens.isnot(None) ) ) messages = query.all() print(f"[토큰 통계] 조회된 메시지 수: {len(messages)} (날짜 범위: {start_date} ~ {end_date})") # 사용자 사용과 시스템 사용 구분 user_messages = [msg for msg in messages if not msg.usage_type or msg.usage_type == 'user'] system_messages = [msg for msg in messages if msg.usage_type == 'system'] print(f"[토큰 통계] 사용자 사용: {len(user_messages)}개, 시스템 사용: {len(system_messages)}개") # 그룹별 집계 if group_by == 'day': # 일별 집계 daily_stats = {} for msg in messages: date_key = msg.created_at.date().isoformat() if date_key not in daily_stats: daily_stats[date_key] = { 'date': date_key, 'input_tokens': 0, 'output_tokens': 0, 'total_tokens': 0, 'count': 0 } daily_stats[date_key]['input_tokens'] += msg.input_tokens or 0 daily_stats[date_key]['output_tokens'] += msg.output_tokens or 0 daily_stats[date_key]['total_tokens'] += (msg.input_tokens or 0) + (msg.output_tokens or 0) daily_stats[date_key]['count'] += 1 stats = sorted(daily_stats.values(), key=lambda x: x['date']) elif group_by == 'model': # 모델별 집계 (시스템 사용 포함) model_stats = {} for msg in messages: # 시스템 사용은 '시스템 사용'으로 표시 if msg.usage_type == 'system': model_key = '시스템 사용' else: model_key = msg.model_name or 'Unknown' if model_key not in model_stats: model_stats[model_key] = { 'model': model_key, 'input_tokens': 0, 'output_tokens': 0, 'total_tokens': 0, 'count': 0 } model_stats[model_key]['input_tokens'] += msg.input_tokens or 0 model_stats[model_key]['output_tokens'] += msg.output_tokens or 0 model_stats[model_key]['total_tokens'] += (msg.input_tokens or 0) + (msg.output_tokens or 0) model_stats[model_key]['count'] += 1 stats = list(model_stats.values()) else: # 전체 집계 total_input = sum(msg.input_tokens or 0 for msg in messages) total_output = sum(msg.output_tokens or 0 for msg in messages) stats = [{ 'input_tokens': total_input, 'output_tokens': total_output, 'total_tokens': total_input + total_output, 'count': len(messages) }] # 사용 가능한 모델 목록 available_models = db.session.query( ChatMessage.model_name ).filter( ChatMessage.role == 'ai', ChatMessage.model_name.isnot(None) ).distinct().all() models = [m[0] for m in available_models if m[0]] # 사용자 사용과 시스템 사용 통계 user_total_input = sum(msg.input_tokens or 0 for msg in user_messages) user_total_output = sum(msg.output_tokens or 0 for msg in user_messages) system_total_input = sum(msg.input_tokens or 0 for msg in system_messages) system_total_output = sum(msg.output_tokens or 0 for msg in system_messages) return jsonify({ 'success': True, 'stats': stats, 'models': models, 'start_date': start_date.isoformat(), 'end_date': end_date.isoformat(), 'total_messages': len(messages), 'user_usage': { 'input_tokens': user_total_input, 'output_tokens': user_total_output, 'total_tokens': user_total_input + user_total_output, 'count': len(user_messages) }, 'system_usage': { 'input_tokens': system_total_input, 'output_tokens': system_total_output, 'total_tokens': system_total_input + system_total_output, 'count': len(system_messages) } }), 200 except Exception as e: import traceback print(f"[토큰 통계] 오류: {str(e)}") print(traceback.format_exc()) return jsonify({'error': f'토큰 통계 조회 중 오류가 발생했습니다: {str(e)}'}), 500 @main_bp.route('/api/admin/users', methods=['GET']) @admin_required def get_users(): """사용자 목록 API""" try: users = User.query.order_by(User.created_at.desc()).all() return jsonify({ 'users': [user.to_dict() for user in users] }), 200 except Exception as e: return jsonify({'error': f'사용자 목록 조회 중 오류가 발생했습니다: {str(e)}'}), 500 @main_bp.route('/api/admin/users', methods=['POST']) @admin_required def create_user(): """사용자 생성 API""" try: data = request.json username = data.get('username', '').strip() nickname = data.get('nickname', '').strip() password = data.get('password', '') is_admin = data.get('is_admin', False) if not username or not password: return jsonify({'error': '사용자명과 비밀번호를 입력해주세요.'}), 400 if User.query.filter_by(username=username).first(): return jsonify({'error': '이미 존재하는 사용자명입니다.'}), 400 user = User(username=username, nickname=nickname if nickname else None, is_admin=is_admin, is_active=True) user.set_password(password) db.session.add(user) db.session.commit() return jsonify({ 'message': '사용자가 성공적으로 생성되었습니다.', 'user': user.to_dict() }), 200 except Exception as e: db.session.rollback() return jsonify({'error': f'사용자 생성 중 오류가 발생했습니다: {str(e)}'}), 500 @main_bp.route('/api/admin/users/', methods=['PUT']) @admin_required def update_user(user_id): """사용자 정보 수정 API""" try: user = User.query.get_or_404(user_id) data = request.json # 자기 자신의 관리자 권한을 제거하는 것은 방지 if user_id == current_user.id and data.get('is_admin') == False: return jsonify({'error': '자기 자신의 관리자 권한을 제거할 수 없습니다.'}), 400 if 'username' in data: new_username = data['username'].strip() if new_username != user.username: if User.query.filter_by(username=new_username).first(): return jsonify({'error': '이미 존재하는 사용자명입니다.'}), 400 user.username = new_username if 'nickname' in data: user.nickname = data['nickname'].strip() if data['nickname'] else None if 'password' in data and data['password']: user.set_password(data['password']) if 'is_admin' in data: user.is_admin = data['is_admin'] if 'is_active' in data: user.is_active = data['is_active'] db.session.commit() return jsonify({ 'message': '사용자 정보가 성공적으로 수정되었습니다.', 'user': user.to_dict() }), 200 except Exception as e: db.session.rollback() return jsonify({'error': f'사용자 정보 수정 중 오류가 발생했습니다: {str(e)}'}), 500 @main_bp.route('/api/admin/messages', methods=['GET']) @admin_required def get_all_messages(): """전체 메시지 조회 (관리자용)""" try: user_id = request.args.get('user_id', type=int) session_id = request.args.get('session_id', type=int) message_id = request.args.get('message_id', type=int) page = request.args.get('page', 1, type=int) per_page = request.args.get('per_page', 50, type=int) # user_id가 있는 경우에만 join 사용, 그 외에는 직접 조회 if user_id: # user_id로 필터링할 때는 join 필요 query = ChatMessage.query.join(ChatSession).filter(ChatSession.user_id == user_id) else: # user_id가 없으면 join 없이 직접 조회 query = ChatMessage.query if session_id: query = query.filter(ChatMessage.session_id == session_id) if message_id: query = query.filter(ChatMessage.id == message_id) messages = query.order_by(ChatMessage.created_at.desc())\ .paginate(page=page, per_page=per_page, error_out=False) return jsonify({ 'messages': [msg.to_dict() for msg in messages.items], 'total': messages.total, 'pages': messages.pages, 'current_page': page }), 200 except Exception as e: import traceback error_trace = traceback.format_exc() print(f"[메시지 조회] 오류: {str(e)}") print(error_trace) return jsonify({'error': f'메시지 조회 중 오류가 발생했습니다: {str(e)}'}), 500 @main_bp.route('/api/admin/sessions', methods=['GET']) @admin_required def get_all_sessions(): """전체 대화 세션 조회 (관리자용)""" try: user_id = request.args.get('user_id', type=int) page = request.args.get('page', 1, type=int) per_page = request.args.get('per_page', 50, type=int) query = ChatSession.query if user_id: query = query.filter(ChatSession.user_id == user_id) sessions = query.order_by(ChatSession.updated_at.desc())\ .paginate(page=page, per_page=per_page, error_out=False) sessions_data = [] for session in sessions.items: try: session_dict = session.to_dict() # 사용자 정보 안전하게 가져오기 try: if session.user: session_dict['username'] = session.user.username if hasattr(session.user, 'username') else 'Unknown' session_dict['nickname'] = session.user.nickname if hasattr(session.user, 'nickname') else None else: # user_id로 직접 조회 시도 user = User.query.get(session.user_id) if session.user_id else None if user: session_dict['username'] = user.username session_dict['nickname'] = user.nickname else: session_dict['username'] = 'Unknown' session_dict['nickname'] = None except Exception as user_error: print(f"[세션 조회] 사용자 정보 조회 오류 (세션 ID: {session.id}): {str(user_error)}") session_dict['username'] = 'Unknown' session_dict['nickname'] = None sessions_data.append(session_dict) except Exception as session_error: print(f"[세션 조회] 세션 처리 오류 (세션 ID: {session.id if hasattr(session, 'id') else 'Unknown'}): {str(session_error)}") import traceback traceback.print_exc() continue # 문제가 있는 세션은 건너뛰고 계속 진행 return jsonify({ 'sessions': sessions_data, 'total': sessions.total, 'pages': sessions.pages, 'current_page': page }), 200 except Exception as e: import traceback error_trace = traceback.format_exc() print(f"[세션 조회] 전체 오류: {str(e)}") print(error_trace) return jsonify({'error': f'대화 세션 조회 중 오류가 발생했습니다: {str(e)}'}), 500 @main_bp.route('/api/admin/users/', methods=['DELETE']) @admin_required def delete_user(user_id): """사용자 삭제 API""" try: user = User.query.get_or_404(user_id) # 자기 자신을 삭제하는 것은 방지 if user_id == current_user.id: return jsonify({'error': '자기 자신을 삭제할 수 없습니다.'}), 400 db.session.delete(user) db.session.commit() return jsonify({'message': '사용자가 성공적으로 삭제되었습니다.'}), 200 except Exception as e: db.session.rollback() return jsonify({'error': f'사용자 삭제 중 오류가 발생했습니다: {str(e)}'}), 500 @main_bp.route('/api/admin/gemini-api-key', methods=['GET']) @admin_required def get_gemini_api_key(): """Gemini API 키 조회""" try: # SystemConfig에서 API 키 가져오기 (테이블이 없으면 빈 문자열 반환) api_key = SystemConfig.get_config('gemini_api_key', '') # 보안을 위해 마스킹된 값 반환 (처음 8자만 표시) masked_key = api_key[:8] + '...' if api_key and len(api_key) > 8 else '' return jsonify({ 'has_api_key': bool(api_key), 'masked_key': masked_key }), 200 except Exception as e: print(f"[Gemini API 키 조회] 오류: {e}") import traceback traceback.print_exc() return jsonify({'error': f'API 키 조회 중 오류가 발생했습니다: {str(e)}'}), 500 @main_bp.route('/api/admin/huggingface-token', methods=['GET']) @admin_required def get_huggingface_token(): """Hugging Face 토큰 조회""" try: from app.huggingface_client import get_huggingface_token token = get_huggingface_token() # 보안을 위해 마스킹된 값 반환 (처음 8자만 표시) masked_token = token[:8] + '...' if token and len(token) > 8 else '' return jsonify({ 'has_token': bool(token), 'masked_token': masked_token }), 200 except Exception as e: print(f"[Hugging Face 토큰 조회] 오류: {e}") import traceback traceback.print_exc() return jsonify({'error': f'토큰 조회 중 오류가 발생했습니다: {str(e)}'}), 500 @main_bp.route('/api/admin/model-tokens', methods=['GET']) @admin_required def get_model_tokens(): """모든 모델의 토큰 수 설정 조회 (입력/출력 분리)""" try: # Ollama 모델 목록 가져오기 ollama_models = [] try: response = requests.get(f'{OLLAMA_BASE_URL}/api/tags', timeout=5) if response.status_code == 200: data = response.json() ollama_models = [model['name'] for model in data.get('models', [])] except: pass # Gemini 모델 목록 가져오기 gemini_models = [] try: gemini_client = get_gemini_client() if gemini_client.is_configured(): gemini_models = gemini_client.get_available_models() gemini_models = [f'gemini:{m}' for m in gemini_models] except: pass # 모든 모델 목록 all_models = ollama_models + gemini_models # 각 모델의 토큰 수 설정 가져오기 (입력/출력/Parent Chunk 분리) model_input_tokens = {} model_output_tokens = {} model_parent_chunk_tokens = {} default_input_tokens = {} default_output_tokens = {} default_parent_chunk_tokens = {} # 모델별 기본값 결정 def get_default_token_for_model(model_name, token_type='output'): """모델별 기본 토큰 수 결정""" if not model_name: if token_type == 'parent_chunk': return 8192 return 2000 if token_type == 'output' else 100000 # Gemini 모델의 경우 if model_name.startswith('gemini:'): if token_type == 'parent_chunk': return 8192 # Parent Chunk 기본값 return 2000 if token_type == 'output' else 100000 # Gemini 입력 기본값은 더 큼 # Ollama 모델의 경우 if token_type == 'parent_chunk': return 8192 # Parent Chunk 기본값 return 2000 if token_type == 'output' else 100000 # Ollama 입력 기본값도 더 큼 for model_name in all_models: # 입력 토큰 설정 가져오기 input_config_key = f"model_token_input_{model_name}" input_token_value = SystemConfig.get_config(input_config_key) default_input_token = get_default_token_for_model(model_name, 'input') default_input_tokens[model_name] = default_input_token if input_token_value: try: model_input_tokens[model_name] = int(input_token_value) except (ValueError, TypeError): model_input_tokens[model_name] = None else: model_input_tokens[model_name] = None # 출력 토큰 설정 가져오기 output_config_key = f"model_token_output_{model_name}" output_token_value = SystemConfig.get_config(output_config_key) # 하위 호환성: 기존 형식도 확인 if not output_token_value: old_config_key = f"model_token_{model_name}" output_token_value = SystemConfig.get_config(old_config_key) default_output_token = get_default_token_for_model(model_name, 'output') default_output_tokens[model_name] = default_output_token if output_token_value: try: model_output_tokens[model_name] = int(output_token_value) except (ValueError, TypeError): model_output_tokens[model_name] = None else: model_output_tokens[model_name] = None # Parent Chunk 토큰 설정 가져오기 parent_chunk_config_key = f"model_token_parent_chunk_{model_name}" parent_chunk_token_value = SystemConfig.get_config(parent_chunk_config_key) default_parent_chunk_token = get_default_token_for_model(model_name, 'parent_chunk') default_parent_chunk_tokens[model_name] = default_parent_chunk_token if parent_chunk_token_value: try: model_parent_chunk_tokens[model_name] = int(parent_chunk_token_value) except (ValueError, TypeError): model_parent_chunk_tokens[model_name] = None else: model_parent_chunk_tokens[model_name] = None return jsonify({ 'models': all_models, 'input_tokens': model_input_tokens, 'output_tokens': model_output_tokens, 'parent_chunk_tokens': model_parent_chunk_tokens, 'default_input_tokens': default_input_tokens, 'default_output_tokens': default_output_tokens, 'default_parent_chunk_tokens': default_parent_chunk_tokens }), 200 except Exception as e: return jsonify({'error': f'토큰 수 설정 조회 중 오류가 발생했습니다: {str(e)}'}), 500 @main_bp.route('/api/admin/model-tokens', methods=['POST']) @admin_required def save_model_tokens(): """모델별 토큰 수 설정 저장 (입력/출력 분리, 또는 삭제)""" try: data = request.json model_name = data.get('model_name', '').strip() token_type = data.get('token_type', 'output').strip() # 'input' 또는 'output' tokens = data.get('tokens', None) if not model_name: return jsonify({'error': '모델명을 입력해주세요.'}), 400 if token_type not in ['input', 'output', 'parent_chunk']: return jsonify({'error': '토큰 타입은 "input", "output", 또는 "parent_chunk"이어야 합니다.'}), 400 # tokens가 None이면 설정 삭제 (기본값 사용) if tokens is None: try: config_key = f"model_token_{token_type}_{model_name}" config = SystemConfig.query.filter_by(key=config_key).first() if config: db.session.delete(config) db.session.commit() return jsonify({ 'message': f'{model_name} 모델의 {token_type} 토큰 수 설정이 삭제되었습니다. 기본값을 사용합니다.', 'model_name': model_name, 'token_type': token_type, 'tokens': None }), 200 else: # 하위 호환성: 기존 형식도 삭제 시도 (출력 토큰인 경우) if token_type == 'output': old_config_key = f"model_token_{model_name}" old_config = SystemConfig.query.filter_by(key=old_config_key).first() if old_config: db.session.delete(old_config) db.session.commit() return jsonify({ 'message': f'{model_name} 모델의 출력 토큰 수 설정이 삭제되었습니다. 기본값을 사용합니다.', 'model_name': model_name, 'token_type': token_type, 'tokens': None }), 200 return jsonify({ 'message': f'{model_name} 모델은 이미 기본값을 사용하고 있습니다.', 'model_name': model_name, 'token_type': token_type, 'tokens': None }), 200 except Exception as e: db.session.rollback() return jsonify({'error': f'설정 삭제 중 오류가 발생했습니다: {str(e)}'}), 500 try: tokens = int(tokens) if tokens < 1: return jsonify({'error': '토큰 수는 1 이상이어야 합니다.'}), 400 except (ValueError, TypeError): return jsonify({'error': '토큰 수는 정수여야 합니다.'}), 400 # SystemConfig에 저장 config_key = f"model_token_{token_type}_{model_name}" SystemConfig.set_config(config_key, str(tokens), f'{model_name} 모델 {token_type} 토큰 수 제한') return jsonify({ 'message': f'{model_name} 모델의 {token_type} 토큰 수가 {tokens}로 설정되었습니다.', 'model_name': model_name, 'token_type': token_type, 'tokens': tokens }), 200 except Exception as e: db.session.rollback() print(f"[토큰 수 저장] 오류: {e}") import traceback traceback.print_exc() return jsonify({'error': f'토큰 수 저장 중 오류가 발생했습니다: {str(e)}'}), 500 @main_bp.route('/api/admin/gemini-api-key', methods=['POST']) @admin_required def set_gemini_api_key(): """Gemini API 키 저장/업데이트""" try: if not request.is_json: return jsonify({'error': 'Content-Type이 application/json이 아닙니다.'}), 400 data = request.json if not data: return jsonify({'error': '요청 데이터가 없습니다.'}), 400 api_key = data.get('api_key', '').strip() if not api_key: return jsonify({'error': 'API 키를 입력해주세요.'}), 400 # API 키 저장 (SystemConfig.set_config 내부에서 테이블 생성 처리) try: SystemConfig.set_config( key='gemini_api_key', value=api_key, description='Google Gemini API 키' ) # 저장 확인 saved_key = SystemConfig.get_config('gemini_api_key', '') if saved_key == api_key: print(f"[Gemini API 키 저장] 성공: 저장 확인됨 (길이: {len(api_key)}자)") else: print(f"[Gemini API 키 저장] 경고: 저장 후 확인 실패. 저장된 값: {saved_key[:20] if saved_key else 'None'}...") except Exception as save_error: print(f"[Gemini API 키 저장] 오류: {save_error}") import traceback traceback.print_exc() return jsonify({'error': f'API 키 저장 중 오류가 발생했습니다: {str(save_error)}'}), 500 # Gemini 클라이언트에 API 키 재로드 알림 try: from app.gemini_client import reset_gemini_client reset_gemini_client() print(f"[Gemini] API 키가 업데이트되어 클라이언트가 재로드되었습니다.") except Exception as e: print(f"[Gemini] API 키 재로드 실패: {e}") # 최종 확인: DB에서 실제로 저장되었는지 확인 final_check = SystemConfig.get_config('gemini_api_key', '') if not final_check: print(f"[Gemini API 키 저장] 경고: 저장 후 DB에서 조회 실패") return jsonify({ 'error': 'API 키 저장 후 확인에 실패했습니다. 데이터베이스 연결을 확인하세요.', 'saved': False }), 500 return jsonify({ 'message': 'Gemini API 키가 성공적으로 저장되었습니다.', 'has_api_key': True, 'masked_key': api_key[:8] + '...' if api_key and len(api_key) > 8 else '', 'saved': True, 'config_count': SystemConfig.query.count() # 현재 설정 개수 반환 }), 200 except Exception as e: db.session.rollback() print(f"[Gemini API 키 저장] 오류: {e}") import traceback traceback.print_exc() return jsonify({'error': f'API 키 저장 중 오류가 발생했습니다: {str(e)}'}), 500 @main_bp.route('/api/admin/huggingface-token', methods=['POST']) @admin_required def set_huggingface_token(): """Hugging Face 토큰 저장/업데이트""" try: if not request.is_json: return jsonify({'error': 'Content-Type이 application/json이 아닙니다.'}), 400 data = request.json if not data: return jsonify({'error': '요청 데이터가 없습니다.'}), 400 token = data.get('token', '').strip() if not token: return jsonify({'error': '토큰을 입력해주세요.'}), 400 # 토큰 저장 (SystemConfig.set_config 내부에서 테이블 생성 처리) SystemConfig.set_config( key='huggingface_token', value=token, description='Hugging Face API 토큰' ) # Hugging Face 클라이언트에 토큰 재로드 알림 try: from app.huggingface_client import reset_huggingface_token reset_huggingface_token() print(f"[Hugging Face] 토큰이 업데이트되어 클라이언트가 재로드되었습니다.") except Exception as e: print(f"[Hugging Face] 토큰 재로드 실패: {e}") return jsonify({ 'message': 'Hugging Face 토큰이 성공적으로 저장되었습니다.', 'has_token': True }), 200 except Exception as e: db.session.rollback() print(f"[Hugging Face 토큰 저장] 오류: {e}") import traceback traceback.print_exc() return jsonify({'error': f'토큰 저장 중 오류가 발생했습니다: {str(e)}'}), 500 @main_bp.route('/api/ollama/models', methods=['GET']) @login_required def get_ollama_models(): """Ollama 및 Gemini에서 사용 가능한 모델 목록 가져오기 (로컬 AI 모델은 학습된 웹소설이 있는 모델만 표시)""" try: # 쿼리 파라미터로 all=true가 전달되면 모든 모델 반환 show_all = request.args.get('all', 'false').lower() == 'true' all_models = [] # 1. Ollama 모델 목록 가져오기 try: response = requests.get(f'{OLLAMA_BASE_URL}/api/tags', timeout=5) if response.status_code == 200: data = response.json() ollama_models_raw = [model['name'] for model in data.get('models', [])] if show_all: # 모든 Ollama 모델 반환 ollama_models = [{'name': model_name, 'type': 'ollama'} for model_name in ollama_models_raw] all_models.extend(ollama_models) print(f"[모델 목록] Ollama 모델 {len(ollama_models)}개 추가 (전체 목록)") else: # 학습된 웹소설이 있는 모델만 필터링 filtered_ollama_models = [] for model_name in ollama_models_raw: # 해당 모델로 학습된 원본 파일이 있는지 확인 (parent_file_id가 None인 파일만) file_count = UploadedFile.query.filter_by( model_name=model_name, parent_file_id=None ).count() if file_count > 0: filtered_ollama_models.append({'name': model_name, 'type': 'ollama'}) print(f"[모델 목록] Ollama 모델 '{model_name}' - 학습된 웹소설 {file_count}개") else: print(f"[모델 목록] Ollama 모델 '{model_name}' - 학습된 웹소설 없음, 목록에서 제외") all_models.extend(filtered_ollama_models) print(f"[모델 목록] Ollama 모델 {len(filtered_ollama_models)}개 추가 (전체 {len(ollama_models_raw)}개 중 {len(filtered_ollama_models)}개 필터링됨)") except Exception as e: print(f"[모델 목록] Ollama 모델 목록 조회 실패: {e}") # 2. Gemini 모델 목록 가져오기 try: gemini_client = get_gemini_client() if gemini_client.is_configured(): gemini_models = gemini_client.get_available_models() if show_all: # 모든 Gemini 모델 반환 gemini_models_list = [{'name': f'gemini:{model_name}', 'type': 'gemini'} for model_name in gemini_models] all_models.extend(gemini_models_list) print(f"[모델 목록] Gemini 모델 {len(gemini_models_list)}개 추가 (전체 목록)") else: # 학습된 웹소설이 있는 모델만 필터링 filtered_gemini_models = [] for model_name in gemini_models: full_model_name = f'gemini:{model_name}' # 해당 모델로 학습된 원본 파일이 있는지 확인 (parent_file_id가 None인 파일만) file_count = UploadedFile.query.filter_by( model_name=full_model_name, parent_file_id=None ).count() if file_count > 0: filtered_gemini_models.append({'name': full_model_name, 'type': 'gemini'}) print(f"[모델 목록] Gemini 모델 '{full_model_name}' - 학습된 웹소설 {file_count}개") else: print(f"[모델 목록] Gemini 모델 '{full_model_name}' - 학습된 웹소설 없음, 목록에서 제외") all_models.extend(filtered_gemini_models) print(f"[모델 목록] Gemini 모델 {len(filtered_gemini_models)}개 추가 (전체 {len(gemini_models)}개 중 {len(filtered_gemini_models)}개 필터링됨)") else: print(f"[모델 목록] Gemini API 키가 설정되지 않아 Gemini 모델을 불러올 수 없습니다.") except Exception as e: print(f"[모델 목록] Gemini 모델 목록 조회 실패: {e}") if all_models: return jsonify({'models': all_models}) else: return jsonify({'error': '사용 가능한 모델이 없습니다. Ollama가 실행 중인지, 또는 Gemini API 키가 설정되었는지 확인하세요.', 'models': []}), 500 except Exception as e: return jsonify({'error': f'모델 목록을 가져오는 중 오류가 발생했습니다: {str(e)}', 'models': []}), 500 @main_bp.route('/api/admin/prompts', methods=['GET']) @admin_required def get_system_prompt(): """시스템 프롬프트 가져오기""" try: prompt = SystemConfig.get_config('system_prompt', '') return jsonify({'prompt': prompt}), 200 except Exception as e: return jsonify({'error': f'프롬프트를 가져오는 중 오류가 발생했습니다: {str(e)}'}), 500 @main_bp.route('/api/admin/prompts', methods=['POST']) @admin_required def save_system_prompt(): """시스템 프롬프트 저장""" try: data = request.json prompt = data.get('prompt', '').strip() SystemConfig.set_config( key='system_prompt', value=prompt, description='질문할 때 자동으로 붙이는 시스템 프롬프트' ) return jsonify({ 'message': '프롬프트가 성공적으로 저장되었습니다.', 'prompt': prompt }), 200 except Exception as e: db.session.rollback() return jsonify({'error': f'프롬프트 저장 중 오류가 발생했습니다: {str(e)}'}), 500 @main_bp.route('/api/admin/database/status', methods=['GET']) @admin_required def get_database_status(): """데이터베이스 연결 상태 확인""" try: from flask import current_app from sqlalchemy import create_engine, text from datetime import datetime db_uri = current_app.config['SQLALCHEMY_DATABASE_URI'] is_postgresql = db_uri.startswith('postgresql://') or db_uri.startswith('postgres://') # 연결 정보 (보안을 위해 비밀번호 마스킹) if is_postgresql and '@' in db_uri: masked_uri = db_uri.split('@')[0].split('://')[0] + '://***@' + '@'.join(db_uri.split('@')[1:]) else: masked_uri = db_uri status = { 'connected': False, 'type': 'PostgreSQL' if is_postgresql else 'SQLite', 'uri_masked': masked_uri, 'version': None, 'error': None, 'test_query': None, 'table_count': 0, 'user_count': 0, 'config_count': 0 } # 연결 테스트 try: if is_postgresql: # PostgreSQL 연결 테스트 engine = create_engine( db_uri, pool_pre_ping=True, pool_recycle=300 ) with engine.connect() as conn: # 버전 확인 result = conn.execute(text("SELECT version()")) version = result.fetchone()[0] status['version'] = version[:100] # 처음 100자만 # 테이블 개수 확인 result = conn.execute(text(""" SELECT COUNT(*) FROM information_schema.tables WHERE table_schema = 'public' """)) status['table_count'] = result.fetchone()[0] # 사용자 개수 확인 result = conn.execute(text("SELECT COUNT(*) FROM \"user\"")) status['user_count'] = result.fetchone()[0] # 설정 개수 확인 result = conn.execute(text("SELECT COUNT(*) FROM system_config")) status['config_count'] = result.fetchone()[0] # 테스트 쿼리 result = conn.execute(text("SELECT NOW()")) test_time = result.fetchone()[0] status['test_query'] = f"현재 시간: {test_time}" status['connected'] = True else: # SQLite 연결 테스트 from sqlalchemy import inspect inspector = inspect(db.engine) tables = inspector.get_table_names() status['table_count'] = len(tables) # 사용자 개수 확인 user_count = User.query.count() status['user_count'] = user_count # 설정 개수 확인 config_count = SystemConfig.query.count() status['config_count'] = config_count # SQLite 버전 확인 with db.engine.connect() as conn: result = conn.execute(text("SELECT sqlite_version()")) version = result.fetchone()[0] status['version'] = f"SQLite {version}" # 테스트 쿼리 result = conn.execute(text("SELECT datetime('now')")) test_time = result.fetchone()[0] status['test_query'] = f"현재 시간: {test_time}" status['connected'] = True except Exception as e: status['error'] = str(e) status['connected'] = False return jsonify(status), 200 if status['connected'] else 500 except Exception as e: import traceback traceback.print_exc() return jsonify({ 'error': f'데이터베이스 상태 확인 중 오류가 발생했습니다: {str(e)}', 'connected': False }), 500 @main_bp.route('/api/admin/ollama/models', methods=['GET']) @admin_required def get_all_ollama_models(): """관리자용: Ollama 및 Gemini에서 사용 가능한 모든 모델 목록 가져오기 (필터링 없이 전체 목록)""" try: all_models = [] # 1. Ollama 모델 목록 가져오기 (필터링 없이 전체 목록) try: response = requests.get(f'{OLLAMA_BASE_URL}/api/tags', timeout=5) if response.status_code == 200: data = response.json() ollama_models_raw = [model['name'] for model in data.get('models', [])] # 필터링 없이 모든 Ollama 모델 추가 for model_name in ollama_models_raw: # 각 모델의 학습된 웹소설 개수 확인 (정보 제공용) file_count = UploadedFile.query.filter_by( model_name=model_name, parent_file_id=None ).count() all_models.append({ 'name': model_name, 'type': 'ollama', 'file_count': file_count # 정보 제공용 }) print(f"[관리자 모델 목록] Ollama 모델 '{model_name}' - 학습된 웹소설 {file_count}개") print(f"[관리자 모델 목록] Ollama 모델 {len(ollama_models_raw)}개 추가") except Exception as e: print(f"[관리자 모델 목록] Ollama 모델 목록 조회 실패: {e}") # 2. Gemini 모델 목록 가져오기 (필터링 없이 전체 목록) try: gemini_client = get_gemini_client() if gemini_client.is_configured(): gemini_models = gemini_client.get_available_models() # 필터링 없이 모든 Gemini 모델 추가 for model_name in gemini_models: full_model_name = f'gemini:{model_name}' # 각 모델의 학습된 웹소설 개수 확인 (정보 제공용) file_count = UploadedFile.query.filter_by( model_name=full_model_name, parent_file_id=None ).count() all_models.append({ 'name': full_model_name, 'type': 'gemini', 'file_count': file_count # 정보 제공용 }) print(f"[관리자 모델 목록] Gemini 모델 '{full_model_name}' - 학습된 웹소설 {file_count}개") print(f"[관리자 모델 목록] Gemini 모델 {len(gemini_models)}개 추가") else: print(f"[관리자 모델 목록] Gemini API 키가 설정되지 않아 Gemini 모델을 불러올 수 없습니다.") except Exception as e: print(f"[관리자 모델 목록] Gemini 모델 목록 조회 실패: {e}") if all_models: return jsonify({'models': all_models}) else: return jsonify({'error': '사용 가능한 모델이 없습니다. Ollama가 실행 중인지, 또는 Gemini API 키가 설정되었는지 확인하세요.', 'models': []}), 500 except Exception as e: return jsonify({'error': f'모델 목록을 가져오는 중 오류가 발생했습니다: {str(e)}', 'models': []}), 500 @main_bp.route('/api/admin/default-models', methods=['GET']) @admin_required def get_default_models(): """기본 AI 모델 설정 조회""" try: default_analysis_model = SystemConfig.get_config('default_analysis_model', '') default_answer_model = SystemConfig.get_config('default_answer_model', '') return jsonify({ 'success': True, 'default_analysis_model': default_analysis_model, 'default_answer_model': default_answer_model }), 200 except Exception as e: return jsonify({'error': f'기본 모델 설정 조회 중 오류가 발생했습니다: {str(e)}'}), 500 @main_bp.route('/api/admin/default-models', methods=['POST']) @admin_required def set_default_models(): """기본 AI 모델 설정 저장""" try: data = request.json default_analysis_model = data.get('default_analysis_model', '').strip() default_answer_model = data.get('default_answer_model', '').strip() # 빈 문자열이면 설정 삭제 if default_analysis_model: SystemConfig.set_config('default_analysis_model', default_analysis_model, '기본 질문 분석용 AI 모델') else: # 설정 삭제 config = SystemConfig.query.filter_by(key='default_analysis_model').first() if config: db.session.delete(config) db.session.commit() if default_answer_model: SystemConfig.set_config('default_answer_model', default_answer_model, '기본 답변 생성용 AI 모델') else: # 설정 삭제 config = SystemConfig.query.filter_by(key='default_answer_model').first() if config: db.session.delete(config) db.session.commit() return jsonify({ 'success': True, 'message': '기본 AI 모델이 설정되었습니다.', 'default_analysis_model': default_analysis_model or None, 'default_answer_model': default_answer_model or None }), 200 except Exception as e: db.session.rollback() return jsonify({'error': f'기본 모델 설정 저장 중 오류가 발생했습니다: {str(e)}'}), 500 @main_bp.route('/api/default-models', methods=['GET']) @login_required def get_user_default_models(): """사용자용: 기본 AI 모델 설정 조회 (공개 API)""" try: default_analysis_model = SystemConfig.get_config('default_analysis_model', '') default_answer_model = SystemConfig.get_config('default_answer_model', '') return jsonify({ 'success': True, 'default_analysis_model': default_analysis_model, 'default_answer_model': default_answer_model }), 200 except Exception as e: return jsonify({'error': f'기본 모델 설정 조회 중 오류가 발생했습니다: {str(e)}'}), 500 @main_bp.route('/api/chat', methods=['POST']) @login_required def chat(): """채팅 API 엔드포인트""" try: data = request.json message = data.get('message', '') # 하위 호환성을 위해 model도 확인 (기존 코드) analysis_model = data.get('analysis_model', data.get('model', '')) # 질문 분석용 모델 answer_model = data.get('answer_model', '') # 최종 답변용 모델 file_ids = [int(fid) for fid in data.get('file_ids', []) if fid] # 선택한 웹소설 파일 ID 목록 session_id = data.get('session_id', None) # 대화 세션 ID (정수로 변환) if not message: return jsonify({'error': '메시지가 필요합니다.'}), 400 # 답변용 모델이 없으면 분석용 모델 사용 (하위 호환성) if not answer_model: answer_model = analysis_model # 답변용 모델이 여전히 없으면 에러 반환 if not answer_model: return jsonify({'error': '답변을 생성할 AI 모델이 선택되지 않았습니다. "사용 가능한 AI 목록"에서 답변을 생성할 AI 모델을 선택해주세요.'}), 400 # 분석용 모델이 선택된 경우 RAG 검색 진행 if analysis_model: try: # RAG: 질문과 관련된 청크 검색 context = "" use_rag = True # RAG 사용 여부 if use_rag: print(f"\n[RAG 검색] 분석 모델: {analysis_model}, 답변 모델: {answer_model}, 질문: {message[:50]}...") print(f"[RAG 검색] 선택된 파일 ID: {file_ids if file_ids else '없음 (모든 파일 검색)'}") # 1단계: 회차별 분석(EpisodeAnalysis) 조회 (회차별 요약 참조용) episode_analyses = [] if file_ids: print(f"[RAG 검색 1단계] 회차별 분석 조회 시작...") episode_analyses = get_episode_analyses_for_files(file_ids) print(f"[RAG 검색 1단계] 회차별 분석 조회 완료: {len(episode_analyses)}개 파일") # 2단계: GraphRAG 데이터 조회 (엔티티, 관계, 사건) graph_data = None if file_ids: print(f"[RAG 검색 2단계] GraphRAG 데이터 조회 시작...") graph_data = get_relevant_graph_data( query=message, file_ids=file_ids ) print(f"[RAG 검색 2단계] GraphRAG 데이터 조회 완료: 엔티티 {len(graph_data['entities'])}개, 관계 {len(graph_data['relationships'])}개, 사건 {len(graph_data['events'])}개") # 3단계: 벡터 검색 + 리랭킹으로 Child Chunk 정밀 검색 (분석 모델 사용) print(f"[RAG 검색 3단계] 벡터 검색 + 리랭킹 시작 (분석 모델: {analysis_model})...") relevant_chunks = search_relevant_chunks( query=message, file_ids=file_ids if file_ids else None, model_name=analysis_model, # 질문 분석은 analysis_model 사용 top_k=5, # 리랭킹 후 상위 5개만 선택 min_score=0.5 # 최소 점수 임계값 ) print(f"[RAG 검색 3단계] 벡터 검색 + 리랭킹 완료: {len(relevant_chunks)}개 청크 (상위 5개)") # 컨텍스트 구성 context_parts = [] # GraphRAG 데이터 추가 (엔티티, 관계, 사건 정보) if graph_data and (graph_data['entities'] or graph_data['relationships'] or graph_data['events']): graph_context_parts = [] # 엔티티 정보 if graph_data['entities']: entity_sections = {} for entity in graph_data['entities']: episode = entity.get('episode_title', '기타') if episode not in entity_sections: entity_sections[episode] = {'characters': [], 'locations': []} if entity.get('entity_type') == 'character': entity_sections[episode]['characters'].append(entity) elif entity.get('entity_type') == 'location': entity_sections[episode]['locations'].append(entity) entity_text = "다음은 질문과 관련된 등장인물 및 장소 정보입니다:\n\n" for episode, entities in entity_sections.items(): entity_text += f"=== {episode} ===\n" if entities['characters']: entity_text += "인물:\n" for char in entities['characters']: entity_text += f"- {char.get('entity_name', '')}" if char.get('role'): entity_text += f" (역할: {char.get('role')})" if char.get('description'): entity_text += f": {char.get('description')}" entity_text += "\n" if entities['locations']: entity_text += "장소:\n" for loc in entities['locations']: entity_text += f"- {loc.get('entity_name', '')}" if loc.get('category'): entity_text += f" (유형: {loc.get('category')})" if loc.get('description'): entity_text += f": {loc.get('description')}" entity_text += "\n" entity_text += "\n" graph_context_parts.append(entity_text) # 관계 정보 if graph_data['relationships']: rel_sections = {} for rel in graph_data['relationships']: episode = rel.get('episode_title', '기타') if episode not in rel_sections: rel_sections[episode] = [] rel_sections[episode].append(rel) rel_text = "다음은 질문과 관련된 인물/장소 간의 관계 정보입니다:\n\n" for episode, rels in rel_sections.items(): rel_text += f"=== {episode} ===\n" for rel in rels: rel_text += f"- {rel.get('source', '')} → {rel.get('target', '')}" if rel.get('relationship_type'): rel_text += f" ({rel.get('relationship_type')})" if rel.get('description'): rel_text += f": {rel.get('description')}" if rel.get('event'): rel_text += f" [관련 사건: {rel.get('event')}]" rel_text += "\n" rel_text += "\n" graph_context_parts.append(rel_text) # 사건 정보 if graph_data['events']: event_sections = {} for event in graph_data['events']: episode = event.get('episode_title', '기타') if episode not in event_sections: event_sections[episode] = [] event_sections[episode].append(event) event_text = "다음은 질문과 관련된 주요 사건 정보입니다:\n\n" for episode, events in event_sections.items(): event_text += f"=== {episode} ===\n" for event in events: if event.get('event_name'): event_text += f"- {event.get('event_name')}\n" if event.get('description'): event_text += f" 설명: {event.get('description')}\n" if event.get('participants') and len(event.get('participants', [])) > 0: event_text += f" 관련 인물: {', '.join(event.get('participants', []))}\n" if event.get('location'): event_text += f" 장소: {event.get('location')}\n" if event.get('significance'): event_text += f" 중요도: {event.get('significance')}\n" event_text += "\n" graph_context_parts.append(event_text) if graph_context_parts: graph_context = "\n\n".join(graph_context_parts) context_parts.append(f"다음은 질문과 관련된 GraphRAG 데이터입니다 (엔티티, 관계, 사건 정보):\n\n{graph_context}") print(f"[RAG 검색] GraphRAG 컨텍스트 추가: {len(graph_context)}자") # 회차별 분석 정보 추가 (회차별 요약 참조용) if episode_analyses: episode_context_sections = [] for episode_analysis in episode_analyses: file = episode_analysis.file file_info = f"\n=== {file.original_filename} 회차별 분석 ===\n" if episode_analysis.analysis_content: episode_context_sections.append(file_info + episode_analysis.analysis_content) if episode_context_sections: episode_context = "\n\n".join(episode_context_sections) context_parts.append(f"다음은 웹소설의 회차별 상세 분석 내용입니다:\n\n{episode_context}") print(f"[RAG 검색] 회차별 분석 컨텍스트 추가: {len(episode_context)}자") # Child Chunk 정보 추가 (정밀 검색 결과) if relevant_chunks: child_context_parts = [] seen_files = set() for chunk in relevant_chunks: file = chunk.file if file.original_filename not in seen_files: seen_files.add(file.original_filename) print(f"[RAG 검색] 사용된 파일: {file.original_filename} (모델: {file.model_name})") child_context_parts.append(f"[{file.original_filename} - 청크 {chunk.chunk_index + 1}]\n{chunk.content}") if child_context_parts: # 컨텍스트 길이 확인 및 최적화 full_child_context = "\n\n".join(child_context_parts) child_context_length = len(full_child_context) # Child Chunk 컨텍스트가 너무 길면 일부만 사용 (최대 15000자) if child_context_length > 15000: truncated_parts = [] current_length = 0 for part in child_context_parts: if current_length + len(part) > 15000: break truncated_parts.append(part) current_length += len(part) full_child_context = "\n\n".join(truncated_parts) print(f"[RAG 검색] Child Chunk 컨텍스트 길이 조절: {child_context_length}자 → {len(full_child_context)}자") context_parts.append(f"다음은 질문과 관련된 웹소설의 구체적인 내용입니다 (정밀 검색 결과, 총 {len(relevant_chunks)}개 청크):\n\n{full_child_context}") print(f"[RAG 검색] Child Chunk 컨텍스트 추가: {len(full_child_context)}자") # 최종 컨텍스트 구성 if context_parts: full_context = "\n\n" + "\n\n---\n\n".join(context_parts) + "\n\n" # 회차별 분석, GraphRAG, Child Chunk 모두 있는 경우 has_graph = graph_data and (graph_data['entities'] or graph_data['relationships'] or graph_data['events']) if episode_analyses and has_graph and relevant_chunks: context = f"""다음은 질문에 답하기 위한 웹소설 정보입니다: {full_context} 위 정보를 참고하여 답변해주세요: - 먼저 GraphRAG 데이터(엔티티, 관계, 사건)를 확인하여 등장인물, 장소, 인물 간의 관계, 주요 사건을 파악하세요. - 그 다음 회차별 분석 내용을 이해하여 각 회차의 주요 스토리, 등장 인물, 인물 관계 변화를 파악하세요. - 마지막으로 구체적인 내용(Child Chunk)을 통해 질문에 대한 정확한 답변을 제공하세요. - 웹소설의 맥락과 스토리를 고려하여 일관성 있는 답변을 작성하세요. 중요: 질문에 답변할 때는 반드시 제공된 [소설 본문] 내의 내용을 근거로 해야 합니다. 답변의 각 문장 끝에는 참고한 본문의 문장을 [근거: "문장 내용..."] 형식으로 반드시 붙이세요. 근거를 찾을 수 없다면 "내용을 찾을 수 없습니다"라고 답하고 지어내지 마세요. 질문: """ elif episode_analyses and relevant_chunks: context = f"""다음은 질문에 답하기 위한 웹소설 정보입니다: {full_context} 위 정보를 참고하여 답변해주세요: - 먼저 회차별 분석 내용을 이해하여 각 회차의 주요 스토리, 등장 인물, 인물 관계 변화를 파악하세요. - 그 다음 구체적인 내용(Child Chunk)을 통해 질문에 대한 정확한 답변을 제공하세요. - 웹소설의 맥락과 스토리를 고려하여 일관성 있는 답변을 작성하세요. 중요: 질문에 답변할 때는 반드시 제공된 [소설 본문] 내의 내용을 근거로 해야 합니다. 답변의 각 문장 끝에는 참고한 본문의 문장을 [근거: "문장 내용..."] 형식으로 반드시 붙이세요. 근거를 찾을 수 없다면 "내용을 찾을 수 없습니다"라고 답하고 지어내지 마세요. 질문: """ elif has_graph and relevant_chunks: context = f"""다음은 질문에 답하기 위한 웹소설 정보입니다: {full_context} 위 정보를 참고하여 답변해주세요: - 먼저 GraphRAG 데이터(엔티티, 관계, 사건)를 확인하여 등장인물, 장소, 인물 간의 관계, 주요 사건을 파악하세요. - 그 다음 구체적인 내용(Child Chunk)을 통해 질문에 대한 정확한 답변을 제공하세요. - 웹소설의 맥락과 스토리를 고려하여 일관성 있는 답변을 작성하세요. 중요: 질문에 답변할 때는 반드시 제공된 [소설 본문] 내의 내용을 근거로 해야 합니다. 답변의 각 문장 끝에는 참고한 본문의 문장을 [근거: "문장 내용..."] 형식으로 반드시 붙이세요. 근거를 찾을 수 없다면 "내용을 찾을 수 없습니다"라고 답하고 지어내지 마세요. 질문: """ elif episode_analyses and has_graph: # 회차별 분석과 GraphRAG만 있는 경우 context = f"""다음은 웹소설의 회차별 상세 분석 및 GraphRAG 데이터입니다: {full_context} 위 정보를 참고하여 질문에 답변해주세요: - GraphRAG 데이터(엔티티, 관계, 사건)를 확인하여 등장인물, 장소, 인물 간의 관계, 주요 사건을 파악하세요. - 회차별 분석 내용을 이해하여 각 회차의 주요 스토리, 등장 인물, 인물 관계 변화를 고려하여 답변하세요. 중요: 질문에 답변할 때는 반드시 제공된 [소설 본문] 내의 내용을 근거로 해야 합니다. 답변의 각 문장 끝에는 참고한 본문의 문장을 [근거: "문장 내용..."] 형식으로 반드시 붙이세요. 근거를 찾을 수 없다면 "내용을 찾을 수 없습니다"라고 답하고 지어내지 마세요. 질문: """ elif episode_analyses: # 회차별 분석만 있는 경우 context = f"""다음은 웹소설의 회차별 상세 분석 내용입니다: {full_context} 위 정보를 참고하여 질문에 답변해주세요. 각 회차의 주요 스토리, 등장 인물, 인물 관계 변화를 고려하여 답변하세요. 중요: 질문에 답변할 때는 반드시 제공된 [소설 본문] 내의 내용을 근거로 해야 합니다. 답변의 각 문장 끝에는 참고한 본문의 문장을 [근거: "문장 내용..."] 형식으로 반드시 붙이세요. 근거를 찾을 수 없다면 "내용을 찾을 수 없습니다"라고 답하고 지어내지 마세요. 질문: """ elif has_graph: # GraphRAG만 있는 경우 context = f"""다음은 질문과 관련된 GraphRAG 데이터입니다 (엔티티, 관계, 사건 정보): {full_context} 위 정보를 참고하여 질문에 답변해주세요: - GraphRAG 데이터를 확인하여 등장인물, 장소, 인물 간의 관계, 주요 사건을 파악하세요. - 웹소설의 맥락과 스토리를 고려하여 일관성 있는 답변을 작성하세요. 중요: 질문에 답변할 때는 반드시 제공된 [소설 본문] 내의 내용을 근거로 해야 합니다. 답변의 각 문장 끝에는 참고한 본문의 문장을 [근거: "문장 내용..."] 형식으로 반드시 붙이세요. 근거를 찾을 수 없다면 "내용을 찾을 수 없습니다"라고 답하고 지어내지 마세요. 질문: """ else: # Child Chunk만 있는 경우 context = f"""다음은 질문과 관련된 웹소설의 구체적인 내용입니다: {full_context} 위 내용을 충분히 참고하여 다음 질문에 정확하고 상세하게 답변해주세요. 웹소설의 맥락과 스토리를 고려하여 답변해주세요. 중요: 질문에 답변할 때는 반드시 제공된 [소설 본문] 내의 내용을 근거로 해야 합니다. 답변의 각 문장 끝에는 참고한 본문의 문장을 [근거: "문장 내용..."] 형식으로 반드시 붙이세요. 근거를 찾을 수 없다면 "내용을 찾을 수 없습니다"라고 답하고 지어내지 마세요. 질문: """ context += message graph_info = f", GraphRAG: {len(graph_data['entities']) if graph_data else 0}개 엔티티, {len(graph_data['relationships']) if graph_data else 0}개 관계, {len(graph_data['events']) if graph_data else 0}개 사건" if graph_data else "" print(f"[RAG 검색] 최종 컨텍스트 생성 완료 (회차별 분석: {len(episode_analyses)}개{graph_info}, Child Chunk: {len(relevant_chunks)}개, 총 {len(context)}자)") else: # RAG 검색 결과가 없으면 기존 방식 사용 print(f"[RAG 검색] 관련 청크를 찾지 못했습니다. 전체 파일 내용 사용") use_rag = False # RAG 검색 결과가 없거나 비활성화된 경우 기존 방식 사용 if not context and not use_rag: if file_ids: # 선택한 파일 ID와 이어서 업로드된 파일들도 포함 expanded_file_ids = list(file_ids) for file_id in file_ids: # 원본 파일인 경우 이어서 업로드된 파일들도 포함 child_files = UploadedFile.query.filter_by(parent_file_id=file_id).all() expanded_file_ids.extend([child.id for child in child_files]) uploaded_files = UploadedFile.query.filter( UploadedFile.id.in_(expanded_file_ids), UploadedFile.model_name == analysis_model ).all() print(f"[파일 사용] 선택된 파일 ID로 조회 (이어서 업로드 포함): {len(uploaded_files)}개 파일") else: # 파일 ID가 없으면 해당 모델의 모든 파일 사용 (원본 및 이어서 업로드 포함) uploaded_files = UploadedFile.query.filter_by(model_name=analysis_model).all() print(f"[파일 사용] 모델 '{analysis_model}'의 모든 파일 사용: {len(uploaded_files)}개 파일") if uploaded_files: print(f"[파일 사용] 사용되는 파일 목록:") for f in uploaded_files: is_child = f.parent_file_id is not None prefix = " └─ " if is_child else " - " print(f"{prefix}{f.original_filename} (모델: {f.model_name})") context_parts = [] for file in uploaded_files: try: if os.path.exists(file.file_path): encoding = 'utf-8' try: with open(file.file_path, 'r', encoding=encoding) as f: file_content = f.read() except UnicodeDecodeError: with open(file.file_path, 'r', encoding='cp949') as f: file_content = f.read() # 파일 내용이 너무 길면 일부만 사용 (최대 20000자로 증가) if len(file_content) > 20000: file_content = file_content[:20000] + "..." context_parts.append(f"[{file.original_filename}]\n{file_content}") except Exception as e: print(f"파일 읽기 오류 ({file.original_filename}): {str(e)}") continue if context_parts: context = "\n\n".join(context_parts) context = f"""다음은 학습된 웹소설 내용입니다: {context} 위 내용을 참고하여 다음 질문에 답변해주세요. 중요: 질문에 답변할 때는 반드시 제공된 [소설 본문] 내의 내용을 근거로 해야 합니다. 답변의 각 문장 끝에는 참고한 본문의 문장을 [근거: "문장 내용..."] 형식으로 반드시 붙이세요. 근거를 찾을 수 없다면 "내용을 찾을 수 없습니다"라고 답하고 지어내지 마세요. 질문: """ # 시스템 프롬프트 가져오기 system_prompt = SystemConfig.get_config('system_prompt', '').strip() # 프롬프트 구성 (시스템 프롬프트 + 컨텍스트 + 사용자 메시지) prompt_parts = [] if system_prompt: prompt_parts.append(system_prompt) if context: prompt_parts.append(context) prompt_parts.append(message) full_prompt = "\n\n".join(prompt_parts) if system_prompt: print(f"[프롬프트] 시스템 프롬프트 적용: {len(system_prompt)}자") # 최종 답변 생성은 answer_model 사용 if not answer_model: return jsonify({'error': '답변용 모델이 선택되지 않았습니다.'}), 400 # 모델 타입 확인 (Gemini 또는 Ollama) is_gemini = answer_model.startswith('gemini:') # 토큰 정보 변수 초기화 gemini_input_tokens = None gemini_output_tokens = None gemini_model_used = None ollama_input_tokens = None ollama_output_tokens = None ollama_model_used = None print(f"[최종 답변 생성] 답변 모델: {answer_model}, 프롬프트 길이: {len(full_prompt)}자") if is_gemini: # Gemini API 호출 gemini_model_name = answer_model.replace('gemini:', '') print(f"[Gemini] 모델: {gemini_model_name}, 질문: {message[:50]}...") gemini_client = get_gemini_client() if not gemini_client.is_configured(): return jsonify({'error': 'Gemini API 키가 설정되지 않았습니다. GEMINI_API_KEY 환경 변수를 설정하세요.'}), 500 result = gemini_client.generate_response( prompt=full_prompt, model_name=gemini_model_name, temperature=0.7, max_output_tokens=get_model_token_limit(gemini_model_name or "gemini-1.5-flash", 8192) # 저장된 토큰 수 사용 ) if result['error']: return jsonify({'error': result['error']}), 500 response_text = result.get('response', '').strip() if not response_text: print(f"[채팅] Gemini 응답이 비어있습니다. result: {result}") response_text = '응답을 생성할 수 없었습니다. 다시 시도해주세요.' # 토큰 정보 추출 gemini_input_tokens = result.get('input_tokens') gemini_output_tokens = result.get('output_tokens') gemini_model_used = gemini_model_name else: # Ollama API 호출 # Ollama 서버 연결 확인 try: # 간단한 연결 테스트 test_response = requests.get(f'{OLLAMA_BASE_URL}/api/tags', timeout=5) if test_response.status_code != 200: return jsonify({'error': f'Ollama 서버에 연결할 수 없습니다. (상태 코드: {test_response.status_code}) Ollama가 실행 중인지 확인하세요. Ollama URL: {OLLAMA_BASE_URL}'}), 503 except requests.exceptions.ConnectionError: return jsonify({'error': f'Ollama 서버에 연결할 수 없습니다. Ollama가 실행 중인지 확인하세요. Ollama URL: {OLLAMA_BASE_URL}'}), 503 except Exception as e: return jsonify({'error': f'Ollama 서버 연결 확인 중 오류가 발생했습니다: {str(e)}. Ollama URL: {OLLAMA_BASE_URL}'}), 503 # 입력 토큰 수를 num_ctx로 사용 num_ctx = get_model_token_limit_by_type(answer_model, 100000, 'input') num_predict = get_model_token_limit_by_type(answer_model, 8192, 'output') ollama_response = requests.post( f'{OLLAMA_BASE_URL}/api/generate', json={ 'model': answer_model, # 답변 모델 사용 'prompt': full_prompt, 'stream': False, 'options': { 'num_ctx': num_ctx, # 입력 토큰 수를 컨텍스트 윈도우로 사용 'num_predict': num_predict # 출력 토큰 수 } }, timeout=120 # 파일이 많을 수 있으므로 타임아웃 증가 ) if ollama_response.status_code != 200: # 오류 상세 정보 가져오기 try: error_detail = ollama_response.json().get('error', ollama_response.text[:200]) except: error_detail = ollama_response.text[:200] if ollama_response.text else '상세 정보 없음' if ollama_response.status_code == 404: error_msg = f'모델 "{answer_model}"을(를) 찾을 수 없습니다. 모델이 Ollama에 설치되어 있는지 확인하세요. (오류: {error_detail})' else: error_msg = f'Ollama 서버 오류: {ollama_response.status_code} (오류: {error_detail})' return jsonify({'error': error_msg}), ollama_response.status_code ollama_data = ollama_response.json() response_text = ollama_data.get('response', '').strip() if not response_text: print(f"[채팅] Ollama 응답이 비어있습니다. ollama_data: {ollama_data}") response_text = '응답을 생성할 수 없었습니다. 다시 시도해주세요.' # Ollama 토큰 정보 추출 ollama_input_tokens = None ollama_output_tokens = None if 'prompt_eval_count' in ollama_data: ollama_input_tokens = ollama_data.get('prompt_eval_count') if 'eval_count' in ollama_data: ollama_output_tokens = ollama_data.get('eval_count') ollama_model_used = answer_model if ollama_input_tokens or ollama_output_tokens: print(f"[Ollama] 토큰 사용량: 입력={ollama_input_tokens}, 출력={ollama_output_tokens}") # 대화 세션에 메시지 저장 (Gemini와 Ollama 공통) session_id = data.get('session_id') session_dict = None if session_id: try: session = ChatSession.query.filter_by( id=session_id, user_id=current_user.id ).first() if session: # 사용자 메시지가 이미 저장되어 있는지 확인 (중복 방지) # 가장 최근 메시지를 확인하여 중복 저장 방지 latest_user_msg = ChatMessage.query.filter_by( session_id=session_id, role='user' ).order_by(ChatMessage.created_at.desc()).first() # 최근 10초 이내에 같은 내용의 메시지가 없으면 저장 should_save = True if latest_user_msg: time_diff = (datetime.utcnow() - latest_user_msg.created_at).total_seconds() if latest_user_msg.content == message and time_diff < 10: should_save = False print(f"[중복 방지] 최근 {time_diff:.2f}초 전에 같은 메시지가 저장되어 있습니다. 저장을 건너뜁니다.") if should_save: user_msg = ChatMessage( session_id=session_id, role='user', content=message ) db.session.add(user_msg) print(f"[메시지 저장] 사용자 메시지 저장: {message[:50]}...") # 세션 제목 업데이트 (첫 사용자 메시지인 경우) title_needs_update = ( not session.title or session.title.strip() == '' or session.title == '새 대화' ) if title_needs_update and message.strip(): # 메시지 내용을 제목으로 사용 (최대 30자) title = message.strip()[:30] if len(message.strip()) > 30: title += '...' session.title = title print(f"[세션 제목] 업데이트: '{title}' (원본 길이: {len(message.strip())}자)") elif title_needs_update: print(f"[세션 제목] 메시지가 비어있어 제목을 업데이트하지 않습니다.") else: print(f"[메시지 저장] 중복 메시지로 인해 저장을 건너뜁니다.") # AI 응답 저장 (토큰 정보 포함) # 토큰 정보 설정 (Gemini 또는 Ollama) input_tokens = gemini_input_tokens if is_gemini else ollama_input_tokens output_tokens = gemini_output_tokens if is_gemini else ollama_output_tokens model_used = gemini_model_used if is_gemini else ollama_model_used ai_msg = ChatMessage( session_id=session_id, role='ai', content=response_text, input_tokens=input_tokens, output_tokens=output_tokens, model_name=model_used ) db.session.add(ai_msg) if input_tokens or output_tokens: print(f"[메시지 저장] AI 메시지 저장 (모델: {model_used}, 입력 토큰: {input_tokens}, 출력 토큰: {output_tokens})") # 세션 모델 정보 업데이트 (첫 메시지인 경우 또는 변경된 경우) if not session.analysis_model or session.analysis_model != analysis_model: session.analysis_model = analysis_model if not session.answer_model or session.answer_model != answer_model: session.answer_model = answer_model # 하위 호환성을 위해 model_name도 업데이트 if not session.model_name: session.model_name = answer_model or analysis_model session.updated_at = datetime.utcnow() db.session.commit() # 세션 정보를 응답에 포함 (제목 업데이트 반영) session_dict = session.to_dict() except Exception as e: print(f"메시지 저장 오류: {str(e)}") db.session.rollback() session_dict = None # 응답이 비어있으면 기본 메시지 사용 if not response_text or not response_text.strip(): print(f"[채팅] 최종 응답이 비어있습니다. 기본 메시지를 사용합니다.") response_text = '응답을 생성할 수 없었습니다. 다시 시도해주세요.' print(f"[채팅] 최종 응답 길이: {len(response_text)}자, 미리보기: {response_text[:100]}...") response_data = {'response': response_text, 'session_id': session_id} if session_dict: response_data['session'] = session_dict return jsonify(response_data) except requests.exceptions.ConnectionError as e: error_msg = f'Ollama 서버에 연결할 수 없습니다. Ollama가 실행 중인지 확인하세요. (URL: {OLLAMA_BASE_URL})' print(f"[채팅] Ollama 연결 오류: {str(e)}") return jsonify({'error': error_msg}), 503 except requests.exceptions.Timeout: return jsonify({'error': '응답 시간이 초과되었습니다. 더 짧은 메시지를 시도해보세요.'}), 504 except Exception as e: error_msg = f'Ollama 통신 중 오류가 발생했습니다: {str(e)}' print(f"[채팅] Ollama 통신 오류 상세: {str(e)}") import traceback traceback.print_exc() return jsonify({'error': error_msg}), 500 else: # 모델이 선택되지 않은 경우 기본 응답 response_text = f"안녕하세요! '{message}'에 대한 답변을 준비 중입니다.\n\n좌측 하단에서 로컬 AI 모델을 선택하면 더 정확한 답변을 제공할 수 있습니다." return jsonify({'response': response_text}) except Exception as e: return jsonify({'error': f'채팅 처리 중 오류가 발생했습니다: {str(e)}'}), 500 @main_bp.route('/api/upload', methods=['POST']) @login_required def upload_file(): """웹소설 파일 업로드""" import sys import traceback # 모든 출력을 즉시 플러시하여 로그가 바로 보이도록 def log_print(*args, **kwargs): from datetime import datetime timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3] print(f"[{timestamp}]", *args, **kwargs) sys.stdout.flush() try: log_print(f"\n{'='*60}") log_print(f"=== 파일 업로드 요청 시작 ===") log_print(f"요청 URL: {request.url}") log_print(f"요청 메서드: {request.method}") log_print(f"Content-Type: {request.content_type}") log_print(f"Content-Length: {request.content_length}") log_print(f"Remote Address: {request.remote_addr}") log_print(f"Headers: {dict(request.headers)}") log_print(f"Form 데이터 키: {list(request.form.keys())}") log_print(f"Files 키: {list(request.files.keys())}") log_print(f"사용자: {current_user.username if current_user and current_user.is_authenticated else 'None'}") log_print(f"사용자 인증 상태: {current_user.is_authenticated if current_user else False}") log_print(f"{'='*60}\n") # 업로드 폴더 확인 및 생성 try: ensure_upload_folder() log_print(f"[1/8] 업로드 폴더 확인 완료: {UPLOAD_FOLDER}") except Exception as e: error_msg = f'업로드 폴더를 준비할 수 없습니다: {str(e)}' log_print(f"[ERROR] {error_msg}") traceback.print_exc() return jsonify({'error': error_msg, 'step': 'folder_check'}), 500 if 'file' not in request.files: error_msg = '파일이 없습니다.' log_print(f"[ERROR] {error_msg}") log_print(f"사용 가능한 키: {list(request.files.keys())}") return jsonify({'error': error_msg, 'step': 'file_check'}), 400 file = request.files['file'] model_name = request.form.get('model_name', '').strip() parent_file_id = request.form.get('parent_file_id', None) # 이어서 업로드할 경우 원본 파일 ID log_print(f"[2/8] 파일 수신: {file.filename if file else 'None'}") log_print(f"[2/8] 모델명: {model_name if model_name else 'None (비어있음)'}") log_print(f"[2/8] 이어서 업로드: {parent_file_id if parent_file_id else '아니오'}") if file.filename == '': error_msg = '파일명이 없습니다.' log_print(f"[ERROR] {error_msg}") return jsonify({'error': error_msg, 'step': 'filename_check'}), 400 # 모델명 검증 if not model_name: error_msg = 'AI 모델을 선택해주세요.' log_print(f"[ERROR] {error_msg}") return jsonify({'error': error_msg, 'step': 'model_check'}), 400 # parent_file_id 검증 (이어서 업로드인 경우) parent_file = None if parent_file_id: try: parent_file_id = int(parent_file_id) parent_file = UploadedFile.query.filter_by( id=parent_file_id, uploaded_by=current_user.id ).first() if not parent_file: error_msg = '원본 파일을 찾을 수 없습니다.' log_print(f"[ERROR] {error_msg}") return jsonify({'error': error_msg, 'step': 'parent_file_check'}), 404 # 같은 모델인지 확인 if parent_file.model_name != model_name: error_msg = '같은 모델의 파일에만 이어서 업로드할 수 있습니다.' log_print(f"[ERROR] {error_msg}") return jsonify({'error': error_msg, 'step': 'model_mismatch'}), 400 log_print(f"[이어서 업로드] 원본 파일: {parent_file.original_filename} (ID: {parent_file_id})") except (ValueError, TypeError): parent_file_id = None log_print(f"[경고] 잘못된 parent_file_id: {parent_file_id}") log_print(f"[3/8] 업로드 시도: {file.filename}, 모델: {model_name}") if not allowed_file(file.filename): error_msg = f'허용되지 않은 파일 형식입니다. 허용 형식: {", ".join(ALLOWED_EXTENSIONS)}' log_print(f"[ERROR] {error_msg}") return jsonify({'error': error_msg, 'step': 'file_type_check'}), 400 log_print(f"[4/8] 파일 형식 확인 완료: {file.filename}") # 파일 크기 확인 (Content-Length 헤더 사용) file_size = 0 try: # Content-Length 헤더 확인 if request.content_length: file_size = request.content_length print(f"Content-Length로 파일 크기 확인: {file_size} bytes") else: # Content-Length가 없으면 파일 스트림에서 크기 확인 시도 try: # 파일 스트림의 현재 위치 저장 current_pos = file.tell() # 파일 끝으로 이동 file.seek(0, os.SEEK_END) file_size = file.tell() # 원래 위치로 복원 file.seek(current_pos, os.SEEK_SET) print(f"파일 스트림으로 크기 확인: {file_size} bytes") except (AttributeError, IOError, OSError) as e: print(f"파일 크기 확인 실패 (저장 후 확인): {str(e)}") file_size = 0 # 저장 후 확인하도록 0으로 설정 except Exception as e: print(f"파일 크기 확인 오류: {str(e)}") file_size = 0 # 저장 후 확인하도록 0으로 설정 # 파일 크기 사전 체크 (가능한 경우에만) if file_size > 0: if file_size > 100 * 1024 * 1024: # 100MB print(f"파일 크기 초과: {file_size} bytes") return jsonify({'error': '파일 크기가 너무 큽니다. 최대 100MB까지 업로드 가능합니다.'}), 400 if file_size == 0: print("빈 파일 업로드 시도") return jsonify({'error': '빈 파일은 업로드할 수 없습니다.'}), 400 # 안전한 파일명 생성 original_filename = file.filename filename = secure_filename(original_filename) if not filename: return jsonify({'error': '유효하지 않은 파일명입니다.'}), 400 unique_filename = f"{uuid.uuid4().hex}_{filename}" file_path = os.path.join(UPLOAD_FOLDER, unique_filename) # 파일 저장 try: log_print(f"[6/8] 파일 저장 시도: {file_path}") file.save(file_path) log_print(f"[6/8] 파일 저장 완료: {file_path}") except IOError as e: error_msg = f'파일 저장 중 오류가 발생했습니다: {str(e)}' log_print(f"[ERROR] 파일 저장 IOError: {error_msg}") traceback.print_exc() return jsonify({'error': error_msg, 'step': 'file_save'}), 500 except PermissionError as e: error_msg = f'파일 저장 권한 오류: {str(e)}' log_print(f"[ERROR] 파일 저장 PermissionError: {error_msg}") traceback.print_exc() return jsonify({'error': error_msg, 'step': 'file_save_permission'}), 500 except Exception as e: error_msg = f'파일 저장 실패: {str(e)}' log_print(f"[ERROR] 파일 저장 Exception: {error_msg}") traceback.print_exc() return jsonify({'error': error_msg, 'step': 'file_save'}), 500 # 저장된 파일 크기 확인 if not os.path.exists(file_path): error_msg = '파일이 저장되지 않았습니다.' print(f"파일 존재 확인 실패: {file_path}") return jsonify({'error': error_msg}), 500 saved_file_size = os.path.getsize(file_path) if saved_file_size == 0: os.remove(file_path) # 빈 파일 삭제 error_msg = '파일이 제대로 저장되지 않았습니다.' print(f"빈 파일 삭제: {file_path}") return jsonify({'error': error_msg}), 500 print(f"저장된 파일 크기: {saved_file_size} bytes") # 데이터베이스에 저장 try: log_print(f"[7/8] 데이터베이스 저장 시도: {original_filename}") uploaded_file = UploadedFile( filename=unique_filename, original_filename=original_filename, file_path=file_path, file_size=saved_file_size, model_name=model_name, # 이미 검증됨 is_public=False, # 기본값: 미공개 uploaded_by=current_user.id, parent_file_id=parent_file_id if parent_file else None # 이어서 업로드인 경우 ) db.session.add(uploaded_file) db.session.flush() # ID를 얻기 위해 flush log_print(f"[7/8] 데이터베이스 flush 완료, 파일 ID: {uploaded_file.id}") # 파일 저장만 완료 (청크 생성은 별도 API로 처리) db.session.commit() log_print(f"[8/8] 데이터베이스 커밋 완료: {original_filename}") log_print(f"[8/8] 연결된 모델: {model_name}") log_print(f"{'='*60}") log_print(f"=== 파일 업로드 완료 (처리 대기 중) ===") log_print(f"{'='*60}\n") log_print(f"[8/8] 업로드 완료 - 파일: {original_filename}, 모델: {model_name}, 크기: {saved_file_size} bytes") log_print(f"[8/8] 다음 단계: Parent Chunk 생성, Chunk 생성, 회차 분석, Graph Extraction을 별도로 진행합니다.") # 회차 수 계산 (섹션 분할 결과 기반) - 파일 읽기 필요 episode_count = 0 if original_filename.lower().endswith(('.txt', '.md')): try: encoding = 'utf-8' try: with open(file_path, 'r', encoding=encoding) as f: content = f.read() except UnicodeDecodeError: with open(file_path, 'r', encoding='cp949') as f: content = f.read() sections = split_content_by_episodes(content) # '#작품설명'을 제외한 회차 수 episode_sections = [s for s in sections if s[0] != '작품설명'] episode_count = len(episode_sections) log_print(f"[8/8] 회차 수 계산: {episode_count}개 회차") except Exception as e: log_print(f"[8/8] 회차 수 계산 오류: {str(e)}") episode_count = 0 return jsonify({ 'message': f'파일이 성공적으로 업로드되었습니다. (모델: {model_name})', 'file': uploaded_file.to_dict(), 'model_name': model_name, 'file_id': uploaded_file.id, 'episode_count': episode_count, # 회차 수 추가 'needs_processing': original_filename.lower().endswith(('.txt', '.md')) # 처리 필요 여부 }), 200 except Exception as e: db.session.rollback() error_msg = f'데이터베이스 저장 중 오류가 발생했습니다: {str(e)}' log_print(f"[ERROR] 데이터베이스 저장 오류: {error_msg}") traceback.print_exc() # 데이터베이스 저장 실패 시 파일도 삭제 if 'file_path' in locals() and os.path.exists(file_path): try: os.remove(file_path) log_print(f"오류로 인한 파일 삭제: {file_path}") except Exception as del_e: log_print(f"파일 삭제 실패: {str(del_e)}") return jsonify({'error': error_msg, 'step': 'database_save'}), 500 except Exception as e: db.session.rollback() error_msg = str(e) error_type = type(e).__name__ log_print(f"\n{'='*60}") log_print(f"=== 업로드 처리 중 예외 발생 ===") log_print(f"예외 타입: {error_type}") log_print(f"에러 메시지: {error_msg}") traceback.print_exc() log_print(f"{'='*60}\n") # 파일 크기 초과 오류 처리 if '413' in error_msg or 'Request Entity Too Large' in error_msg or error_type == 'RequestEntityTooLarge': return jsonify({'error': '파일 크기가 너무 큽니다. 최대 100MB까지 업로드 가능합니다.', 'step': 'file_size'}), 413 return jsonify({'error': f'파일 업로드 중 오류가 발생했습니다: {error_type}: {error_msg}', 'step': 'exception'}), 500 @main_bp.route('/api/files', methods=['GET']) @login_required def get_files(): """업로드된 파일 목록 조회""" try: model_name = request.args.get('model_name', None) public_only = request.args.get('public_only', 'false').lower() == 'true' # 공개 파일만 조회 옵션 # 원본 파일만 조회 (parent_file_id가 None인 파일) # 관리자가 아닌 경우 공개 파일만 조회, 관리자는 모든 파일 조회 가능 if public_only or (not current_user.is_admin): query = UploadedFile.query.filter_by(parent_file_id=None, is_public=True) print(f"[파일 조회] 공개 파일만 조회 (사용자: {current_user.username}, 관리자: {current_user.is_admin})") else: query = UploadedFile.query.filter_by(parent_file_id=None) print(f"[파일 조회] 모든 파일 조회 (사용자: {current_user.username}, 관리자: {current_user.is_admin})") # 모델 필터링 전 전체 파일 수 확인 total_before_filter = query.count() print(f"[파일 조회] 필터링 전 파일 수: {total_before_filter}개") if model_name: query = query.filter_by(model_name=model_name) print(f"[파일 조회] 모델 '{model_name}' 필터링") files = query.order_by(UploadedFile.uploaded_at.desc()).all() # 필터링 후 파일 수와 모델명 확인 print(f"[파일 조회] 필터링 후 파일 수: {len(files)}개") if len(files) > 0: print(f"[파일 조회] 첫 번째 파일 모델명: {files[0].model_name}") else: # 필터링 결과가 없을 때 실제 존재하는 모델명 확인 all_files = UploadedFile.query.filter_by(parent_file_id=None).all() unique_models = set(f.model_name for f in all_files if f.model_name) print(f"[파일 조회] 데이터베이스에 존재하는 모델명 목록: {list(unique_models)}") # 각 원본 파일에 대해 이어서 업로드된 파일도 포함 files_with_children = [] for file in files: file_dict = file.to_dict() # 청크 개수 추가 chunk_count = DocumentChunk.query.filter_by(file_id=file.id).count() file_dict['chunk_count'] = chunk_count # Parent Chunk 존재 여부 확인 has_parent_chunk = ParentChunk.query.filter_by(file_id=file.id).first() is not None file_dict['has_parent_chunk'] = has_parent_chunk # 이어서 업로드된 파일들도 조회 child_files = UploadedFile.query.filter_by(parent_file_id=file.id).order_by(UploadedFile.uploaded_at.asc()).all() child_files_dict = [] for child in child_files: child_dict = child.to_dict() child_chunk_count = DocumentChunk.query.filter_by(file_id=child.id).count() child_dict['chunk_count'] = child_chunk_count # Child 파일도 Parent Chunk 확인 child_has_parent_chunk = ParentChunk.query.filter_by(file_id=child.id).first() is not None child_dict['has_parent_chunk'] = child_has_parent_chunk child_files_dict.append(child_dict) file_dict['child_files'] = child_files_dict files_with_children.append(file_dict) # 모델별 통계 정보 추가 (원본 파일만 카운트, 공개 파일만) model_stats = {} if not model_name: # 모든 모델의 통계 (원본 파일만, 공개 파일만) if public_only or (not current_user.is_admin): all_files = UploadedFile.query.filter_by(parent_file_id=None, is_public=True).all() else: all_files = UploadedFile.query.filter_by(parent_file_id=None).all() for file in all_files: model = file.model_name or '미지정' if model not in model_stats: model_stats[model] = {'count': 0, 'total_size': 0} model_stats[model]['count'] += 1 model_stats[model]['total_size'] += file.file_size else: # 특정 모델의 통계 model_stats[model_name] = { 'count': len(files), 'total_size': sum(f.file_size for f in files) } print(f"[파일 조회] 조회된 원본 파일 수: {len(files)}개") return jsonify({ 'files': files_with_children, 'model_stats': model_stats, 'filtered_model': model_name }), 200 except Exception as e: return jsonify({'error': f'파일 목록 조회 중 오류가 발생했습니다: {str(e)}'}), 500 @main_bp.route('/api/files//chunks', methods=['GET']) @login_required def get_file_chunks(file_id): """파일의 청크 정보 조회 (학습 상태 확인용)""" try: file = UploadedFile.query.filter_by(id=file_id, uploaded_by=current_user.id).first() if not file: return jsonify({'error': '파일을 찾을 수 없습니다.'}), 404 chunks = DocumentChunk.query.filter_by(file_id=file_id).order_by(DocumentChunk.chunk_index.asc()).all() total_chunks = len(chunks) # 샘플 청크 (처음 3개) sample_chunks = [] for chunk in chunks[:3]: sample_chunks.append({ 'index': chunk.chunk_index, 'content_preview': chunk.content[:100] + '...' if len(chunk.content) > 100 else chunk.content, 'content_length': len(chunk.content) }) return jsonify({ 'file_id': file_id, 'filename': file.original_filename, 'model_name': file.model_name, 'total_chunks': total_chunks, 'sample_chunks': sample_chunks, 'learning_status': 'ready' if total_chunks > 0 else 'not_ready', 'message': f'{total_chunks}개 청크가 저장되어 RAG 검색에 사용 가능합니다.' if total_chunks > 0 else '청크가 생성되지 않아 RAG 검색이 불가능합니다.' }), 200 except Exception as e: return jsonify({'error': f'청크 정보 조회 중 오류가 발생했습니다: {str(e)}'}), 500 @main_bp.route('/api/files//chunks/all', methods=['GET']) @login_required def get_all_file_chunks(file_id): """파일의 모든 청크 목록과 내용 조회 (관리자용)""" try: # 관리자는 모든 파일 조회 가능 if current_user.is_admin: file = UploadedFile.query.get(file_id) else: file = UploadedFile.query.filter_by(id=file_id, uploaded_by=current_user.id).first() if not file: return jsonify({'error': '파일을 찾을 수 없습니다.'}), 404 chunks = DocumentChunk.query.filter_by(file_id=file_id).order_by(DocumentChunk.chunk_index.asc()).all() chunks_data = [] for chunk in chunks: chunk_dict = { 'id': chunk.id, 'chunk_index': chunk.chunk_index, 'content': chunk.content, 'content_length': len(chunk.content), 'created_at': chunk.created_at.isoformat() if chunk.created_at else None } # 메타데이터 파싱 if chunk.chunk_metadata: try: metadata = json.loads(chunk.chunk_metadata) chunk_dict['metadata'] = metadata except: chunk_dict['metadata'] = None else: chunk_dict['metadata'] = None chunks_data.append(chunk_dict) return jsonify({ 'file_id': file_id, 'filename': file.original_filename, 'model_name': file.model_name, 'total_chunks': len(chunks_data), 'chunks': chunks_data }), 200 except Exception as e: return jsonify({'error': f'청크 목록 조회 중 오류가 발생했습니다: {str(e)}'}), 500 @main_bp.route('/api/files//summary', methods=['GET']) @login_required def get_file_summary(file_id): """파일의 요약 내용 조회 (Parent Chunk + Episode Analysis)""" try: print(f"[요약 조회] 파일 ID {file_id} 요약 내용 조회 요청 (사용자: {current_user.username})") # 모든 사용자가 모든 파일 조회 가능 (관리자 페이지와 동일) file = UploadedFile.query.get(file_id) if not file: print(f"[요약 조회] 파일을 찾을 수 없음: 파일 ID {file_id}") # 디버깅: 전체 파일 목록 확인 all_files = UploadedFile.query.all() print(f"[요약 조회] 데이터베이스에 존재하는 파일 ID 목록: {[f.id for f in all_files]}") return jsonify({'error': f'파일을 찾을 수 없습니다. (파일 ID: {file_id})'}), 404 parent_chunk = ParentChunk.query.filter_by(file_id=file_id).first() episode_analysis = EpisodeAnalysis.query.filter_by(file_id=file_id).first() return jsonify({ 'file_id': file_id, 'filename': file.original_filename, 'parent_chunk': parent_chunk.to_dict() if parent_chunk else None, 'episode_analysis': episode_analysis.to_dict() if episode_analysis else None, 'has_parent_chunk': parent_chunk is not None, 'has_episode_analysis': episode_analysis is not None }), 200 except Exception as e: return jsonify({'error': f'요약 내용 조회 중 오류가 발생했습니다: {str(e)}'}), 500 @main_bp.route('/api/files//graph', methods=['GET']) @login_required def get_file_graph(file_id): """파일의 GraphRAG 데이터 조회 (엔티티, 관계, 사건)""" try: print(f"[GraphRAG 조회] 파일 ID {file_id} GraphRAG 데이터 조회 요청 (사용자: {current_user.username})") file = UploadedFile.query.get(file_id) if not file: print(f"[GraphRAG 조회] 파일을 찾을 수 없음: 파일 ID {file_id}") return jsonify({'error': f'파일을 찾을 수 없습니다. (파일 ID: {file_id})'}), 404 # 엔티티 조회 (회차별로 그룹화) entities = GraphEntity.query.filter_by(file_id=file_id).all() entities_by_episode = {} for entity in entities: episode = entity.episode_title if episode not in entities_by_episode: entities_by_episode[episode] = {'characters': [], 'locations': []} if entity.entity_type == 'character': entities_by_episode[episode]['characters'].append(entity.to_dict()) elif entity.entity_type == 'location': entities_by_episode[episode]['locations'].append(entity.to_dict()) # 관계 조회 (회차별로 그룹화) relationships = GraphRelationship.query.filter_by(file_id=file_id).all() relationships_by_episode = {} for rel in relationships: episode = rel.episode_title if episode not in relationships_by_episode: relationships_by_episode[episode] = [] relationships_by_episode[episode].append(rel.to_dict()) # 사건 조회 (회차별로 그룹화) events = GraphEvent.query.filter_by(file_id=file_id).all() events_by_episode = {} for event in events: episode = event.episode_title if episode not in events_by_episode: events_by_episode[episode] = [] events_by_episode[episode].append(event.to_dict()) # 통계 정보 total_entities = len(entities) total_relationships = len(relationships) total_events = len(events) episodes = list(set([e.episode_title for e in entities] + [r.episode_title for r in relationships] + [ev.episode_title for ev in events])) return jsonify({ 'file_id': file_id, 'filename': file.original_filename, 'statistics': { 'total_entities': total_entities, 'total_relationships': total_relationships, 'total_events': total_events, 'episodes_count': len(episodes) }, 'entities_by_episode': entities_by_episode, 'relationships_by_episode': relationships_by_episode, 'events_by_episode': events_by_episode, 'episodes': sorted(episodes) }), 200 except Exception as e: print(f"[GraphRAG 조회] 오류: {str(e)}") import traceback traceback.print_exc() return jsonify({'error': f'GraphRAG 데이터 조회 중 오류가 발생했습니다: {str(e)}'}), 500 @main_bp.route('/api/files//parent-chunk', methods=['GET']) @login_required def get_file_parent_chunk(file_id): """파일의 Parent Chunk 조회""" try: file = UploadedFile.query.filter_by(id=file_id, uploaded_by=current_user.id).first() if not file: return jsonify({'error': '파일을 찾을 수 없습니다.'}), 404 parent_chunk = ParentChunk.query.filter_by(file_id=file_id).first() if not parent_chunk: return jsonify({ 'file_id': file_id, 'filename': file.original_filename, 'has_parent_chunk': False, 'message': 'Parent Chunk가 생성되지 않았습니다.' }), 200 return jsonify({ 'file_id': file_id, 'filename': file.original_filename, 'has_parent_chunk': True, 'parent_chunk': parent_chunk.to_dict(), 'message': 'Parent Chunk가 존재합니다.' }), 200 except Exception as e: return jsonify({'error': f'Parent Chunk 조회 중 오류가 발생했습니다: {str(e)}'}), 500 @main_bp.route('/api/files//parent-chunk', methods=['POST']) @login_required def create_file_parent_chunk(file_id): """파일의 Parent Chunk 수동 생성 (재생성)""" try: file = UploadedFile.query.filter_by(id=file_id, uploaded_by=current_user.id).first() if not file: return jsonify({'error': '파일을 찾을 수 없습니다.'}), 404 # 모델명 확인 if not file.model_name: return jsonify({'error': '파일에 연결된 AI 모델이 없습니다. Parent Chunk를 생성할 수 없습니다.'}), 400 # 파일이 텍스트 파일인지 확인 if not file.original_filename.lower().endswith(('.txt', '.md')): return jsonify({'error': 'Parent Chunk는 텍스트 파일(.txt, .md)에만 생성할 수 있습니다.'}), 400 # 파일 경로 확인 if not file.file_path or not os.path.exists(file.file_path): error_msg = f'파일 경로가 유효하지 않습니다: {file.file_path}' print(f"[Parent Chunk 생성] ❌ 오류: {error_msg}") return jsonify({'error': error_msg}), 500 # 파일 내용 읽기 try: encoding = 'utf-8' try: with open(file.file_path, 'r', encoding=encoding) as f: content = f.read() except UnicodeDecodeError: with open(file.file_path, 'r', encoding='cp949') as f: content = f.read() except FileNotFoundError: error_msg = f'파일을 찾을 수 없습니다: {file.file_path}' print(f"[Parent Chunk 생성] ❌ 오류: {error_msg}") return jsonify({'error': error_msg}), 500 except PermissionError: error_msg = f'파일 읽기 권한이 없습니다: {file.file_path}' print(f"[Parent Chunk 생성] ❌ 오류: {error_msg}") return jsonify({'error': error_msg}), 500 except Exception as e: error_msg = f'파일을 읽을 수 없습니다: {str(e)}' print(f"[Parent Chunk 생성] ❌ 오류: {error_msg}") import traceback traceback.print_exc() return jsonify({'error': error_msg}), 500 if not content or len(content.strip()) == 0: return jsonify({'error': '파일 내용이 비어있습니다.'}), 400 # Parent Chunk 생성 print(f"[Parent Chunk 수동 생성] 파일 ID {file_id}에 대한 Parent Chunk 생성 시작") print(f"[Parent Chunk 수동 생성] 모델명: {file.model_name}") print(f"[Parent Chunk 수동 생성] 파일명: {file.original_filename}") parent_chunk = create_parent_chunk_with_ai(file_id, content, file.model_name) if parent_chunk: return jsonify({ 'file_id': file_id, 'filename': file.original_filename, 'has_parent_chunk': True, 'parent_chunk': parent_chunk.to_dict(), 'message': 'Parent Chunk가 성공적으로 생성되었습니다.' }), 200 else: return jsonify({ 'error': 'Parent Chunk 생성에 실패했습니다. 서버 로그를 확인하세요.', 'file_id': file_id, 'filename': file.original_filename }), 500 except Exception as e: import traceback error_traceback = traceback.format_exc() error_msg = str(e) print(f"[Parent Chunk 생성] ❌ 예외 발생: {error_msg}") print(f"[Parent Chunk 생성] Traceback:\n{error_traceback}") return jsonify({ 'error': f'Parent Chunk 생성 중 오류가 발생했습니다: {error_msg}', 'file_id': file_id }), 500 @main_bp.route('/api/files//process/parent-chunk', methods=['POST']) @login_required def process_parent_chunk(file_id): """단계 1: Parent Chunk 생성""" return create_file_parent_chunk(file_id) @main_bp.route('/api/files//process/chunks', methods=['POST']) @login_required def process_chunks(file_id): """단계 2: Chunk 생성 (회차 분석, Graph Extraction 제외)""" try: file = UploadedFile.query.filter_by(id=file_id, uploaded_by=current_user.id).first() if not file: return jsonify({'error': '파일을 찾을 수 없습니다.'}), 404 if not file.original_filename.lower().endswith(('.txt', '.md')): return jsonify({'error': 'Chunk는 텍스트 파일(.txt, .md)에만 생성할 수 있습니다.'}), 400 # 파일 내용 읽기 try: encoding = 'utf-8' try: with open(file.file_path, 'r', encoding=encoding) as f: content = f.read() except UnicodeDecodeError: with open(file.file_path, 'r', encoding='cp949') as f: content = f.read() except Exception as e: return jsonify({'error': f'파일을 읽을 수 없습니다: {str(e)}'}), 500 print(f"[단계 2: Chunk 생성] 파일 ID {file_id}에 대한 Chunk 생성 시작") chunk_count = create_chunks_for_file(file_id, content, skip_episode_analysis=True, skip_graph_extraction=True) return jsonify({ 'file_id': file_id, 'filename': file.original_filename, 'chunk_count': chunk_count, 'message': f'Chunk {chunk_count}개가 성공적으로 생성되었습니다.', 'step': 'chunks', 'completed': True }), 200 except Exception as e: return jsonify({'error': f'Chunk 생성 중 오류가 발생했습니다: {str(e)}', 'step': 'chunks'}), 500 @main_bp.route('/api/files//process/episode-analysis', methods=['POST']) @login_required def process_episode_analysis(file_id): """단계 3: 회차 분석""" try: file = UploadedFile.query.filter_by(id=file_id, uploaded_by=current_user.id).first() if not file: return jsonify({'error': '파일을 찾을 수 없습니다.'}), 404 if not file.model_name: return jsonify({'error': '파일에 연결된 AI 모델이 없습니다.'}), 400 if not file.original_filename.lower().endswith(('.txt', '.md')): return jsonify({'error': '회차 분석은 텍스트 파일(.txt, .md)에만 가능합니다.'}), 400 # 파일 내용 읽기 try: encoding = 'utf-8' try: with open(file.file_path, 'r', encoding=encoding) as f: content = f.read() except UnicodeDecodeError: with open(file.file_path, 'r', encoding='cp949') as f: content = f.read() except Exception as e: return jsonify({'error': f'파일을 읽을 수 없습니다: {str(e)}'}), 500 # 섹션 분할 sections = split_content_by_episodes(content) episode_sections = [s for s in sections if s[0] != '작품설명'] if not episode_sections: return jsonify({'error': '분석할 회차가 없습니다.'}), 400 # Parent Chunk 가져오기 parent_chunk = None try: parent_chunk = ParentChunk.query.filter_by(file_id=file_id).first() except: pass # 기존 회차 분석 삭제 existing_analyses = EpisodeAnalysis.query.filter_by(file_id=file_id).all() if existing_analyses: for analysis in existing_analyses: db.session.delete(analysis) db.session.commit() print(f"[단계 3: 회차 분석] 파일 ID {file_id}에 대한 회차 분석 시작 ({len(episode_sections)}개 회차)") # 각 회차 분석 all_analyses = [] for section_type, section_title, section_content, section_metadata in episode_sections: try: print(f"[단계 3: 회차 분석] '{section_title}' 분석 중...") analysis_result = analyze_episode( episode_content=section_content, episode_title=section_title, full_content=content, parent_chunk=parent_chunk, model_name=file.model_name ) if analysis_result: all_analyses.append(f"\n\n{analysis_result}") print(f"[단계 3: 회차 분석] '{section_title}' 분석 완료") except Exception as e: print(f"[단계 3: 회차 분석] '{section_title}' 분석 중 오류: {str(e)}") continue # 모든 회차 분석 결과를 하나의 텍스트로 저장 if all_analyses: combined_analysis = "\n".join(all_analyses).strip() episode_analysis = EpisodeAnalysis( file_id=file_id, episode_title="전체 회차 통합 분석", analysis_content=combined_analysis ) db.session.add(episode_analysis) db.session.commit() # 회차 분석 성공 후 Graph Extraction 자동 실행 print(f"[단계 3: 회차 분석] Graph Extraction 자동 실행 시작...") graph_success_count = 0 for section_type, section_title, section_content, section_metadata in episode_sections: try: print(f"[단계 3: 회차 분석] '{section_title}' Graph Extraction 중...") success = extract_graph_from_episode( episode_content=section_content, episode_title=section_title, file_id=file_id, full_content=content, parent_chunk=parent_chunk, model_name=file.model_name ) if success: graph_success_count += 1 print(f"[단계 3: 회차 분석] '{section_title}' Graph Extraction 완료") except Exception as e: print(f"[단계 3: 회차 분석] '{section_title}' Graph Extraction 중 오류: {str(e)}") continue print(f"[단계 3: 회차 분석] Graph Extraction 완료: {graph_success_count}/{len(episode_sections)}개 회차 성공") return jsonify({ 'file_id': file_id, 'filename': file.original_filename, 'episode_count': len(episode_sections), 'graph_success_count': graph_success_count, 'message': f'{len(episode_sections)}개 회차 분석이 완료되었습니다. (Graph Extraction: {graph_success_count}/{len(episode_sections)}개 성공)', 'step': 'episode-analysis', 'completed': True }), 200 else: return jsonify({ 'error': '회차 분석 결과가 없습니다.', 'step': 'episode-analysis', 'completed': False }), 500 except Exception as e: db.session.rollback() return jsonify({'error': f'회차 분석 중 오류가 발생했습니다: {str(e)}', 'step': 'episode-analysis'}), 500 @main_bp.route('/api/files//process/graph', methods=['POST']) @login_required def process_graph(file_id): """단계 4: Graph Extraction""" try: file = UploadedFile.query.filter_by(id=file_id, uploaded_by=current_user.id).first() if not file: return jsonify({'error': '파일을 찾을 수 없습니다.'}), 404 if not file.model_name: return jsonify({'error': '파일에 연결된 AI 모델이 없습니다.'}), 400 if not file.original_filename.lower().endswith(('.txt', '.md')): return jsonify({'error': 'Graph Extraction은 텍스트 파일(.txt, .md)에만 가능합니다.'}), 400 # 파일 내용 읽기 try: encoding = 'utf-8' try: with open(file.file_path, 'r', encoding=encoding) as f: content = f.read() except UnicodeDecodeError: with open(file.file_path, 'r', encoding='cp949') as f: content = f.read() except Exception as e: return jsonify({'error': f'파일을 읽을 수 없습니다: {str(e)}'}), 500 # 섹션 분할 sections = split_content_by_episodes(content) episode_sections = [s for s in sections if s[0] != '작품설명'] if not episode_sections: return jsonify({'error': 'Graph Extraction할 회차가 없습니다.'}), 400 # Parent Chunk 가져오기 parent_chunk = None try: parent_chunk = ParentChunk.query.filter_by(file_id=file_id).first() except: pass print(f"[단계 4: Graph Extraction] 파일 ID {file_id}에 대한 Graph Extraction 시작 ({len(episode_sections)}개 회차)") # 각 회차 Graph Extraction success_count = 0 for section_type, section_title, section_content, section_metadata in episode_sections: try: print(f"[단계 4: Graph Extraction] '{section_title}' Graph Extraction 중...") success = extract_graph_from_episode( episode_content=section_content, episode_title=section_title, file_id=file_id, full_content=content, parent_chunk=parent_chunk, model_name=file.model_name ) if success: success_count += 1 print(f"[단계 4: Graph Extraction] '{section_title}' Graph Extraction 완료") except Exception as e: print(f"[단계 4: Graph Extraction] '{section_title}' Graph Extraction 중 오류: {str(e)}") continue return jsonify({ 'file_id': file_id, 'filename': file.original_filename, 'episode_count': len(episode_sections), 'success_count': success_count, 'message': f'{success_count}/{len(episode_sections)}개 회차 Graph Extraction이 완료되었습니다.', 'step': 'graph', 'completed': True }), 200 except Exception as e: return jsonify({'error': f'Graph Extraction 중 오류가 발생했습니다: {str(e)}', 'step': 'graph'}), 500 @main_bp.route('/api/files//metadata', methods=['POST']) @login_required def create_file_metadata(file_id): """파일의 모든 청크에 메타데이터 생성 (수동 생성)""" try: file = UploadedFile.query.get_or_404(file_id) # 권한 확인 if not current_user.is_admin and file.uploaded_by != current_user.id: return jsonify({'error': '권한이 없습니다.'}), 403 # 모델명 확인 if not file.model_name: return jsonify({'error': '파일에 연결된 AI 모델이 없습니다. 메타데이터를 생성할 수 없습니다.'}), 400 # 텍스트 파일만 가능 if not file.original_filename.lower().endswith(('.txt', '.md')): return jsonify({'error': '메타데이터는 텍스트 파일(.txt, .md)에만 생성할 수 있습니다.'}), 400 # 파일 내용 읽기 encoding = 'utf-8' try: with open(file.file_path, 'r', encoding=encoding) as f: content = f.read() except UnicodeDecodeError: with open(file.file_path, 'r', encoding='cp949') as f: content = f.read() # 모든 청크 가져오기 chunks = DocumentChunk.query.filter_by(file_id=file_id).order_by(DocumentChunk.chunk_index).all() if not chunks: return jsonify({'error': '청크가 없습니다. 먼저 파일을 업로드하세요.'}), 400 print(f"[메타데이터 생성] 파일 ID {file_id}에 대한 메타데이터 생성 시작") print(f"[메타데이터 생성] 모델명: {file.model_name}") print(f"[메타데이터 생성] 파일명: {file.original_filename}") print(f"[메타데이터 생성] 청크 개수: {len(chunks)}개") # 각 청크에 메타데이터 생성 success_count = 0 fail_count = 0 for chunk in chunks: try: # 기존 메타데이터 읽기 existing_metadata = {} if chunk.chunk_metadata: try: existing_metadata = json.loads(chunk.chunk_metadata) except: existing_metadata = {} # 새 메타데이터 추출 new_metadata = extract_chunk_metadata( chunk_content=chunk.content, full_content=content, # 원본 웹소설 전체 내용 참조 chunk_index=chunk.chunk_index, file_id=file_id, model_name=file.model_name ) # 기존 메타데이터와 새 메타데이터 병합 (새 메타데이터가 우선) # 기존 메타데이터의 모든 필드를 유지하되, 새로 추출한 필드로 업데이트 # chapter 필드는 파일 업로드 시 추가된 회차 정보이므로 유지 merged_metadata = existing_metadata.copy() for key, value in new_metadata.items(): if value is not None and value != []: # 리스트인 경우 중복 제거 후 병합 if isinstance(value, list) and isinstance(merged_metadata.get(key), list): merged_list = merged_metadata.get(key, []).copy() for item in value: if item not in merged_list: merged_list.append(item) merged_metadata[key] = merged_list else: merged_metadata[key] = value # 메타데이터를 JSON 문자열로 변환 metadata_json = json.dumps(merged_metadata, ensure_ascii=False) if merged_metadata else None # 청크에 메타데이터 저장 chunk.chunk_metadata = metadata_json success_count += 1 # 진행 상황 출력 (10개마다) if (success_count + fail_count) % 10 == 0: print(f"[메타데이터 생성] 진행 중: {success_count + fail_count}/{len(chunks)}개 청크 처리 중...") except Exception as e: print(f"[메타데이터 생성] 경고: 청크 {chunk.chunk_index} 메타데이터 생성 실패: {str(e)}") fail_count += 1 continue # 데이터베이스 커밋 db.session.commit() print(f"[메타데이터 생성] 완료: {success_count}개 성공, {fail_count}개 실패") return jsonify({ 'file_id': file_id, 'filename': file.original_filename, 'total_chunks': len(chunks), 'success_count': success_count, 'fail_count': fail_count, 'message': f'메타데이터 생성이 완료되었습니다. (성공: {success_count}개, 실패: {fail_count}개)' }), 200 except Exception as e: db.session.rollback() print(f"[메타데이터 생성] 오류: {str(e)}") import traceback traceback.print_exc() return jsonify({'error': f'메타데이터 생성 중 오류가 발생했습니다: {str(e)}'}), 500 @main_bp.route('/api/files/', methods=['DELETE']) @login_required def delete_file(file_id): """업로드된 파일 삭제 (연관된 모든 파일도 함께 삭제)""" try: file = UploadedFile.query.get_or_404(file_id) # 원본 파일인 경우 (parent_file_id가 None인 경우) # 이어서 업로드된 모든 파일도 함께 삭제 files_to_delete = [] if file.parent_file_id is None: # 원본 파일이면, 이어서 업로드된 모든 파일도 찾아서 삭제 child_files = UploadedFile.query.filter_by(parent_file_id=file_id).all() files_to_delete = [file] + child_files print(f"[파일 삭제] 원본 파일 삭제: {file.original_filename}, 연관 파일 {len(child_files)}개도 함께 삭제") else: # 이어서 업로드된 파일이면 원본 파일도 함께 삭제 parent_file = UploadedFile.query.get(file.parent_file_id) if parent_file: # 원본 파일과 모든 연관 파일 삭제 all_child_files = UploadedFile.query.filter_by(parent_file_id=file.parent_file_id).all() files_to_delete = [parent_file] + all_child_files print(f"[파일 삭제] 이어서 업로드된 파일 삭제: {file.original_filename}, 원본 및 연관 파일 {len(all_child_files)}개도 함께 삭제") else: files_to_delete = [file] deleted_count = 0 deleted_files = [] for file_to_delete in files_to_delete: try: # 파일 시스템에서 삭제 if os.path.exists(file_to_delete.file_path): os.remove(file_to_delete.file_path) print(f"[파일 삭제] 파일 시스템에서 삭제: {file_to_delete.file_path}") # 관련 Child Chunk (DocumentChunk) 삭제 child_chunk_count = DocumentChunk.query.filter_by(file_id=file_to_delete.id).count() if child_chunk_count > 0: DocumentChunk.query.filter_by(file_id=file_to_delete.id).delete() print(f"[파일 삭제] Child Chunk {child_chunk_count}개 삭제 완료") # 벡터 DB에서도 해당 파일의 청크 삭제 try: vector_db = get_vector_db() vector_db.delete_chunks_by_file_id(file_to_delete.id) print(f"[파일 삭제] 벡터 DB에서 청크 삭제 완료") except Exception as vector_e: print(f"[파일 삭제] 벡터 DB 삭제 오류 (무시): {str(vector_e)}") # 관련 Parent Chunk 삭제 parent_chunk = ParentChunk.query.filter_by(file_id=file_to_delete.id).first() if parent_chunk: db.session.delete(parent_chunk) print(f"[파일 삭제] Parent Chunk 삭제 완료") # 관련 EpisodeAnalysis 삭제 episode_analysis_count = EpisodeAnalysis.query.filter_by(file_id=file_to_delete.id).count() if episode_analysis_count > 0: EpisodeAnalysis.query.filter_by(file_id=file_to_delete.id).delete() print(f"[파일 삭제] EpisodeAnalysis {episode_analysis_count}개 삭제 완료") # 관련 GraphRAG 데이터 삭제 (GraphEntity, GraphRelationship, GraphEvent) graph_entity_count = GraphEntity.query.filter_by(file_id=file_to_delete.id).count() if graph_entity_count > 0: GraphEntity.query.filter_by(file_id=file_to_delete.id).delete() print(f"[파일 삭제] GraphEntity {graph_entity_count}개 삭제 완료") graph_relationship_count = GraphRelationship.query.filter_by(file_id=file_to_delete.id).count() if graph_relationship_count > 0: GraphRelationship.query.filter_by(file_id=file_to_delete.id).delete() print(f"[파일 삭제] GraphRelationship {graph_relationship_count}개 삭제 완료") graph_event_count = GraphEvent.query.filter_by(file_id=file_to_delete.id).count() if graph_event_count > 0: GraphEvent.query.filter_by(file_id=file_to_delete.id).delete() print(f"[파일 삭제] GraphEvent {graph_event_count}개 삭제 완료") deleted_files.append(file_to_delete.original_filename) db.session.delete(file_to_delete) deleted_count += 1 print(f"[파일 삭제] 데이터베이스에서 파일 삭제 완료: {file_to_delete.original_filename}") except Exception as e: print(f"[파일 삭제 오류] {file_to_delete.original_filename}: {str(e)}") import traceback traceback.print_exc() db.session.commit() message = f'파일이 성공적으로 삭제되었습니다.' if deleted_count > 1: message = f'파일 {deleted_count}개가 성공적으로 삭제되었습니다. (원본 및 연관 파일 포함)' return jsonify({ 'message': message, 'deleted_count': deleted_count, 'deleted_files': deleted_files }), 200 except Exception as e: db.session.rollback() return jsonify({'error': f'파일 삭제 중 오류가 발생했습니다: {str(e)}'}), 500 @main_bp.route('/api/files//public', methods=['PUT']) @login_required @admin_required def toggle_file_public(file_id): """파일 공개 여부 변경 (관리자만 가능)""" try: file = UploadedFile.query.get_or_404(file_id) data = request.get_json() is_public = data.get('is_public', False) file.is_public = is_public db.session.commit() return jsonify({ 'message': f'파일이 {"공개" if is_public else "비공개"}로 설정되었습니다.', 'file': file.to_dict() }), 200 except Exception as e: db.session.rollback() return jsonify({'error': f'파일 공개 여부 변경 중 오류가 발생했습니다: {str(e)}'}), 500 @main_bp.route('/api/files//content', methods=['GET']) @login_required def get_file_content(file_id): """업로드된 파일 내용 조회""" try: file = UploadedFile.query.get_or_404(file_id) if not os.path.exists(file.file_path): return jsonify({'error': '파일을 찾을 수 없습니다.'}), 404 # 텍스트 파일 읽기 encoding = 'utf-8' try: with open(file.file_path, 'r', encoding=encoding) as f: content = f.read() except UnicodeDecodeError: # UTF-8로 읽을 수 없으면 다른 인코딩 시도 with open(file.file_path, 'r', encoding='cp949') as f: content = f.read() return jsonify({ 'content': content, 'filename': file.original_filename }), 200 except Exception as e: return jsonify({'error': f'파일 내용 조회 중 오류가 발생했습니다: {str(e)}'}), 500 @main_bp.route('/api/chat/sessions', methods=['GET']) @login_required def get_chat_sessions(): """사용자의 대화 세션 목록 조회 (최근 20개만 표시)""" try: sessions = ChatSession.query.filter_by(user_id=current_user.id)\ .order_by(ChatSession.updated_at.desc())\ .limit(20).all() return jsonify({ 'sessions': [session.to_dict() for session in sessions] }), 200 except Exception as e: return jsonify({'error': f'대화 세션 조회 중 오류가 발생했습니다: {str(e)}'}), 500 @main_bp.route('/api/chat/sessions', methods=['POST']) @login_required def create_chat_session(): """새 대화 세션 생성""" try: data = request.json title = data.get('title', '새 대화') model_name = data.get('model_name', None) # 하위 호환성 analysis_model = data.get('analysis_model', None) answer_model = data.get('answer_model', None) session = ChatSession( user_id=current_user.id, title=title, model_name=model_name, # 하위 호환성 analysis_model=analysis_model, answer_model=answer_model ) db.session.add(session) db.session.commit() return jsonify({ 'message': '대화 세션이 생성되었습니다.', 'session': session.to_dict() }), 200 except Exception as e: db.session.rollback() return jsonify({'error': f'대화 세션 생성 중 오류가 발생했습니다: {str(e)}'}), 500 @main_bp.route('/api/chat/sessions/', methods=['GET']) @login_required def get_chat_session(session_id): """대화 세션 상세 조회 (메시지 포함)""" try: session = ChatSession.query.filter_by( id=session_id, user_id=current_user.id ).first_or_404() session_dict = session.to_dict() session_dict['messages'] = [msg.to_dict() for msg in session.messages] return jsonify({'session': session_dict}), 200 except Exception as e: return jsonify({'error': f'대화 세션 조회 중 오류가 발생했습니다: {str(e)}'}), 500 @main_bp.route('/api/chat/sessions/', methods=['PUT']) @login_required def update_chat_session(session_id): """대화 세션 수정 (제목 등)""" try: session = ChatSession.query.filter_by( id=session_id, user_id=current_user.id ).first_or_404() data = request.json if 'title' in data: session.title = data['title'] session.updated_at = datetime.utcnow() db.session.commit() return jsonify({ 'message': '대화 세션이 수정되었습니다.', 'session': session.to_dict() }), 200 except Exception as e: db.session.rollback() return jsonify({'error': f'대화 세션 수정 중 오류가 발생했습니다: {str(e)}'}), 500 @main_bp.route('/api/chat/sessions/', methods=['DELETE']) @login_required def delete_chat_session(session_id): """대화 세션 삭제""" try: session = ChatSession.query.filter_by( id=session_id, user_id=current_user.id ).first_or_404() db.session.delete(session) db.session.commit() return jsonify({'message': '대화 세션이 삭제되었습니다.'}), 200 except Exception as e: db.session.rollback() return jsonify({'error': f'대화 세션 삭제 중 오류가 발생했습니다: {str(e)}'}), 500 @main_bp.route('/api/chat/sessions//messages', methods=['POST']) @login_required def add_chat_message(session_id): """대화 메시지 추가""" try: session = ChatSession.query.filter_by( id=session_id, user_id=current_user.id ).first_or_404() data = request.json role = data.get('role', 'user') content = data.get('content', '') if not content: return jsonify({'error': '메시지 내용이 필요합니다.'}), 400 message = ChatMessage( session_id=session_id, role=role, content=content ) db.session.add(message) # 세션 제목 업데이트 (첫 사용자 메시지인 경우) if not session.title or session.title == '새 대화': if role == 'user': title = content[:30] + '...' if len(content) > 30 else content session.title = title session.updated_at = datetime.utcnow() db.session.commit() return jsonify({ 'message': '메시지가 추가되었습니다.', 'chat_message': message.to_dict() }), 200 except Exception as e: db.session.rollback() return jsonify({'error': f'메시지 추가 중 오류가 발생했습니다: {str(e)}'}), 500