Leesn465 commited on
Commit
e66222b
·
verified ·
1 Parent(s): 4485637

Update main.py

Browse files
Files changed (1) hide show
  1. main.py +376 -113
main.py CHANGED
@@ -4,10 +4,7 @@ import uvicorn
4
  from pydantic import BaseModel
5
  import requests
6
  from bs4 import BeautifulSoup as bs
7
- import mysql.connector
8
  import os
9
- import google.genai as genai
10
- import json
11
  from util.keywordExtract import *
12
  from typing import Optional,List, Dict, Any, Union
13
  import pandas as pd
@@ -26,6 +23,11 @@ from datetime import datetime, timedelta
26
  from starlette.concurrency import run_in_threadpool
27
  import FinanceDataReader as fdr
28
  from groq import Groq
 
 
 
 
 
29
 
30
  app = FastAPI()
31
 
@@ -44,6 +46,77 @@ else:
44
  groq_client = Groq(api_key=API_KEY)
45
  logger.info(":white_check_mark: Groq API 설정 완료 (환경 변수 사용)")
46
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
47
  # ---------------------------------------
48
  # 입력/출력 모델
49
  # ---------------------------------------
@@ -154,7 +227,16 @@ def groq_use(text_content: Any) -> str:
154
 
155
  # 프롬프트 구성 (불필요한 특수문자 제거 및 슬라이싱)
156
  clean_text = text_for_ai[:500].replace('\n', ' ')
157
- prompt = f"상장사 이름을 '회사명' 형식으로 하나만 답해줘: {clean_text}"
 
 
 
 
 
 
 
 
 
158
 
159
  try:
160
  chat_completion = groq_client.chat.completions.create(
@@ -171,17 +253,6 @@ def groq_use(text_content: Any) -> str:
171
  logger.error(f"🚨 Groq 호출 실패: {repr(e)}")
172
  return "추출 실패"
173
 
174
- def get_stock_info(company_info: str) -> Dict[str, str] | None:
175
- try:
176
- if krx_listings is not None:
177
- # 포함 관계 확인
178
- for _, row in krx_listings.iterrows():
179
- if row['Name'] in company_info:
180
- return {"market": "KRX", "symbol": row['Code'], "name": row['Name']}
181
- except Exception as e:
182
- logger.error(f"주식 검색 에러: {e}")
183
- return None
184
-
185
 
186
  # ---------------------------------------
187
  # 1) 요약 단계
@@ -281,128 +352,221 @@ def step_predict(inp: PredictInput):
281
 
282
  return {"prediction": pred_label, "prob": prob}
283
 
 
 
284
  # ---------------------------------------
285
  # 호환용: 기존 parse-news (한방 요청) - 유지
286
  # ---------------------------------------
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
287
  @app.post("/ai/parse-news")
288
  def parse_news(req: NewsRequest):
289
- url = req.url.strip()
290
  try:
291
- meta = parse_article_all(url)
292
-
293
- # 키워드/요약(기존 resultKeyword 사용)
294
- rk = resultKeyword(meta["content"])
295
- targetCompany = groq_use(rk) # 텍스트 변환은 f-string 내부에서 처리됨
296
-
297
- # 감정(기존 로직)
298
- s = analyze_sentiment(meta["content"])
299
- pos, neg, neu = s["positive"], s["negative"], s["neutral"]
300
- print("부정:", neg)
301
- print("중립:", neu)
302
- print("긍정:", pos)
303
-
304
- reduced_net = neu / 2
305
- remaining = neu - reduced_net
306
- total_non_neu = neg + pos
307
- if total_non_neu > 0:
308
- neg += remaining * (neg / total_non_neu)
309
- pos += remaining * (pos / total_non_neu)
310
- else:
311
- neg += remaining / 2
312
- pos += remaining / 2
313
- neu = reduced_net
314
-
315
- max_label = max([("부정", neg), ("중립", neu), ("긍정", pos)], key=lambda x: x[1])[0]
316
- if max_label == "긍정":
317
- if pos >= 0.9: sentiment_label = f"매우 긍정 ({pos*100:.1f}%)"
318
- elif pos >= 0.6: sentiment_label = f"긍정 ({pos*100:.1f}%)"
319
- else: sentiment_label = f"약한 긍정 ({pos*100:.1f}%)"
320
- elif max_label == "부정":
321
- if neg >= 0.9: sentiment_label = f"매우 부정 ({neg*100:.1f}%)"
322
- elif neg >= 0.6: sentiment_label = f"부정 ({neg*100:.1f}%)"
323
- else: sentiment_label = f"약한 부정 ({neg*100:.1f}%)"
324
- else:
325
- sentiment_label = f"중립 ({neu*100:.1f}%)"
326
-
327
- # 예측
328
- summary_text = rk.get("summary") or summarize(meta["content"])
329
- _, keywords_2nd = extract_keywords(summary_text)
330
- clean_keywords = [kw for kw, _ in keywords_2nd]
331
- keyword_vec = embed_keywords(clean_keywords)
332
- input_vec = torch.tensor(keyword_vec, dtype=torch.float32).unsqueeze(0)
333
- model = SimpleClassifier(input_vec.shape[1])
334
- model.load_state_dict(torch.load("news_model.pt", map_location="cpu"))
335
- model.eval()
336
- with torch.no_grad():
337
- prob = model(input_vec).item()
338
- prediction_label = '📈 상승 (1)' if prob >= 0.5 else '📉 하락 (0)'
339
-
340
- return {
341
- **meta,
342
- "message": "뉴스 파싱 및 저장 완료",
343
- "summary": rk["summary"],
344
- "keyword": rk["keyword"],
345
- "company": targetCompany,
346
- "sentiment": sentiment_label,
347
- "sentiment_value": sentiment_label,
348
- "prediction": prediction_label,
349
- "prob": prob,
350
- }
351
-
352
- except requests.exceptions.RequestException as e:
353
- traceback.print_exc()
354
- raise HTTPException(status_code=500, detail=f"서버 오류: {e}")
355
  except Exception as e:
356
  traceback.print_exc()
357
  raise HTTPException(status_code=500, detail=f"서버 오류: {e}")
358
 
 
359
  # ---------------------------------------
360
  # 주가 데이터 (기존 유지)
361
  # ---------------------------------------
362
  krx_listings: pd.DataFrame = None
363
  us_listings: pd.DataFrame = None
364
 
365
-
366
  @app.on_event("startup")
367
  async def load_initial_data():
368
  global krx_listings, us_listings
369
- logger.info("✅ 서버 시작: 초기 데이터 로딩을 시작합니다...")
370
- try:
371
- krx_listings = await run_in_threadpool(fdr.StockListing, 'KRX')
372
- logger.info("📊 한국 상장 기업 목록 로딩 완료.")
373
- nasdaq = await run_in_threadpool(fdr.StockListing, 'NASDAQ')
374
- nyse = await run_in_threadpool(fdr.StockListing, 'NYSE')
375
- amex = await run_in_threadpool(fdr.StockListing, 'AMEX')
376
- us_listings = pd.concat([nasdaq, nyse, amex], ignore_index=True)
377
- logger.info("📊 미국 상장 기업 목록 로딩 완료.")
378
- logger.info("🌐 번역기 초기화 완료.")
379
- logger.info("✅ 초기 데이터 로딩 성공.")
380
- except Exception as e:
381
- logger.error(f"🚨 초기 데이터 로딩 오류: {e}", exc_info=True)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
382
 
383
  def get_stock_info(company_name: str) -> Dict[str, str] | None:
384
  try:
385
- # 1. 한국 주식(KRX) 검색
386
- if krx_listings is not None:
387
- kr_match = krx_listings[krx_listings['Name'].str.contains(company_name, case=False, na=False)]
388
- if not kr_match.empty:
389
- s = kr_match.iloc[0]
390
- return {"market": "KRX", "symbol": s['Code'], "name": s['Name']}
391
-
392
- # 2. 미국 주식 검색 (번역기 없이 이름으로 직접 검색)
393
- if us_listings is not None:
394
- # 영어 이름이 포함되어 있을 수 있으므로 대소문자 무시하고 검색
395
- us_match = us_listings[
396
- us_listings['Name'].str.contains(company_name, case=False, na=False) |
397
- us_listings['Symbol'].str.fullmatch(company_name, case=False)
398
- ]
399
- if not us_match.empty:
400
- s = us_match.iloc[0]
401
- return {"market": "US", "symbol": s['Symbol'], "name": s['Name']}
 
 
 
 
 
 
 
 
402
 
403
  except Exception as e:
404
- logger.error(f"주식 종목 검색 중 오류 발생: {e}")
405
-
406
  return None
407
 
408
 
@@ -436,6 +600,105 @@ async def get_stock_data_by_name(company_name: str = Query(..., description="조
436
  prices_df['Date'] = prices_df['Date'].dt.strftime('%Y-%m-%d')
437
  return prices_df.to_dict(orient='records')
438
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
439
  # ---------------------------------------
440
  # 실행
441
  # ---------------------------------------
 
4
  from pydantic import BaseModel
5
  import requests
6
  from bs4 import BeautifulSoup as bs
 
7
  import os
 
 
8
  from util.keywordExtract import *
9
  from typing import Optional,List, Dict, Any, Union
10
  import pandas as pd
 
23
  from starlette.concurrency import run_in_threadpool
24
  import FinanceDataReader as fdr
25
  from groq import Groq
26
+ import asyncio
27
+ import json
28
+ from aiokafka import AIOKafkaConsumer, AIOKafkaProducer
29
+ import ssl
30
+
31
 
32
  app = FastAPI()
33
 
 
46
  groq_client = Groq(api_key=API_KEY)
47
  logger.info(":white_check_mark: Groq API 설정 완료 (환경 변수 사용)")
48
 
49
+ KAFKA_BOOTSTRAP = os.getenv(
50
+ "KAFKA_BOOTSTRAP",
51
+ "newsnake-kafka-lsm71103186-f353.i.aivencloud.com:11897"
52
+ )
53
+
54
+ KAFKA_TOPIC = os.getenv("KAFKA_TOPIC", "news-analyze")
55
+ KAFKA_GROUP_ID = os.getenv("KAFKA_GROUP_ID", "ai-analyzer-group")
56
+ KAFKA_PROGRESS_TOPIC = os.getenv("KAFKA_PROGRESS_TOPIC", "analysis-progress")
57
+ KAFKA_DONE_TOPIC = os.getenv("KAFKA_DONE_TOPIC", "analysis-done")
58
+
59
+ KAFKA_CA_FILE = os.getenv("KAFKA_CA_FILE", "ca.pem")
60
+ KAFKA_CERT_FILE = os.getenv("KAFKA_CERT_FILE", "service.cert")
61
+ KAFKA_KEY_FILE = os.getenv("KAFKA_KEY_FILE", "service.key")
62
+
63
+ producer = None
64
+ consumer = None
65
+ consumer_task = None
66
+
67
+ def build_ssl_context():
68
+ ctx = ssl.create_default_context(cafile=KAFKA_CA_FILE)
69
+ ctx.load_cert_chain(certfile=KAFKA_CERT_FILE, keyfile=KAFKA_KEY_FILE)
70
+ return ctx
71
+
72
+ SSL_CONTEXT = build_ssl_context()
73
+
74
+ @app.on_event("startup")
75
+ async def start_kafka():
76
+ global producer, consumer, consumer_task
77
+
78
+ producer = AIOKafkaProducer(
79
+ bootstrap_servers=KAFKA_BOOTSTRAP,
80
+ security_protocol="SSL",
81
+ ssl_context=SSL_CONTEXT,
82
+ )
83
+ await producer.start()
84
+ logger.info("[KAFKA] producer started (SSL)")
85
+
86
+ consumer = AIOKafkaConsumer(
87
+ KAFKA_TOPIC,
88
+ bootstrap_servers=KAFKA_BOOTSTRAP,
89
+ group_id=KAFKA_GROUP_ID,
90
+ enable_auto_commit=True,
91
+ auto_offset_reset="latest",
92
+ security_protocol="SSL",
93
+ ssl_context=SSL_CONTEXT,
94
+ )
95
+ await consumer.start()
96
+ logger.info("[KAFKA] consumer started (SSL)")
97
+
98
+ consumer_task = asyncio.create_task(consume_loop())
99
+
100
+ @app.on_event("shutdown")
101
+ async def stop_kafka():
102
+ global producer, consumer, consumer_task
103
+ if consumer_task:
104
+ consumer_task.cancel()
105
+ try:
106
+ await consumer_task
107
+ except asyncio.CancelledError:
108
+ pass
109
+
110
+ if consumer:
111
+ await consumer.stop()
112
+ logger.info("[KAFKA] consumer stopped")
113
+
114
+ if producer:
115
+ await producer.stop()
116
+ logger.info("[KAFKA] producer stopped")
117
+
118
+
119
+
120
  # ---------------------------------------
121
  # 입력/출력 모델
122
  # ---------------------------------------
 
227
 
228
  # 프롬프트 구성 (불필요한 특수문자 제거 및 슬라이싱)
229
  clean_text = text_for_ai[:500].replace('\n', ' ')
230
+ prompt = f'''제공되는 뉴스 본문을 읽고, 뉴스와 가장 연관성이 높은 기업
231
+ 현재 주식 시장(KOSPI, KOSDAQ 등)에 상장된 기업의 이름 하나만로 출력해줘.
232
+ [제약 사항]
233
+ 뉴스 본문과 가장 연관이 된 회사일 것
234
+ 꼭 하나의 회사를 추출할 것
235
+ 없음이라고 표시하지 말 것
236
+ 상장되지 않은 일반 단체, 정부 기관, 비상장사는 제외할 것.
237
+ FinanceDataReader 이 라이브러리에 존재하는 회사만 추출할 것.
238
+ 설명 없이 회사 이름만 나열할 것.
239
+ 뉴스에 언급된 맥락상 '기업'임이 확실한 것만 포함할 것 : {clean_text}'''
240
 
241
  try:
242
  chat_completion = groq_client.chat.completions.create(
 
253
  logger.error(f"🚨 Groq 호출 실패: {repr(e)}")
254
  return "추출 실패"
255
 
 
 
 
 
 
 
 
 
 
 
 
256
 
257
  # ---------------------------------------
258
  # 1) 요약 단계
 
352
 
353
  return {"prediction": pred_label, "prob": prob}
354
 
355
+
356
+
357
  # ---------------------------------------
358
  # 호환용: 기존 parse-news (한방 요청) - 유지
359
  # ---------------------------------------
360
+ def analyze_news_sync(
361
+ url: str,
362
+ user_id: str | None = None,
363
+ progress_cb=None, # ✅ 추가
364
+ ) -> Dict[str, Any]:
365
+
366
+ def emit(percent: int, stage: str, message: str):
367
+ if progress_cb:
368
+ try:
369
+ progress_cb(percent, stage, message)
370
+ except Exception:
371
+ pass
372
+
373
+ emit(0, "START", "분석 시작")
374
+
375
+ # 1) 기사 파싱
376
+ emit(5, "PARSING", "뉴스 파싱 중...")
377
+ meta = parse_article_all(url)
378
+ emit(15, "PARSING", "뉴스 파싱 완료")
379
+
380
+ # 2) 요약/키워드(1차) (네가 원래 하던 resultKeyword)
381
+ emit(25, "SUMMARY", "요약/키워드 생성 중...")
382
+ rk = resultKeyword(meta["content"])
383
+ emit(35, "SUMMARY", "요약/키워드 생성 완료")
384
+
385
+ # 3) 회사 추론
386
+ emit(45, "COMPANY", "관련 회사 분석 중...")
387
+ targetCompany = groq_use(rk)
388
+ emit(55, "COMPANY", "관련 회사 분석 완료")
389
+
390
+ # 4) 감성 분석
391
+ emit(65, "SENTIMENT", "감정 분석 중...")
392
+ s = analyze_sentiment(meta["content"])
393
+ emit(75, "SENTIMENT", "감정 분석 완료")
394
+
395
+ # (원래 감성 후처리 로직 그대로)
396
+ pos, neg, neu = s["positive"], s["negative"], s["neutral"]
397
+
398
+ reduced_net = neu / 2
399
+ remaining = neu - reduced_net
400
+ total_non_neu = neg + pos
401
+ if total_non_neu > 0:
402
+ neg += remaining * (neg / total_non_neu)
403
+ pos += remaining * (pos / total_non_neu)
404
+ else:
405
+ neg += remaining / 2
406
+ pos += remaining / 2
407
+ neu = reduced_net # ✅ 원래 코드에 있었던 거 유지해야 함
408
+
409
+ max_label = max([("부정", neg), ("중립", neu), ("긍정", pos)], key=lambda x: x[1])[0]
410
+ if max_label == "긍정":
411
+ if pos >= 0.9:
412
+ sentiment_label = f"매우 긍정 ({pos*100:.1f}%)"
413
+ elif pos >= 0.6:
414
+ sentiment_label = f"긍정 ({pos*100:.1f}%)"
415
+ else:
416
+ sentiment_label = f"약한 긍정 ({pos*100:.1f}%)"
417
+ elif max_label == "부정":
418
+ if neg >= 0.9:
419
+ sentiment_label = f"매우 부정 ({neg*100:.1f}%)"
420
+ elif neg >= 0.6:
421
+ sentiment_label = f"부정 ({neg*100:.1f}%)"
422
+ else:
423
+ sentiment_label = f"약한 부정 ({neg*100:.1f}%)"
424
+ else:
425
+ sentiment_label = f"중립 ({neu*100:.1f}%)"
426
+
427
+ # 5) (네 원래 코드 유지) summary_text / keywords_2nd / clean_keywords
428
+ emit(82, "KEYWORDS", "키워드 추출(2차) 중...")
429
+ summary_text = rk.get("summary") or summarize(meta["content"])
430
+ _, keywords_2nd = extract_keywords(summary_text)
431
+ clean_keywords = [kw for kw, _ in keywords_2nd]
432
+ emit(88, "KEYWORDS", "키워드 추출 완료")
433
+
434
+ # 6) 임베딩 + 예측
435
+ emit(92, "PREDICT", "주가 예측 중...")
436
+ keyword_vec = embed_keywords(clean_keywords)
437
+ input_vec = torch.tensor(keyword_vec, dtype=torch.float32).unsqueeze(0)
438
+
439
+ model = SimpleClassifier(input_vec.shape[1])
440
+ model.load_state_dict(torch.load("news_model.pt", map_location="cpu"))
441
+ model.eval()
442
+ with torch.no_grad():
443
+ prob = model(input_vec).item()
444
+ prediction_label = "📈 상승 (1)" if prob >= 0.5 else "📉 하락 (0)"
445
+ emit(98, "PREDICT", "주가 예측 완료")
446
+
447
+ emit(100, "DONE", "분석 완료")
448
+
449
+ # ✅ 리턴 키는 “원래 네 함수랑 최대한 동일하게”
450
+ return {
451
+ **meta,
452
+ "message": "뉴스 파싱 및 저장 완료",
453
+ "summary": rk.get("summary"), # 원래: rk["summary"]
454
+ "keyword": rk.get("keyword"), # 원래: rk["keyword"]
455
+ "company": targetCompany,
456
+ "sentiment": sentiment_label,
457
+ "sentiment_value": sentiment_label,
458
+ "prediction": prediction_label,
459
+ "prob": prob,
460
+ "userId": user_id,
461
+ }
462
+
463
+
464
+
465
  @app.post("/ai/parse-news")
466
  def parse_news(req: NewsRequest):
 
467
  try:
468
+ return analyze_news_sync(req.url.strip(), user_id=req.id)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
469
  except Exception as e:
470
  traceback.print_exc()
471
  raise HTTPException(status_code=500, detail=f"서버 오류: {e}")
472
 
473
+
474
  # ---------------------------------------
475
  # 주가 데이터 (기존 유지)
476
  # ---------------------------------------
477
  krx_listings: pd.DataFrame = None
478
  us_listings: pd.DataFrame = None
479
 
 
480
  @app.on_event("startup")
481
  async def load_initial_data():
482
  global krx_listings, us_listings
483
+ file_path_kr = "krx_listings.csv"
484
+ file_path_ns = "nas_listings.csv"
485
+
486
+ # --- 1. 한국 시장 로딩 ---
487
+ if os.path.exists(file_path_kr):
488
+ # dtype={'Code': str} 설정해야 '005930'이 '5930'이 되지 않습니다.
489
+ krx_listings = pd.read_csv(file_path_kr, dtype={'Code': str})
490
+ logger.info("💾 로컬 파일에서 KRX 목록을 불러왔습니다.")
491
+ else:
492
+ try:
493
+ krx_listings = await run_in_threadpool(fdr.StockListing, 'KRX')
494
+ # 한글 깨짐 방지를 위해 utf-8-sig 권장
495
+ krx_listings.to_csv(file_path_kr, index=False, encoding='utf-8-sig')
496
+ logger.info("📊 KRX 데이터를 새로 받아 저장했습니다.")
497
+ except Exception as e:
498
+ logger.error(f"🚨 KRX 데이터 로딩 실패: {e}")
499
+ krx_listings = pd.DataFrame(columns=['Code', 'Name']) # 빈 데이터프레임 할당
500
+
501
+ # --- 2. 미국 시장 로딩 ---
502
+ if os.path.exists(file_path_ns):
503
+ us_listings = pd.read_csv(file_path_ns, dtype={'Symbol': str})
504
+ logger.info("💾 로컬 파일에서 US 목록을 불러왔습니다.")
505
+ else:
506
+ try:
507
+ # 여러 시장 데이터를 합칠 때 에러 방지
508
+ nasdaq = await run_in_threadpool(fdr.StockListing, 'NASDAQ')
509
+ nyse = await run_in_threadpool(fdr.StockListing, 'NYSE')
510
+ amex = await run_in_threadpool(fdr.StockListing, 'AMEX')
511
+
512
+ us_listings = pd.concat([nasdaq, nyse, amex], ignore_index=True)
513
+ us_listings.to_csv(file_path_ns, index=False, encoding='utf-8-sig')
514
+ logger.info("📊 미국 상장 기업 목록 로딩 완료.")
515
+ except Exception as e:
516
+ logger.error(f"🚨 미국 상장사 로딩 실패: {e}")
517
+ us_listings = pd.DataFrame(columns=['Symbol', 'Name'])
518
+
519
+
520
+ async def produce_progress(analysis_id: str, user_id: str | None, percent: int, stage: str, message: str):
521
+ if not producer:
522
+ return
523
+
524
+ payload = {
525
+ "analysisId": analysis_id,
526
+ "userId": user_id,
527
+ "percent": percent,
528
+ "stage": stage,
529
+ "message": message,
530
+ }
531
+ data = json.dumps(payload, ensure_ascii=False).encode("utf-8")
532
+
533
+ await producer.send_and_wait(
534
+ KAFKA_PROGRESS_TOPIC,
535
+ key=analysis_id.encode("utf-8"),
536
+ value=data
537
+ )
538
 
539
  def get_stock_info(company_name: str) -> Dict[str, str] | None:
540
  try:
541
+ name = (company_name or "").strip()
542
+ if not name:
543
+ return None
544
+
545
+ # 1) KRX
546
+ if krx_listings is not None and not krx_listings.empty:
547
+ # code 컬럼 자동 선택
548
+ code_col = 'Code' if 'Code' in krx_listings.columns else ('Symbol' if 'Symbol' in krx_listings.columns else None)
549
+ if code_col and 'Name' in krx_listings.columns:
550
+ kr_match = krx_listings[krx_listings['Name'].str.contains(name, case=False, na=False)]
551
+ if not kr_match.empty:
552
+ s = kr_match.iloc[0]
553
+ code = str(s[code_col]).zfill(6)
554
+ return {"market": "KRX", "symbol": code, "name": s['Name']}
555
+
556
+ # 2) US
557
+ if us_listings is not None and not us_listings.empty:
558
+ if 'Name' in us_listings.columns and 'Symbol' in us_listings.columns:
559
+ us_match = us_listings[
560
+ us_listings['Name'].str.contains(name, case=False, na=False) |
561
+ us_listings['Symbol'].str.fullmatch(name, case=False)
562
+ ]
563
+ if not us_match.empty:
564
+ s = us_match.iloc[0]
565
+ return {"market": "US", "symbol": str(s['Symbol']), "name": s['Name']}
566
 
567
  except Exception as e:
568
+ logger.error(f"주식 종목 검색 중 오류 발생: {repr(e)}", exc_info=True)
569
+
570
  return None
571
 
572
 
 
600
  prices_df['Date'] = prices_df['Date'].dt.strftime('%Y-%m-%d')
601
  return prices_df.to_dict(orient='records')
602
 
603
+
604
+ # ---------------------------------------
605
+ # Kafka Consumer 루프 추가
606
+ # ---------------------------------------
607
+
608
+ async def consume_loop():
609
+ global consumer
610
+ logger.info(f"[KAFKA] started topic={KAFKA_TOPIC} group={KAFKA_GROUP_ID} bootstrap={KAFKA_BOOTSTRAP}")
611
+
612
+ try:
613
+ # ✅ 현재 이벤트 루프를 미리 잡아둠 (threadpool 콜백에서 필요)
614
+ loop = asyncio.get_running_loop()
615
+
616
+ async for msg in consumer:
617
+ key = msg.key.decode() if msg.key else None
618
+ raw = msg.value.decode() if msg.value else ""
619
+
620
+ try:
621
+ payload = json.loads(raw)
622
+ except Exception:
623
+ logger.warning(f"[KAFKA] invalid json: {raw}")
624
+ continue
625
+
626
+ analysis_id = payload.get("analysisId")
627
+ url = (payload.get("url") or "").strip()
628
+ user_id = payload.get("userId")
629
+
630
+ if not analysis_id or not url:
631
+ logger.warning(f"[KAFKA] missing analysisId/url payload={payload}")
632
+ continue
633
+
634
+ logger.info(f"[KAFKA] consume key={key} analysisId={analysis_id} url={url}")
635
+
636
+ # ✅ (선택) 여기서 0%를 한 번 보내도 되지만,
637
+ # analyze_news_sync 내부에서도 0%를 emit 하게 만들었으면 중복될 수 있음.
638
+ # await produce_progress(analysis_id, user_id, 0, "START", "분석 시작")
639
+
640
+ # ✅ threadpool에서 호출될 progress 콜백
641
+ def progress_cb(percent: int, stage: str, message: str):
642
+ # threadpool(다른 스레드) -> 현재 이벤트루프에서 코루틴 실행
643
+ try:
644
+ asyncio.run_coroutine_threadsafe(
645
+ produce_progress(analysis_id, user_id, percent, stage, message),
646
+ loop
647
+ )
648
+
649
+ except Exception as e:
650
+ logger.warning(f"[KAFKA] progress send failed: {e}")
651
+
652
+ try:
653
+ # ✅ 무거운 작업 → threadpool (progress_cb 전달!)
654
+ result = await run_in_threadpool(analyze_news_sync, url, user_id, progress_cb)
655
+
656
+ logger.info(f"[KAFKA] done analysisId={analysis_id} title={result.get('title')}")
657
+
658
+ # ✅ analyze_news_sync가 100% DONE까지 emit 한다면 여기서 100% 또 보낼 필요 없음
659
+ # await produce_progress(analysis_id, user_id, 100, "DONE", "분석 완료")
660
+
661
+ # ✅ DONE 이벤트는 analysis-done 토픽으로 (이건 그대로 유지)
662
+ if producer:
663
+ done_payload = json.dumps({
664
+ "analysisId": analysis_id,
665
+ "userId": user_id,
666
+ "result": result
667
+ }, ensure_ascii=False).encode("utf-8")
668
+
669
+ logger.info("DONE topic=%s bytes=%d", KAFKA_DONE_TOPIC, len(done_payload))
670
+
671
+ await producer.send_and_wait(
672
+ KAFKA_DONE_TOPIC,
673
+ key=analysis_id.encode("utf-8"),
674
+ value=done_payload
675
+ )
676
+ logger.info(f"[KAFKA] produced done analysisId={analysis_id}")
677
+
678
+ except Exception as e:
679
+ logger.error(f"[KAFKA] analysis failed analysisId={analysis_id}: {repr(e)}", exc_info=True)
680
+ await produce_progress(analysis_id, user_id, 100, "ERROR", f"오류: {type(e).__name__}")
681
+ if producer:
682
+ err_payload = {
683
+ "analysisId": analysis_id,
684
+ "userId": user_id,
685
+ "result": {
686
+ "error": True,
687
+ "message": f"분석 실패: {type(e).__name__}",
688
+ "url": url,
689
+ }
690
+ }
691
+ await producer.send_and_wait(
692
+ KAFKA_DONE_TOPIC,
693
+ key=analysis_id.encode("utf-8"),
694
+ value=json.dumps(err_payload, ensure_ascii=False).encode("utf-8")
695
+ )
696
+ logger.info(f"[KAFKA] produced error-done analysisId={analysis_id}")
697
+
698
+ finally:
699
+ logger.info("[KAFKA] stopped")
700
+
701
+
702
  # ---------------------------------------
703
  # 실행
704
  # ---------------------------------------