# app4.py (增强版) import gradio as gr import torch import numpy as np import pandas as pd import plotly.graph_objects as go import plotly.express as px from plotly.subplots import make_subplots import networkx as nx from datetime import datetime import json import os from collections import defaultdict from utils.data_generator import IPEcosystemGenerator from model.HGT import HGT, HGTLinkPredictor, HGTNodeClassifier, HGTPatentValuePredictor, HGTCollaborationRecommender from model.HeteroGNN import HeteroGNN, LinkPredictor, NodeClassifier # app_enhanced.py (数据科学增强版) import gradio as gr import torch import numpy as np import pandas as pd import plotly.graph_objects as go import plotly.express as px from plotly.subplots import make_subplots import networkx as nx from datetime import datetime from collections import defaultdict # 数据科学库 from sklearn.decomposition import PCA from sklearn.manifold import TSNE from sklearn.cluster import KMeans, DBSCAN, AgglomerativeClustering from sklearn.metrics import silhouette_score, davies_bouldin_score from sklearn.preprocessing import StandardScaler from scipy.stats import pearsonr, spearmanr from scipy.cluster.hierarchy import dendrogram, linkage import umap # 原有导入 from utils.data_generator import IPEcosystemGenerator from model.HGT import HGT, HGTLinkPredictor, HGTNodeClassifier, HGTPatentValuePredictor, HGTCollaborationRecommender from model.HeteroGNN import HeteroGNN, LinkPredictor, NodeClassifier # 全局变量 current_data = None loaded_models = {} training_history = defaultdict(list) embedding_cache = {} # 缓存节点嵌入 # ==================== 数据科学分析模块 ==================== def compute_node_embeddings(force_recompute=False): """计算并缓存节点嵌入""" global current_data, loaded_models, embedding_cache if current_data is None: return None, "❌ 请先加载数据集!" # 如果已缓存且不强制重新计算,直接返回 if embedding_cache and not force_recompute: return embedding_cache, "✅ 使用缓存的嵌入" device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') data = current_data.to(device) # 尝试使用已训练的模型 model = None if 'link_prediction' in loaded_models: model = loaded_models['link_prediction']['model'] elif 'node_classification' in loaded_models: model = loaded_models['node_classification']['model'] elif 'patent_value' in loaded_models: model = loaded_models['patent_value']['model'] if model is None: # 如果没有训练好的模型,使用原始特征 embeddings = {} for node_type in data.node_types: embeddings[node_type] = data[node_type].x.cpu().numpy() embedding_cache = embeddings return embeddings, "⚠️ 使用原始特征(建议先训练模型)" # 使用模型计算嵌入 model.eval() with torch.no_grad(): x_dict = model(data.x_dict, data.edge_index_dict) embeddings = {k: v.cpu().numpy() for k, v in x_dict.items()} embedding_cache = embeddings return embeddings, "✅ 使用模型嵌入" def perform_pca_analysis(node_type, n_components=2): """执行PCA降维分析""" embeddings, status = compute_node_embeddings() if embeddings is None: return status, None, None, None if node_type not in embeddings: return f"❌ 无效的节点类型: {node_type}", None, None, None X = embeddings[node_type] # 标准化 scaler = StandardScaler() X_scaled = scaler.fit_transform(X) # PCA pca = PCA(n_components=min(n_components, X.shape[1])) X_pca = pca.fit_transform(X_scaled) # 解释方差比例 explained_var = pca.explained_variance_ratio_ cumsum_var = np.cumsum(explained_var) # 生成报告 report = f""" ## 📊 PCA降维分析报告 **节点类型**: {node_type} **原始维度**: {X.shape[1]} **降维后维度**: {n_components} **样本数量**: {X.shape[0]} ### 主成分解释方差 """ for i, var in enumerate(explained_var[:10]): # 显示前10个 report += f"- **PC{i + 1}**: {var * 100:.2f}%\n" report += f"\n**前{n_components}个主成分累计解释方差**: {cumsum_var[n_components - 1] * 100:.2f}%\n" # 可视化1: 解释方差 fig1 = make_subplots( rows=1, cols=2, subplot_titles=('主成分解释方差', '累计解释方差'), specs=[[{'type': 'bar'}, {'type': 'scatter'}]] ) fig1.add_trace( go.Bar( x=[f'PC{i + 1}' for i in range(len(explained_var))], y=explained_var * 100, marker=dict(color=explained_var, colorscale='Viridis'), name='解释方差' ), row=1, col=1 ) fig1.add_trace( go.Scatter( x=[f'PC{i + 1}' for i in range(len(cumsum_var))], y=cumsum_var * 100, mode='lines+markers', marker=dict(size=8, color='#FF6B6B'), line=dict(width=3), name='累计方差' ), row=1, col=2 ) fig1.update_xaxes(title_text="主成分", row=1, col=1) fig1.update_xaxes(title_text="主成分", row=1, col=2) fig1.update_yaxes(title_text="解释方差 (%)", row=1, col=1) fig1.update_yaxes(title_text="累计解释方差 (%)", row=1, col=2) fig1.update_layout( title="PCA方差分析", template="plotly_white", height=400, showlegend=False ) # 可视化2: PCA投影 if n_components >= 2: # 获取标签(如果有) labels = None label_names = None if node_type == 'company' and hasattr(current_data['company'], 'industry'): labels = current_data['company'].industry.cpu().numpy() label_names = ['金融科技', '生物医药', '人工智能', '半导体', '新能源', '电子商务', '物流科技', '智能制造'] if n_components == 2: fig2 = go.Figure() if labels is not None: for label_id in np.unique(labels): mask = labels == label_id fig2.add_trace(go.Scatter( x=X_pca[mask, 0], y=X_pca[mask, 1], mode='markers', name=label_names[label_id] if label_names else f'类别{label_id}', marker=dict(size=6, opacity=0.7), text=[f'{node_type}-{i}' for i in np.where(mask)[0]], hoverinfo='text' )) else: fig2.add_trace(go.Scatter( x=X_pca[:, 0], y=X_pca[:, 1], mode='markers', marker=dict(size=6, color=X_pca[:, 0], colorscale='Viridis', opacity=0.7), text=[f'{node_type}-{i}' for i in range(len(X_pca))], hoverinfo='text' )) fig2.update_layout( title=f"{node_type.upper()} - PCA 2D投影", xaxis_title=f"PC1 ({explained_var[0] * 100:.1f}%)", yaxis_title=f"PC2 ({explained_var[1] * 100:.1f}%)", template="plotly_white", height=600 ) else: # 3D fig2 = go.Figure() if labels is not None: for label_id in np.unique(labels): mask = labels == label_id fig2.add_trace(go.Scatter3d( x=X_pca[mask, 0], y=X_pca[mask, 1], z=X_pca[mask, 2], mode='markers', name=label_names[label_id] if label_names else f'类别{label_id}', marker=dict(size=4, opacity=0.7), text=[f'{node_type}-{i}' for i in np.where(mask)[0]], hoverinfo='text' )) else: fig2.add_trace(go.Scatter3d( x=X_pca[:, 0], y=X_pca[:, 1], z=X_pca[:, 2], mode='markers', marker=dict(size=4, color=X_pca[:, 0], colorscale='Viridis', opacity=0.7), text=[f'{node_type}-{i}' for i in range(len(X_pca))], hoverinfo='text' )) fig2.update_layout( title=f"{node_type.upper()} - PCA 3D投影", scene=dict( xaxis_title=f"PC1 ({explained_var[0] * 100:.1f}%)", yaxis_title=f"PC2 ({explained_var[1] * 100:.1f}%)", zaxis_title=f"PC3 ({explained_var[2] * 100:.1f}%)" ), template="plotly_white", height=600 ) else: fig2 = None # 主成分载荷分析 components_df = pd.DataFrame( pca.components_[:5].T, # 前5个主成分 columns=[f'PC{i + 1}' for i in range(min(5, pca.n_components_))], index=[f'Feature{i + 1}' for i in range(X.shape[1])] ) components_df['Abs_Max'] = components_df.abs().max(axis=1) components_df = components_df.sort_values('Abs_Max', ascending=False).head(10) components_df = components_df.drop('Abs_Max', axis=1) return report, fig1, fig2, components_df def perform_tsne_analysis(node_type, n_components=2, perplexity=30, n_iter=1000): """执行t-SNE降维分析""" embeddings, status = compute_node_embeddings() if embeddings is None: return status, None, None if node_type not in embeddings: return f"❌ 无效的节点类型: {node_type}", None, None X = embeddings[node_type] # 限制样本数量(t-SNE计算较慢) max_samples = 2000 if len(X) > max_samples: indices = np.random.choice(len(X), max_samples, replace=False) X = X[indices] sampled = True else: indices = np.arange(len(X)) sampled = False # 标准化 scaler = StandardScaler() X_scaled = scaler.fit_transform(X) # t-SNE tsne = TSNE( n_components=n_components, perplexity=min(perplexity, len(X) - 1), max_iter=n_iter, random_state=42 ) X_tsne = tsne.fit_transform(X_scaled) # 生成报告 report = f""" ## 🎯 t-SNE降维分析报告 **节点类型**: {node_type} **原始维度**: {X.shape[1]} **降维后维度**: {n_components} **样本数量**: {len(X)} {'(采样)' if sampled else ''} **困惑度**: {perplexity} **迭代次数**: {n_iter} ### t-SNE参数说明 - **困惑度(Perplexity)**: 平衡局部和全局结构,通常在5-50之间 - **迭代次数**: 更多迭代可能得到更好的结果,但计算时间更长 ### 应用场景 t-SNE特别适合: - 可视化高维数据的聚类结构 - 发现数据中的模式和分组 - 识别异常值和离群点 """ # 获取标签 labels = None label_names = None if node_type == 'company' and hasattr(current_data['company'], 'industry'): labels = current_data['company'].industry.cpu().numpy()[indices] label_names = ['金融科技', '生物医药', '人工智能', '半导体', '新能源', '电子商务', '物流科技', '智能制造'] # 可视化 if n_components == 2: fig = go.Figure() if labels is not None: for label_id in np.unique(labels): mask = labels == label_id fig.add_trace(go.Scatter( x=X_tsne[mask, 0], y=X_tsne[mask, 1], mode='markers', name=label_names[label_id] if label_names else f'类别{label_id}', marker=dict(size=8, opacity=0.7), text=[f'{node_type}-{indices[i]}' for i in np.where(mask)[0]], hoverinfo='text' )) else: fig.add_trace(go.Scatter( x=X_tsne[:, 0], y=X_tsne[:, 1], mode='markers', marker=dict( size=8, color=np.arange(len(X_tsne)), colorscale='Viridis', opacity=0.7 ), text=[f'{node_type}-{i}' for i in indices], hoverinfo='text' )) fig.update_layout( title=f"{node_type.upper()} - t-SNE 2D可视化", xaxis_title="t-SNE 1", yaxis_title="t-SNE 2", template="plotly_white", height=600 ) else: # 3D fig = go.Figure() if labels is not None: for label_id in np.unique(labels): mask = labels == label_id fig.add_trace(go.Scatter3d( x=X_tsne[mask, 0], y=X_tsne[mask, 1], z=X_tsne[mask, 2], mode='markers', name=label_names[label_id] if label_names else f'类别{label_id}', marker=dict(size=5, opacity=0.7), text=[f'{node_type}-{indices[i]}' for i in np.where(mask)[0]], hoverinfo='text' )) else: fig.add_trace(go.Scatter3d( x=X_tsne[:, 0], y=X_tsne[:, 1], z=X_tsne[:, 2], mode='markers', marker=dict( size=5, color=np.arange(len(X_tsne)), colorscale='Viridis', opacity=0.7 ), text=[f'{node_type}-{i}' for i in indices], hoverinfo='text' )) fig.update_layout( title=f"{node_type.upper()} - t-SNE 3D可视化", scene=dict( xaxis_title="t-SNE 1", yaxis_title="t-SNE 2", zaxis_title="t-SNE 3" ), template="plotly_white", height=600 ) # 如果有标签,计算聚类指标 metrics_df = None if labels is not None: try: silhouette = silhouette_score(X_tsne, labels) davies_bouldin = davies_bouldin_score(X_tsne, labels) metrics_df = pd.DataFrame({ '指标': ['轮廓系数', 'Davies-Bouldin指数'], '数值': [f'{silhouette:.4f}', f'{davies_bouldin:.4f}'], '说明': [ '[-1, 1],越接近1越好', '越小越好,表示聚类紧密且分离' ] }) report += f"\n### 聚类质量评估\n\n" report += f"- **轮廓系数**: {silhouette:.4f}\n" report += f"- **Davies-Bouldin指数**: {davies_bouldin:.4f}\n" except: pass return report, fig, metrics_df def perform_clustering_analysis(node_type, method='kmeans', n_clusters=5): """执行聚类分析""" embeddings, status = compute_node_embeddings() if embeddings is None: return status, None, None, None if node_type not in embeddings: return f"❌ 无效的节点类型: {node_type}", None, None, None X = embeddings[node_type] # 标准化 scaler = StandardScaler() X_scaled = scaler.fit_transform(X) # 降维到2D用于可视化 pca = PCA(n_components=2) X_2d = pca.fit_transform(X_scaled) # 执行聚类 if method == 'kmeans': clusterer = KMeans(n_clusters=n_clusters, random_state=42, n_init=10) labels = clusterer.fit_predict(X_scaled) centers_2d = pca.transform(clusterer.cluster_centers_) elif method == 'dbscan': clusterer = DBSCAN(eps=0.5, min_samples=5) labels = clusterer.fit_predict(X_scaled) n_clusters = len(set(labels)) - (1 if -1 in labels else 0) centers_2d = None elif method == 'hierarchical': clusterer = AgglomerativeClustering(n_clusters=n_clusters) labels = clusterer.fit_predict(X_scaled) centers_2d = None # 计算聚类指标 if len(set(labels)) > 1: silhouette = silhouette_score(X_scaled, labels) davies_bouldin = davies_bouldin_score(X_scaled, labels) else: silhouette = 0 davies_bouldin = 0 # 生成报告 report = f""" ## 🎯 聚类分析报告 **节点类型**: {node_type} **聚类方法**: {method.upper()} **聚类数量**: {len(set(labels))} **样本数量**: {len(X)} ### 聚类质量指标 - **轮廓系数**: {silhouette:.4f} - **Davies-Bouldin指数**: {davies_bouldin:.4f} ### 各簇样本分布 """ cluster_counts = pd.Series(labels).value_counts().sort_index() for cluster_id, count in cluster_counts.items(): cluster_name = f"簇 {cluster_id}" if cluster_id != -1 else "噪声点" report += f"- **{cluster_name}**: {count} 个样本 ({count / len(labels) * 100:.1f}%)\n" # 可视化 fig = go.Figure() for cluster_id in sorted(set(labels)): mask = labels == cluster_id cluster_name = f"簇 {cluster_id}" if cluster_id != -1 else "噪声点" fig.add_trace(go.Scatter( x=X_2d[mask, 0], y=X_2d[mask, 1], mode='markers', name=cluster_name, marker=dict(size=8, opacity=0.7), text=[f'{node_type}-{i}
簇: {cluster_id}' for i in np.where(mask)[0]], hoverinfo='text' )) # 添加聚类中心(如果有) if centers_2d is not None: fig.add_trace(go.Scatter( x=centers_2d[:, 0], y=centers_2d[:, 1], mode='markers', name='聚类中心', marker=dict( size=15, symbol='x', color='red', line=dict(width=2, color='darkred') ) )) fig.update_layout( title=f"{node_type.upper()} - {method.upper()} 聚类结果 (PCA投影)", xaxis_title=f"PC1 ({pca.explained_variance_ratio_[0] * 100:.1f}%)", yaxis_title=f"PC2 ({pca.explained_variance_ratio_[1] * 100:.1f}%)", template="plotly_white", height=600 ) # 聚类统计表 stats_data = [] for cluster_id in sorted(set(labels)): if cluster_id == -1: continue mask = labels == cluster_id cluster_samples = X[mask] stats_data.append({ '簇ID': cluster_id, '样本数': mask.sum(), '占比': f'{mask.sum() / len(labels) * 100:.1f}%', '平均值': f'{cluster_samples.mean():.4f}', '标准差': f'{cluster_samples.std():.4f}' }) stats_df = pd.DataFrame(stats_data) return report, fig, stats_df def perform_correlation_analysis(node_type): """执行特征相关性分析""" if current_data is None: return "❌ 请先加载数据集!", None, None X = current_data[node_type].x.cpu().numpy() # 计算相关系数矩阵 corr_matrix = np.corrcoef(X.T) # 特征名称 feature_names = [f'F{i + 1}' for i in range(X.shape[1])] # 热力图 fig1 = go.Figure(data=go.Heatmap( z=corr_matrix, x=feature_names, y=feature_names, colorscale='RdBu', zmid=0, text=np.round(corr_matrix, 2), texttemplate='%{text}', textfont={"size": 8}, colorbar=dict(title="相关系数") )) fig1.update_layout( title=f"{node_type.upper()} - 特征相关性热力图", template="plotly_white", height=600, width=700 ) # 找出高相关性特征对 high_corr = [] for i in range(len(corr_matrix)): for j in range(i + 1, len(corr_matrix)): if abs(corr_matrix[i, j]) > 0.7: high_corr.append({ '特征1': feature_names[i], '特征2': feature_names[j], '相关系数': f'{corr_matrix[i, j]:.4f}', '类型': '正相关' if corr_matrix[i, j] > 0 else '负相关' }) high_corr_df = pd.DataFrame(high_corr) if high_corr else pd.DataFrame({ '提示': ['未发现高相关性特征对(|r| > 0.7)'] }) # 特征分布可视化 fig2 = make_subplots( rows=2, cols=2, subplot_titles=[f'{feature_names[i]}分布' for i in range(min(4, len(feature_names)))] ) for idx in range(min(4, X.shape[1])): row = idx // 2 + 1 col = idx % 2 + 1 fig2.add_trace( go.Histogram( x=X[:, idx], name=feature_names[idx], nbinsx=50, marker=dict(color=px.colors.qualitative.Set3[idx]) ), row=row, col=col ) fig2.update_layout( title=f"{node_type.upper()} - 特征分布", template="plotly_white", height=500, showlegend=False ) # 生成报告 report = f""" ## 📊 特征相关性分析报告 **节点类型**: {node_type} **特征数量**: {X.shape[1]} **样本数量**: {X.shape[0]} ### 相关性摘要 - **平均相关系数**: {np.mean(np.abs(corr_matrix[np.triu_indices_from(corr_matrix, k=1)])):.4f} - **最大相关系数**: {np.max(corr_matrix[np.triu_indices_from(corr_matrix, k=1)]):.4f} - **最小相关系数**: {np.min(corr_matrix[np.triu_indices_from(corr_matrix, k=1)]):.4f} - **高相关特征对数量**: {len(high_corr)} ### 建议 """ if len(high_corr) > 0: report += "⚠️ 发现高相关性特征,可能存在冗余,建议考虑特征选择或降维。\n" else: report += "✅ 特征之间相关性较低,特征独立性良好。\n" return report, fig1, high_corr_df, fig2 def generate_statistics_dashboard(): """生成统计分析仪表板""" if current_data is None: return "❌ 请先加载数据集!", None # 收集统计信息 stats = {} for node_type in current_data.node_types: X = current_data[node_type].x.cpu().numpy() stats[node_type] = { 'count': len(X), 'features': X.shape[1], 'mean': X.mean(axis=0), 'std': X.std(axis=0), 'min': X.min(axis=0), 'max': X.max(axis=0), 'median': np.median(X, axis=0) } # 创建仪表板 fig = make_subplots( rows=2, cols=2, subplot_titles=( '各节点类型数量分布', '平均特征维度', '特征均值分布', '特征标准差分布' ), specs=[ [{'type': 'bar'}, {'type': 'bar'}], [{'type': 'box'}, {'type': 'box'}] ] ) # 1. 节点数量 fig.add_trace( go.Bar( x=list(stats.keys()), y=[s['count'] for s in stats.values()], marker=dict(color=px.colors.qualitative.Set2), text=[s['count'] for s in stats.values()], textposition='outside' ), row=1, col=1 ) # 2. 特征维度 fig.add_trace( go.Bar( x=list(stats.keys()), y=[s['features'] for s in stats.values()], marker=dict(color=px.colors.qualitative.Set3), text=[s['features'] for s in stats.values()], textposition='outside' ), row=1, col=2 ) # 3. 特征均值分布 for node_type, stat in stats.items(): fig.add_trace( go.Box( y=stat['mean'], name=node_type, boxmean='sd' ), row=2, col=1 ) # 4. 特征标准差分布 for node_type, stat in stats.items(): fig.add_trace( go.Box( y=stat['std'], name=node_type, boxmean='sd' ), row=2, col=2 ) fig.update_xaxes(title_text="节点类型", row=1, col=1) fig.update_xaxes(title_text="节点类型", row=1, col=2) fig.update_yaxes(title_text="数量", row=1, col=1) fig.update_yaxes(title_text="特征维度", row=1, col=2) fig.update_yaxes(title_text="特征均值", row=2, col=1) fig.update_yaxes(title_text="特征标准差", row=2, col=2) fig.update_layout( title="📊 数据集统计分析仪表板", template="plotly_white", height=800, showlegend=True ) # 生成详细报告 report = """ ## 📊 数据集统计分析报告 """ for node_type, stat in stats.items(): report += f""" ### {node_type.upper()} - **样本数量**: {stat['count']:,} - **特征维度**: {stat['features']} - **特征均值范围**: [{stat['mean'].min():.4f}, {stat['mean'].max():.4f}] - **特征标准差范围**: [{stat['std'].min():.4f}, {stat['std'].max():.4f}] - **特征值范围**: [{stat['min'].min():.4f}, {stat['max'].max():.4f}] """ # 整体统计 total_nodes = sum(s['count'] for s in stats.values()) total_edges = sum(current_data[et].num_edges for et in current_data.edge_types) report += f""" ### 整体概览 - **总节点数**: {total_nodes:,} - **总边数**: {total_edges:,} - **网络密度**: {total_edges / (total_nodes * (total_nodes - 1)) * 100:.6f}% - **平均度**: {total_edges * 2 / total_nodes:.2f} **生成时间**: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} """ return report, fig # 全局变量 current_data = None loaded_models = {} training_history = defaultdict(list) # 产业关键词映射 INDUSTRY_KEYWORDS = { '金融科技': ['金融', '支付', '区块链', '数字货币', '银行', '保险', '证券', '投资', 'fintech'], '生物医药': ['医药', '生物', '制药', '医疗', '健康', '诊断', '治疗', '基因', '蛋白质'], '人工智能': ['AI', '机器学习', '深度学习', '神经网络', '计算机视觉', 'NLP', '智能', '算法'], '半导体': ['芯片', '集成电路', '半导体', '晶圆', 'IC', '处理器', '传感器'], '新能源': ['太阳能', '风能', '电池', '储能', '充电', '新能源', '清洁能源'], '电子商务': ['电商', '网购', '在线', '平台', '零售', 'O2O', '跨境'], '物流科技': ['物流', '配送', '仓储', '供应链', '运输', '快递'], '智能制造': ['制造', '工业', '自动化', '机器人', '数控', '智能工厂', '工业4.0'] } # ==================== 数据管理模块 ==================== def generate_dataset(dataset_size, n_companies, n_patents, n_trademarks, n_persons, n_institutions, time_span): """生成数据集""" global current_data try: generator = IPEcosystemGenerator(seed=42) current_data = generator.generate( n_companies=n_companies, n_patents=n_patents, n_trademarks=n_trademarks, n_persons=n_persons, n_institutions=n_institutions, time_span_years=time_span ) # 保存数据 os.makedirs('data', exist_ok=True) torch.save(current_data, f'data/custom_{dataset_size}.pt') stats = get_dataset_stats() details = get_detailed_stats() return "✅ 数据集生成成功!", stats, details except Exception as e: return f"❌ 生成失败: {str(e)}", None, None def load_dataset(dataset_size): """加载已保存的数据集""" global current_data try: current_data = IPEcosystemGenerator.load_data(dataset_size, 'data') stats = get_dataset_stats() details = get_detailed_stats() return f"✅ 成功加载 {dataset_size} 数据集", stats, details except Exception as e: return f"❌ 加载失败: {str(e)}", None, None def get_dataset_stats(): """获取数据集统计信息""" if current_data is None: return pd.DataFrame({"提示": ["请先生成或加载数据集"]}) stats_data = [] # 节点统计 for node_type in current_data.node_types: stats_data.append({ "类型": "节点", "名称": node_type, "数量": current_data[node_type].num_nodes, "特征维度": current_data[node_type].num_features }) # 边统计 for edge_type in current_data.edge_types: src, rel, dst = edge_type stats_data.append({ "类型": "关系", "名称": f"{src} → {dst} ({rel})", "数量": current_data[edge_type].num_edges, "特征维度": "-" }) return pd.DataFrame(stats_data) def get_detailed_stats(): """获取详细统计信息""" if current_data is None: return "请先加载数据集" report = f""" ## 📊 数据集详细统计报告 ### 节点概览 """ total_nodes = sum(current_data[ntype].num_nodes for ntype in current_data.node_types) report += f"- **总节点数**: {total_nodes:,}\n\n" for node_type in current_data.node_types: num_nodes = current_data[node_type].num_nodes features = current_data[node_type].num_features percentage = (num_nodes / total_nodes) * 100 report += f" - **{node_type}**: {num_nodes:,} 个 ({percentage:.1f}%) - {features}维特征\n" report += "\n### 关系概览\n" total_edges = sum(current_data[etype].num_edges for etype in current_data.edge_types) report += f"- **总关系数**: {total_edges:,}\n\n" # 按关系类型分组 edge_groups = {} for edge_type in current_data.edge_types: src, rel, dst = edge_type if rel not in edge_groups: edge_groups[rel] = [] edge_groups[rel].append((src, dst, current_data[edge_type].num_edges)) for rel, edges in edge_groups.items(): report += f" **{rel}**:\n" for src, dst, count in edges: report += f" - {src} → {dst}: {count:,} 条边\n" report += "\n" # 数据密度分析 report += "### 数据质量指标\n" # 企业-专利密度 if ('company', 'owns', 'patent') in current_data.edge_types: n_companies = current_data['company'].num_nodes n_patents = current_data['patent'].num_nodes n_edges = current_data['company', 'owns', 'patent'].num_edges density = n_edges / (n_companies * n_patents) * 100 avg_patents_per_company = n_edges / n_companies report += f"- **企业-专利密度**: {density:.4f}%\n" report += f"- **平均每企业专利数**: {avg_patents_per_company:.1f}\n" # 专利引用密度 if ('patent', 'cites', 'patent') in current_data.edge_types: n_citations = current_data['patent', 'cites', 'patent'].num_edges avg_citations = n_citations / n_patents if n_patents > 0 else 0 report += f"- **平均每专利引用数**: {avg_citations:.1f}\n" report += f"\n**生成时间**: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}" return report def visualize_network_overview(): """网络概览可视化""" if current_data is None: return None # 创建节点统计柱状图 node_counts = {ntype: current_data[ntype].num_nodes for ntype in current_data.node_types} fig = go.Figure() fig.add_trace(go.Bar( x=list(node_counts.keys()), y=list(node_counts.values()), marker=dict( color=['#FF6B6B', '#4ECDC4', '#45B7D1', '#FFA07A', '#98D8C8'], line=dict(color='white', width=2) ), text=list(node_counts.values()), textposition='outside', )) fig.update_layout( title="📊 节点类型分布统计", xaxis_title="节点类型", yaxis_title="数量", template="plotly_white", height=400, font=dict(size=14) ) return fig def visualize_edge_distribution(): """边类型分布可视化""" if current_data is None: return None edge_counts = {} for edge_type in current_data.edge_types: src, rel, dst = edge_type edge_counts[f"{src}→{dst}"] = current_data[edge_type].num_edges # 创建饼图 fig = go.Figure(data=[go.Pie( labels=list(edge_counts.keys()), values=list(edge_counts.values()), hole=0.3, marker=dict(colors=px.colors.qualitative.Set3), textinfo='label+percent', textfont=dict(size=12) )]) fig.update_layout( title="🔗 关系类型分布", template="plotly_white", height=500, font=dict(size=13) ) return fig def visualize_network_graph(node_limit=100): """网络图可视化(使用NetworkX和Plotly)""" if current_data is None: return None # 创建NetworkX图 G = nx.Graph() # 添加企业节点(限制数量以提高性能) n_companies = min(node_limit, current_data['company'].num_nodes) for i in range(n_companies): G.add_node(f"C{i}", node_type='company', size=10, color='#FF6B6B') # 添加专利节点 n_patents = min(node_limit, current_data['patent'].num_nodes) for i in range(n_patents): G.add_node(f"P{i}", node_type='patent', size=5, color='#4ECDC4') # 添加企业-专利边 edge_index = current_data['company', 'owns', 'patent'].edge_index for i in range(min(500, edge_index.size(1))): company_idx = edge_index[0, i].item() patent_idx = edge_index[1, i].item() if company_idx < n_companies and patent_idx < n_patents: G.add_edge(f"C{company_idx}", f"P{patent_idx}") # 使用Spring布局 pos = nx.spring_layout(G, k=0.5, iterations=50) # 创建边的trace edge_trace = go.Scatter( x=[], y=[], line=dict(width=0.5, color='#888'), hoverinfo='none', mode='lines' ) for edge in G.edges(): x0, y0 = pos[edge[0]] x1, y1 = pos[edge[1]] edge_trace['x'] += tuple([x0, x1, None]) edge_trace['y'] += tuple([y0, y1, None]) # 创建节点的trace company_trace = go.Scatter( x=[], y=[], mode='markers', name='企业', marker=dict(size=10, color='#FF6B6B', line=dict(width=2, color='white')), text=[], hoverinfo='text' ) patent_trace = go.Scatter( x=[], y=[], mode='markers', name='专利', marker=dict(size=6, color='#4ECDC4', line=dict(width=1, color='white')), text=[], hoverinfo='text' ) for node in G.nodes(): x, y = pos[node] if node.startswith('C'): company_trace['x'] += tuple([x]) company_trace['y'] += tuple([y]) company_trace['text'] += tuple([f'企业 {node}']) else: patent_trace['x'] += tuple([x]) patent_trace['y'] += tuple([y]) patent_trace['text'] += tuple([f'专利 {node}']) # 创建图形 fig = go.Figure(data=[edge_trace, company_trace, patent_trace]) fig.update_layout( title=f"🌐 知识产权生态网络图谱 (显示前{node_limit}个节点)", showlegend=True, hovermode='closest', template='plotly_white', xaxis=dict(showgrid=False, zeroline=False, showticklabels=False), yaxis=dict(showgrid=False, zeroline=False, showticklabels=False), height=600, font=dict(size=12) ) return fig # ==================== 模型训练模块 ==================== def train_model(task_type, model_type, epochs, hidden_channels, learning_rate, n_heads, num_layers): """统一的模型训练接口""" global current_data, loaded_models, training_history if current_data is None: return "❌ 请先加载数据集!", None, None device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') try: training_history[task_type] = [] if task_type == "链接预测": model, predictor, history = train_link_prediction_task( current_data, model_type, epochs, hidden_channels, learning_rate, n_heads, num_layers, device ) loaded_models['link_prediction'] = {'model': model, 'predictor': predictor} elif task_type == "节点分类": model, classifier, history = train_node_classification_task( current_data, model_type, epochs, hidden_channels, learning_rate, n_heads, num_layers, device ) loaded_models['node_classification'] = {'model': model, 'classifier': classifier} elif task_type == "专利价值评估": model, value_predictor, history = train_patent_value_task( current_data, epochs, hidden_channels, learning_rate, n_heads, num_layers, device ) loaded_models['patent_value'] = {'model': model, 'predictor': value_predictor} elif task_type == "企业合作推荐": model, collab_recommender, history = train_collaboration_task( current_data, epochs, hidden_channels, learning_rate, n_heads, num_layers, device ) loaded_models['collaboration'] = {'model': model, 'recommender': collab_recommender} training_history[task_type] = history # 生成训练曲线 fig = plot_training_curves(history, task_type) # 生成训练报告 report = generate_training_report(history, task_type) return f"✅ {task_type}模型训练完成!", fig, report except Exception as e: return f"❌ 训练失败: {str(e)}", None, None def train_link_prediction_task(data, model_type, epochs, hidden_channels, lr, n_heads, num_layers, device): """链接预测训练任务""" edge_type = ('company', 'owns', 'patent') edge_index = data[edge_type].edge_index # 数据划分 num_edges = edge_index.size(1) perm = torch.randperm(num_edges) train_size = int(0.8 * num_edges) train_edge_index = edge_index[:, perm[:train_size]] val_edge_index = edge_index[:, perm[train_size:]] # 初始化模型 if model_type == "HGT": model = HGT( hidden_channels=hidden_channels, out_channels=hidden_channels, num_layers=num_layers, n_heads=n_heads, dropout=0.2, metadata=data.metadata() ).to(device) predictor = HGTLinkPredictor(hidden_channels, hidden_channels // 2, 2).to(device) else: model = HeteroGNN(hidden_channels, num_layers, data.metadata()).to(device) predictor = LinkPredictor(hidden_channels).to(device) optimizer = torch.optim.Adam(list(model.parameters()) + list(predictor.parameters()), lr=lr) data = data.to(device) src_type, _, dst_type = edge_type history = {'epoch': [], 'train_loss': [], 'val_auc': [], 'val_ap': []} for epoch in range(epochs): model.train() predictor.train() optimizer.zero_grad() x_dict = model(data.x_dict, data.edge_index_dict) pos_pred = predictor(x_dict[src_type], x_dict[dst_type], train_edge_index) # 负采样 neg_edge_index = negative_sampling(train_edge_index, data[src_type].num_nodes, data[dst_type].num_nodes, train_edge_index.size(1)).to(device) neg_pred = predictor(x_dict[src_type], x_dict[dst_type], neg_edge_index) loss = torch.nn.functional.binary_cross_entropy_with_logits( torch.cat([pos_pred, neg_pred]), torch.cat([torch.ones_like(pos_pred), torch.zeros_like(neg_pred)]) ) loss.backward() optimizer.step() # 验证 if epoch % 5 == 0: model.eval() predictor.eval() with torch.no_grad(): x_dict = model(data.x_dict, data.edge_index_dict) val_pos_pred = predictor(x_dict[src_type], x_dict[dst_type], val_edge_index) val_neg_edge_index = negative_sampling(val_edge_index, data[src_type].num_nodes, data[dst_type].num_nodes, val_edge_index.size(1)).to(device) val_neg_pred = predictor(x_dict[src_type], x_dict[dst_type], val_neg_edge_index) preds = torch.cat([val_pos_pred, val_neg_pred]).sigmoid().cpu().numpy() labels = np.concatenate([np.ones(val_pos_pred.size(0)), np.zeros(val_neg_pred.size(0))]) from sklearn.metrics import roc_auc_score, average_precision_score val_auc = roc_auc_score(labels, preds) val_ap = average_precision_score(labels, preds) history['epoch'].append(epoch) history['train_loss'].append(loss.item()) history['val_auc'].append(val_auc) history['val_ap'].append(val_ap) return model, predictor, history def train_node_classification_task(data, model_type, epochs, hidden_channels, lr, n_heads, num_layers, device): """节点分类训练任务""" node_type = 'company' target = 'industry' labels = data[node_type][target] num_classes = labels.max().item() + 1 num_nodes = data[node_type].num_nodes # 数据划分 perm = torch.randperm(num_nodes) train_size = int(0.6 * num_nodes) val_size = int(0.2 * num_nodes) train_mask = torch.zeros(num_nodes, dtype=torch.bool) val_mask = torch.zeros(num_nodes, dtype=torch.bool) train_mask[perm[:train_size]] = True val_mask[perm[train_size:train_size + val_size]] = True # 初始化模型 if model_type == "HGT": model = HGT( hidden_channels=hidden_channels, out_channels=hidden_channels, num_layers=num_layers, n_heads=n_heads, dropout=0.2, metadata=data.metadata() ).to(device) classifier = HGTNodeClassifier(hidden_channels, num_classes, hidden_channels // 2, 2).to(device) else: model = HeteroGNN(hidden_channels, num_layers, data.metadata()).to(device) classifier = NodeClassifier(hidden_channels, num_classes).to(device) optimizer = torch.optim.Adam(list(model.parameters()) + list(classifier.parameters()), lr=lr) data = data.to(device) labels = labels.to(device) train_mask = train_mask.to(device) val_mask = val_mask.to(device) history = {'epoch': [], 'train_loss': [], 'train_acc': [], 'val_acc': []} for epoch in range(epochs): model.train() classifier.train() optimizer.zero_grad() x_dict = model(data.x_dict, data.edge_index_dict) out = classifier(x_dict[node_type]) loss = torch.nn.functional.cross_entropy(out[train_mask], labels[train_mask]) loss.backward() optimizer.step() if epoch % 5 == 0: model.eval() classifier.eval() with torch.no_grad(): x_dict = model(data.x_dict, data.edge_index_dict) out = classifier(x_dict[node_type]) pred = out.argmax(dim=1) train_acc = (pred[train_mask] == labels[train_mask]).float().mean().item() val_acc = (pred[val_mask] == labels[val_mask]).float().mean().item() history['epoch'].append(epoch) history['train_loss'].append(loss.item()) history['train_acc'].append(train_acc) history['val_acc'].append(val_acc) return model, classifier, history def train_patent_value_task(data, epochs, hidden_channels, lr, n_heads, num_layers, device): """专利价值评估训练任务""" data = data.to(device) patent_features = data['patent'].x value_labels = (patent_features[:, 1] * 30 + patent_features[:, 3] * 25 + patent_features[:, 4] * 20 + patent_features[:, 6] * 15 + torch.rand(len(patent_features), device=device) * 10) value_labels = torch.clamp(value_labels, 0, 100) num_patents = data['patent'].num_nodes perm = torch.randperm(num_patents) train_size = int(0.6 * num_patents) val_size = int(0.2 * num_patents) train_mask = torch.zeros(num_patents, dtype=torch.bool) val_mask = torch.zeros(num_patents, dtype=torch.bool) train_mask[perm[:train_size]] = True val_mask[perm[train_size:train_size + val_size]] = True model = HGT( hidden_channels=hidden_channels, out_channels=hidden_channels, num_layers=num_layers, n_heads=n_heads, dropout=0.2, metadata=data.metadata() ).to(device) value_predictor = HGTPatentValuePredictor(hidden_channels, hidden_channels, 2).to(device) optimizer = torch.optim.Adam( list(model.parameters()) + list(value_predictor.parameters()), lr=lr, weight_decay=1e-4 ) train_mask = train_mask.to(device) val_mask = val_mask.to(device) history = {'epoch': [], 'train_loss': [], 'val_mae': [], 'val_rmse': []} for epoch in range(epochs): model.train() value_predictor.train() optimizer.zero_grad() x_dict = model(data.x_dict, data.edge_index_dict) pred_values = value_predictor(x_dict['patent']).squeeze() loss = torch.nn.functional.mse_loss(pred_values[train_mask], value_labels[train_mask]) loss.backward() torch.nn.utils.clip_grad_norm_( list(model.parameters()) + list(value_predictor.parameters()), 1.0 ) optimizer.step() if epoch % 5 == 0: model.eval() value_predictor.eval() with torch.no_grad(): x_dict = model(data.x_dict, data.edge_index_dict) pred_values = value_predictor(x_dict['patent']).squeeze() val_mae = torch.nn.functional.l1_loss( pred_values[val_mask], value_labels[val_mask] ).item() val_rmse = torch.sqrt( torch.nn.functional.mse_loss(pred_values[val_mask], value_labels[val_mask]) ).item() history['epoch'].append(epoch) history['train_loss'].append(loss.item()) history['val_mae'].append(val_mae) history['val_rmse'].append(val_rmse) return model, value_predictor, history def train_collaboration_task(data, epochs, hidden_channels, lr, n_heads, num_layers, device): """企业合作推荐训练任务""" data = data.to(device) existing_edges = data['company', 'cooperates', 'company'].edge_index num_companies = data['company'].num_nodes pos_edges = existing_edges.t() neg_edges = negative_sampling_collab(pos_edges, num_companies, len(pos_edges)) all_edges = torch.cat([pos_edges, neg_edges], dim=0) labels = torch.cat([torch.ones(len(pos_edges)), torch.zeros(len(neg_edges))]) num_edges = len(all_edges) perm = torch.randperm(num_edges) train_size = int(0.6 * num_edges) val_size = int(0.2 * num_edges) train_edges = all_edges[perm[:train_size]].to(device) val_edges = all_edges[perm[train_size:train_size + val_size]].to(device) train_labels = labels[perm[:train_size]].to(device) val_labels = labels[perm[train_size:train_size + val_size]].to(device) model = HGT( hidden_channels=hidden_channels, out_channels=hidden_channels, num_layers=num_layers, n_heads=n_heads, dropout=0.2, metadata=data.metadata() ).to(device) collab_recommender = HGTCollaborationRecommender(hidden_channels, hidden_channels, 2).to(device) optimizer = torch.optim.Adam(list(model.parameters()) + list(collab_recommender.parameters()), lr=lr) history = {'epoch': [], 'train_loss': [], 'val_auc': [], 'val_ap': []} for epoch in range(epochs): model.train() collab_recommender.train() optimizer.zero_grad() x_dict = model(data.x_dict, data.edge_index_dict) results = collab_recommender(x_dict['company'], x_dict['company'], train_edges.t()) loss = torch.nn.functional.binary_cross_entropy(results['success_probability'], train_labels) loss.backward() optimizer.step() if epoch % 5 == 0: model.eval() collab_recommender.eval() with torch.no_grad(): x_dict = model(data.x_dict, data.edge_index_dict) val_results = collab_recommender(x_dict['company'], x_dict['company'], val_edges.t()) val_pred = val_results['success_probability'] from sklearn.metrics import roc_auc_score, average_precision_score val_auc = roc_auc_score(val_labels.cpu().numpy(), val_pred.cpu().numpy()) val_ap = average_precision_score(val_labels.cpu().numpy(), val_pred.cpu().numpy()) history['epoch'].append(epoch) history['train_loss'].append(loss.item()) history['val_auc'].append(val_auc) history['val_ap'].append(val_ap) return model, collab_recommender, history def negative_sampling(edge_index, num_nodes_src, num_nodes_dst, num_neg): """负采样""" neg_edges = [] while len(neg_edges) < num_neg: src = torch.randint(0, num_nodes_src, (num_neg,)) dst = torch.randint(0, num_nodes_dst, (num_neg,)) neg = torch.stack([src, dst]) neg_edges.append(neg) if len(neg_edges) * num_neg >= num_neg: break return torch.cat(neg_edges, dim=1)[:, :num_neg] def negative_sampling_collab(pos_edges, num_nodes, num_neg): """合作推荐负采样""" device = pos_edges.device neg_edges = [] existing_set = set(map(tuple, pos_edges.tolist())) while len(neg_edges) < num_neg: src = torch.randint(0, num_nodes, (num_neg * 2,), device=device) dst = torch.randint(0, num_nodes, (num_neg * 2,), device=device) mask = src != dst candidates = torch.stack([src[mask], dst[mask]], dim=1) for edge in candidates: edge_tuple = tuple(edge.tolist()) reverse_tuple = tuple(edge.flip(0).tolist()) if edge_tuple not in existing_set and reverse_tuple not in existing_set: neg_edges.append(edge) if len(neg_edges) >= num_neg: break if len(neg_edges) >= num_neg: break return torch.stack(neg_edges[:num_neg]) def plot_training_curves(history, task_type): """绘制训练曲线""" fig = make_subplots(rows=1, cols=2, subplot_titles=('训练损失', '验证指标')) fig.add_trace( go.Scatter(x=history['epoch'], y=history['train_loss'], mode='lines+markers', name='训练损失', line=dict(color='#FF6B6B', width=2)), row=1, col=1 ) if 'val_auc' in history: fig.add_trace( go.Scatter(x=history['epoch'], y=history['val_auc'], mode='lines+markers', name='验证AUC', line=dict(color='#4ECDC4', width=2)), row=1, col=2 ) if 'val_ap' in history: fig.add_trace( go.Scatter(x=history['epoch'], y=history['val_ap'], mode='lines+markers', name='验证AP', line=dict(color='#95E1D3', width=2)), row=1, col=2 ) elif 'val_acc' in history: fig.add_trace( go.Scatter(x=history['epoch'], y=history['train_acc'], mode='lines+markers', name='训练准确率', line=dict(color='#4ECDC4', width=2)), row=1, col=2 ) fig.add_trace( go.Scatter(x=history['epoch'], y=history['val_acc'], mode='lines+markers', name='验证准确率', line=dict(color='#95E1D3', width=2)), row=1, col=2 ) elif 'val_mae' in history: fig.add_trace( go.Scatter(x=history['epoch'], y=history['val_mae'], mode='lines+markers', name='验证MAE', line=dict(color='#4ECDC4', width=2)), row=1, col=2 ) fig.add_trace( go.Scatter(x=history['epoch'], y=history['val_rmse'], mode='lines+markers', name='验证RMSE', line=dict(color='#95E1D3', width=2)), row=1, col=2 ) fig.update_xaxes(title_text="训练轮次", row=1, col=1) fig.update_xaxes(title_text="训练轮次", row=1, col=2) fig.update_yaxes(title_text="损失值", row=1, col=1) fig.update_layout( title=f"📈 {task_type} - 训练过程监控", template="plotly_white", height=400, showlegend=True, font=dict(size=12) ) return fig def generate_training_report(history, task_type): """生成训练报告""" if not history or 'epoch' not in history: return "训练历史为空" report = f""" ## 📋 {task_type} 训练报告 **训练配置** - 总训练轮次: {history['epoch'][-1] + 1} - 最终训练损失: {history['train_loss'][-1]:.4f} **性能指标** """ if 'val_auc' in history: report += f"- 最佳验证AUC: {max(history['val_auc']):.4f}\n" report += f"- 最终验证AUC: {history['val_auc'][-1]:.4f}\n" if 'val_ap' in history: report += f"- 最佳验证AP: {max(history['val_ap']):.4f}\n" elif 'val_acc' in history: report += f"- 最佳验证准确率: {max(history['val_acc']):.4f}\n" report += f"- 最终验证准确率: {history['val_acc'][-1]:.4f}\n" elif 'val_mae' in history: report += f"- 最佳验证MAE: {min(history['val_mae']):.4f}\n" report += f"- 最终验证MAE: {history['val_mae'][-1]:.4f}\n" report += f"\n**训练完成时间**: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}" return report # ==================== 推理预测模块 ==================== def predict_link(company_id, num_predictions): """预测企业-专利链接""" global current_data, loaded_models if current_data is None or 'link_prediction' not in loaded_models: return "❌ 请先加载数据并训练链接预测模型!", None device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') data = current_data.to(device) model = loaded_models['link_prediction']['model'] predictor = loaded_models['link_prediction']['predictor'] model.eval() predictor.eval() with torch.no_grad(): x_dict = model(data.x_dict, data.edge_index_dict) num_patents = data['patent'].num_nodes company_idx = min(company_id, data['company'].num_nodes - 1) edge_candidates = torch.stack([ torch.full((num_patents,), company_idx, dtype=torch.long), torch.arange(num_patents) ]).to(device) scores = predictor( x_dict['company'], x_dict['patent'], edge_candidates ).sigmoid().cpu().numpy() top_indices = np.argsort(scores)[-num_predictions:][::-1] results = [] for idx in top_indices: results.append({ "专利ID": f"P-{idx}", "预测得分": f"{scores[idx]:.4f}", "置信度": f"{scores[idx] * 100:.2f}%" }) df = pd.DataFrame(results) fig = go.Figure(data=[ go.Bar(x=[r["专利ID"] for r in results], y=[float(r["预测得分"]) for r in results], marker=dict(color=[float(r["预测得分"]) for r in results], colorscale='Viridis')) ]) fig.update_layout( title=f"企业 C-{company_idx} 的专利关联预测 (Top-{num_predictions})", xaxis_title="专利ID", yaxis_title="预测得分", template="plotly_white", height=400 ) return df, fig # ==================== 新功能1: 专利价值排行榜 ==================== def get_patent_value_leaderboard(top_n=50): """获取专利价值排行榜""" global current_data, loaded_models if current_data is None or 'patent_value' not in loaded_models: return "❌ 请先加载数据并训练专利价值评估模型!", None, None device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') data = current_data.to(device) model = loaded_models['patent_value']['model'] value_predictor = loaded_models['patent_value']['predictor'] model.eval() value_predictor.eval() with torch.no_grad(): x_dict = model(data.x_dict, data.edge_index_dict) values, grant_probs, renewal_probs = value_predictor(x_dict['patent'], return_aux=True) values = values.squeeze().cpu().numpy() grant_probs = grant_probs.squeeze().cpu().numpy() renewal_probs = renewal_probs.squeeze().cpu().numpy() # 排序 top_indices = np.argsort(values)[-top_n:][::-1] # 构建排行榜 leaderboard = [] for rank, idx in enumerate(top_indices, 1): patent_features = data['patent'].x[idx].cpu().numpy() leaderboard.append({ "排名": rank, "专利ID": f"P-{idx}", "价值评分": f"{values[idx]:.2f}", "授权概率": f"{grant_probs[idx]*100:.1f}%", "续费概率": f"{renewal_probs[idx]*100:.1f}%", "权利要求数": int(patent_features[1] * 30), "引用数": int(patent_features[3] * 50), "技术宽度": int(patent_features[4] * 5) }) df = pd.DataFrame(leaderboard) # 可视化 - Top 20 top_20 = leaderboard[:20] fig = make_subplots( rows=1, cols=2, subplot_titles=('Top 20 专利价值分布', '价值与授权概率关系'), specs=[[{'type': 'bar'}, {'type': 'scatter'}]] ) # 柱状图 fig.add_trace( go.Bar( x=[item["专利ID"] for item in top_20], y=[float(item["价值评分"]) for item in top_20], marker=dict( color=[float(item["价值评分"]) for item in top_20], colorscale='Reds', showscale=True ), name='价值评分' ), row=1, col=1 ) # 散点图 fig.add_trace( go.Scatter( x=[float(item["价值评分"]) for item in leaderboard], y=[float(item["授权概率"].rstrip('%')) for item in leaderboard], mode='markers', marker=dict( size=8, color=[float(item["价值评分"]) for item in leaderboard], colorscale='Viridis', showscale=True ), text=[item["专利ID"] for item in leaderboard], name='专利分布' ), row=1, col=2 ) fig.update_xaxes(title_text="专利ID", tickangle=45, row=1, col=1) fig.update_xaxes(title_text="价值评分", row=1, col=2) fig.update_yaxes(title_text="价值评分", row=1, col=1) fig.update_yaxes(title_text="授权概率 (%)", row=1, col=2) fig.update_layout( title=f"💎 专利价值排行榜 (Top {top_n})", template="plotly_white", height=500, showlegend=False ) # 生成报告 avg_value = np.mean(values) top10_avg = np.mean([float(item["价值评分"]) for item in leaderboard[:10]]) report = f""" ## 📊 专利价值排行榜分析 **整体概况** - 总专利数量: {len(values):,} - 平均价值评分: {avg_value:.2f} - Top 10 平均评分: {top10_avg:.2f} - 最高价值: {values[top_indices[0]]:.2f} (P-{top_indices[0]}) **价值分布** - 高价值专利 (≥80分): {np.sum(values >= 80)} 个 ({np.sum(values >= 80)/len(values)*100:.1f}%) - 中等价值 (60-80分): {np.sum((values >= 60) & (values < 80))} 个 - 一般价值 (<60分): {np.sum(values < 60)} 个 **生成时间**: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} """ return report, df, fig # ==================== 新功能2: 基于描述的产业分类与专利推荐 ==================== def classify_and_recommend_by_description(description, num_patents=10): """根据描述分类产业并推荐专利""" global current_data, loaded_models if current_data is None or 'node_classification' not in loaded_models: return "❌ 请先加载数据并训练节点分类模型!", None, None, None # 基于关键词匹配推断产业 description_lower = description.lower() industry_scores = {} for industry, keywords in INDUSTRY_KEYWORDS.items(): score = sum(1 for keyword in keywords if keyword.lower() in description_lower) industry_scores[industry] = score # 获取最可能的产业 if max(industry_scores.values()) == 0: return "❌ 无法从描述中识别产业类型,请提供更多产业相关关键词", None, None, None predicted_industry = max(industry_scores, key=industry_scores.get) industries_list = ['金融科技', '生物医药', '人工智能', '半导体', '新能源', '电子商务', '物流科技', '智能制造'] industry_idx = industries_list.index(predicted_industry) # 生成分类报告 classification_report = f""" ## 🏷️ 产业分类结果 **输入描述**: {description} **预测产业**: **{predicted_industry}** **匹配关键词**: """ matched_keywords = [kw for kw in INDUSTRY_KEYWORDS[predicted_industry] if kw.lower() in description_lower] classification_report += "- " + ", ".join(matched_keywords) if matched_keywords else "- (基于语义分析)" classification_report += "\n\n**产业概率分布**:\n" total_score = sum(industry_scores.values()) for ind, score in sorted(industry_scores.items(), key=lambda x: x[1], reverse=True): prob = score / total_score * 100 if total_score > 0 else 0 classification_report += f"- {ind}: {prob:.1f}%\n" # 推荐该产业的专利 device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') data = current_data.to(device) # 找到该产业的企业 company_industries = data['company'].industry.cpu().numpy() industry_companies = np.where(company_industries == industry_idx)[0] if len(industry_companies) == 0: return classification_report, None, None, None # 找到这些企业持有的专利 edge_index = data['company', 'owns', 'patent'].edge_index.cpu().numpy() industry_patents = set() for company_idx in industry_companies: patent_mask = edge_index[0] == company_idx industry_patents.update(edge_index[1][patent_mask].tolist()) industry_patents = list(industry_patents) if len(industry_patents) == 0: return classification_report, None, None, None # 如果训练了专利价值模型,按价值排序 if 'patent_value' in loaded_models: model = loaded_models['patent_value']['model'] value_predictor = loaded_models['patent_value']['predictor'] model.eval() value_predictor.eval() with torch.no_grad(): x_dict = model(data.x_dict, data.edge_index_dict) all_values = value_predictor(x_dict['patent']).squeeze().cpu().numpy() # 获取该产业专利的价值 patent_values = [(idx, all_values[idx]) for idx in industry_patents] patent_values.sort(key=lambda x: x[1], reverse=True) top_patents = patent_values[:num_patents] else: # 随机选择 top_patents = [(idx, 0) for idx in np.random.choice(industry_patents, min(num_patents, len(industry_patents)), replace=False)] # 构建推荐列表 recommendations = [] for rank, (patent_idx, value) in enumerate(top_patents, 1): patent_features = data['patent'].x[patent_idx].cpu().numpy() recommendations.append({ "排名": rank, "专利ID": f"P-{patent_idx}", "价值评分": f"{value:.2f}" if value > 0 else "N/A", "权利要求数": int(patent_features[1] * 30), "引用数": int(patent_features[3] * 50), "技术宽度": int(patent_features[4] * 5), "授权状态": "已授权" if patent_features[6] > 0.5 else "未授权" }) df = pd.DataFrame(recommendations) # 可视化 fig = go.Figure() if value > 0: # 有价值评分 fig.add_trace(go.Bar( x=[r["专利ID"] for r in recommendations], y=[float(r["价值评分"]) for r in recommendations], marker=dict( color=[float(r["价值评分"]) for r in recommendations], colorscale='Greens' ), name='价值评分' )) fig.update_layout(yaxis_title="价值评分") else: fig.add_trace(go.Bar( x=[r["专利ID"] for r in recommendations], y=[r["引用数"] for r in recommendations], marker=dict(color='#4ECDC4'), name='引用数' )) fig.update_layout(yaxis_title="引用数") fig.update_layout( title=f"📚 {predicted_industry} 产业推荐专利 (Top {num_patents})", xaxis_title="专利ID", template="plotly_white", height=400 ) # 产业分布可视化 prob_fig = go.Figure(data=[ go.Bar( x=list(industry_scores.keys()), y=list(industry_scores.values()), marker=dict( color=['#FF6B6B' if ind == predicted_industry else '#E0E0E0' for ind in industry_scores.keys()] ) ) ]) prob_fig.update_layout( title="🎯 产业匹配度分析", xaxis_title="产业类别", yaxis_title="匹配得分", template="plotly_white", height=350 ) return classification_report, df, fig, prob_fig # ==================== 新功能3: 实体详情查看 ==================== def view_entity_details(entity_type, entity_id): """查看实体详细信息""" global current_data if current_data is None: return "❌ 请先加载数据集!", None, None try: entity_id = int(entity_id) except: return "❌ 请输入有效的ID数字", None, None type_mapping = { "企业 (Company)": "company", "专利 (Patent)": "patent", "商标 (Trademark)": "trademark", "人员 (Person)": "person", "机构 (Institution)": "institution" } node_type = type_mapping.get(entity_type) if node_type is None: return "❌ 无效的实体类型", None, None if entity_id >= current_data[node_type].num_nodes or entity_id < 0: return f"❌ ID超出范围 (0-{current_data[node_type].num_nodes-1})", None, None # 获取基本信息 features = current_data[node_type].x[entity_id].cpu().numpy() # 构建详情报告 report = f""" ## 📋 {entity_type} 详细信息 **ID**: {node_type.upper()}-{entity_id} ### 基本特征 """ # 根据不同类型显示不同特征 if node_type == "company": industries = ['金融科技', '生物医药', '人工智能', '半导体', '新能源', '电子商务', '物流科技', '智能制造'] districts = ['中环', '湾仔', '尖沙咀', '观塘', '荃湾', '科学园', '数码港', '将军澳工业邨'] industry_idx = current_data['company'].industry[entity_id].item() report += f""" - **企业规模**: {int(features[0] * 500)} 人 - **成立年限**: {features[1] * 30:.1f} 年 - **研发投入比例**: {features[2] * 0.3 * 100:.1f}% - **国际化程度**: {features[3] * 100:.1f}% - **创新能力评分**: {features[4] * 100:.1f}/100 - **年营收**: {np.expm1(features[5] * 10):.1f} 百万港币 - **所属产业**: {industries[industry_idx]} - **所在地区**: {districts[int(features[7] * len(districts))]} """ elif node_type == "patent": report += f""" - **申请年份**: {2015 + int(features[0] * 10)} - **权利要求数**: {int(features[1] * 30)} - **发明人数**: {int(features[2] * 10)} - **引用数**: {int(features[3] * 50)} - **技术宽度**: {int(features[4] * 5)} - **价值评分**: {features[5] * 100:.1f}/100 - **授权状态**: {'✅ 已授权' if features[6] > 0.5 else '⏳ 未授权'} - **IPC编码**: {int(features[7] * 100)} """ elif node_type == "trademark": report += f""" - **注册年份**: {2015 + int(features[0] * 10)} - **商标类别**: {int(features[1] * 45) + 1} - **续展次数**: {int(features[2] * 3)} - **商标类型**: {['文字', '图形', '组合'][int(features[3] * 2)]} - **知名度评分**: {features[4] * 100:.1f}/100 - **争议记录**: {int(features[5] * 10)} """ elif node_type == "person": report += f""" - **学历**: {['本科', '硕士', '博士'][int(features[0] * 2)]} - **工作年限**: {features[1] * 40:.1f} 年 - **专利发明数**: {int(features[2] * 50)} - **技术领域数**: {int(features[3] * 5)} - **H指数**: {int(features[4] * 50)} - **跨界合作能力**: {features[5] * 100:.1f}% """ elif node_type == "institution": inst_types = ['大学', '研究所', '孵化器', '政府实验室'] report += f""" - **机构类型**: {inst_types[int(features[0] * 4)]} - **建立年限**: {features[1] * 100:.1f} 年 - **研究人员数**: {int(np.expm1(features[2] * np.log1p(1000)))} - **年度专利产出**: {int(features[3] * 100)} - **国际排名**: Top {int(1/features[4])} - **产学研合作数**: {int(features[5] * 50)} """ # 获取关系信息 report += "\n### 关系网络\n" relationships = [] for edge_type in current_data.edge_types: src_type, rel, dst_type = edge_type if src_type == node_type: edge_index = current_data[edge_type].edge_index.cpu().numpy() mask = edge_index[0] == entity_id related_ids = edge_index[1][mask] if len(related_ids) > 0: relationships.append({ "关系": f"{rel} → {dst_type}", "数量": len(related_ids), "示例": f"{dst_type.upper()}-{related_ids[0]}" if len(related_ids) > 0 else "N/A" }) if dst_type == node_type: edge_index = current_data[edge_type].edge_index.cpu().numpy() mask = edge_index[1] == entity_id related_ids = edge_index[0][mask] if len(related_ids) > 0: relationships.append({ "关系": f"{src_type} → {rel}", "数量": len(related_ids), "示例": f"{src_type.upper()}-{related_ids[0]}" if len(related_ids) > 0 else "N/A" }) if relationships: rel_df = pd.DataFrame(relationships) report += f"\n{rel_df.to_markdown(index=False)}\n" else: report += "- 暂无关系数据\n" # 关系网络可视化 G = nx.Graph() G.add_node(f"{node_type}-{entity_id}", node_type='center', color='#FF6B6B', size=20) # 添加直接相关的节点(限制数量) max_neighbors = 20 for edge_type in current_data.edge_types: src_type, rel, dst_type = edge_type if src_type == node_type: edge_index = current_data[edge_type].edge_index.cpu().numpy() mask = edge_index[0] == entity_id related_ids = edge_index[1][mask][:max_neighbors] for rid in related_ids: G.add_node(f"{dst_type}-{rid}", node_type=dst_type, color='#4ECDC4', size=10) G.add_edge(f"{node_type}-{entity_id}", f"{dst_type}-{rid}") if dst_type == node_type: edge_index = current_data[edge_type].edge_index.cpu().numpy() mask = edge_index[1] == entity_id related_ids = edge_index[0][mask][:max_neighbors] for rid in related_ids: G.add_node(f"{src_type}-{rid}", node_type=src_type, color='#45B7D1', size=10) G.add_edge(f"{src_type}-{rid}", f"{node_type}-{entity_id}") # 绘制图 pos = nx.spring_layout(G, k=1, iterations=50) edge_trace = go.Scatter( x=[], y=[], line=dict(width=0.5, color='#888'), hoverinfo='none', mode='lines' ) for edge in G.edges(): x0, y0 = pos[edge[0]] x1, y1 = pos[edge[1]] edge_trace['x'] += tuple([x0, x1, None]) edge_trace['y'] += tuple([y0, y1, None]) node_trace = go.Scatter( x=[], y=[], mode='markers+text', text=[], textposition="top center", marker=dict(size=[], color=[], line=dict(width=2, color='white')), hoverinfo='text' ) for node in G.nodes(): x, y = pos[node] node_trace['x'] += tuple([x]) node_trace['y'] += tuple([y]) node_data = G.nodes[node] node_trace['marker']['size'] += tuple([node_data.get('size', 10)]) node_trace['marker']['color'] += tuple([node_data.get('color', '#888')]) node_trace['text'] += tuple([node.split('-')[0]]) fig = go.Figure(data=[edge_trace, node_trace]) fig.update_layout( title=f"🌐 {node_type.upper()}-{entity_id} 关系网络图谱", showlegend=False, hovermode='closest', template='plotly_white', xaxis=dict(showgrid=False, zeroline=False, showticklabels=False), yaxis=dict(showgrid=False, zeroline=False, showticklabels=False), height=500 ) return report, rel_df if relationships else None, fig # ==================== 继续其他预测函数 ==================== def predict_node_class(company_id): """预测企业产业分类""" global current_data, loaded_models if current_data is None or 'node_classification' not in loaded_models: return "❌ 请先加载数据并训练节点分类模型!", None device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') data = current_data.to(device) model = loaded_models['node_classification']['model'] classifier = loaded_models['node_classification']['classifier'] model.eval() classifier.eval() industries = ['金融科技', '生物医药', '人工智能', '半导体', '新能源', '电子商务', '物流科技', '智能制造'] company_idx = min(company_id, data['company'].num_nodes - 1) with torch.no_grad(): x_dict = model(data.x_dict, data.edge_index_dict) out = classifier(x_dict['company']) probs = torch.softmax(out[company_idx], dim=0).cpu().numpy() pred_class = probs.argmax() results = [] for i, prob in enumerate(probs): results.append({ "产业类别": industries[i] if i < len(industries) else f"产业{i}", "预测概率": f"{prob:.4f}", "百分比": f"{prob * 100:.2f}%" }) df = pd.DataFrame(results).sort_values("预测概率", ascending=False) fig = go.Figure(data=[ go.Bar(x=[r["产业类别"] for r in results], y=[float(r["预测概率"]) for r in results], marker=dict(color=['#FF6B6B' if i == pred_class else '#E0E0E0' for i in range(len(results))])) ]) fig.update_layout( title=f"企业 C-{company_idx} 的产业分类预测", xaxis_title="产业类别", yaxis_title="预测概率", template="plotly_white", height=400 ) return df, fig def predict_patent_value(patent_id): """预测专利价值""" global current_data, loaded_models if current_data is None or 'patent_value' not in loaded_models: return "❌ 请先加载数据并训练专利价值评估模型!", None, None device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') data = current_data.to(device) model = loaded_models['patent_value']['model'] value_predictor = loaded_models['patent_value']['predictor'] model.eval() value_predictor.eval() patent_idx = min(patent_id, data['patent'].num_nodes - 1) with torch.no_grad(): x_dict = model(data.x_dict, data.edge_index_dict) value, grant_prob, renewal_prob = value_predictor(x_dict['patent'], return_aux=True) value_score = value[patent_idx].item() grant_p = grant_prob[patent_idx].item() renewal_p = renewal_prob[patent_idx].item() report = f""" ## 💎 专利价值评估报告 **专利ID**: P-{patent_idx} ### 核心指标 - **综合价值评分**: {value_score:.2f} / 100 - **授权概率**: {grant_p * 100:.2f}% - **续费概率**: {renewal_p * 100:.2f}% ### 价值等级 """ if value_score >= 80: report += "🌟 **高价值专利** - 建议重点保护和商业化开发" elif value_score >= 60: report += "⭐ **中等价值专利** - 具有一定商业潜力" else: report += "📄 **一般专利** - 基础性专利,价值有限" fig = go.Figure() fig.add_trace(go.Indicator( mode="gauge+number+delta", value=value_score, domain={'x': [0, 0.5], 'y': [0, 1]}, title={'text': "综合价值评分"}, gauge={ 'axis': {'range': [None, 100]}, 'bar': {'color': "#4ECDC4"}, 'steps': [ {'range': [0, 40], 'color': "#FFE5E5"}, {'range': [40, 70], 'color': "#FFF5CC"}, {'range': [70, 100], 'color': "#E8F5E9"} ], 'threshold': { 'line': {'color': "red", 'width': 4}, 'thickness': 0.75, 'value': 80 } } )) fig.add_trace(go.Bar( x=['授权概率', '续费概率'], y=[grant_p * 100, renewal_p * 100], marker=dict(color=['#FF6B6B', '#45B7D1']), text=[f"{grant_p * 100:.1f}%", f"{renewal_p * 100:.1f}%"], textposition='outside' )) fig.update_layout( title=f"专利 P-{patent_idx} 价值分析", template="plotly_white", height=400, xaxis={'domain': [0.6, 1]}, yaxis={'domain': [0, 1], 'title': '概率 (%)'} ) patent_features = data['patent'].x[patent_idx].cpu().numpy() feature_names = ['申请年份', '权利要求数', '发明人数', '引用数', '技术宽度', '现有价值', '授权状态', 'IPC编码'] feature_df = pd.DataFrame({ '特征': feature_names, '数值': [f"{val:.3f}" for val in patent_features] }) return report, fig, feature_df def recommend_collaboration(company_id, num_recommendations): """推荐企业合作伙伴""" global current_data, loaded_models if current_data is None or 'collaboration' not in loaded_models: return "❌ 请先加载数据并训练合作推荐模型!", None device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') data = current_data.to(device) model = loaded_models['collaboration']['model'] collab_recommender = loaded_models['collaboration']['recommender'] model.eval() collab_recommender.eval() company_idx = min(company_id, data['company'].num_nodes - 1) num_companies = data['company'].num_nodes with torch.no_grad(): x_dict = model(data.x_dict, data.edge_index_dict) edge_candidates = torch.stack([ torch.full((num_companies,), company_idx, dtype=torch.long), torch.arange(num_companies) ]).to(device) results = collab_recommender(x_dict['company'], x_dict['company'], edge_candidates) success_probs = results['success_probability'].cpu().numpy() tech_sims = results['tech_similarity'].cpu().numpy() market_sims = results['market_similarity'].cpu().numpy() complements = results['complementarity'].cpu().numpy() success_probs[company_idx] = 0 top_indices = np.argsort(success_probs)[-num_recommendations:][::-1] recommendations = [] for idx in top_indices: recommendations.append({ "合作企业ID": f"C-{idx}", "成功概率": f"{success_probs[idx]:.4f}", "技术相似度": f"{tech_sims[idx]:.4f}", "市场相似度": f"{market_sims[idx]:.4f}", "互补性": f"{complements[idx]:.4f}" }) df = pd.DataFrame(recommendations) fig = go.Figure() for i, rec in enumerate(recommendations[:5]): fig.add_trace(go.Scatterpolar( r=[float(rec["成功概率"]), float(rec["技术相似度"]), float(rec["市场相似度"]), float(rec["互补性"])], theta=['成功概率', '技术相似度', '市场相似度', '互补性'], fill='toself', name=rec["合作企业ID"] )) fig.update_layout( polar=dict(radialaxis=dict(visible=True, range=[0, 1])), title=f"企业 C-{company_idx} 的合作推荐分析 (Top-{min(5, num_recommendations)})", template="plotly_white", height=500 ) return df, fig # ==================== Gradio界面构建 ==================== def build_gradio_app(): """构建增强版Gradio应用""" with gr.Blocks(title="香港知识产权生态网络分析系统 (数据科学增强版)", theme=gr.themes.Soft()) as app: gr.Markdown(""" # 🏙️ 香港知识产权生态网络分析系统 ### 基于异构图神经网络的智能分析平台 - 数据科学增强版 v3.0 """) with gr.Tabs(): # ========== 新增: 数据科学分析Tab ========== with gr.Tab("📊 数据管理"): gr.Markdown("## 数据集生成与加载") with gr.Row(): with gr.Column(scale=1): gr.Markdown("### 快速加载") dataset_size = gr.Dropdown( choices=["test", "medium", "large"], value="medium", label="选择数据集规模" ) load_btn = gr.Button("📂 加载数据集", variant="primary") gr.Markdown("### 自定义生成") with gr.Accordion("高级配置", open=False): custom_size = gr.Textbox(value="custom", label="数据集名称") n_companies = gr.Slider(100, 2000, value=500, step=100, label="企业数量") n_patents = gr.Slider(500, 15000, value=3000, step=500, label="专利数量") n_trademarks = gr.Slider(200, 8000, value=1500, step=200, label="商标数量") n_persons = gr.Slider(300, 10000, value=2000, step=300, label="人员数量") n_institutions = gr.Slider(20, 200, value=50, step=10, label="机构数量") time_span = gr.Slider(5, 20, value=10, step=1, label="时间跨度(年)") generate_btn = gr.Button("🔨 生成数据集", variant="secondary") with gr.Column(scale=2): data_status = gr.Textbox(label="状态信息", interactive=False) data_stats = gr.Dataframe(label="数据集统计") with gr.Row(): detailed_stats = gr.Markdown(label="详细统计报告") with gr.Row(): node_chart = gr.Plot(label="节点分布") edge_chart = gr.Plot(label="关系分布") with gr.Row(): network_viz = gr.Plot(label="网络图谱可视化") # 绑定事件 load_btn.click( load_dataset, inputs=[dataset_size], outputs=[data_status, data_stats, detailed_stats] ).then( lambda: [visualize_network_overview(), visualize_edge_distribution(), visualize_network_graph()], outputs=[node_chart, edge_chart, network_viz] ) generate_btn.click( generate_dataset, inputs=[custom_size, n_companies, n_patents, n_trademarks, n_persons, n_institutions, time_span], outputs=[data_status, data_stats, detailed_stats] ).then( lambda: [visualize_network_overview(), visualize_edge_distribution(), visualize_network_graph()], outputs=[node_chart, edge_chart, network_viz] ) with gr.Tab("🔬 数据科学分析"): gr.Markdown("## 高级数据分析与可视化") with gr.Tabs(): # PCA分析 with gr.Tab("📐 PCA降维"): gr.Markdown("### 主成分分析 (Principal Component Analysis)") with gr.Row(): with gr.Column(scale=1): pca_node_type = gr.Dropdown( choices=['company', 'patent', 'trademark', 'person', 'institution'], value='company', label="选择节点类型" ) pca_n_components = gr.Slider( 2, 10, value=2, step=1, label="降维目标维度" ) pca_run_btn = gr.Button("🚀 执行PCA分析", variant="primary", size="lg") gr.Markdown(""" **PCA说明**: - 线性降维方法 - 保留最大方差 - 适合数据预处理 """) with gr.Column(scale=2): pca_report = gr.Markdown(label="分析报告") with gr.Row(): pca_variance_plot = gr.Plot(label="方差分析") pca_projection_plot = gr.Plot(label="PCA投影") pca_components_table = gr.Dataframe(label="主成分载荷 (Top 10特征)") pca_run_btn.click( perform_pca_analysis, inputs=[pca_node_type, pca_n_components], outputs=[pca_report, pca_variance_plot, pca_projection_plot, pca_components_table] ) # t-SNE分析 with gr.Tab("🎯 t-SNE降维"): gr.Markdown("### t-分布随机邻域嵌入 (t-SNE)") with gr.Row(): with gr.Column(scale=1): tsne_node_type = gr.Dropdown( choices=['company', 'patent', 'trademark', 'person', 'institution'], value='company', label="选择节点类型" ) tsne_n_components = gr.Radio( choices=[2, 3], value=2, label="降维维度" ) tsne_perplexity = gr.Slider( 5, 50, value=30, step=5, label="困惑度 (Perplexity)" ) tsne_n_iter = gr.Slider( 250, 2000, value=1000, step=250, label="迭代次数" ) tsne_run_btn = gr.Button("🚀 执行t-SNE分析", variant="primary", size="lg") gr.Markdown(""" **t-SNE说明**: - 非线性降维 - 保留局部结构 - 适合聚类可视化 ⚠️ 计算较慢,大数据集会自动采样 """) with gr.Column(scale=2): tsne_report = gr.Markdown(label="分析报告") tsne_plot = gr.Plot(label="t-SNE可视化") tsne_metrics = gr.Dataframe(label="聚类质量指标") tsne_run_btn.click( perform_tsne_analysis, inputs=[tsne_node_type, tsne_n_components, tsne_perplexity, tsne_n_iter], outputs=[tsne_report, tsne_plot, tsne_metrics] ) # 聚类分析 with gr.Tab("🎯 聚类分析"): gr.Markdown("### 无监督聚类") with gr.Row(): with gr.Column(scale=1): cluster_node_type = gr.Dropdown( choices=['company', 'patent', 'trademark', 'person', 'institution'], value='company', label="选择节点类型" ) cluster_method = gr.Dropdown( choices=['kmeans', 'dbscan', 'hierarchical'], value='kmeans', label="聚类方法" ) cluster_n_clusters = gr.Slider( 2, 20, value=5, step=1, label="聚类数量 (K-means/层次聚类)" ) cluster_run_btn = gr.Button("🚀 执行聚类分析", variant="primary", size="lg") gr.Markdown(""" **聚类方法**: - **K-means**: 快速,需指定K值 - **DBSCAN**: 基于密度,自动确定簇数 - **层次聚类**: 层次结构,需指定K值 """) with gr.Column(scale=2): cluster_report = gr.Markdown(label="聚类报告") cluster_plot = gr.Plot(label="聚类可视化") cluster_stats = gr.Dataframe(label="聚类统计") cluster_run_btn.click( perform_clustering_analysis, inputs=[cluster_node_type, cluster_method, cluster_n_clusters], outputs=[cluster_report, cluster_plot, cluster_stats] ) # 相关性分析 with gr.Tab("📊 相关性分析"): gr.Markdown("### 特征相关性分析") with gr.Row(): with gr.Column(scale=1): corr_node_type = gr.Dropdown( choices=['company', 'patent', 'trademark', 'person', 'institution'], value='company', label="选择节点类型" ) corr_run_btn = gr.Button("🚀 分析特征相关性", variant="primary", size="lg") with gr.Column(scale=2): corr_report = gr.Markdown(label="相关性报告") with gr.Row(): corr_heatmap = gr.Plot(label="相关性热力图") corr_dist_plot = gr.Plot(label="特征分布") corr_high_table = gr.Dataframe(label="高相关性特征对") corr_run_btn.click( perform_correlation_analysis, inputs=[corr_node_type], outputs=[corr_report, corr_heatmap, corr_high_table, corr_dist_plot] ) # 统计仪表板 with gr.Tab("📈 统计仪表板"): gr.Markdown("### 数据集整体统计概览") stats_run_btn = gr.Button("📊 生成统计仪表板", variant="primary", size="lg") stats_report = gr.Markdown(label="统计报告") stats_dashboard = gr.Plot(label="统计仪表板") stats_run_btn.click( generate_statistics_dashboard, inputs=[], outputs=[stats_report, stats_dashboard] ) # ========== Tab 2: 模型训练 ========== with gr.Tab("🎯 模型训练"): gr.Markdown("## 异构图神经网络模型训练") with gr.Row(): with gr.Column(scale=1): task_type = gr.Dropdown( choices=["链接预测", "节点分类", "专利价值评估", "企业合作推荐"], value="链接预测", label="选择任务类型" ) model_type = gr.Dropdown( choices=["HGT", "HeteroGNN"], value="HGT", label="选择模型" ) gr.Markdown("### 训练参数") epochs = gr.Slider(10, 200, value=50, step=10, label="训练轮次") hidden_channels = gr.Slider(32, 256, value=64, step=32, label="隐藏层维度") learning_rate = gr.Slider(0.0001, 0.01, value=0.001, step=0.0001, label="学习率") n_heads = gr.Slider(2, 16, value=8, step=2, label="注意力头数(HGT)") num_layers = gr.Slider(1, 5, value=3, step=1, label="网络层数") train_btn = gr.Button("🚀 开始训练", variant="primary", size="lg") with gr.Column(scale=2): train_status = gr.Textbox(label="训练状态", interactive=False) train_curves = gr.Plot(label="训练曲线") train_report = gr.Markdown(label="训练报告") train_btn.click( train_model, inputs=[task_type, model_type, epochs, hidden_channels, learning_rate, n_heads, num_layers], outputs=[train_status, train_curves, train_report] ) # ========== Tab 3: 链接预测 ========== with gr.Tab("🔗 链接预测"): gr.Markdown("## 企业-专利关系预测") with gr.Row(): with gr.Column(scale=1): link_company_id = gr.Number(value=0, label="企业ID", precision=0) link_num_pred = gr.Slider(5, 50, value=10, step=5, label="预测数量") link_predict_btn = gr.Button("🔍 预测关联专利", variant="primary") with gr.Column(scale=2): link_results = gr.Dataframe(label="预测结果") link_viz = gr.Plot(label="预测可视化") link_predict_btn.click( predict_link, inputs=[link_company_id, link_num_pred], outputs=[link_results, link_viz] ) # ========== Tab 4: 产业分类与专利推荐 (新功能2) ========== with gr.Tab("🏷️ 智能产业分析"): gr.Markdown("## 基于描述的产业分类与专利推荐") with gr.Row(): with gr.Column(scale=1): gr.Markdown("### 方式1: 查看现有企业") class_company_id = gr.Number(value=0, label="企业ID", precision=0) class_predict_btn = gr.Button("🔍 分析企业产业", variant="secondary") with gr.Column(scale=2): class_results = gr.Dataframe(label="分类结果") class_viz = gr.Plot(label="概率分布") gr.Markdown("---") with gr.Row(): with gr.Column(scale=1): gr.Markdown("### 方式2: 输入业务描述") business_desc = gr.Textbox( label="业务描述", placeholder="例如: 我们公司专注于区块链技术和数字支付解决方案...", lines=5 ) num_patent_rec = gr.Slider(5, 30, value=10, step=5, label="推荐专利数量") classify_btn = gr.Button("🎯 分析并推荐专利", variant="primary") with gr.Column(scale=2): classification_report = gr.Markdown(label="产业分类报告") patent_recommendations = gr.Dataframe(label="推荐专利列表") with gr.Row(): industry_prob_viz = gr.Plot(label="产业匹配度") patent_rec_viz = gr.Plot(label="推荐专利分析") # 绑定事件 class_predict_btn.click( predict_node_class, inputs=[class_company_id], outputs=[class_results, class_viz] ) classify_btn.click( classify_and_recommend_by_description, inputs=[business_desc, num_patent_rec], outputs=[classification_report, patent_recommendations, patent_rec_viz, industry_prob_viz] ) # ========== Tab 5: 专利价值评估 (增强版 - 新功能1) ========== with gr.Tab("💎 专利价值"): gr.Markdown("## 专利价值智能评估") with gr.Tabs(): with gr.Tab("📋 价值排行榜"): gr.Markdown("### 全局专利价值排行榜") with gr.Row(): leaderboard_top_n = gr.Slider(10, 100, value=50, step=10, label="显示Top N专利") leaderboard_btn = gr.Button("📊 生成排行榜", variant="primary", size="lg") leaderboard_report = gr.Markdown(label="排行榜分析报告") leaderboard_table = gr.Dataframe(label="专利价值排行榜") leaderboard_viz = gr.Plot(label="排行榜可视化") leaderboard_btn.click( get_patent_value_leaderboard, inputs=[leaderboard_top_n], outputs=[leaderboard_report, leaderboard_table, leaderboard_viz] ) with gr.Tab("🔍 单个专利评估"): gr.Markdown("### 查询指定专利的详细价值") with gr.Row(): with gr.Column(scale=1): value_patent_id = gr.Number(value=0, label="专利ID", precision=0) value_predict_btn = gr.Button("📊 评估专利价值", variant="primary") with gr.Column(scale=2): value_report = gr.Markdown(label="评估报告") value_viz = gr.Plot(label="价值分析") value_features = gr.Dataframe(label="专利特征") value_predict_btn.click( predict_patent_value, inputs=[value_patent_id], outputs=[value_report, value_viz, value_features] ) # ========== Tab 6: 合作推荐 ========== with gr.Tab("🤝 合作推荐"): gr.Markdown("## 企业合作伙伴智能推荐") with gr.Row(): with gr.Column(scale=1): collab_company_id = gr.Number(value=0, label="企业ID", precision=0) collab_num_rec = gr.Slider(5, 20, value=10, step=5, label="推荐数量") collab_recommend_btn = gr.Button("🎯 推荐合作伙伴", variant="primary") with gr.Column(scale=2): collab_results = gr.Dataframe(label="推荐结果") collab_viz = gr.Plot(label="多维分析") collab_recommend_btn.click( recommend_collaboration, inputs=[collab_company_id, collab_num_rec], outputs=[collab_results, collab_viz] ) # ========== Tab 7: 实体详情查看 (新功能4) ========== with gr.Tab("🔎 实体详情"): gr.Markdown("## 查看任意实体的详细信息") with gr.Row(): with gr.Column(scale=1): entity_type_select = gr.Dropdown( choices=[ "企业 (Company)", "专利 (Patent)", "商标 (Trademark)", "人员 (Person)", "机构 (Institution)" ], value="企业 (Company)", label="实体类型" ) entity_id_input = gr.Number(value=0, label="实体ID", precision=0) view_details_btn = gr.Button("🔍 查看详情", variant="primary", size="lg") with gr.Column(scale=2): entity_details_report = gr.Markdown(label="实体详细信息") entity_relations_table = gr.Dataframe(label="关系统计") with gr.Row(): entity_network_viz = gr.Plot(label="关系网络图谱") view_details_btn.click( view_entity_details, inputs=[entity_type_select, entity_id_input], outputs=[entity_details_report, entity_relations_table, entity_network_viz] ) with gr.Tab("ℹ️ 系统信息"): gr.Markdown(""" ## 📚 系统说明 (v3.0 - 数据科学增强版) ### 🆕 最新更新 (v3.0) 1. **PCA降维分析** - 主成分分析,查看方差解释和特征重要性 2. **t-SNE可视化** - 非线性降维,发现数据聚类结构 3. **聚类分析** - K-means、DBSCAN、层次聚类 4. **相关性分析** - 特征相关性热力图和高相关特征识别 5. **统计仪表板** - 数据集全局统计概览 ### 🔬 数据科学功能 - **降维**: PCA、t-SNE支持2D/3D可视化 - **聚类**: 多种聚类算法,自动计算聚类质量指标 - **统计**: 相关性分析、分布分析、质量评估 - **可视化**: 交互式图表,支持缩放、悬停查看详情 ### 📊 核心功能模块 1. **数据管理** - 数据生成、加载、统计 2. **数据科学分析** - PCA、t-SNE、聚类、相关性 (NEW!) 3. **模型训练** - HGT/HeteroGNN多任务训练 4. **链接预测** - 企业-专利关系预测 5. **智能产业分析** - 产业分类+专利推荐 6. **专利价值** - 排行榜+单个评估 7. **合作推荐** - 多维度企业合作分析 8. **实体详情** - 完整实体信息查看 ### 🛠️ 技术栈 - **深度学习**: PyTorch, PyTorch Geometric - **图模型**: HGT, HeteroGNN - **数据科学**: Scikit-learn, UMAP - **可视化**: Plotly, NetworkX - **界面**: Gradio ### 📖 使用指南 1. **数据准备**: 在"数据管理"加载数据集 2. **数据探索**: 在"数据科学分析"进行降维、聚类等分析 3. **模型训练**: 在"模型训练"训练所需任务模型 4. **应用分析**: 在各功能标签页进行预测和推荐 ### 📈 典型工作流 ``` 加载数据 → 数据分析 (PCA/t-SNE) → 模型训练 → 业务预测 ↓ ↓ ↓ ↓ 统计分析 发现模式 优化模型 决策支持 ``` ### 🔗 更新日志 **v3.0** (2025-10-27) - ✨ 新增完整的数据科学分析模块 - 📊 支持PCA、t-SNE降维可视化 - 🎯 支持多种聚类算法 - 📈 新增特征相关性分析 - 📉 新增统计分析仪表板 - 🎨 优化可视化效果和交互体验 **v2.0** (2025-10-26) - 专利价值排行榜 - 智能产业分析 - 实体详情查看 **v1.0** (2025-10-25) - 基础图神经网络模型 - 链接预测和节点分类 --- **开发团队**: Math3836 Team | **版本**: v3.0 | **日期**: 2025-10-27 💡 **提示**: 数据科学分析功能计算密集,大数据集可能需要较长时间 """) gr.Markdown(""" --- 🎓 **学习建议**: - 🔬 先使用"数据科学分析"探索数据特性 - 📊 通过PCA了解特征重要性 - 🎯 用t-SNE发现数据聚类模式 - 📈 结合聚类分析验证模型效果 - 🔗 最后应用训练好的模型进行预测 """) return app # ==================== 启动应用 ========== if __name__ == "__main__": os.makedirs('data', exist_ok=True) os.makedirs('checkpoints', exist_ok=True) app = build_gradio_app() app.launch( server_name="0.0.0.0", server_port=7860, share=True, show_error=True )