Spaces:
Running
Running
| from transformers import PreTrainedTokenizerFast, BartForConditionalGeneration, AutoTokenizer, AutoModel, AutoModelForSequenceClassification | |
| from konlpy.tag import Komoran | |
| from keybert import KeyBERT | |
| import textwrap | |
| import os | |
| import requests | |
| import torch | |
| import pandas as pd | |
| import torch.nn.functional as F | |
| from transformers import BertTokenizer, BertForSequenceClassification | |
| # ✅ 1. 상장기업 목록 불러오기 | |
| def load_company_list(file_path='상장법인목록.xls'): | |
| df_list = pd.read_html(file_path) | |
| df = df_list[0] | |
| return df['회사명'].dropna().tolist() | |
| # ✅ 요약용 KoBART | |
| summary_tokenizer = PreTrainedTokenizerFast.from_pretrained("gogamza/kobart-summarization") | |
| summary_model = BartForConditionalGeneration.from_pretrained("gogamza/kobart-summarization") | |
| def summarize_kobart(text): | |
| # ✅ 입력 길이 제한(핵심) | |
| inputs = summary_tokenizer( | |
| text, | |
| return_tensors="pt", | |
| truncation=True, | |
| max_length=512, # 모델에 맞게 조정 (512/1024 중 하나일 확률 큼) | |
| ) | |
| if "token_type_ids" in inputs: | |
| inputs.pop("token_type_ids") | |
| summary_ids = summary_model.generate( | |
| **inputs, | |
| max_new_tokens=160, # ✅ 출력 길이는 max_new_tokens로 관리 추천 | |
| min_new_tokens=100, | |
| num_beams=4, | |
| repetition_penalty=2.5, | |
| no_repeat_ngram_size=4, | |
| early_stopping=True, | |
| ) | |
| return summary_tokenizer.decode(summary_ids[0], skip_special_tokens=True) | |
| # ✅ 키워드 추출용 KoBERT | |
| class KoBERTEmbedding: | |
| def __init__(self, model, tokenizer): | |
| self.model = model | |
| self.tokenizer = tokenizer | |
| def encode(self, documents, **kwargs): | |
| if isinstance(documents, str): | |
| documents = [documents] | |
| encoded_input = self.tokenizer(documents, padding=True, truncation=True, return_tensors="pt") | |
| with torch.no_grad(): | |
| output = self.model(**encoded_input) | |
| cls_embeddings = output.last_hidden_state[:, 0, :] | |
| return cls_embeddings.numpy() | |
| keyword_model_name = "skt/kobert-base-v1" | |
| keyword_tokenizer = AutoTokenizer.from_pretrained("skt/kobert-base-v1", use_fast=False) | |
| keyword_model = AutoModel.from_pretrained(keyword_model_name) | |
| kobert_embedder = KoBERTEmbedding(keyword_model, keyword_tokenizer) | |
| kw_model = KeyBERT(model=kobert_embedder) | |
| STOPWORDS_FILE = "stopwords-ko.txt" | |
| # ✅ 감성 분석용 모델 (예: snunlp/KR-FinBert-SC 사용) | |
| sentiment_model_name = "snunlp/KR-FinBert-SC" | |
| bert_tokenizer = AutoTokenizer.from_pretrained(sentiment_model_name) | |
| bert_model = AutoModelForSequenceClassification.from_pretrained(sentiment_model_name) | |
| device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
| sentiment_tokenizer = AutoTokenizer.from_pretrained(sentiment_model_name) # 👈 tokenizer 정의 | |
| sentiment_model = AutoModelForSequenceClassification.from_pretrained(sentiment_model_name) | |
| sentiment_model = sentiment_model.to(device) | |
| def analyze_sentiment(text): | |
| inputs = sentiment_tokenizer( | |
| text, | |
| return_tensors="pt", | |
| truncation=True, | |
| padding=True, | |
| max_length=512 # 👈 추가 | |
| ).to(device) | |
| # 모델 추론 | |
| with torch.no_grad(): | |
| outputs = bert_model(**inputs) | |
| logits = outputs.logits | |
| #확률 계산 | |
| print("logits:", logits) | |
| print("logits.shape:", logits.shape) | |
| probs = F.softmax(logits, dim=1)[0] | |
| #라벨링 | |
| label_idx = torch.argmax(probs).item() | |
| labels = ["부정적", "중립적", "긍정적"] | |
| label = labels[label_idx] | |
| return { | |
| "negative": round(float(probs[0]), 4), | |
| "neutral": round(float(probs[1]), 4), | |
| "positive": round(float(probs[2]), 4), | |
| } | |
| def get_or_download_stopwords(): | |
| # 1. 파일이 있으면 읽어서 반환 | |
| if os.path.exists(STOPWORDS_FILE): | |
| with open(STOPWORDS_FILE, "r", encoding="utf-8") as f: | |
| return [line.strip() for line in f.readlines()] | |
| # 2. 파일이 없으면 다운로드 후 저장 | |
| url = "https://raw.githubusercontent.com/stopwords-iso/stopwords-ko/master/stopwords-ko.txt" | |
| response = requests.get(url) | |
| stopwords = response.text.splitlines() | |
| with open(STOPWORDS_FILE, "w", encoding="utf-8") as f: | |
| f.write(response.text) | |
| return stopwords | |
| korean_stopwords = get_or_download_stopwords() | |
| # ✅ 형태소 분석기 (komoran) 사용하여 명사 추출 | |
| komoran = Komoran() | |
| def remove_stopwords(text, stopwords): | |
| words = komoran.nouns(text) # Komoran은 복합명사 더 잘 잡음 | |
| filtered_words = [word for word in words if word not in stopwords and len(word) > 1] | |
| return " ".join(filtered_words) | |
| def resultKeyword(content) : | |
| company_names = load_company_list() | |
| # ✅ 요약 | |
| summary = summarize_kobart(content) | |
| wrapped_summary = textwrap.fill(summary, width=80) # 80자마다 줄바꿈 | |
| # ✅ 핵심 키워드 추출 | |
| # 불용어 처리 후 요약 텍스트에서 키워드 추출 | |
| filtered_summary = remove_stopwords(summary, korean_stopwords) | |
| filtered_content = remove_stopwords(content, korean_stopwords) | |
| keywords = kw_model.extract_keywords( | |
| filtered_content, | |
| keyphrase_ngram_range=(1, 2), # 복합명사 유지 가능 | |
| stop_words=None, | |
| top_n=5 | |
| ) | |
| # 요약문에서 상장기업명 탐지 | |
| summary_words = set(filtered_summary.split()) | |
| matched_companies = [name for name in company_names if name in summary_words] | |
| # 가중치 반영 | |
| weighted_keywords = {} | |
| for kw, score in keywords: | |
| if kw in matched_companies: | |
| weighted_keywords[kw] = score + 0.3 | |
| else: | |
| weighted_keywords[kw] = score | |
| # 기업명 강제 삽입 | |
| for company in matched_companies: | |
| if company not in weighted_keywords: | |
| weighted_keywords[company] = 0.9 | |
| # 1차 키워드 결과 정렬 | |
| sorted_keywords = sorted(weighted_keywords.items(), key=lambda x: x[1], reverse=True) | |
| top_keywords = sorted_keywords[:5] | |
| return { | |
| "summary": wrapped_summary, | |
| "keyword": [{"word": kw, "score": float(f"{score:.4f}")} for kw, score in top_keywords] | |
| } |