Leesn465 commited on
Commit
1097a00
·
verified ·
1 Parent(s): 7ff0b61

Update main.py

Browse files
Files changed (1) hide show
  1. main.py +332 -83
main.py CHANGED
@@ -4,10 +4,7 @@ import uvicorn
4
  from pydantic import BaseModel
5
  import requests
6
  from bs4 import BeautifulSoup as bs
7
- import mysql.connector
8
  import os
9
- import google.genai as genai
10
- import json
11
  from util.keywordExtract import *
12
  from typing import Optional,List, Dict, Any, Union
13
  import pandas as pd
@@ -26,6 +23,11 @@ from datetime import datetime, timedelta
26
  from starlette.concurrency import run_in_threadpool
27
  import FinanceDataReader as fdr
28
  from groq import Groq
 
 
 
 
 
29
 
30
  app = FastAPI()
31
 
@@ -44,6 +46,77 @@ else:
44
  groq_client = Groq(api_key=API_KEY)
45
  logger.info(":white_check_mark: Groq API 설정 완료 (환경 변수 사용)")
46
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
47
  # ---------------------------------------
48
  # 입력/출력 모델
49
  # ---------------------------------------
@@ -154,7 +227,16 @@ def groq_use(text_content: Any) -> str:
154
 
155
  # 프롬프트 구성 (불필요한 특수문자 제거 및 슬라이싱)
156
  clean_text = text_for_ai[:500].replace('\n', ' ')
157
- prompt = f"상장사 이름을 '회사명' 형식으로 하나만 답해줘: {clean_text}"
 
 
 
 
 
 
 
 
 
158
 
159
  try:
160
  chat_completion = groq_client.chat.completions.create(
@@ -281,104 +363,189 @@ def step_predict(inp: PredictInput):
281
 
282
  return {"prediction": pred_label, "prob": prob}
283
 
 
 
284
  # ---------------------------------------
285
  # 호환용: 기존 parse-news (한방 요청) - 유지
286
  # ---------------------------------------
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
287
  @app.post("/ai/parse-news")
288
  def parse_news(req: NewsRequest):
289
- url = req.url.strip()
290
  try:
291
- meta = parse_article_all(url)
292
-
293
- # 키워드/요약(기존 resultKeyword 사용)
294
- rk = resultKeyword(meta["content"])
295
- targetCompany = groq_use(rk) # 텍스트 변환은 f-string 내부에서 처리됨
296
-
297
- # 감정(기존 로직)
298
- s = analyze_sentiment(meta["content"])
299
- pos, neg, neu = s["positive"], s["negative"], s["neutral"]
300
- print("부정:", neg)
301
- print("중립:", neu)
302
- print("긍정:", pos)
303
-
304
- reduced_net = neu / 2
305
- remaining = neu - reduced_net
306
- total_non_neu = neg + pos
307
- if total_non_neu > 0:
308
- neg += remaining * (neg / total_non_neu)
309
- pos += remaining * (pos / total_non_neu)
310
- else:
311
- neg += remaining / 2
312
- pos += remaining / 2
313
- neu = reduced_net
314
-
315
- max_label = max([("부정", neg), ("중립", neu), ("긍정", pos)], key=lambda x: x[1])[0]
316
- if max_label == "긍정":
317
- if pos >= 0.9: sentiment_label = f"매우 긍정 ({pos*100:.1f}%)"
318
- elif pos >= 0.6: sentiment_label = f"긍정 ({pos*100:.1f}%)"
319
- else: sentiment_label = f"약한 긍정 ({pos*100:.1f}%)"
320
- elif max_label == "부정":
321
- if neg >= 0.9: sentiment_label = f"매우 부정 ({neg*100:.1f}%)"
322
- elif neg >= 0.6: sentiment_label = f"부정 ({neg*100:.1f}%)"
323
- else: sentiment_label = f"약한 부정 ({neg*100:.1f}%)"
324
- else:
325
- sentiment_label = f"중립 ({neu*100:.1f}%)"
326
-
327
- # 예측
328
- summary_text = rk.get("summary") or summarize(meta["content"])
329
- _, keywords_2nd = extract_keywords(summary_text)
330
- clean_keywords = [kw for kw, _ in keywords_2nd]
331
- keyword_vec = embed_keywords(clean_keywords)
332
- input_vec = torch.tensor(keyword_vec, dtype=torch.float32).unsqueeze(0)
333
- model = SimpleClassifier(input_vec.shape[1])
334
- model.load_state_dict(torch.load("news_model.pt", map_location="cpu"))
335
- model.eval()
336
- with torch.no_grad():
337
- prob = model(input_vec).item()
338
- prediction_label = '📈 상승 (1)' if prob >= 0.5 else '📉 하락 (0)'
339
-
340
- return {
341
- **meta,
342
- "message": "뉴스 파싱 및 저장 완료",
343
- "summary": rk["summary"],
344
- "keyword": rk["keyword"],
345
- "company": targetCompany,
346
- "sentiment": sentiment_label,
347
- "sentiment_value": sentiment_label,
348
- "prediction": prediction_label,
349
- "prob": prob,
350
- }
351
-
352
- except requests.exceptions.RequestException as e:
353
- traceback.print_exc()
354
- raise HTTPException(status_code=500, detail=f"서버 오류: {e}")
355
  except Exception as e:
356
  traceback.print_exc()
357
  raise HTTPException(status_code=500, detail=f"서버 오류: {e}")
358
 
 
359
  # ---------------------------------------
360
  # 주가 데이터 (기존 유지)
361
  # ---------------------------------------
362
  krx_listings: pd.DataFrame = None
363
  us_listings: pd.DataFrame = None
364
 
365
-
366
  @app.on_event("startup")
367
  async def load_initial_data():
368
  global krx_listings, us_listings
369
- logger.info("✅ 서버 시작: 초기 데이터 로딩을 시작합니다...")
370
- try:
371
- krx_listings = await run_in_threadpool(fdr.StockListing, 'KRX')
372
- logger.info("📊 한국 상장 기업 목록 로딩 완료.")
373
- nasdaq = await run_in_threadpool(fdr.StockListing, 'NASDAQ')
374
- nyse = await run_in_threadpool(fdr.StockListing, 'NYSE')
375
- amex = await run_in_threadpool(fdr.StockListing, 'AMEX')
376
- us_listings = pd.concat([nasdaq, nyse, amex], ignore_index=True)
377
- logger.info("📊 미국 상장 기업 목록 로딩 완료.")
378
- logger.info("🌐 번역기 초기화 완료.")
379
- logger.info("✅ 초기 데이터 로딩 성공.")
380
- except Exception as e:
381
- logger.error(f"🚨 초기 데이터 로딩 오류: {e}", exc_info=True)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
382
 
383
  def get_stock_info(company_name: str) -> Dict[str, str] | None:
384
  try:
@@ -436,6 +603,88 @@ async def get_stock_data_by_name(company_name: str = Query(..., description="조
436
  prices_df['Date'] = prices_df['Date'].dt.strftime('%Y-%m-%d')
437
  return prices_df.to_dict(orient='records')
438
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
439
  # ---------------------------------------
440
  # 실행
441
  # ---------------------------------------
 
4
  from pydantic import BaseModel
5
  import requests
6
  from bs4 import BeautifulSoup as bs
 
7
  import os
 
 
8
  from util.keywordExtract import *
9
  from typing import Optional,List, Dict, Any, Union
10
  import pandas as pd
 
23
  from starlette.concurrency import run_in_threadpool
24
  import FinanceDataReader as fdr
25
  from groq import Groq
26
+ import asyncio
27
+ import json
28
+ from aiokafka import AIOKafkaConsumer, AIOKafkaProducer
29
+ import ssl
30
+
31
 
32
  app = FastAPI()
33
 
 
46
  groq_client = Groq(api_key=API_KEY)
47
  logger.info(":white_check_mark: Groq API 설정 완료 (환경 변수 사용)")
48
 
49
+ KAFKA_BOOTSTRAP = os.getenv(
50
+ "KAFKA_BOOTSTRAP",
51
+ "newsnake-kafka-lsm71103186-f353.i.aivencloud.com:11897"
52
+ )
53
+
54
+ KAFKA_TOPIC = os.getenv("KAFKA_TOPIC", "news-analyze")
55
+ KAFKA_GROUP_ID = os.getenv("KAFKA_GROUP_ID", "ai-analyzer-group")
56
+ KAFKA_PROGRESS_TOPIC = os.getenv("KAFKA_PROGRESS_TOPIC", "analysis-progress")
57
+ KAFKA_DONE_TOPIC = os.getenv("KAFKA_DONE_TOPIC", "analysis-done")
58
+
59
+ KAFKA_CA_FILE = os.getenv("KAFKA_CA_FILE", "ca.pem")
60
+ KAFKA_CERT_FILE = os.getenv("KAFKA_CERT_FILE", "service.cert")
61
+ KAFKA_KEY_FILE = os.getenv("KAFKA_KEY_FILE", "service.key")
62
+
63
+ producer = None
64
+ consumer = None
65
+ consumer_task = None
66
+
67
+ def build_ssl_context():
68
+ ctx = ssl.create_default_context(cafile=KAFKA_CA_FILE)
69
+ ctx.load_cert_chain(certfile=KAFKA_CERT_FILE, keyfile=KAFKA_KEY_FILE)
70
+ return ctx
71
+
72
+ SSL_CONTEXT = build_ssl_context()
73
+
74
+ @app.on_event("startup")
75
+ async def start_kafka():
76
+ global producer, consumer, consumer_task
77
+
78
+ producer = AIOKafkaProducer(
79
+ bootstrap_servers=KAFKA_BOOTSTRAP,
80
+ security_protocol="SSL",
81
+ ssl_context=SSL_CONTEXT,
82
+ )
83
+ await producer.start()
84
+ logger.info("[KAFKA] producer started (SSL)")
85
+
86
+ consumer = AIOKafkaConsumer(
87
+ KAFKA_TOPIC,
88
+ bootstrap_servers=KAFKA_BOOTSTRAP,
89
+ group_id=KAFKA_GROUP_ID,
90
+ enable_auto_commit=True,
91
+ auto_offset_reset="latest",
92
+ security_protocol="SSL",
93
+ ssl_context=SSL_CONTEXT,
94
+ )
95
+ await consumer.start()
96
+ logger.info("[KAFKA] consumer started (SSL)")
97
+
98
+ consumer_task = asyncio.create_task(consume_loop())
99
+
100
+ @app.on_event("shutdown")
101
+ async def stop_kafka():
102
+ global producer, consumer, consumer_task
103
+ if consumer_task:
104
+ consumer_task.cancel()
105
+ try:
106
+ await consumer_task
107
+ except asyncio.CancelledError:
108
+ pass
109
+
110
+ if consumer:
111
+ await consumer.stop()
112
+ logger.info("[KAFKA] consumer stopped")
113
+
114
+ if producer:
115
+ await producer.stop()
116
+ logger.info("[KAFKA] producer stopped")
117
+
118
+
119
+
120
  # ---------------------------------------
121
  # 입력/출력 모델
122
  # ---------------------------------------
 
227
 
228
  # 프롬프트 구성 (불필요한 특수문자 제거 및 슬라이싱)
229
  clean_text = text_for_ai[:500].replace('\n', ' ')
230
+ prompt = f'''제공되는 뉴스 본문을 읽고, 뉴스와 가장 연관성이 높은 기업
231
+ 현재 주식 시장(KOSPI, KOSDAQ 등)에 상장된 기업의 이름 하나만로 출력해줘.
232
+ [제약 사항]
233
+ 뉴스 본문과 가장 연관이 된 회사일 것
234
+ 꼭 하나의 회사를 추출할 것
235
+ 없음이라고 표시하지 말 것
236
+ 상장되지 않은 일반 단체, 정부 기관, 비상장사는 제외할 것.
237
+ FinanceDataReader 이 라이브러리에 존재하는 회사만 추출할 것.
238
+ 설명 없이 회사 이름만 나열할 것.
239
+ 뉴스에 언급된 맥락상 '기업'임이 확실한 것만 포함할 것 : {clean_text}'''
240
 
241
  try:
242
  chat_completion = groq_client.chat.completions.create(
 
363
 
364
  return {"prediction": pred_label, "prob": prob}
365
 
366
+
367
+
368
  # ---------------------------------------
369
  # 호환용: 기존 parse-news (한방 요청) - 유지
370
  # ---------------------------------------
371
+ def analyze_news_sync(
372
+ url: str,
373
+ user_id: str | None = None,
374
+ progress_cb=None, # ✅ 추가
375
+ ) -> Dict[str, Any]:
376
+
377
+ def emit(percent: int, stage: str, message: str):
378
+ if progress_cb:
379
+ try:
380
+ progress_cb(percent, stage, message)
381
+ except Exception:
382
+ pass
383
+
384
+ emit(0, "START", "분석 시작")
385
+
386
+ # 1) 기사 파싱
387
+ emit(5, "PARSING", "뉴스 파싱 중...")
388
+ meta = parse_article_all(url)
389
+ emit(15, "PARSING", "뉴스 파싱 완료")
390
+
391
+ # 2) 요약/키워드(1차) (네가 원래 하던 resultKeyword)
392
+ emit(25, "SUMMARY", "요약/키워드 생성 중...")
393
+ rk = resultKeyword(meta["content"])
394
+ emit(35, "SUMMARY", "요약/키워드 생성 완료")
395
+
396
+ # 3) 회사 추론
397
+ emit(45, "COMPANY", "관련 회사 분석 중...")
398
+ targetCompany = groq_use(rk)
399
+ emit(55, "COMPANY", "관련 회사 분석 완료")
400
+
401
+ # 4) 감성 분석
402
+ emit(65, "SENTIMENT", "감정 분석 중...")
403
+ s = analyze_sentiment(meta["content"])
404
+ emit(75, "SENTIMENT", "감정 분석 완료")
405
+
406
+ # (원래 감성 후처리 로직 그대로)
407
+ pos, neg, neu = s["positive"], s["negative"], s["neutral"]
408
+
409
+ reduced_net = neu / 2
410
+ remaining = neu - reduced_net
411
+ total_non_neu = neg + pos
412
+ if total_non_neu > 0:
413
+ neg += remaining * (neg / total_non_neu)
414
+ pos += remaining * (pos / total_non_neu)
415
+ else:
416
+ neg += remaining / 2
417
+ pos += remaining / 2
418
+ neu = reduced_net # ✅ 원래 코드에 있었던 거 유지해야 함
419
+
420
+ max_label = max([("부정", neg), ("중립", neu), ("긍정", pos)], key=lambda x: x[1])[0]
421
+ if max_label == "긍정":
422
+ if pos >= 0.9:
423
+ sentiment_label = f"매우 긍정 ({pos*100:.1f}%)"
424
+ elif pos >= 0.6:
425
+ sentiment_label = f"긍정 ({pos*100:.1f}%)"
426
+ else:
427
+ sentiment_label = f"약한 긍정 ({pos*100:.1f}%)"
428
+ elif max_label == "부정":
429
+ if neg >= 0.9:
430
+ sentiment_label = f"매우 부정 ({neg*100:.1f}%)"
431
+ elif neg >= 0.6:
432
+ sentiment_label = f"부정 ({neg*100:.1f}%)"
433
+ else:
434
+ sentiment_label = f"약한 부정 ({neg*100:.1f}%)"
435
+ else:
436
+ sentiment_label = f"중립 ({neu*100:.1f}%)"
437
+
438
+ # 5) (네 원래 코드 유지) summary_text / keywords_2nd / clean_keywords
439
+ emit(82, "KEYWORDS", "키워드 추출(2차) 중...")
440
+ summary_text = rk.get("summary") or summarize(meta["content"])
441
+ _, keywords_2nd = extract_keywords(summary_text)
442
+ clean_keywords = [kw for kw, _ in keywords_2nd]
443
+ emit(88, "KEYWORDS", "키워드 추출 완료")
444
+
445
+ # 6) 임베딩 + 예측
446
+ emit(92, "PREDICT", "주가 예측 중...")
447
+ keyword_vec = embed_keywords(clean_keywords)
448
+ input_vec = torch.tensor(keyword_vec, dtype=torch.float32).unsqueeze(0)
449
+
450
+ model = SimpleClassifier(input_vec.shape[1])
451
+ model.load_state_dict(torch.load("news_model.pt", map_location="cpu"))
452
+ model.eval()
453
+ with torch.no_grad():
454
+ prob = model(input_vec).item()
455
+ prediction_label = "📈 상승 (1)" if prob >= 0.5 else "📉 하락 (0)"
456
+ emit(98, "PREDICT", "주가 예측 완료")
457
+
458
+ emit(100, "DONE", "분석 완료")
459
+
460
+ # ✅ 리턴 키는 “원래 네 함수랑 최대한 동일하게”
461
+ return {
462
+ **meta,
463
+ "message": "뉴스 파싱 및 저장 완료",
464
+ "summary": rk.get("summary"), # 원래: rk["summary"]
465
+ "keyword": rk.get("keyword"), # 원래: rk["keyword"]
466
+ "company": targetCompany,
467
+ "sentiment": sentiment_label,
468
+ "sentiment_value": sentiment_label,
469
+ "prediction": prediction_label,
470
+ "prob": prob,
471
+ "userId": user_id,
472
+ }
473
+
474
+
475
+
476
  @app.post("/ai/parse-news")
477
  def parse_news(req: NewsRequest):
 
478
  try:
479
+ return analyze_news_sync(req.url.strip(), user_id=req.id)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
480
  except Exception as e:
481
  traceback.print_exc()
482
  raise HTTPException(status_code=500, detail=f"서버 오류: {e}")
483
 
484
+
485
  # ---------------------------------------
486
  # 주가 데이터 (기존 유지)
487
  # ---------------------------------------
488
  krx_listings: pd.DataFrame = None
489
  us_listings: pd.DataFrame = None
490
 
 
491
  @app.on_event("startup")
492
  async def load_initial_data():
493
  global krx_listings, us_listings
494
+ file_path_kr = "krx_listings.csv"
495
+ file_path_ns = "nas_listings.csv"
496
+
497
+ # --- 1. 한국 시장 로딩 ---
498
+ if os.path.exists(file_path_kr):
499
+ # dtype={'Code': str} 설정해야 '005930'이 '5930'이 되지 않습니다.
500
+ krx_listings = pd.read_csv(file_path_kr, dtype={'Code': str})
501
+ logger.info("💾 로컬 파일에서 KRX 목록을 불러왔습니다.")
502
+ else:
503
+ try:
504
+ krx_listings = await run_in_threadpool(fdr.StockListing, 'KRX')
505
+ # 한글 깨짐 방지를 위해 utf-8-sig 권장
506
+ krx_listings.to_csv(file_path_kr, index=False, encoding='utf-8-sig')
507
+ logger.info("📊 KRX 데이터를 새로 받아 저장했습니다.")
508
+ except Exception as e:
509
+ logger.error(f"🚨 KRX 데이터 로딩 실패: {e}")
510
+ krx_listings = pd.DataFrame(columns=['Code', 'Name']) # 빈 데이터프레임 할당
511
+
512
+ # --- 2. 미국 시장 로딩 ---
513
+ if os.path.exists(file_path_ns):
514
+ us_listings = pd.read_csv(file_path_ns, dtype={'Symbol': str})
515
+ logger.info("💾 로컬 파일에서 US 목록을 불러왔습니다.")
516
+ else:
517
+ try:
518
+ # 여러 시장 데이터를 합칠 때 에러 방지
519
+ nasdaq = await run_in_threadpool(fdr.StockListing, 'NASDAQ')
520
+ nyse = await run_in_threadpool(fdr.StockListing, 'NYSE')
521
+ amex = await run_in_threadpool(fdr.StockListing, 'AMEX')
522
+
523
+ us_listings = pd.concat([nasdaq, nyse, amex], ignore_index=True)
524
+ us_listings.to_csv(file_path_ns, index=False, encoding='utf-8-sig')
525
+ logger.info("📊 미국 상장 기업 목록 로딩 완료.")
526
+ except Exception as e:
527
+ logger.error(f"🚨 미국 상장사 로딩 실패: {e}")
528
+ us_listings = pd.DataFrame(columns=['Symbol', 'Name'])
529
+
530
+
531
+ async def produce_progress(analysis_id: str, user_id: str | None, percent: int, stage: str, message: str):
532
+ if not producer:
533
+ return
534
+
535
+ payload = {
536
+ "analysisId": analysis_id,
537
+ "userId": user_id,
538
+ "percent": percent,
539
+ "stage": stage,
540
+ "message": message,
541
+ }
542
+ data = json.dumps(payload, ensure_ascii=False).encode("utf-8")
543
+
544
+ await producer.send_and_wait(
545
+ KAFKA_PROGRESS_TOPIC,
546
+ key=analysis_id.encode("utf-8"),
547
+ value=data
548
+ )
549
 
550
  def get_stock_info(company_name: str) -> Dict[str, str] | None:
551
  try:
 
603
  prices_df['Date'] = prices_df['Date'].dt.strftime('%Y-%m-%d')
604
  return prices_df.to_dict(orient='records')
605
 
606
+
607
+ # ---------------------------------------
608
+ # Kafka Consumer 루프 추가
609
+ # ---------------------------------------
610
+
611
+ async def consume_loop():
612
+ global consumer
613
+ logger.info(f"[KAFKA] started topic={KAFKA_TOPIC} group={KAFKA_GROUP_ID} bootstrap={KAFKA_BOOTSTRAP}")
614
+
615
+ try:
616
+ # ��� 현재 이벤트 루프를 미리 잡아둠 (threadpool 콜백에서 필요)
617
+ loop = asyncio.get_running_loop()
618
+
619
+ async for msg in consumer:
620
+ key = msg.key.decode() if msg.key else None
621
+ raw = msg.value.decode() if msg.value else ""
622
+
623
+ try:
624
+ payload = json.loads(raw)
625
+ except Exception:
626
+ logger.warning(f"[KAFKA] invalid json: {raw}")
627
+ continue
628
+
629
+ analysis_id = payload.get("analysisId")
630
+ url = (payload.get("url") or "").strip()
631
+ user_id = payload.get("userId")
632
+
633
+ if not analysis_id or not url:
634
+ logger.warning(f"[KAFKA] missing analysisId/url payload={payload}")
635
+ continue
636
+
637
+ logger.info(f"[KAFKA] consume key={key} analysisId={analysis_id} url={url}")
638
+
639
+ # ✅ (선택) 여기서 0%를 한 번 보내도 되지만,
640
+ # analyze_news_sync 내부에서도 0%를 emit 하게 만들었으면 중복될 수 있음.
641
+ # await produce_progress(analysis_id, user_id, 0, "START", "분석 시작")
642
+
643
+ # ✅ threadpool에서 호출될 progress 콜백
644
+ def progress_cb(percent: int, stage: str, message: str):
645
+ # threadpool(다른 스레드) -> 현재 이벤트루프에서 코루틴 실행
646
+ fut = asyncio.run_coroutine_threadsafe(
647
+ produce_progress(analysis_id, user_id, percent, stage, message),
648
+ loop
649
+ )
650
+ # (선택) progress 전송 실패 로그 보고 싶으면:
651
+ try:
652
+ fut.result(timeout=2)
653
+ except Exception as e:
654
+ logger.warning(f"[KAFKA] progress send failed: {e}")
655
+
656
+ try:
657
+ # ✅ 무거운 작업 → threadpool (progress_cb 전달!)
658
+ result = await run_in_threadpool(analyze_news_sync, url, user_id, progress_cb)
659
+
660
+ logger.info(f"[KAFKA] done analysisId={analysis_id} title={result.get('title')}")
661
+
662
+ # ✅ analyze_news_sync가 100% DONE까지 emit 한다면 여기서 100% 또 보낼 필요 없음
663
+ # await produce_progress(analysis_id, user_id, 100, "DONE", "분석 완료")
664
+
665
+ # ✅ DONE 이벤트는 analysis-done 토픽으로 (이건 그대로 유지)
666
+ if producer:
667
+ done_payload = json.dumps({
668
+ "analysisId": analysis_id,
669
+ "userId": user_id,
670
+ "result": result
671
+ }, ensure_ascii=False).encode("utf-8")
672
+
673
+ await producer.send_and_wait(
674
+ KAFKA_DONE_TOPIC,
675
+ key=analysis_id.encode("utf-8"),
676
+ value=done_payload
677
+ )
678
+ logger.info(f"[KAFKA] produced done analysisId={analysis_id}")
679
+
680
+ except Exception as e:
681
+ logger.error(f"[KAFKA] analysis failed analysisId={analysis_id}: {repr(e)}", exc_info=True)
682
+ await produce_progress(analysis_id, user_id, 100, "ERROR", f"오류: {type(e).__name__}")
683
+
684
+ finally:
685
+ logger.info("[KAFKA] stopped")
686
+
687
+
688
  # ---------------------------------------
689
  # 실행
690
  # ---------------------------------------