import streamlit as st
import json
import os
import uuid
import glob
from datetime import datetime
import numpy as np
import platform
import networkx as nx
import plotly.graph_objects as go
from sklearn.metrics.pairwise import cosine_similarity
import plotly
import matplotlib.pyplot as plt
import matplotlib.font_manager as fm
from sklearn.manifold import TSNE
import warnings
warnings.filterwarnings('ignore')
# --- (이전 코드는 동일) ---
# 페이지 설정
st.set_page_config(
page_title="한국어 단어 의미 네트워크 시각화",
page_icon="🔤",
layout="wide"
)
# 폴더 경로 설정
DATA_FOLDER = 'data'
UPLOAD_FOLDER = 'uploads'
# 폴더 생성
if not os.path.exists(UPLOAD_FOLDER):
os.makedirs(UPLOAD_FOLDER)
# 세션 상태 초기화
if 'model' not in st.session_state:
st.session_state.model = None
if 'embeddings_cache' not in st.session_state:
st.session_state.embeddings_cache = {}
if 'graph_cache' not in st.session_state:
st.session_state.graph_cache = {}
if 'data_files' not in st.session_state:
st.session_state.data_files = {}
if 'selected_files' not in st.session_state:
st.session_state.selected_files = [] # 리스트로 초기화
if 'threshold' not in st.session_state:
st.session_state.threshold = 0.7
if 'generate_clicked' not in st.session_state:
st.session_state.generate_clicked = False
if 'fig' not in st.session_state:
st.session_state.fig = None
# --- (함수 정의 부분은 동일: set_korean_font, load_words_from_json, ...) ---
# --- 한글 폰트 설정 함수 ---
def set_korean_font():
"""
현재 운영체제에 맞는 한글 폰트를 matplotlib 및 Plotly용으로 설정 시도하고,
Plotly에서 사용할 폰트 이름을 반환합니다.
"""
system_name = platform.system()
plotly_font_name = None # Plotly에서 사용할 폰트 이름
# Matplotlib 폰트 설정
if system_name == "Windows":
font_name = "Malgun Gothic"
plotly_font_name = "Malgun Gothic"
elif system_name == "Darwin": # MacOS
font_name = "AppleGothic"
plotly_font_name = "AppleGothic"
elif system_name == "Linux":
# Linux에서 선호하는 한글 폰트 경로 또는 이름 설정
font_path = "/usr/share/fonts/truetype/nanum/NanumGothic.ttf"
plotly_font_name_linux = "NanumGothic" # Plotly는 폰트 '이름'을 주로 사용
if os.path.exists(font_path):
prop = fm.FontProperties(fname=font_path)
fm.fontManager.addfont(font_path) # 시스템에 폰트 추가 (필요할 수 있음)
font_name = prop.get_name()
plotly_font_name = plotly_font_name_linux
else:
# 시스템에서 'Nanum' 포함 폰트 찾기 시도
try:
available_fonts = [f.name for f in fm.fontManager.ttflist]
nanum_fonts = [name for name in available_fonts if 'Nanum' in name]
if nanum_fonts:
font_name = nanum_fonts[0]
# Plotly에서 사용할 이름도 비슷하게 설정 (정확한 이름은 시스템마다 다를 수 있음)
plotly_font_name = font_name if 'Nanum' in font_name else plotly_font_name_linux
else:
# 다른 OS 폰트 시도 (Linux에서 드물지만)
if "Malgun Gothic" in available_fonts:
font_name = "Malgun Gothic"
plotly_font_name = "Malgun Gothic"
elif "AppleGothic" in available_fonts:
font_name = "AppleGothic"
plotly_font_name = "AppleGothic"
else:
font_name = None
except Exception as e:
print(f"Linux font search error: {e}")
font_name = None
if not font_name:
font_name = None
plotly_font_name = None # Plotly도 기본값 사용
else: # 기타 OS
font_name = None
plotly_font_name = None
# Matplotlib 폰트 설정 적용
if font_name:
try:
plt.rc('font', family=font_name)
plt.rc('axes', unicode_minus=False)
print(f"Matplotlib font set to: {font_name}")
except Exception as e:
print(f"Failed to set Matplotlib font '{font_name}': {e}")
plt.rcdefaults()
plt.rc('axes', unicode_minus=False)
else:
print("No suitable Korean font found for Matplotlib. Using default.")
plt.rcdefaults()
plt.rc('axes', unicode_minus=False)
if not plotly_font_name:
plotly_font_name = 'sans-serif' # Plotly 기본값 지정
print(f"Plotly font name to use: {plotly_font_name}")
return plotly_font_name # Plotly에서 사용할 폰트 이름 반환
# --- 데이터 로드 함수 ---
def load_words_from_json(filepath):
""" JSON 파일에서 'word' 필드만 리스트로 로드합니다. """
try:
with open(filepath, 'r', encoding='utf-8') as f:
data = json.load(f)
# data가 리스트 형태라고 가정
if isinstance(data, list):
words = [item.get('word', '') for item in data if isinstance(item, dict) and item.get('word')] # dict 형태이고 'word' 키가 있는지 확인
# 빈 문자열 제거
words = [word for word in words if word]
if not words:
st.warning(f"경고: 파일 '{os.path.basename(filepath)}'에서 'word' 키를 가진 유효한 데이터를 찾을 수 없습니다.")
return None
return words
else:
st.error(f"오류: 파일 '{os.path.basename(filepath)}'의 최상위 형식이 리스트가 아닙니다.")
return None
except FileNotFoundError:
st.error(f"오류: 파일 '{filepath}'를 찾을 수 없습니다.")
return None
except json.JSONDecodeError as e:
st.error(f"오류: 파일 '{os.path.basename(filepath)}'의 JSON 형식이 잘못되었습니다. 오류: {e}")
return None
except Exception as e:
st.error(f"'{os.path.basename(filepath)}' 데이터 로딩 중 오류 발생: {e}")
return None
def scan_data_files():
"""데이터 폴더에서 사용 가능한 모든 JSON 파일을 스캔하고 정보를 반환합니다."""
data_files = {}
# 기본 데이터 폴더 스캔
try:
for file_path in glob.glob(os.path.join(DATA_FOLDER, '*.json')):
file_id = f"default_{os.path.basename(file_path)}" # 고유 ID 생성 방식 변경
file_name = os.path.basename(file_path)
words = load_words_from_json(file_path)
if words: # words가 None이 아니고 비어있지 않은 경우
data_files[file_id] = {
'path': file_path,
'name': file_name,
'word_count': len(words),
'type': 'default',
'sample_words': words[:5] # 샘플 단어 수 조정 가능
}
except Exception as e:
st.error(f"기본 데이터 폴더 스캔 중 오류: {e}")
# 업로드 폴더 스캔
try:
for file_path in glob.glob(os.path.join(UPLOAD_FOLDER, '*.json')):
file_id = f"uploaded_{os.path.basename(file_path)}" # 고유 ID 생성 방식 변경
file_name = os.path.basename(file_path)
words = load_words_from_json(file_path)
if words: # words가 None이 아니고 비어있지 않은 경우
data_files[file_id] = {
'path': file_path,
'name': file_name,
'word_count': len(words),
'type': 'uploaded',
'sample_words': words[:5] # 샘플 단어 수 조정 가능
}
except Exception as e:
st.error(f"업로드 폴더 스캔 중 오류: {e}")
return data_files
def merge_word_lists(file_ids):
"""선택된 파일들에서 단어를 로드하고 중복 제거하여 병합합니다."""
all_words = []
if not file_ids:
return []
# data_files 상태가 최신인지 확인 (업로드/삭제 후 필요할 수 있음)
current_data_files = st.session_state.get('data_files', {})
for file_id in file_ids:
if file_id in current_data_files:
file_path = current_data_files[file_id]['path']
words = load_words_from_json(file_path)
if words:
all_words.extend(words)
else:
st.warning(f"선택된 파일 ID '{file_id}'를 현재 파일 목록에서 찾을 수 없습니다. 목록을 새로고침합니다.")
# 파일 목록을 다시 스캔하고 재시도 (선택적)
st.session_state.data_files = scan_data_files()
if file_id in st.session_state.data_files:
words = load_words_from_json(st.session_state.data_files[file_id]['path'])
if words: all_words.extend(words)
else:
st.error(f"파일 '{file_id}'를 여전히 찾을 수 없습니다.")
# 중복 제거 및 정렬
unique_words = sorted(list(set(all_words)))
return unique_words
def encode_words(words, normalize=True):
"""단어 목록을 임베딩으로 변환합니다. (개선된 TF-IDF 스타일 임베딩)"""
if not words:
return np.array([])
embeddings = []
# 전체 단어에 나타나는 모든 고유 문자로 어휘 구성
unique_chars = set(char for word in words for char in word)
char_to_idx = {char: i for i, char in enumerate(sorted(list(unique_chars)))}
dim = len(char_to_idx)
if dim == 0: # 단어가 아예 없는 경우
return np.array([])
for word in words:
embed = np.zeros(dim)
word_len = len(word)
if word_len == 0: # 빈 문자열 처리
embeddings.append(embed)
continue
# TF (Term Frequency): 단어 내 문자 빈도
tf = {}
for char in word:
if char in char_to_idx:
tf[char] = tf.get(char, 0) + 1
for char, count in tf.items():
if char in char_to_idx:
# TF 계산 (여기서는 단순 빈도 사용, 필요시 log 스케일링 등 적용 가능)
embed[char_to_idx[char]] = count / word_len # 단어 길이로 정규화
# L2 정규화 (Cosine Similarity를 위해 유용)
if normalize:
norm = np.linalg.norm(embed)
if norm > 0:
embed = embed / norm
embeddings.append(embed)
return np.array(embeddings)
def generate_graph(file_ids, similarity_threshold=0.7):
"""여러 파일에서 단어를 로드하고 그래프를 생성합니다."""
if not file_ids:
st.error("그래프를 생성할 파일이 선택되지 않았습니다.")
return None
# 캐시 키 생성 (파일 ID 리스트와 임계값 조합, 순서 보장)
cache_key = f"{'-'.join(sorted(file_ids))}_{similarity_threshold}"
if cache_key in st.session_state.graph_cache:
# 캐시된 결과 반환
return st.session_state.graph_cache[cache_key]
# 한글 폰트 설정
plotly_font = set_korean_font()
# 선택된 파일들에서 단어 로드 및 병합
word_list = merge_word_lists(file_ids)
if not word_list:
st.error("선택된 파일에서 유효한 단어를 로드할 수 없습니다.")
return None
if len(word_list) < 2:
st.warning("그래프를 생성하려면 최소 2개 이상의 고유 단어가 필요합니다.")
return None
# 임베딩 생성
embeddings = None
with st.spinner('단어 임베딩 생성 중...'):
# 캐시 확인 (파일 ID 기반)
embedding_cache_key = '-'.join(sorted(file_ids))
if embedding_cache_key in st.session_state.embeddings_cache:
word_list_cached, embeddings = st.session_state.embeddings_cache[embedding_cache_key]
# 캐시된 단어 목록과 현재 단어 목록이 다르면 재생성
if sorted(word_list_cached) != sorted(word_list):
embeddings = encode_words(word_list, normalize=True)
st.session_state.embeddings_cache[embedding_cache_key] = (word_list, embeddings)
else:
embeddings = encode_words(word_list, normalize=True)
st.session_state.embeddings_cache[embedding_cache_key] = (word_list, embeddings)
if embeddings is None or embeddings.shape[0] == 0 or embeddings.shape[1] == 0:
st.error("단어 임베딩 생성에 실패했습니다.")
return None
# 3D 좌표 생성 - t-SNE 사용
embeddings_3d = None
with st.spinner('단어 좌표 계산 중 (t-SNE)...'):
# t-SNE 파라미터 설정 (데이터 크기에 따라 동적 조절)
n_samples = embeddings.shape[0]
# perplexity는 n_samples - 1 보다 작아야 함
effective_perplexity = min(30, max(5, n_samples - 1)) # 최소 5, 최대 30 또는 샘플수-1
# 반복 횟수
max_iter = max(250, min(1000, n_samples * 5)) # 샘플 수에 따라 조절하되 최소/최대값 설정
# 학습률
learning_rate = max(10, min(200, n_samples / 12)) if n_samples > 12 else 'auto' # 샘플 수 기반, 너무 작으면 auto
if n_samples <= 3: # t-SNE는 최소 4개 샘플 권장
st.warning(f"t-SNE는 최소 4개의 단어가 필요합니다 (현재 {n_samples}개). PCA를 사용합니다.")
from sklearn.decomposition import PCA
pca = PCA(n_components=min(3, n_samples), random_state=42) # 최대 3차원 또는 샘플 수
embeddings_3d_pca = pca.fit_transform(embeddings)
# 3차원으로 맞추기 (부족하면 0으로 채움)
embeddings_3d = np.zeros((n_samples, 3))
embeddings_3d[:, :embeddings_3d_pca.shape[1]] = embeddings_3d_pca
else:
try:
# max_iter 변수 동적 계산 및 할당
max_iter = max(250, min(1000, n_samples * 5)) # <--- 이 줄을 실제 코드로 추가/활성화
tsne = TSNE(n_components=3, random_state=42,
perplexity=effective_perplexity,
n_iter=max_iter, # 이제 정의된 max_iter 사용
init='pca',
learning_rate=learning_rate,
n_jobs=-1)
embeddings_3d = tsne.fit_transform(embeddings)
except Exception as e:
st.error(f"t-SNE 실행 중 오류 발생: {e}. PCA로 대체합니다.")
from sklearn.decomposition import PCA
pca = PCA(n_components=3, random_state=42)
embeddings_3d = pca.fit_transform(embeddings)
if embeddings_3d is None:
st.error("단어 좌표 생성에 실패했습니다.")
return None
# 유사도 계산 및 엣지 정의
edges = []
edge_weights = []
with st.spinner('단어 간 유사도 계산 및 연결(엣지) 생성 중...'):
# 유사도 행렬 계산
similarity_matrix = cosine_similarity(embeddings)
# 임계값 이상인 엣지만 추가
for i in range(n_samples):
for j in range(i + 1, n_samples): # 중복 및 자기 자신 연결 방지
similarity = similarity_matrix[i, j]
if similarity >= similarity_threshold: # 등호 포함 (임계값과 같아도 연결)
edges.append((word_list[i], word_list[j]))
edge_weights.append(similarity)
# NetworkX 그래프 생성
G = nx.Graph()
# 노드 추가 (단어와 3D 좌표)
for i, word in enumerate(word_list):
G.add_node(word, pos=(embeddings_3d[i, 0], embeddings_3d[i, 1], embeddings_3d[i, 2]))
# 엣지와 가중치 추가
for edge, weight in zip(edges, edge_weights):
# self-loop 방지 (이론상 위 로직에서 발생 안 함)
if edge[0] != edge[1]:
G.add_edge(edge[0], edge[1], weight=weight)
# Plotly 그래프 생성
edge_x, edge_y, edge_z = [], [], []
if G.number_of_edges() > 0:
for edge in G.edges():
try:
pos0 = G.nodes[edge[0]]['pos']
pos1 = G.nodes[edge[1]]['pos']
edge_x.extend([pos0[0], pos1[0], None]) # None은 선 끊기
edge_y.extend([pos0[1], pos1[1], None])
edge_z.extend([pos0[2], pos1[2], None])
except KeyError as e:
st.warning(f"엣지 생성 중 노드 키 오류: {e}. 해당 엣지를 건너<0xEB><0x84>니다.")
continue # 문제가 있는 엣지는 건너뜀
# 엣지 트레이스
edge_trace = go.Scatter3d(
x=edge_x, y=edge_y, z=edge_z,
mode='lines',
line=dict(width=1, color='#888'),
hoverinfo='none' # 엣지에는 호버 정보 없음
)
# 노드 좌표 및 텍스트 정보
node_x, node_y, node_z, node_text = [], [], [], []
node_adjacencies = [] # 연결 수 (degree)
node_hover_text = [] # 호버 텍스트
nodes_data = []
for node in G.nodes():
try:
pos = G.nodes[node]['pos']
degree = G.degree(node) # 노드의 연결 수 계산
nodes_data.append({
'x': pos[0], 'y': pos[1], 'z': pos[2],
'text': node,
'degree': degree,
'hover_text': f'{node}
연결 수: {degree}'
})
except KeyError:
st.warning(f"노드 '{node}' 처리 중 'pos' 키 오류. 해당 노드를 건너<0xEB><0x84>니다.")
continue # 위치 정보 없는 노드 건너뜀
# 노드 데이터가 있을 경우에만 처리
if nodes_data:
# 노드 크기를 연결 수에 따라 조절 (예시: 로그 스케일링)
degrees = np.array([data['degree'] for data in nodes_data])
# 로그 스케일링 적용 (0인 경우 대비 +1), 최대/최소 크기 제한
node_sizes = np.log1p(degrees) * 3 + 6 # 기본 크기 6, 연결 많을수록 커짐
node_sizes = np.clip(node_sizes, 5, 20) # 최소 5, 최대 20
# 노드 데이터 분리
node_x = [data['x'] for data in nodes_data]
node_y = [data['y'] for data in nodes_data]
node_z = [data['z'] for data in nodes_data]
node_text = [data['text'] for data in nodes_data]
node_hover_text = [data['hover_text'] for data in nodes_data]
# 노드 트레이스
node_trace = go.Scatter3d(
x=node_x, y=node_y, z=node_z,
mode='markers+text', # 마커와 텍스트 함께 표시
text=node_text, # 노드 위에 표시될 텍스트
hovertext=node_hover_text, # 마우스 올렸을 때 표시될 텍스트
hoverinfo='text', # 호버 시 hovertext만 표시
textposition='top center', # 텍스트 위치
textfont=dict(
size=10,
color='black',
family=plotly_font # 설정된 한글 폰트 사용
),
marker=dict(
size=node_sizes, # 연결 수에 따라 크기 조절된 리스트
color=node_z, # Z축 값으로 색상 매핑
colorscale='Viridis', # 색상 스케일
opacity=0.9,
colorbar=dict(thickness=15, title='Node Depth (Z)', xanchor='left', titleside='right')
)
)
else:
# 노드 데이터가 없으면 빈 트레이스 생성
node_trace = go.Scatter3d(x=[], y=[], z=[], mode='markers')
# 사용된 파일 이름 목록 생성
file_names_used = []
if 'data_files' in st.session_state:
file_names_used = [st.session_state.data_files[fid]['name'] for fid in file_ids if fid in st.session_state.data_files]
file_info_str = ", ".join(file_names_used) if file_names_used else "알 수 없음"
# 레이아웃 설정
layout = go.Layout(
title=dict(
text=f'어휘 의미 유사성 기반 3D 그래프
Threshold: {similarity_threshold:.2f} | 데이터: {file_info_str}',
font=dict(size=16, family=plotly_font),
x=0.5, # 제목 중앙 정렬
xanchor='center'
),
showlegend=False, # 범례 숨김
margin=dict(l=10, r=10, b=10, t=80), # 여백 조절 (제목 공간 확보)
scene=dict(
xaxis=dict(
title='TSNE-1', showticklabels=False, # 축 눈금 숨김
backgroundcolor="rgb(240, 240, 240)", gridcolor="white", zerolinecolor="white"
),
yaxis=dict(
title='TSNE-2', showticklabels=False,
backgroundcolor="rgb(240, 240, 240)", gridcolor="white", zerolinecolor="white"
),
zaxis=dict(
title='TSNE-3', showticklabels=False,
backgroundcolor="rgb(240, 240, 240)", gridcolor="white", zerolinecolor="white"
),
aspectratio=dict(x=1, y=1, z=0.8), # 가로세로비 조절
camera=dict(
eye=dict(x=1.2, y=1.2, z=0.8) # 초기 카메라 시점
)
),
# 호버 모드 설정 (가장 가까운 데이터 포인트 또는 통합)
hovermode='closest'
)
# Figure 객체 생성
fig = go.Figure(data=[edge_trace, node_trace], layout=layout)
# 결과 캐시 저장
st.session_state.graph_cache[cache_key] = fig
return fig
def handle_uploaded_file(uploaded_file):
"""업로드된 파일을 처리하고 데이터 파일 목록에 추가합니다."""
if uploaded_file is not None:
# 파일명 안전 처리 (uuid 사용 권장) 및 저장 경로
# original_name = uploaded_file.name
unique_id = str(uuid.uuid4()) # 고유 ID 생성
# file_extension = os.path.splitext(original_name)[1]
# file_name = f"{unique_id}{file_extension}" # 고유 ID로 파일명 생성
file_name = f"{unique_id}_{uploaded_file.name}" # 원본 이름 일부 포함 (선택적)
file_path = os.path.join(UPLOAD_FOLDER, file_name)
try:
# 파일 저장
with open(file_path, 'wb') as f:
f.write(uploaded_file.getbuffer())
st.info(f"파일 '{uploaded_file.name}' ({file_name}) 저장 완료. 내용 검증 중...")
# 업로드된 파일 검증 (단어 로드 시도)
words = load_words_from_json(file_path)
if words is None or not words : # 로드 실패 또는 빈 리스트
try:
os.remove(file_path) # 유효하지 않으면 파일 삭제
st.error(f"업로드된 파일 '{uploaded_file.name}'에서 유효한 'word' 데이터를 찾을 수 없습니다. 파일 형식(UTF-8 인코딩 JSON 배열, 각 객체에 'word' 키)을 확인해주세요. 파일이 삭제되었습니다.")
except OSError as e:
st.error(f"유효하지 않은 파일을 삭제하는 중 오류 발생: {e}")
return None # 실패 시 None 반환
st.success(f"파일 '{uploaded_file.name}' 검증 완료. {len(words)}개의 단어를 찾았습니다.")
# 데이터 파일 다시 스캔하여 새 파일 정보 포함 (세션 상태 업데이트)
st.session_state.data_files = scan_data_files()
# 새 파일에 해당하는 file_id 찾기 (scan_data_files에서 생성된 ID 사용)
new_file_id = f"uploaded_{file_name}" # scan_data_files와 동일한 로직으로 ID 생성
if new_file_id in st.session_state.data_files:
return new_file_id # 성공 시 파일 ID 반환
else:
st.error("파일 목록 업데이트 후에도 새 파일 ID를 찾지 못했습니다.")
return None
except Exception as e:
st.error(f"파일 업로드 및 처리 중 오류 발생: {e}")
# 오류 발생 시 업로드된 파일 삭제 시도
try:
if os.path.exists(file_path):
os.remove(file_path)
except OSError as del_e:
st.warning(f"오류 발생 후 파일 삭제 실패: {del_e}")
return None # 실패 시 None 반환
def delete_file(file_id):
"""파일을 삭제합니다."""
if file_id not in st.session_state.get('data_files', {}):
st.error('삭제할 파일을 찾을 수 없습니다.')
return False
file_info = st.session_state.data_files[file_id]
# 업로드된 파일만 삭제 허용
if file_info.get('type') != 'uploaded':
st.error('기본 데이터 파일은 삭제할 수 없습니다.')
return False
file_path = file_info.get('path')
file_name = file_info.get('name', '알 수 없음')
if not file_path:
st.error(f"파일 '{file_name}'의 경로 정보가 없습니다.")
return False
try:
# 파일 시스템에서 파일 삭제
if os.path.exists(file_path):
os.remove(file_path)
st.info(f"파일 시스템에서 '{file_name}' 삭제 완료.")
else:
st.warning(f"파일 시스템에 '{file_name}'({file_path})이(가) 이미 존재하지 않습니다.")
# 세션 상태에서 파일 정보 제거
del st.session_state.data_files[file_id]
# 관련 캐시 항목 삭제 (그래프, 임베딩)
keys_to_remove_graph = [k for k in st.session_state.graph_cache if file_id in k]
for key in keys_to_remove_graph:
del st.session_state.graph_cache[key]
keys_to_remove_embed = [k for k in st.session_state.embeddings_cache if file_id in k]
for key in keys_to_remove_embed:
del st.session_state.embeddings_cache[key]
# 현재 선택된 파일 목록에서도 제거
if file_id in st.session_state.selected_files:
st.session_state.selected_files.remove(file_id)
st.success(f"파일 '{file_name}' 관련 정보 및 캐시가 삭제되었습니다.")
return True
except Exception as e:
st.error(f"파일 삭제 중 오류 발생: {e}")
return False
def clear_cache():
"""그래프 및 임베딩 캐시를 초기화합니다."""
st.session_state.graph_cache = {}
st.session_state.embeddings_cache = {}
st.session_state.fig = None # 현재 표시중인 그래프도 초기화
st.success('그래프 및 임베딩 캐시가 초기화되었습니다.')
# st.experimental_rerun() # 캐시 클리어 후 UI 갱신
# --- 앱 실행 시작 ---
# 데이터 파일 스캔 (앱 시작 시 또는 필요 시)
if 'data_files' not in st.session_state or not st.session_state.data_files:
st.session_state.data_files = scan_data_files()
# 타이틀 및 소개
st.title('한국어 단어 의미 네트워크 시각화')
st.markdown("""
이 도구는 제공된 JSON 파일에서 한국어 단어 목록을 읽어들여, 단어 간의 의미적 유사성(여기서는 문자 구성 기반 유사성)을 계산하고,
그 관계를 인터랙티브한 3D 네트워크 그래프로 시각화합니다.
""")
# --- 사이드바 설정 ---
st.sidebar.title('⚙️ 설정 및 제어')
# 임계값 설정
threshold = st.sidebar.slider(
'유사도 임계값 (Similarity Threshold)',
min_value=0.1,
max_value=0.95, # 최대값 약간 늘림
value=st.session_state.threshold,
step=0.05,
help='이 값보다 유사도가 높은 단어들만 연결선(엣지)으로 이어집니다. 값이 높을수록 연결이 더 엄격해집니다.'
)
# 슬라이더 값이 변경되면 세션 상태 업데이트 (콜백 사용이 더 효율적일 수 있음)
if threshold != st.session_state.threshold:
st.session_state.threshold = threshold
st.session_state.fig = None # 임계값 변경 시 현재 그래프 초기화 (재생성 필요 알림)
st.session_state.generate_clicked = False # 클릭 상태도 리셋
st.sidebar.divider()
# 파일 업로드
st.sidebar.header('📄 파일 업로드')
uploaded_file = st.sidebar.file_uploader(
"JSON 파일 업로드",
type=['json'],
help="단어 목록이 포함된 JSON 파일을 업로드하세요. 형식: [{'word': '단어1'}, {'word': '단어2'}, ...]"
)
if uploaded_file is not None:
# 업로드 버튼 대신 파일이 있으면 바로 처리 시도 (사용자 경험 개선)
# if st.sidebar.button('업로드 처리', key='upload_button'): # 버튼 제거
with st.spinner("업로드된 파일 처리 중..."):
new_file_id = handle_uploaded_file(uploaded_file)
if new_file_id:
st.sidebar.success(f"파일 '{uploaded_file.name}' 업로드 및 처리 완료!")
# 새로 업로드된 파일을 자동으로 선택 목록에 추가하고 선택 상태로 만듦
if new_file_id not in st.session_state.selected_files:
st.session_state.selected_files.append(new_file_id)
# 스크립트 재실행하여 UI 업데이트
# st.experimental_rerun()
else:
# handle_uploaded_file 내부에서 오류 메시지 표시됨
pass
# 업로드 위젯 초기화를 위해 None 할당 (선택적)
# uploaded_file = None # 이렇게 하면 파일 선택 창이 다시 나타남, 필요에 따라 조절
st.sidebar.divider()
# 파일 선택 영역
st.sidebar.header('🗂️ 데이터 파일 선택')
if st.session_state.data_files:
# 사용할 파일 선택 체크박스
st.sidebar.markdown("**사용할 파일을 선택하세요 (다중 선택 가능):**")
# 선택 상태 관리를 위한 임시 리스트
selected_files_temp = []
# 파일 목록 정렬 (이름순)
sorted_file_ids = sorted(st.session_state.data_files.keys(), key=lambda fid: st.session_state.data_files[fid]['name'])
# 각 파일에 대한 체크박스 및 정보 표시
for file_id in sorted_file_ids:
if file_id not in st.session_state.data_files: continue # 삭제된 경우 건너뛰기
file_info = st.session_state.data_files[file_id]
file_label = f"{file_info['name']} ({file_info['word_count']} 단어)"
file_type_tag = "[기본]" if file_info['type'] == 'default' else "[업로드]"
label_full = f"{file_label} {file_type_tag}"
# 현재 파일이 선택되었는지 확인 (세션 상태 기준)
is_selected = file_id in st.session_state.selected_files
# 체크박스 생성
checkbox_key = f"cb_{file_id}" # 고유 키
# 체크박스 값 변경 시 콜백 사용 대신, 루프 후 비교 방식으로 처리
if st.sidebar.checkbox(label_full, value=is_selected, key=checkbox_key):
# 체크된 경우 임시 리스트에 추가
selected_files_temp.append(file_id)
# 샘플 단어 및 삭제 버튼 (업로드된 파일에만)
with st.sidebar.expander("파일 정보 보기", expanded=False):
st.markdown(f"**샘플 단어:** `{'`, `'.join(file_info['sample_words'])}`")
if file_info['type'] == 'uploaded':
delete_button_key = f"del_{file_id}"
if st.button('🗑️ 이 파일 삭제', key=delete_button_key, help=f"'{file_info['name']}' 파일을 영구적으로 삭제합니다."):
with st.spinner(f"'{file_info['name']}' 삭제 중..."):
if delete_file(file_id):
# 삭제 성공 시, selected_files_temp에서도 제거 (필수)
if file_id in selected_files_temp:
selected_files_temp.remove(file_id)
# data_files 상태가 변경되었으므로 재실행 필요
# st.experimental_rerun()
else:
st.error("파일 삭제에 실패했습니다.")
# st.sidebar.markdown("---") # 구분선 제거 또는 스타일 조정
# --- 중요: 선택 상태 업데이트 ---
# 현재 체크박스 상태(selected_files_temp)와 세션 상태(st.session_state.selected_files)가 다를 때만 업데이트
# 순서에 상관없이 비교하기 위해 정렬 후 비교
if sorted(selected_files_temp) != sorted(st.session_state.selected_files):
st.session_state.selected_files = selected_files_temp
st.session_state.fig = None # 파일 선택 변경 시 그래프 초기화
st.session_state.generate_clicked = False # 클릭 상태도 리셋
# 선택 변경 시 바로 재실행하여 UI 반영 (선택적이지만 사용자 경험 개선)
# st.experimental_rerun()
st.sidebar.divider()
# 그래프 생성 버튼
# 선택된 파일이 있을 때만 활성화
if st.session_state.selected_files:
if st.sidebar.button('📊 그래프 생성/업데이트', key='generate_button', type="primary"):
# 버튼 클릭 시, generate_clicked 플래그 설정
# 선택된 파일이 있는지 다시 한번 확인 (혹시 모를 동시성 문제 방지)
if st.session_state.selected_files:
st.session_state.generate_clicked = True
# 여기서 st.experimental_rerun() 호출 제거! 버튼 클릭 시 자동으로 재실행됨
else:
st.sidebar.warning('그래프를 생성할 파일을 먼저 선택해주세요.')
st.session_state.generate_clicked = False # 만약을 위해 리셋
else:
st.sidebar.warning('그래프를 생성하려면 최소 1개 이상의 파일을 선택해주세요.')
else:
st.sidebar.warning('사용 가능한 데이터 파일이 없습니다. 파일을 업로드하거나 `data` 폴더에 JSON 파일을 추가하세요.')
# 캐시 초기화 버튼 (항상 표시)
if st.sidebar.button('🔄 캐시 초기화', key='clear_cache_button'):
clear_cache()
# --- 메인 콘텐츠 영역 ---
st.header("📈 3D 단어 네트워크 시각화")
# 그래프 표시 로직
# 1. 선택된 파일이 있어야 함
# 2. '그래프 생성' 버튼이 클릭되었거나 (generate_clicked == True)
# 3. 이미 생성된 그래프가 세션 상태에 있어야 함 (st.session_state.fig is not None)
if st.session_state.selected_files:
# 그래프를 생성해야 하는 조건 : 버튼 클릭 플래그가 True 이거나, 임계값/파일선택 변경으로 fig가 None이 된 경우
should_generate_graph = st.session_state.generate_clicked or \
(st.session_state.fig is None and st.session_state.selected_files) # 파일 선택 후 fig가 없을 때
if should_generate_graph:
with st.spinner('그래프 생성 중... 잠시만 기다려주세요.'):
try:
# generate_graph 함수 호출
fig = generate_graph(st.session_state.selected_files, st.session_state.threshold)
# 성공적으로 생성되면 세션 상태에 저장
st.session_state.fig = fig
# 생성 완료 후 클릭 플래그 리셋
st.session_state.generate_clicked = False
except Exception as e:
st.error(f"그래프 생성 중 오류 발생: {e}")
st.session_state.fig = None # 오류 발생 시 fig 초기화
st.session_state.generate_clicked = False # 플래그 리셋
# 생성된 그래프가 세션 상태에 있으면 표시
if st.session_state.get('fig') is not None:
st.plotly_chart(st.session_state.fig, use_container_width=True)
# 현재 그래프 정보 표시
try:
selected_file_names = [st.session_state.data_files[fid]['name'] for fid in st.session_state.selected_files if fid in st.session_state.data_files]
total_word_count = sum(st.session_state.data_files[fid]['word_count'] for fid in st.session_state.selected_files if fid in st.session_state.data_files)
# 실제 그래프의 노드/엣지 수 가져오기 (fig 객체 분석 필요)
num_nodes = len(st.session_state.fig.data[1].x) if len(st.session_state.fig.data) > 1 and hasattr(st.session_state.fig.data[1], 'x') else 0
num_edges = len(st.session_state.fig.data[0].x) // 3 if len(st.session_state.fig.data) > 0 and hasattr(st.session_state.fig.data[0], 'x') and st.session_state.fig.data[0].x else 0
st.info(f"""
**현재 그래프 정보**
- **데이터 파일:** {', '.join(selected_file_names)}
- **고유 단어 수 (노드):** {num_nodes} 개
- **연결선 수 (엣지):** {num_edges} 개 (유사도 ≥ {st.session_state.threshold:.2f})
""")
except Exception as info_e:
st.warning(f"그래프 정보 표시 중 오류: {info_e}")
# 사용 설명
with st.expander("💡 그래프 조작 방법"):
st.markdown("""
- **확대/축소:** 마우스 휠 스크롤 또는 터치스크린에서 두 손가락 사용
- **회전:** 마우스 왼쪽 버튼 누른 상태로 드래그
- **이동 (Pan):** 마우스 오른쪽 버튼 누른 상태로 드래그 또는 Shift + 왼쪽 버튼 드래그
- **단어 정보 확인:** 마우스 커서를 단어(마커) 위에 올리면 단어 이름과 연결된 다른 단어의 수를 볼 수 있습니다.
- **툴바 사용:** 그래프 우측 상단의 툴바 아이콘을 사용하여 다양한 보기 옵션(다운로드, 확대/축소 영역 지정 등)을 활용할 수 있습니다.
""")
elif not should_generate_graph and not st.session_state.selected_files:
st.info("👈 사이드바에서 분석할 데이터 파일을 선택해주세요.")
elif not should_generate_graph and st.session_state.selected_files and st.session_state.fig is None:
# 파일은 선택했지만 아직 생성 버튼 안 누름 or 생성 실패
st.info("👈 사이드바에서 '📊 그래프 생성/업데이트' 버튼을 클릭하여 시각화를 시작하세요.")
elif not st.session_state.data_files:
st.warning("표시할 데이터 파일이 없습니다. 파일을 업로드하거나 `data` 폴더에 유효한 JSON 파일을 추가하세요.")
else:
# data_files는 있지만 selected_files가 없는 경우
st.info("👈 사이드바에서 분석할 데이터 파일을 선택해주세요.")
# --- 하단 정보 섹션 ---
st.divider()
with st.expander("ℹ️ 이 시각화 도구에 대하여"):
st.markdown("""
이 도구는 다음과 같은 과정을 통해 한국어 단어 네트워크를 시각화합니다:
1. **데이터 로딩:** 사용자가 제공한 JSON 파일에서 'word' 필드를 가진 단어 목록을 추출합니다.
2. **단어 임베딩:** 각 단어를 고차원 벡터 공간에 표현합니다. 현재는 **문자 구성 기반 TF-IDF 스타일 임베딩**을 사용하여, 단어를 이루는 문자들의 빈도를 기반으로 벡터를 생성합니다. (추후 Word2Vec, FastText 등 사전 훈련된 모델 사용 가능)
3. **차원 축소:** 고차원 임베딩 벡터를 시각화 가능한 3차원 공간으로 축소합니다. **t-SNE(t-Distributed Stochastic Neighbor Embedding)** 알고리즘을 사용하여 복잡한 데이터 구조를 유지하면서 차원을 줄입니다. (단어 수가 적을 경우 PCA 사용)
4. **유사도 계산:** 3D 공간으로 축소하기 전의 원본 임베딩 벡터 간의 **코사인 유사도(Cosine Similarity)**를 계산하여 단어 쌍의 의미적(여기서는 구성적) 유사성을 측정합니다.
5. **그래프 생성:** 설정된 **유사도 임계값(Threshold)** 이상인 단어 쌍들을 연결선(엣지)으로 이어 네트워크 그래프를 구성합니다. 각 단어는 노드(점)로 표시됩니다.
6. **3D 시각화:** **Plotly 라이브러리**를 사용하여 생성된 네트워크 그래프를 인터랙티브한 3D 공간에 시각화합니다. 노드의 위치는 t-SNE 결과 좌표를 따르며, 색상이나 크기는 Z축 값이나 연결 수(degree) 등을 반영할 수 있습니다.
이를 통해 단어들이 서로 얼마나 유사한지에 따라 군집을 이루거나 연결되는 패턴을 시각적으로 탐색할 수 있습니다.
""")
with st.expander("📋 JSON 파일 형식 안내"):
st.markdown("""
업로드하거나 `data` 폴더에 넣는 JSON 파일은 **UTF-8 인코딩**이어야 하며, 다음과 같은 형식을 따라야 합니다:
```json
[
{
"word": "학교"
},
{
"word": "선생님"
},
{
"word": "학생"
},
{
"word": "교실"
},
{
"word": "컴퓨터",
"description": "이 필드는 무시됩니다"
}
]
```
- 파일의 최상위 구조는 **배열(List)**이어야 합니다 (`[...]`).
- 배열의 각 요소는 **객체(Dictionary)**여야 합니다 (`{...}`).
- 각 객체는 반드시 `"word"`라는 키를 포함해야 하며, 그 값은 분석할 **한국어 단어 문자열**이어야 합니다.
- `"word"` 외의 다른 키가 있어도 무방하나, 현재 버전에서는 사용되지 않고 무시됩니다.
- 파일 인코딩이 UTF-8이 아닌 경우 한글이 깨지거나 오류가 발생할 수 있습니다.
""")