import streamlit as st from datetime import datetime import pandas as pd import os import json import uuid import pickle from tqdm import tqdm from typing import Union, Dict, List, Any import pandas as pd from openai import OpenAI from lida_ko.components.summarizer import Summarizer from pydantic.dataclasses import dataclass DIR = "_csv_data" @dataclass class Metadata: title: str description: str keywords: List[str] timestamp: str file_data: List[str] | None organization: str | None department: str | None phone: str | None update_interval: str | None updated_at: str | None next_update_at: str cost : str | None serving_type: str | None download_count: str | None permission_scope: str| None augmentation: Dict | None def title_to_id(title: str) -> str: with open('./data/title_to_id.json', 'r') as f: title_to_id = json.load(f) return title_to_id[title] def id_to_metadata(data_id: str) -> dict: with open('./data/id_to_metadata.json', 'r') as f: id_to_metadata = json.load(f) return id_to_metadata[data_id] def title_to_filename(title: str) -> str: data_id = title_to_id(title) metadata = id_to_metadata(data_id) file_data = metadata['file_data'] file_name = os.path.join(os.getcwd(), DIR, file_data[0]['filename']) return file_name def title_to_df(title) -> pd.DataFrame: filename = title_to_filename(title) df = pd.read_csv(filename) return df def safe_int(value: str) -> int: """10진수 정수로 변환하고, 변환할 수 없는 경우 0을 반환합니다.""" try: return int(value) except (ValueError, TypeError): return 0 # 메타데이터를 다운로드 순으로 정렬하고, 다운로드 상위 10인 데이터셋을 반환합니다. @st.cache_data def load_datasets(dataframe=True, top_n = None) -> Dict: with open("./data/id_to_metadata.json") as f: metadata = json.load(f) # metadata.items()로 (key, value) 쌍을 가져와서 value["download_count"]로 정렬 sorted_metadata = sorted(metadata.items(), key=lambda item: safe_int(item[1]["download_count"]), reverse=True) # 상위 10개 항목을 선택 if top_n: top_n_metadata = sorted_metadata[:top_n] else: top_n_metadata = sorted_metadata # 키와 값 형태로 출력하기 위해 리스트를 딕셔너리로 변환 top_n_metadata_dict = {k: v for k, v in top_n_metadata} if not dataframe: return top_n_metadata_dict return pd.DataFrame(top_n_metadata_dict).T def save_session_cache(session_data: Dict) -> str: del session_data["lida_ko"] del session_data["selected_goal_object"] # implement goal serial and deseiralization session_id = str(uuid.uuid4())[:8] with open(f'./data/session_cache/{session_id}.pkl', 'wb') as f: pickle.dump(session_data, f) return session_id def load_session_cache(session_id: str) -> Dict: if os.path.exists(f'./data/session_cache/{session_id}.pkl'): with open(f'./data/session_cache/{session_id}.pkl', 'rb') as f: session_data = pickle.load(f) return session_data return {} def add_column_metadata(data: Dict[str, Metadata]) -> Dict: _data = data.copy() summarizer = Summarizer() for data_id, metadata in tqdm(_data.items(), desc=f'Adding column'): if len(metadata['file_data']) == 0: continue filepath = os.getcwd() + "/" + "_csv_data/" + metadata['file_data'][0]['filename'] if filepath.lower().endswith('.csv'): df = pd.read_csv(filepath) elif filepath.lower().endswith('.json'): df = pd.read_json(filepath) column_properties = summarizer.get_column_properties(df, 5) metadata['column_properties'] = column_properties def convert_timestamps_to_strings(data: Union[Dict[str, Any], list, Any]) -> Union[Dict[str, Any], list, Any]: if isinstance(data, dict): for key, value in data.items(): data[key] = convert_timestamps_to_strings(value) elif isinstance(data, list): for i in range(len(data)): data[i] = convert_timestamps_to_strings(data[i]) elif isinstance(data, datetime): return data.isoformat() return data with open('./data/data_with_column_metadata.json', 'w', encoding='utf-8') as f: converted_data = convert_timestamps_to_strings(_data) json.dump(converted_data, f, ensure_ascii=False, indent=4) def augment_data_with_llm(data: Dict[str, Metadata]) -> Dict: SYSTEM_PROMPT = """ 당신은 부산광역시의 공공 데이터를 분석하고 중요한 인사이트를 생성하는 AI입니다. 주어진 데이터 사양을 면밀히 검사하고, 중요한 인사이트 10개와 사람들이 데이터에 대해 가장 알고 싶어할 질문 10개를 생성하는 것이 주요 임무입니다. 다음 지침을 신중히 따르세요: 1. 데이터 분석: 제공된 데이터 사양을 자세히 검사합니다. 2. 인사이트 생성: 데이터에서 가장 중요한 인사이트 10개를 식별하고 명확하게 설명합니다. 이러한 인사이트는 데이터의 중요한 패턴, 트렌드, 이상치 또는 주요 정보를 반영해야 합니다. 3. 질문 예측: 사람들이 데이터에 대해 가장 알고 싶어할 만한 질문 10개를 예측하고 나열합니다. 이 질문들은 데이터의 맥락에 맞고 일반적인 사용자 관심사나 우려를 반영해야 합니다. 출력은 아래의 JSON 형식을 엄격히 따라야 합니다. 출력 내용에 ,가 들어가는 경우 \\,로 이스케이프 처리해야 합니다. { "insights": [insight1, insight2, ...], "questions": [question1, question2, ...] } """ def save_cache(cache, filepath='./data/id_to_metadata_col_aug.json'): with open(filepath, 'w', encoding='utf-8') as f: json.dump(cache, f, ensure_ascii=False, indent=4) print(f"Saved {len(cache)} data") _data = data.copy() count = 0 client = OpenAI( api_key=os.getenv("OPENAI_API_KEY") ) try: with open('./data/id_to_metadata_col_aug.json', 'r') as f: cache = json.load(f) except FileNotFoundError: cache = {} total_token = 0 skipped = 0 with tqdm(total=len(_data)) as pbar: for k, v in _data.items(): if k in cache: print(f"Skipping {k}") skipped += 1 pbar.set_postfix({'total_token': total_token, 'skipped': skipped,}) pbar.update(1) count += 1 continue cache[k] = v count += 1 metadata_to_str = "" USER_PROMPT = f"""여기 데이터의 세부 사항이 있어 지시를 충실히 따라줘\n\n {json.dumps(v, ensure_ascii=False, indent=4)}""" try: response = client.chat.completions.create( model='gpt-3.5-turbo-16k', messages=[ {"role": "system", "content": SYSTEM_PROMPT}, {"role": "user", "content": USER_PROMPT} ] ) result = response.choices[0].message.content total_token += int(response.usage.total_tokens) pbar.set_postfix({'total_token': total_token, 'skipped': skipped,}) pbar.update(1) result_json = json.loads(result) v['insights'] = result_json['insights'] v['questions'] = result_json['questions'] except Exception as e: print(f"Error processing key {k}: {e}") continue # print('Token usage', response.usage) # print() if count % 10 == 0: save_cache(cache) save_cache(cache) def convert_timestamps_to_strings(data): if isinstance(data, dict): for key, value in data.items(): if isinstance(value, dict): convert_timestamps_to_strings(value) elif isinstance(value, list): for item in value: convert_timestamps_to_strings(item) elif isinstance(value, datetime): data[key] = value.isoformat() elif isinstance(value, (datetime,)): data[key] = value.isoformat() elif isinstance(data, list): for item in data: convert_timestamps_to_strings(item) return data def dict_to_string(data): # Convert all datetime objects to strings data = convert_timestamps_to_strings(data) # Convert the dictionary to a JSON string return json.dumps(data, ensure_ascii=False, indent=2)