File size: 12,060 Bytes
beb2111 687a4ef beb2111 687a4ef beb2111 687a4ef beb2111 687a4ef beb2111 687a4ef beb2111 687a4ef beb2111 687a4ef beb2111 687a4ef beb2111 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 | import json
import os
from storage.azure_table import AzureTableStorage
from config import DOCS_INDEX_NAME
import logging
from dotenv import load_dotenv
import xmltodict
import requests
from utils.text_cleaner import strip_html_tags
from embeddings import get_embeddings_model
from datetime import datetime, timezone, timedelta
from email.utils import parsedate_to_datetime
import numpy as np
from sklearn.cluster import DBSCAN
from collections import defaultdict
from pymilvus import connections, Collection
import time
load_dotenv()
logger = logging.getLogger("backend")
def parse_rss_items(rss_dict: dict, source_name: str) -> list:
"""解析不同格式的RSS源"""
logger.info(f"解析RSS源: {source_name}")
if "rdf:RDF" in rss_dict:
items = rss_dict["rdf:RDF"]["item"]
elif "rss" in rss_dict:
items = rss_dict["rss"]["channel"]["item"]
else:
logger.error(f"无法识别的RSS格式: {source_name}")
return []
# 确保items是列表
return items if isinstance(items, list) else [items]
def parse_news_date(item: dict) -> datetime:
"""解析新闻日期,支持多种格式"""
logger.debug(f"解析新闻日期: {item}")
date_fields = ["pubDate", "published", "dc:date"]
for field in date_fields:
if field in item:
date_str = item[field]
try:
return parsedate_to_datetime(date_str)
except:
try:
return datetime.fromisoformat(date_str.replace("Z", "+00:00"))
except:
try:
return datetime.strptime(date_str, "%A %b %d %Y %H:%M:%S")
except:
continue
return None
def get_news_content(item: dict) -> str:
if "title" in item and item["title"]:
return item["title"]
return None
def get_recent_news_clusters():
# RSS源列表
rss_sources = {
"NYTimes": "https://rss.nytimes.com/services/xml/rss/nyt/World.xml",
"CNN": "http://rss.cnn.com/rss/edition_world.rss",
"FoxNews": "https://abcnews.go.com/abcnews/internationalheadlines",
# europe
"BBC": "https://feeds.bbci.co.uk/news/world/rss.xml",
"Reuters": "https://rsshub.app/reuters/world",
"DW": "http://rss.dw-world.de/rdf/rss-en-top",
"Guardian": "https://www.theguardian.com/world/rss",
"SkyNews": "https://feeds.skynews.com/feeds/rss/world.xml",
"TheSun": "https://thesun.my/rss/world",
# asia
"Aljazeera": "http://www.aljazeera.com/xml/rss/all.xml",
"TimesOfIndia": "https://timesofindia.indiatimes.com/rssfeeds/296589292.cms",
"ChannelNewsAsia": "https://www.channelnewsasia.com/api/v1/rss-outbound-feed?_format=xml&category=6311",
# other
"GlobalNews": "https://globalnews.ca/world/feed/",
"SMH": "https://www.smh.com.au/rss/world.xml",
"Capi24": "https://feeds.capi24.com/v1/Search/articles/news24/World/rss",
"IFPNews": "https://ifpnews.com/feed/",
# Russia
"MoscowTimes": "https://themoscowtimes.com/feeds/main.xml",
# China
"SCMP": "https://www.scmp.com/rss/91/feed",
"ChinaNews": "https://www.chinanews.com.cn/rss/world.xml",
"People": "http://www.people.com.cn/rss/world.xml",
# source
"AP News": "https://march42-rsshub.hf.space/apnews/api/apf-topnews",
"CNBC": "https://march42-rsshub.hf.space/cnbc/rss",
"Tass": "https://march42-rsshub.hf.space/tass/world",
"Sputnik News": "https://march42-rsshub.hf.space/sputniknews/world",
"Economist": "https://march42-rsshub.hf.space/economist/latest",
"Straits Times": "https://march42-rsshub.hf.space/straitstimes/world",
"Huanqiu": "https://march42-rsshub.hf.space/huanqiu/news/world",
"Zaobao": "https://march42-rsshub.hf.space/zaobao/znews/world",
}
current_time = datetime.now(timezone.utc)
embeddings_model = get_embeddings_model()
recent_news = [] # 存储24小时内的新闻
for source_name, url in rss_sources.items():
try:
start_time = time.time()
logger.info(f"开始处理源: {source_name}")
# 记录请求时间
request_start = time.time()
response = requests.get(url, timeout=10)
request_time = time.time() - request_start
logger.info(f"{source_name} 请求耗时: {request_time:.2f}秒")
# 记录XML解析时间
parse_start = time.time()
rss_dict = xmltodict.parse(response.content)
items = parse_rss_items(rss_dict, source_name)
parse_time = time.time() - parse_start
logger.info(f"{source_name} XML解析耗时: {parse_time:.2f}秒")
# 记录embedding处理时间
embed_start = time.time()
processed_items = 0
for item in items:
content = get_news_content(item)
if not content:
logger.debug(f"跳过:描述和标题都为空: {source_name}")
continue
pub_date = parse_news_date(item)
if not pub_date:
logger.debug(f"跳过:无法解析日期: {source_name}")
continue
pub_date = pub_date.astimezone(timezone.utc)
if (current_time - pub_date).total_seconds() <= 48 * 3600:
recent_news.append(
{
"source": source_name,
"content": content,
"pub_date": pub_date,
"embedding": embeddings_model.embed_query(content),
}
)
processed_items += 1
embed_time = time.time() - embed_start
logger.info(
f"{source_name} Embedding处理耗时: {embed_time:.2f}秒 (处理{processed_items}条新闻)"
)
logger.info(
f"{source_name} 总处理耗时: {time.time() - start_time:.2f}秒 "
f"[请求: {request_time:.2f}s, 解析: {parse_time:.2f}s, Embedding: {embed_time:.2f}s]"
)
except Exception as e:
logger.error(f"处理新闻源时出错 {source_name}: {str(e)}")
continue
# 处理embeddings聚类
if recent_news:
embeddings = np.array([news["embedding"] for news in recent_news])
clustering = DBSCAN(eps=0.4, min_samples=2, metric="cosine").fit(embeddings)
# 整理聚类结果
clusters = defaultdict(list)
for idx, label in enumerate(clustering.labels_):
if label != -1: # 排除噪声点
clusters[label].append(recent_news[idx])
# 准备JSON输出数据,按聚类大小降序排序
clusters_output = []
# 按聚类大小排序
sorted_clusters = sorted(
clusters.items(), key=lambda x: len(x[1]), reverse=True
)
for i, (cluster_id, news_list) in enumerate(sorted_clusters):
# 计算聚类中心
cluster_embeddings = np.array([news["embedding"] for news in news_list])
cluster_center = np.mean(cluster_embeddings, axis=0)
clusters_output.append(
{
"id": i, # 使用新的顺序索引作为ID
"size": len(news_list),
"center": cluster_center.tolist(),
}
)
output_data = {
"clusters": clusters_output,
"timestamp": current_time.strftime("%Y-%m-%d %H:%M:%S UTC"),
}
return output_data
async def analyze_news_clusters():
try:
zilliz_uri = os.getenv("ZILLIZ_CLOUD_URI")
zilliz_token = os.getenv("ZILLIZ_CLOUD_TOKEN")
if not zilliz_uri or not zilliz_token:
logger.error("缺少 Zilliz Cloud 配置信息")
return {"status": "error", "message": "缺少必要的环境变量配置"}
connections.connect(
alias="default",
uri=zilliz_uri,
token=zilliz_token,
)
logger.info("连接到Zilliz Cloud成功")
collection = Collection(DOCS_INDEX_NAME)
collection.load()
# 获取聚类结果
clusters_data = get_recent_news_clusters()
if not clusters_data:
logger.warning("没有找到最近24小时内的新闻聚类")
return {
"status": "no_clusters",
"message": "没有找到最近24小时内的新闻聚类",
}
azure_storage = AzureTableStorage()
# 准备所有聚类中心的向量
all_centers = []
for cluster in clusters_data["clusters"]:
all_centers.append(
{
"id": cluster["id"],
"size": cluster["size"],
"vector": cluster["center"],
}
)
# 存储所有聚类结果
all_cluster_results = []
timestamp = clusters_data["timestamp"]
logger.info(f"开始处理 {len(all_centers)} 个聚类")
# 为每个聚类中心分别查询
for cluster in all_centers:
logger.info(f"处理聚类 #{cluster['id']}, 大小: {cluster['size']}")
search_params = {"metric_type": "COSINE", "params": {"nprobe": 10}}
current_time = datetime.now(timezone.utc)
two_days_ago = current_time - timedelta(days=2)
time_filter = (
f'publish_time >= "{two_days_ago.strftime("%Y-%m-%dT%H:%M:%S.000Z")}"'
)
results = collection.search(
data=[cluster["vector"]],
anns_field="embedding",
param=search_params,
limit=3,
expr=time_filter, # 添加时间过滤
output_fields=["source"],
)
# 处理搜索结果
if results:
cluster_result = {"size": cluster["size"], "articles": []}
for hits in results:
for hit in hits:
similarity = hit.score
if similarity > 0.75:
cluster_result["articles"].append(
{
"url": hit.entity.get("source"),
"similarity": round(similarity, 3),
}
)
if cluster_result["articles"]:
all_cluster_results.append(cluster_result)
# 如果有有效的聚类结果,则保存到Azure Table
if all_cluster_results:
entity = {
"PartitionKey": timestamp.split("T")[0],
"RowKey": timestamp,
"timestamp": timestamp,
"clusters": json.dumps(all_cluster_results, ensure_ascii=False),
}
azure_storage.store_clusters(entity)
logger.info(f"成功处理 {len(all_cluster_results)} 个聚类")
return {
"status": "success",
"message": f"成功处理 {len(all_cluster_results)} 个聚类",
"data": {"timestamp": timestamp, "clusters": all_cluster_results},
}
return {"status": "no_results", "message": "没有找到足够相似的新闻聚类"}
except Exception as e:
logger.error(f"分析聚类时出错: {str(e)}")
import traceback
logger.error(f"详细错误信息:\n{traceback.format_exc()}")
return {"status": "error", "message": str(e)}
finally:
connections.disconnect("default")
if __name__ == "__main__":
try:
analyze_news_clusters()
except Exception as e:
logger.error(f"执行失败: {str(e)}")
|