import argparse import csv import json import pandas as pd import requests from datetime import datetime from pathlib import Path from io import StringIO def get_user_df(input_path: str, top_n: int) -> pd.DataFrame: df = pd.read_csv(input_path, dtype={'종목코드': str}) df['매입금액'] = df['매입주가'] * df['보유주식수'] df_sorted = df.sort_values(by='매입금액', ascending=False) df_top = df_sorted.head(top_n) df_top = df_top[['종목코드', '종목명', '매입주가', '보유주식수']] return df_top def request_post(base_url: str, service: str, req_body: dict) -> dict: try: url = f'{base_url}/{service}' res = requests.post( url=url, json=req_body ) res.raise_for_status() except Exception as e: print(f'[ERROR] {service} / 에러 발생: {str(e)}') res_body = res.json() if res_body['success']: return res_body['data'] def update_user_with_stock_price(json_data, res_data): """ res_data 로 받은 stock_price 정보를 기반으로 json_data["user"] 항목에 현재주가, 수익률, 현재자산을 업데이트 """ def normalize(code): return str(code).strip().upper() # user 목록 가져오기 user_list = json_data.get("user", []) if not isinstance(user_list, list): print("[WARN] user 데이터 구조가 list 아님 → 업데이트 불가") return # 종목 가격 dict(normalized) stock_price_map = {normalize(k): v for k, v in res_data.items()} # user 각 항목 업데이트 for user_item in user_list: raw_code = user_item.get("종목코드") if not raw_code: continue code = normalize(raw_code) # stock_price 에 해당 종목이 존재하는지 확인 prices = stock_price_map.get(code) if not prices: print(f"[WARN] {code} 종목의 주가 데이터 없음 → user 추가정보 스킵") continue # 최신 종가 (list 마지막) last_row = prices[-1] try: current_price = float(last_row.get("Close")) buy_price = float(user_item.get("매입주가")) qty = float(user_item.get("보유주식수")) except Exception: print(f"[WARN] {code} 값 변환 오류 → user 추가정보 스킵") continue # 수익률 계산 profit_rate = round(((current_price - buy_price) / buy_price) * 100, 2) current_asset = round(current_price * qty, 2) # 업데이트 user_item["현재주가"] = str(current_price) user_item["수익률"] = f"{profit_rate}%" user_item["현재자산"] = str(current_asset) # 저장 json_data["user"] = user_list return json_data def collect_data(region, base_url: str, portfolio: pd.DataFrame): json_data = { 'user': [], 'similar_investors': {}, 'investment_company': {}, 'industry_info': [], 'theme_info': [], 'stock_price': {}, 'stock_news': {}, } # ------------------------------ # 유사 투자자 # ------------------------------ service = 'similar_investors' req_body = { 'csv_text': portfolio.to_csv(index=False), 'region': region[0] if isinstance(region, list) else region, 'top': 5 } print(f"[INFO] 유사 투자자 {req_body['top']}개 수집 시작..") res_data = request_post(base_url, service, req_body) if not res_data: print(f'[ERROR] 유사 투자자 수집 실패..') return None print(f'[INFO] 유사 투자자 수집 완료:', res_data) json_data[service] = res_data # user 키 저장 used_user_csv = portfolio.to_csv(index=False) f = StringIO(used_user_csv) reader = csv.reader(f) rows = list(reader) if not rows or len(rows) < 2: print("[WARN] portfolio CSV 내용이 비어 있어 user 데이터 저장을 생략합니다.") json_data["user"] = [] else: headers = rows[0] user_list = [dict(zip(headers, cols)) for cols in rows[1:]] json_data["user"] = user_list # 산업, 테마 요청시 투자자 종목도 포함 si_names = list(set([d['NAME'] for rows in res_data.values() for d in rows])) si_companies = list(set([d['COMPANY'] for rows in res_data.values() for d in rows])) # ------------------------------ # 유사 투자회사 설명 # ------------------------------ print("[INFO] 투자회사 name 목록:", si_names) service = "investment_company" for name in si_names: print(f"[INFO] {name} 설명 수집 시작..") req_body = {"name": name} res_data = request_post(base_url, service, req_body) if not res_data: print("[ERROR] 설명 수집 실패:", name) json_data[service][name] = { "error": True, "detail": "request_post failed" } continue print("[INFO] 설명 수집 완료:", res_data) json_data[service][name] = res_data if not json_data[service]: print(f'[ERROR] 유사 투자자 수집 실패..') return None print() # ------------------------------ # 산업정보 # ------------------------------ service = 'industry_info' stock = portfolio['종목코드'].to_list() + si_companies req_body = { 'stock': stock } print(f'[INFO] 산업정보 {",".join(stock)} 수집 시작..') res_data = request_post(base_url, service, req_body) if not res_data: print(f'[ERROR] 산업정보 수집 실패..') return None # 중복 제거 seen = set() unique_data = [d for d in res_data if not (d['stock'] in seen or seen.add(d['stock']))] print(f'[INFO] 산업정보 수집 완료:', unique_data) print() json_data[service] = unique_data # ------------------------------ # 테마정보 # ------------------------------ service = 'theme_info' stock = portfolio['종목코드'].to_list() + si_companies req_body = { 'stock': stock } print(f'[INFO] 테마정보 {",".join(stock)} 수집 시작..') res_data = request_post(base_url, service, req_body) if not res_data: print(f'[ERROR] 테마정보 수집 실패..') return None # 중복 제거 seen = set() unique_data = [d for d in res_data if not (d['stock'] in seen or seen.add(d['stock']))] print(f'[INFO] 테마정보 수집 완료:', unique_data) print() json_data[service] = unique_data # ------------------------------ # 종목 가격 # ------------------------------ service = 'stock_price' stock = ','.join(portfolio['종목코드']) req_body = {'stock': stock} print(f'[INFO] 종목가격 {stock} 수집 시작..') res_data = request_post(base_url, service, req_body) if not res_data: print(f'[ERROR] 종목가격 수집 실패..') return None print(f'[INFO] 종목가격 수집 완료:', res_data) print() json_data[service] = res_data json_data = update_user_with_stock_price(json_data, res_data) # ------------------------------ # 종목 뉴스 # ------------------------------ service = 'stock_news' for stock in portfolio['종목코드']: print(f'[INFO] 종목뉴스 {stock} 수집 시작..') req_body = { 'stock': stock, 'period': 7 } res_data = request_post(base_url, service, req_body) if res_data: print(f'[INFO] 종목뉴스 {stock} 수집 완료:', res_data) json_data[service] |= res_data else: print(f'[ERROR] 종목뉴스 {stock} 수집 실패..') if not json_data[service]: print(f'[ERROR] 종목뉴스 수집 실패..') return None print() return json_data def save_results(output_root: str, user_csv: str, output_json: dict, report: str): output_dir = Path(output_root) / datetime.now().strftime('%y%m%d_%H%M%S') output_dir.mkdir(parents=True, exist_ok=True) # ------------------------------ # 유저 포트폴리오 저장 # ------------------------------ user_path = output_dir / 'user.csv' with open(user_path, 'w') as f: f.write(user_csv) print(f'[INFO] 유저 포트폴리오 저장 완료 -> {user_path}') # ------------------------------ # 데이터 저장 # ------------------------------ output_path = output_dir / 'output.json' with open(output_path, 'w') as f: json.dump(output_json, f, ensure_ascii=False, indent=2) print(f'[INFO] 데이터 저장 완료 -> {output_path}') # ------------------------------ # 리포트 저장 # ------------------------------ report_path = output_dir / 'report.txt' with open(report_path, 'w') as f: f.write(report) print(f'[INFO] 리포트 저장 완료 -> {report_path}') def main(): parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter) parser.add_argument('--server', type=str, default='http://localhost:8080', help='서버 주소') parser.add_argument("--region", choices=['ko', 'us'], required=True, help="지역 선택: ko(한국) 또는 us(미국)") parser.add_argument('--user', type=str, default='user.csv', help='유저 포트폴리오 csv 파일 경로') parser.add_argument('--user_name', type=str, default='이서준', help='투자자 이름') parser.add_argument('--output', type=str, default='results', help='결과 경로') args = parser.parse_args() # ------------------------------ # 데이터 수집 # ------------------------------ user_df = get_user_df(args.user, top_n=5) json_data = collect_data(args.region, args.server, user_df) if not json_data: print(f'[ERROR] 데이터 수집 실패.. 보고서 생성 종료.') return # ------------------------------ # 보고서 생성 # ------------------------------ service = 'report' req_body = { "date": datetime.now().strftime("%Y-%m-%d"), # 리포트 생성 날짜 (기본: 오늘 날짜) 'user_name': args.user_name, 'csv_text': user_df.to_csv(index=False), 'json_text': json_data } print(f'[INFO] 유저 [{args.user_name}] 보고서 생성 시작..') report_data = request_post(args.server, service, req_body) if not report_data: print(f'[ERROR] 보고서 생성 실패..') return report = report_data['report'] # ------------------------------ # 결과 저장 # ------------------------------ save_results(args.output, user_df.to_csv(index=False), json_data, report) if __name__ == '__main__': main()