|
|
|
|
|
|
|
|
""" |
|
|
计算信息级联的指标:情感得分、情感deviation、contextual deviation、perplexity |
|
|
|
|
|
该脚本处理 information_cascade.json 和 information_cascade_original_posts.json, |
|
|
计算以下指标: |
|
|
1. 情感得分 (sentiment score) |
|
|
2. 情感deviation (sentiment deviation) |
|
|
3. Contextual deviation (语境偏差) |
|
|
4. Perplexity (困惑度) |
|
|
|
|
|
使用方法(在云电脑上): |
|
|
python compute_cascade_metrics.py \ |
|
|
--input_cascade information_cascade.json \ |
|
|
--input_original information_cascade_original_posts.json \ |
|
|
--output output_with_metrics.json \ |
|
|
--bert_model bert-base-chinese \ |
|
|
--sentiment_model <sentiment_model_path> \ |
|
|
--perplexity_model <perplexity_model_path> \ |
|
|
--batch_size 32 |
|
|
""" |
|
|
|
|
|
import argparse |
|
|
import json |
|
|
import numpy as np |
|
|
import torch |
|
|
from typing import Dict, List, Any, Optional, Tuple |
|
|
from tqdm import tqdm |
|
|
from transformers import AutoModel, AutoTokenizer, AutoModelForSequenceClassification, AutoModelForCausalLM |
|
|
import os |
|
|
|
|
|
|
|
|
class CascadeMetricsComputer: |
|
|
""" |
|
|
计算级联数据的各种指标 |
|
|
""" |
|
|
|
|
|
def __init__( |
|
|
self, |
|
|
bert_model_name: str = 'bert-base-chinese', |
|
|
sentiment_model_name: Optional[str] = None, |
|
|
perplexity_model_name: Optional[str] = None, |
|
|
device: Optional[str] = None, |
|
|
batch_size: int = 32, |
|
|
max_length: int = 512 |
|
|
): |
|
|
""" |
|
|
初始化指标计算器 |
|
|
|
|
|
Args: |
|
|
bert_model_name: BERT模型名称(用于计算语义向量和contextual deviation) |
|
|
sentiment_model_name: 情感分析模型名称(用于计算情感得分) |
|
|
perplexity_model_name: 语言模型名称(用于计算困惑度) |
|
|
device: 计算设备('cuda'或'cpu'),如果为None则自动选择 |
|
|
batch_size: 批处理大小 |
|
|
max_length: 最大序列长度 |
|
|
""" |
|
|
if device is None: |
|
|
device = 'cuda' if torch.cuda.is_available() else 'cpu' |
|
|
|
|
|
self.device = device |
|
|
self.batch_size = batch_size |
|
|
self.max_length = max_length |
|
|
|
|
|
print(f"正在加载BERT模型: {bert_model_name}") |
|
|
self.bert_tokenizer = AutoTokenizer.from_pretrained(bert_model_name) |
|
|
self.bert_model = AutoModel.from_pretrained(bert_model_name) |
|
|
self.bert_model.to(device) |
|
|
self.bert_model.eval() |
|
|
print(f"BERT模型已加载到设备: {device}") |
|
|
|
|
|
|
|
|
if sentiment_model_name: |
|
|
print(f"正在加载情感分析模型: {sentiment_model_name}") |
|
|
self.sentiment_tokenizer = AutoTokenizer.from_pretrained(sentiment_model_name) |
|
|
self.sentiment_model = AutoModelForSequenceClassification.from_pretrained(sentiment_model_name) |
|
|
self.sentiment_model.to(device) |
|
|
self.sentiment_model.eval() |
|
|
print(f"情感分析模型已加载到设备: {device}") |
|
|
else: |
|
|
self.sentiment_tokenizer = None |
|
|
self.sentiment_model = None |
|
|
print("未提供情感分析模型,将使用简化的情感计算方法") |
|
|
|
|
|
|
|
|
if perplexity_model_name: |
|
|
print(f"正在加载困惑度模型: {perplexity_model_name}") |
|
|
self.perplexity_tokenizer = AutoTokenizer.from_pretrained(perplexity_model_name) |
|
|
self.perplexity_model = AutoModelForCausalLM.from_pretrained(perplexity_model_name) |
|
|
self.perplexity_model.to(device) |
|
|
self.perplexity_model.eval() |
|
|
print(f"困惑度模型已加载到设备: {device}") |
|
|
else: |
|
|
self.perplexity_tokenizer = None |
|
|
self.perplexity_model = None |
|
|
print("未提供困惑度模型,将使用简化的困惑度计算方法") |
|
|
|
|
|
def compute_embeddings(self, texts: List[str]) -> np.ndarray: |
|
|
""" |
|
|
计算BERT语义向量 |
|
|
|
|
|
Args: |
|
|
texts: 文本列表 |
|
|
|
|
|
Returns: |
|
|
语义向量矩阵 [num_texts, hidden_size] |
|
|
""" |
|
|
embeddings = [] |
|
|
|
|
|
with torch.no_grad(): |
|
|
for i in range(0, len(texts), self.batch_size): |
|
|
batch_texts = texts[i:i + self.batch_size] |
|
|
|
|
|
|
|
|
batch_texts = [text if text else "[PAD]" for text in batch_texts] |
|
|
|
|
|
|
|
|
inputs = self.bert_tokenizer( |
|
|
batch_texts, |
|
|
return_tensors='pt', |
|
|
padding=True, |
|
|
truncation=True, |
|
|
max_length=self.max_length |
|
|
).to(self.device) |
|
|
|
|
|
|
|
|
outputs = self.bert_model(**inputs) |
|
|
|
|
|
|
|
|
batch_embeddings = outputs.last_hidden_state[:, 0, :].cpu().numpy() |
|
|
embeddings.append(batch_embeddings) |
|
|
|
|
|
return np.vstack(embeddings) |
|
|
|
|
|
def compute_sentiment_scores(self, texts: List[str]) -> List[float]: |
|
|
""" |
|
|
计算情感得分 |
|
|
|
|
|
Args: |
|
|
texts: 文本列表 |
|
|
|
|
|
Returns: |
|
|
情感得分列表(每个文本一个得分,范围通常在[-1, 1]或[0, 1]) |
|
|
""" |
|
|
if self.sentiment_model is None: |
|
|
|
|
|
return self._compute_sentiment_simple(texts) |
|
|
|
|
|
sentiment_scores = [] |
|
|
|
|
|
with torch.no_grad(): |
|
|
for i in range(0, len(texts), self.batch_size): |
|
|
batch_texts = texts[i:i + self.batch_size] |
|
|
batch_texts = [text if text else "[PAD]" for text in batch_texts] |
|
|
|
|
|
inputs = self.sentiment_tokenizer( |
|
|
batch_texts, |
|
|
return_tensors='pt', |
|
|
padding=True, |
|
|
truncation=True, |
|
|
max_length=self.max_length |
|
|
).to(self.device) |
|
|
|
|
|
outputs = self.sentiment_model(**inputs) |
|
|
logits = outputs.logits |
|
|
|
|
|
|
|
|
probs = torch.softmax(logits, dim=-1) |
|
|
|
|
|
|
|
|
if probs.shape[1] == 2: |
|
|
|
|
|
batch_scores = (probs[:, 1] - probs[:, 0]).cpu().numpy().tolist() |
|
|
else: |
|
|
|
|
|
batch_scores = probs[:, 0].cpu().numpy().tolist() |
|
|
|
|
|
sentiment_scores.extend(batch_scores) |
|
|
|
|
|
return sentiment_scores |
|
|
|
|
|
def _compute_sentiment_simple(self, texts: List[str]) -> List[float]: |
|
|
""" |
|
|
简化的情感计算方法(基于启发式规则) |
|
|
|
|
|
Args: |
|
|
texts: 文本列表 |
|
|
|
|
|
Returns: |
|
|
情感得分列表 |
|
|
""" |
|
|
scores = [] |
|
|
for text in texts: |
|
|
if not text: |
|
|
scores.append(0.0) |
|
|
continue |
|
|
|
|
|
|
|
|
positive_words = ['好', '棒', '赞', '喜欢', '支持', '👍', '❤️', '😊', '😄'] |
|
|
negative_words = ['差', '坏', '讨厌', '反对', '👎', '😢', '😠', '😡'] |
|
|
|
|
|
positive_count = sum(1 for word in positive_words if word in text) |
|
|
negative_count = sum(1 for word in negative_words if word in text) |
|
|
|
|
|
|
|
|
total_words = len(text) |
|
|
if total_words > 0: |
|
|
score = (positive_count - negative_count) / max(total_words, 1) |
|
|
score = np.clip(score, -1.0, 1.0) |
|
|
else: |
|
|
score = 0.0 |
|
|
|
|
|
scores.append(score) |
|
|
|
|
|
return scores |
|
|
|
|
|
def compute_perplexity(self, texts: List[str]) -> List[float]: |
|
|
""" |
|
|
计算困惑度 |
|
|
|
|
|
Args: |
|
|
texts: 文本列表 |
|
|
|
|
|
Returns: |
|
|
困惑度列表 |
|
|
""" |
|
|
if self.perplexity_model is None: |
|
|
|
|
|
return self._compute_perplexity_simple(texts) |
|
|
|
|
|
perplexities = [] |
|
|
|
|
|
with torch.no_grad(): |
|
|
for text in texts: |
|
|
if not text: |
|
|
perplexities.append(0.0) |
|
|
continue |
|
|
|
|
|
|
|
|
inputs = self.perplexity_tokenizer( |
|
|
text, |
|
|
return_tensors='pt', |
|
|
truncation=True, |
|
|
max_length=self.max_length |
|
|
).to(self.device) |
|
|
|
|
|
|
|
|
outputs = self.perplexity_model(**inputs, labels=inputs['input_ids']) |
|
|
loss = outputs.loss |
|
|
|
|
|
|
|
|
perplexity = torch.exp(loss).item() |
|
|
perplexities.append(perplexity) |
|
|
|
|
|
return perplexities |
|
|
|
|
|
def _compute_perplexity_simple(self, texts: List[str]) -> List[float]: |
|
|
""" |
|
|
简化的困惑度计算方法(基于词汇多样性) |
|
|
|
|
|
Args: |
|
|
texts: 文本列表 |
|
|
|
|
|
Returns: |
|
|
困惑度列表 |
|
|
""" |
|
|
perplexities = [] |
|
|
|
|
|
for text in texts: |
|
|
if not text: |
|
|
perplexities.append(0.0) |
|
|
continue |
|
|
|
|
|
|
|
|
words = text.split() |
|
|
unique_words = len(set(words)) |
|
|
total_words = len(words) |
|
|
|
|
|
if total_words > 0: |
|
|
|
|
|
perplexity_proxy = 1.0 - (unique_words / total_words) |
|
|
else: |
|
|
perplexity_proxy = 0.0 |
|
|
|
|
|
perplexities.append(perplexity_proxy) |
|
|
|
|
|
return perplexities |
|
|
|
|
|
def compute_cosine_similarity(self, vec1: np.ndarray, vec2: np.ndarray) -> float: |
|
|
""" |
|
|
计算余弦相似度 |
|
|
|
|
|
Args: |
|
|
vec1: 向量1 |
|
|
vec2: 向量2 |
|
|
|
|
|
Returns: |
|
|
余弦相似度 [0, 1] |
|
|
""" |
|
|
dot_product = np.dot(vec1, vec2) |
|
|
norm1 = np.linalg.norm(vec1) |
|
|
norm2 = np.linalg.norm(vec2) |
|
|
|
|
|
if norm1 == 0 or norm2 == 0: |
|
|
return 0.0 |
|
|
|
|
|
similarity = dot_product / (norm1 * norm2) |
|
|
return float(similarity) |
|
|
|
|
|
def compute_contextual_deviation(self, root_embedding: np.ndarray, current_embedding: np.ndarray) -> float: |
|
|
""" |
|
|
计算语境偏差(Contextual Deviation) |
|
|
|
|
|
定义为:1 - 语义相似度 |
|
|
|
|
|
Args: |
|
|
root_embedding: 原帖的语义向量 |
|
|
current_embedding: 当前文本的语义向量 |
|
|
|
|
|
Returns: |
|
|
语境偏差值 [0, 1],越高表示越偏离原帖语境 |
|
|
""" |
|
|
similarity = self.compute_cosine_similarity(root_embedding, current_embedding) |
|
|
deviation = 1.0 - similarity |
|
|
return deviation |
|
|
|
|
|
def compute_sentiment_deviation(self, root_sentiment: float, current_sentiment: float) -> float: |
|
|
""" |
|
|
计算情感偏差(Sentiment Deviation) |
|
|
|
|
|
定义为:|当前情感得分 - 原帖情感得分| |
|
|
|
|
|
Args: |
|
|
root_sentiment: 原帖的情感得分 |
|
|
current_sentiment: 当前文本的情感得分 |
|
|
|
|
|
Returns: |
|
|
情感偏差值 [0, 2](如果情感得分范围是[-1, 1]) |
|
|
""" |
|
|
deviation = abs(current_sentiment - root_sentiment) |
|
|
return deviation |
|
|
|
|
|
def process_cascade(self, cascade: Dict[str, Any]) -> Dict[str, Any]: |
|
|
""" |
|
|
处理单个级联,计算所有指标 |
|
|
|
|
|
Args: |
|
|
cascade: 级联数据字典 |
|
|
|
|
|
Returns: |
|
|
添加了指标后的级联数据字典 |
|
|
""" |
|
|
|
|
|
texts: List[str] = [] |
|
|
indices: List[Tuple[str, Optional[str]]] = [] |
|
|
|
|
|
|
|
|
post_info = cascade.get('post_info', {}) |
|
|
post_content = post_info.get('content', '') |
|
|
texts.append(post_content) |
|
|
indices.append(('post', None)) |
|
|
|
|
|
|
|
|
comment_tree = cascade.get('comment_tree', {}) |
|
|
comment_ids = list(comment_tree.keys()) |
|
|
for comment_id in comment_ids: |
|
|
node = comment_tree[comment_id] |
|
|
texts.append(node.get('content', '')) |
|
|
indices.append(('comment', comment_id)) |
|
|
|
|
|
|
|
|
repost_chain = cascade.get('repost_chain', []) |
|
|
for node in repost_chain: |
|
|
forward_text = node.get('forward_text', '') or '' |
|
|
comment_content = node.get('comment_content', '') or '' |
|
|
repost_text = forward_text + comment_content |
|
|
texts.append(repost_text) |
|
|
indices.append(('repost', node.get('repost_id'))) |
|
|
|
|
|
|
|
|
if len(texts) == 0: |
|
|
return cascade |
|
|
|
|
|
embeddings = self.compute_embeddings(texts) |
|
|
sentiment_scores = self.compute_sentiment_scores(texts) |
|
|
perplexities = self.compute_perplexity(texts) |
|
|
|
|
|
|
|
|
root_embedding = embeddings[0] |
|
|
root_sentiment = sentiment_scores[0] |
|
|
|
|
|
|
|
|
|
|
|
post_info['embedding'] = root_embedding.tolist() |
|
|
post_info['sentiment_score'] = root_sentiment |
|
|
post_info['perplexity'] = perplexities[0] |
|
|
|
|
|
|
|
|
for i, comment_id in enumerate(comment_ids): |
|
|
node = comment_tree[comment_id] |
|
|
idx = 1 + i |
|
|
|
|
|
node['embedding'] = embeddings[idx].tolist() |
|
|
node['sentiment_score'] = sentiment_scores[idx] |
|
|
node['perplexity'] = perplexities[idx] |
|
|
|
|
|
|
|
|
node['contextual_deviation'] = self.compute_contextual_deviation( |
|
|
root_embedding, embeddings[idx] |
|
|
) |
|
|
node['sentiment_deviation'] = self.compute_sentiment_deviation( |
|
|
root_sentiment, sentiment_scores[idx] |
|
|
) |
|
|
|
|
|
|
|
|
offset = 1 + len(comment_ids) |
|
|
for j, node in enumerate(repost_chain): |
|
|
idx = offset + j |
|
|
|
|
|
node['embedding'] = embeddings[idx].tolist() |
|
|
node['sentiment_score'] = sentiment_scores[idx] |
|
|
node['perplexity'] = perplexities[idx] |
|
|
|
|
|
|
|
|
node['contextual_deviation'] = self.compute_contextual_deviation( |
|
|
root_embedding, embeddings[idx] |
|
|
) |
|
|
node['sentiment_deviation'] = self.compute_sentiment_deviation( |
|
|
root_sentiment, sentiment_scores[idx] |
|
|
) |
|
|
|
|
|
return cascade |
|
|
|
|
|
|
|
|
def load_json_file(file_path: str) -> Dict[str, Any]: |
|
|
""" |
|
|
加载JSON文件(支持大文件) |
|
|
|
|
|
Args: |
|
|
file_path: JSON文件路径 |
|
|
|
|
|
Returns: |
|
|
数据字典 |
|
|
""" |
|
|
print(f"正在加载JSON文件: {file_path}") |
|
|
with open(file_path, 'r', encoding='utf-8') as f: |
|
|
data = json.load(f) |
|
|
print(f"已加载 {len(data.get('cascades', []))} 个级联") |
|
|
return data |
|
|
|
|
|
|
|
|
def main(): |
|
|
parser = argparse.ArgumentParser( |
|
|
description='计算信息级联的指标:情感得分、情感deviation、contextual deviation、perplexity' |
|
|
) |
|
|
parser.add_argument( |
|
|
'--input_cascade', |
|
|
type=str, |
|
|
required=True, |
|
|
help='输入级联JSON文件路径 (information_cascade.json)' |
|
|
) |
|
|
parser.add_argument( |
|
|
'--input_original', |
|
|
type=str, |
|
|
default=None, |
|
|
help='输入原帖JSON文件路径 (information_cascade_original_posts.json),可选' |
|
|
) |
|
|
parser.add_argument( |
|
|
'--output', |
|
|
type=str, |
|
|
required=True, |
|
|
help='输出JSON文件路径' |
|
|
) |
|
|
parser.add_argument( |
|
|
'--bert_model', |
|
|
type=str, |
|
|
default='bert-base-chinese', |
|
|
help='BERT模型名称或路径(用于计算语义向量)' |
|
|
) |
|
|
parser.add_argument( |
|
|
'--sentiment_model', |
|
|
type=str, |
|
|
default=None, |
|
|
help='情感分析模型名称或路径(可选)' |
|
|
) |
|
|
parser.add_argument( |
|
|
'--perplexity_model', |
|
|
type=str, |
|
|
default=None, |
|
|
help='语言模型名称或路径(用于计算困惑度,可选)' |
|
|
) |
|
|
parser.add_argument( |
|
|
'--batch_size', |
|
|
type=int, |
|
|
default=32, |
|
|
help='批处理大小' |
|
|
) |
|
|
parser.add_argument( |
|
|
'--max_length', |
|
|
type=int, |
|
|
default=512, |
|
|
help='最大序列长度' |
|
|
) |
|
|
parser.add_argument( |
|
|
'--device', |
|
|
type=str, |
|
|
default=None, |
|
|
help='计算设备(cuda/cpu),如果为None则自动选择' |
|
|
) |
|
|
parser.add_argument( |
|
|
'--max_cascades', |
|
|
type=int, |
|
|
default=None, |
|
|
help='最大处理级联数量(用于测试,None表示处理所有)' |
|
|
) |
|
|
|
|
|
args = parser.parse_args() |
|
|
|
|
|
|
|
|
cascade_data = load_json_file(args.input_cascade) |
|
|
|
|
|
if args.input_original: |
|
|
original_data = load_json_file(args.input_original) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
print("\n初始化指标计算器...") |
|
|
computer = CascadeMetricsComputer( |
|
|
bert_model_name=args.bert_model, |
|
|
sentiment_model_name=args.sentiment_model, |
|
|
perplexity_model_name=args.perplexity_model, |
|
|
device=args.device, |
|
|
batch_size=args.batch_size, |
|
|
max_length=args.max_length |
|
|
) |
|
|
|
|
|
|
|
|
cascades = cascade_data.get('cascades', []) |
|
|
total_cascades = len(cascades) |
|
|
if args.max_cascades: |
|
|
cascades = cascades[:args.max_cascades] |
|
|
|
|
|
print(f"\n开始处理 {len(cascades)}/{total_cascades} 个级联...") |
|
|
processed_count = 0 |
|
|
for idx, cascade in enumerate(tqdm(cascades, desc="处理级联")): |
|
|
try: |
|
|
cascade_data['cascades'][idx] = computer.process_cascade(cascade) |
|
|
processed_count += 1 |
|
|
except Exception as e: |
|
|
print(f"\n处理级联 {idx} 时出错: {e}") |
|
|
import traceback |
|
|
traceback.print_exc() |
|
|
continue |
|
|
|
|
|
print(f"\n成功处理 {processed_count}/{len(cascades)} 个级联") |
|
|
|
|
|
|
|
|
print(f"\n正在保存结果到: {args.output}") |
|
|
with open(args.output, 'w', encoding='utf-8') as f: |
|
|
json.dump(cascade_data, f, ensure_ascii=False, indent=2) |
|
|
|
|
|
print(f"✅ 完成!结果已保存到: {args.output}") |
|
|
|
|
|
|
|
|
if __name__ == '__main__': |
|
|
main() |
|
|
|