busan_data_navigator / utils /data_utils.py
atoye1's picture
fix caching decorator
9352255
import streamlit as st
from datetime import datetime
import pandas as pd
import os
import json
import uuid
import pickle
from tqdm import tqdm
from typing import Union, Dict, List, Any
import pandas as pd
from openai import OpenAI
from lida_ko.components.summarizer import Summarizer
from pydantic.dataclasses import dataclass
DIR = "_csv_data"
@dataclass
class Metadata:
title: str
description: str
keywords: List[str]
timestamp: str
file_data: List[str] | None
organization: str | None
department: str | None
phone: str | None
update_interval: str | None
updated_at: str | None
next_update_at: str
cost : str | None
serving_type: str | None
download_count: str | None
permission_scope: str| None
augmentation: Dict | None
def title_to_id(title: str) -> str:
with open('./data/title_to_id.json', 'r') as f:
title_to_id = json.load(f)
return title_to_id[title]
def id_to_metadata(data_id: str) -> dict:
with open('./data/id_to_metadata.json', 'r') as f:
id_to_metadata = json.load(f)
return id_to_metadata[data_id]
def title_to_filename(title: str) -> str:
data_id = title_to_id(title)
metadata = id_to_metadata(data_id)
file_data = metadata['file_data']
file_name = os.path.join(os.getcwd(), DIR, file_data[0]['filename'])
return file_name
def title_to_df(title) -> pd.DataFrame:
filename = title_to_filename(title)
df = pd.read_csv(filename)
return df
def safe_int(value: str) -> int:
"""10μ§„μˆ˜ μ •μˆ˜λ‘œ λ³€ν™˜ν•˜κ³ , λ³€ν™˜ν•  수 μ—†λŠ” 경우 0을 λ°˜ν™˜ν•©λ‹ˆλ‹€."""
try:
return int(value)
except (ValueError, TypeError):
return 0
# 메타데이터λ₯Ό λ‹€μš΄λ‘œλ“œ 순으둜 μ •λ ¬ν•˜κ³ , λ‹€μš΄λ‘œλ“œ μƒμœ„ 10인 데이터셋을 λ°˜ν™˜ν•©λ‹ˆλ‹€.
@st.cache_data
def load_datasets(dataframe=True, top_n = None) -> Dict:
with open("./data/id_to_metadata.json") as f:
metadata = json.load(f)
# metadata.items()둜 (key, value) μŒμ„ κ°€μ Έμ™€μ„œ value["download_count"]둜 μ •λ ¬
sorted_metadata = sorted(metadata.items(), key=lambda item: safe_int(item[1]["download_count"]), reverse=True)
# μƒμœ„ 10개 ν•­λͺ©μ„ 선택
if top_n:
top_n_metadata = sorted_metadata[:top_n]
else:
top_n_metadata = sorted_metadata
# 킀와 κ°’ ν˜•νƒœλ‘œ 좜λ ₯ν•˜κΈ° μœ„ν•΄ 리슀트λ₯Ό λ”•μ…”λ„ˆλ¦¬λ‘œ λ³€ν™˜
top_n_metadata_dict = {k: v for k, v in top_n_metadata}
if not dataframe:
return top_n_metadata_dict
return pd.DataFrame(top_n_metadata_dict).T
def save_session_cache(session_data: Dict) -> str:
del session_data["lida_ko"]
del session_data["selected_goal_object"] # implement goal serial and deseiralization
session_id = str(uuid.uuid4())[:8]
with open(f'./data/session_cache/{session_id}.pkl', 'wb') as f:
pickle.dump(session_data, f)
return session_id
def load_session_cache(session_id: str) -> Dict:
if os.path.exists(f'./data/session_cache/{session_id}.pkl'):
with open(f'./data/session_cache/{session_id}.pkl', 'rb') as f:
session_data = pickle.load(f)
return session_data
return {}
def add_column_metadata(data: Dict[str, Metadata]) -> Dict:
_data = data.copy()
summarizer = Summarizer()
for data_id, metadata in tqdm(_data.items(), desc=f'Adding column'):
if len(metadata['file_data']) == 0:
continue
filepath = os.getcwd() + "/" + "_csv_data/" + metadata['file_data'][0]['filename']
if filepath.lower().endswith('.csv'):
df = pd.read_csv(filepath)
elif filepath.lower().endswith('.json'):
df = pd.read_json(filepath)
column_properties = summarizer.get_column_properties(df, 5)
metadata['column_properties'] = column_properties
def convert_timestamps_to_strings(data: Union[Dict[str, Any], list, Any]) -> Union[Dict[str, Any], list, Any]:
if isinstance(data, dict):
for key, value in data.items():
data[key] = convert_timestamps_to_strings(value)
elif isinstance(data, list):
for i in range(len(data)):
data[i] = convert_timestamps_to_strings(data[i])
elif isinstance(data, datetime):
return data.isoformat()
return data
with open('./data/data_with_column_metadata.json', 'w', encoding='utf-8') as f:
converted_data = convert_timestamps_to_strings(_data)
json.dump(converted_data, f, ensure_ascii=False, indent=4)
def augment_data_with_llm(data: Dict[str, Metadata]) -> Dict:
SYSTEM_PROMPT = """
당신은 λΆ€μ‚°κ΄‘μ—­μ‹œμ˜ 곡곡 데이터λ₯Ό λΆ„μ„ν•˜κ³  μ€‘μš”ν•œ μΈμ‚¬μ΄νŠΈλ₯Ό μƒμ„±ν•˜λŠ” AIμž…λ‹ˆλ‹€. μ£Όμ–΄μ§„ 데이터 사양을 λ©΄λ°€νžˆ κ²€μ‚¬ν•˜κ³ , μ€‘μš”ν•œ μΈμ‚¬μ΄νŠΈ 10κ°œμ™€ μ‚¬λžŒλ“€μ΄ 데이터에 λŒ€ν•΄ κ°€μž₯ μ•Œκ³  μ‹Άμ–΄ν•  질문 10개λ₯Ό μƒμ„±ν•˜λŠ” 것이 μ£Όμš” μž„λ¬΄μž…λ‹ˆλ‹€. λ‹€μŒ 지침을 μ‹ μ€‘νžˆ λ”°λ₯΄μ„Έμš”:
1. 데이터 뢄석: 제곡된 데이터 사양을 μžμ„Ένžˆ κ²€μ‚¬ν•©λ‹ˆλ‹€.
2. μΈμ‚¬μ΄νŠΈ 생성: λ°μ΄ν„°μ—μ„œ κ°€μž₯ μ€‘μš”ν•œ μΈμ‚¬μ΄νŠΈ 10개λ₯Ό μ‹λ³„ν•˜κ³  λͺ…ν™•ν•˜κ²Œ μ„€λͺ…ν•©λ‹ˆλ‹€. μ΄λŸ¬ν•œ μΈμ‚¬μ΄νŠΈλŠ” λ°μ΄ν„°μ˜ μ€‘μš”ν•œ νŒ¨ν„΄, νŠΈλ Œλ“œ, μ΄μƒμΉ˜ λ˜λŠ” μ£Όμš” 정보λ₯Ό λ°˜μ˜ν•΄μ•Ό ν•©λ‹ˆλ‹€.
3. 질문 예츑: μ‚¬λžŒλ“€μ΄ 데이터에 λŒ€ν•΄ κ°€μž₯ μ•Œκ³  μ‹Άμ–΄ν•  λ§Œν•œ 질문 10개λ₯Ό μ˜ˆμΈ‘ν•˜κ³  λ‚˜μ—΄ν•©λ‹ˆλ‹€. 이 μ§ˆλ¬Έλ“€μ€ λ°μ΄ν„°μ˜ λ§₯락에 맞고 일반적인 μ‚¬μš©μž κ΄€μ‹¬μ‚¬λ‚˜ 우렀λ₯Ό λ°˜μ˜ν•΄μ•Ό ν•©λ‹ˆλ‹€.
좜λ ₯은 μ•„λž˜μ˜ JSON ν˜•μ‹μ„ μ—„κ²©νžˆ 따라야 ν•©λ‹ˆλ‹€.
좜λ ₯ λ‚΄μš©μ— ,κ°€ λ“€μ–΄κ°€λŠ” 경우 \\,둜 μ΄μŠ€μΌ€μ΄ν”„ μ²˜λ¦¬ν•΄μ•Ό ν•©λ‹ˆλ‹€.
{
"insights": [insight1, insight2, ...],
"questions": [question1, question2, ...]
}
"""
def save_cache(cache, filepath='./data/id_to_metadata_col_aug.json'):
with open(filepath, 'w', encoding='utf-8') as f:
json.dump(cache, f, ensure_ascii=False, indent=4)
print(f"Saved {len(cache)} data")
_data = data.copy()
count = 0
client = OpenAI(
api_key=os.getenv("OPENAI_API_KEY")
)
try:
with open('./data/id_to_metadata_col_aug.json', 'r') as f:
cache = json.load(f)
except FileNotFoundError:
cache = {}
total_token = 0
skipped = 0
with tqdm(total=len(_data)) as pbar:
for k, v in _data.items():
if k in cache:
print(f"Skipping {k}")
skipped += 1
pbar.set_postfix({'total_token': total_token,
'skipped': skipped,})
pbar.update(1)
count += 1
continue
cache[k] = v
count += 1
metadata_to_str = ""
USER_PROMPT = f"""μ—¬κΈ° λ°μ΄ν„°μ˜ μ„ΈλΆ€ 사항이 μžˆμ–΄ μ§€μ‹œλ₯Ό μΆ©μ‹€νžˆ λ”°λΌμ€˜\n\n
{json.dumps(v, ensure_ascii=False, indent=4)}"""
try:
response = client.chat.completions.create(
model='gpt-3.5-turbo-16k',
messages=[
{"role": "system", "content": SYSTEM_PROMPT},
{"role": "user", "content": USER_PROMPT}
]
)
result = response.choices[0].message.content
total_token += int(response.usage.total_tokens)
pbar.set_postfix({'total_token': total_token,
'skipped': skipped,})
pbar.update(1)
result_json = json.loads(result)
v['insights'] = result_json['insights']
v['questions'] = result_json['questions']
except Exception as e:
print(f"Error processing key {k}: {e}")
continue
# print('Token usage', response.usage)
# print()
if count % 10 == 0:
save_cache(cache)
save_cache(cache)
def convert_timestamps_to_strings(data):
if isinstance(data, dict):
for key, value in data.items():
if isinstance(value, dict):
convert_timestamps_to_strings(value)
elif isinstance(value, list):
for item in value:
convert_timestamps_to_strings(item)
elif isinstance(value, datetime):
data[key] = value.isoformat()
elif isinstance(value, (datetime,)):
data[key] = value.isoformat()
elif isinstance(data, list):
for item in data:
convert_timestamps_to_strings(item)
return data
def dict_to_string(data):
# Convert all datetime objects to strings
data = convert_timestamps_to_strings(data)
# Convert the dictionary to a JSON string
return json.dumps(data, ensure_ascii=False, indent=2)