|
|
from typing import Union, NamedTuple |
|
|
from collections.abc import Callable |
|
|
import io,os,re,sys,math,time,uuid,ctypes,pickle,random,shutil,string,urllib,decimal,datetime,itertools,traceback,collections,statistics |
|
|
import numpy as np, pandas as pd |
|
|
import plotly.express as px |
|
|
import huggingface_hub |
|
|
|
|
|
import sklearn |
|
|
from sklearn import svm, neighbors, naive_bayes, neural_network, tree, ensemble, linear_model, discriminant_analysis, gaussian_process, manifold, cluster |
|
|
|
|
|
|
|
|
os.makedirs(".temp", exist_ok=True) |
|
|
|
|
|
""" remove decoration and popup menu button at top """ |
|
|
STYLE_CORRECTION = " ".join([ |
|
|
"<style>", |
|
|
"header[data-testid='stHeader'] { display:none }", |
|
|
"div[data-testid='stSidebarHeader'] { display:none }", |
|
|
"div[data-testid='stAppViewBlockContainer'] { padding:1em }", |
|
|
"div[data-testid='collapsedControl'] { background-color:#EEE }", |
|
|
"a[href='https://streamlit.io/cloud'] { display:none }" |
|
|
"</style>" |
|
|
]) |
|
|
|
|
|
|
|
|
def pandas_info(df: pd.DataFrame) -> Union[pd.DataFrame,str]: |
|
|
buffer = io.StringIO() |
|
|
df.info(buf=buffer) |
|
|
str_info = buffer.getvalue() |
|
|
try: |
|
|
lines = str_info.splitlines() |
|
|
df = (pd.DataFrame([x.split() for x in lines[5:-2]], columns=lines[3].split()).drop('Count',axis=1).rename(columns={'Non-Null':'Non-Null Count'})) |
|
|
return df |
|
|
except Exception as ex: |
|
|
print(ex) |
|
|
return str_info |
|
|
|
|
|
|
|
|
def pandas_random_dataframe(n_cols:int = 15, n_rows:int = 100) -> pd.DataFrame: |
|
|
""" create random dataframe - случайные числа, для отладки например """ |
|
|
df = pd.DataFrame(np.random.randn(n_rows, n_cols), columns=(f"col {i}" for i in range(n_cols))) |
|
|
return df |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class HfRepo(NamedTuple): |
|
|
repo_id: str |
|
|
repo_type: str |
|
|
get_token: Callable[[], str] |
|
|
|
|
|
class HF_tools: |
|
|
""" Huggingface tools """ |
|
|
|
|
|
def list_models_spaces(get_token: Callable[[], str], author = 'f64k'): |
|
|
""" list models and spaces """ |
|
|
api = huggingface_hub.HfApi(token=get_token()) |
|
|
|
|
|
models = api.list_models(author=author) |
|
|
datasets = api.list_datasets(author=author) |
|
|
lstResult = list(datasets) + list(models) |
|
|
lstResult = [ {"id": i.id, "type": type(i).__name__, "private": i.private, "tags": i.tags} for i in lstResult] |
|
|
return lstResult |
|
|
|
|
|
def save_dataframe_to_hf(repo: HfRepo, dfToSave: pd.DataFrame, new_filename: str, remote_subdir: str) -> Union[huggingface_hub.CommitInfo, Exception]: |
|
|
""" save dataframe to hf repo """ |
|
|
try: |
|
|
local_filename = os.path.join(".temp", new_filename) |
|
|
|
|
|
dfToSave.to_csv(local_filename, index=False, sep=";", encoding="utf-8") |
|
|
apiHF = huggingface_hub.HfApi(token=repo.get_token()) |
|
|
path_in_repo = os.path.basename(local_filename) |
|
|
if remote_subdir: |
|
|
path_in_repo = f"{remote_subdir}/{path_in_repo}" |
|
|
commit_info = apiHF.upload_file(path_or_fileobj=local_filename, path_in_repo=path_in_repo, repo_id=repo.repo_id, repo_type=repo.repo_type) |
|
|
return commit_info |
|
|
except Exception as exSave: |
|
|
return exSave |
|
|
|
|
|
def load_dataframes_from_hf(repo: HfRepo, lstCsvFiles: list[str] = []) -> {str, pd.DataFrame}: |
|
|
""" load dataframes from hf """ |
|
|
|
|
|
dict_res = {} |
|
|
for fl_name in lstCsvFiles: |
|
|
try: file_loaded = huggingface_hub.hf_hub_download(filename=fl_name, repo_id=repo.repo_id, repo_type=repo.repo_type, token=repo.get_token()) |
|
|
except: file_loaded = "" |
|
|
if os.path.exists(file_loaded): |
|
|
compress = "zip" if file_loaded.lower().endswith("zip") else None |
|
|
df_loaded = pd.read_csv(file_loaded, sep=";", encoding = "utf-8", compression=compress) |
|
|
dict_res[fl_name] = df_loaded |
|
|
return dict_res |
|
|
|
|
|
def list_files_hf(repo: HfRepo) -> list[str]: |
|
|
""" List CSV and ZIP files in HF repo - список CSV и ZIP файлов (c уровнем вложенности) в репозитории """ |
|
|
|
|
|
fs = huggingface_hub.HfFileSystem(token=repo.get_token(), use_listings_cache=False) |
|
|
path_hf = f"{repo.repo_type}s/{repo.repo_id}/" |
|
|
|
|
|
lstGlob = fs.glob(path_hf + "**") |
|
|
lstNames = [fname.replace(path_hf, "") for fname in lstGlob if fname.lower().endswith(".csv") or fname.lower().endswith(".zip")] |
|
|
|
|
|
return lstNames |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
RANDOM_STATE=11 |
|
|
|
|
|
class XYZV_tools: |
|
|
""" XYZV tools - для данных в специальном формате """ |
|
|
|
|
|
def df_process_v_column(df: pd.DataFrame) -> pd.DataFrame: |
|
|
""" обработка столбца V для дальнейшего удобства + столб T типа время """ |
|
|
df = df.reset_index() |
|
|
df.rename(columns = {"index": "T"}, inplace=True) |
|
|
df["Vis"] = df.V.map(lambda v: 0 if str(v)=="nan" else 1).astype(int) |
|
|
df["Vfloat"] = df.V.map(lambda v: 0 if str(v)=="nan" else str(v).replace(',', '.')).astype(float) |
|
|
df["Vsign"] = df.Vfloat.map(lambda v: -1 if v<0 else 1 if v>0 else 0).astype(int) |
|
|
df["Vposneg"] = df.Vfloat.map(lambda v: "n" if v<0 else "p" if v>0 else "o").astype(str) |
|
|
return df |
|
|
|
|
|
@staticmethod |
|
|
def CreateDictClassifiers_BestForXYZ() : |
|
|
dictFastTree = { |
|
|
|
|
|
|
|
|
"DecisionTreeClassifier": tree.DecisionTreeClassifier(random_state=RANDOM_STATE), |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
return {**dictFastTree} |
|
|
|
|
|
|
|
|
|
|
|
def GetClassifier(lstDfOriginal, nHystorySteps) : |
|
|
|
|
|
nShift = nHystorySteps |
|
|
nCurrShift = nHystorySteps |
|
|
classifierName = "DecisionTreeClassifier" |
|
|
colsVectorInp = ["X","Y","Z"] |
|
|
fieldY = "Vis" |
|
|
lstDataFrames = XYZV_tools.MakeHystoryColumns(lstDfOriginal, nShift) |
|
|
df_train = pd.concat(lstDataFrames) |
|
|
lstColsShift = [f"{c}-{i}" for i in range(1, nCurrShift+1) for c in colsVectorInp] |
|
|
colsVectorInpAll = colsVectorInp + lstColsShift |
|
|
y_train = df_train[fieldY] |
|
|
x_train_vect = df_train[colsVectorInpAll] |
|
|
dictClassifiers = XYZV_tools.CreateDictClassifiers_BestForXYZ() |
|
|
classifierObject = dictClassifiers[classifierName] |
|
|
start2 = time.time() |
|
|
classifierObject.fit(x_train_vect, y_train) |
|
|
time_elapsed = time.time() - start2 |
|
|
y_pred = classifierObject.predict(x_train_vect.values) |
|
|
df_train[f"predict_{fieldY}"] = y_pred |
|
|
return (classifierObject, df_train, time_elapsed) |
|
|
|
|
|
|
|
|
def MakeHystoryColumns(lstDfOriginal, nShift) : |
|
|
lstDataframesShifted = [df.copy() for df in lstDfOriginal] |
|
|
lstColsShift = [] |
|
|
for i in range(1, nShift+1): |
|
|
|
|
|
cols = ["X","Y","Z"] |
|
|
|
|
|
for c in cols: |
|
|
for dfShift in lstDataframesShifted: |
|
|
dfShift[f'{c}-{i}'] = dfShift[c].shift(i).fillna(0) |
|
|
lstColsShift.append(lstDataframesShifted[0].columns[-1]) |
|
|
print(lstColsShift) |
|
|
return lstDataframesShifted |
|
|
|
|
|
|
|
|
def plotly_xyzv_scatter_gray(df3D): |
|
|
""" 3D plot """ |
|
|
color_discrete_map = dict(o='rgb(230,230,230)', p='rgb(90,1,1)', n='rgb(1,1,90)') |
|
|
fig = px.scatter_3d(df3D, x='X', y='Y', z='Z', color="Vposneg", opacity=0.4, height=800, color_discrete_map=color_discrete_map) |
|
|
fig.update_scenes( |
|
|
xaxis={"gridcolor":"rgba(30, 0, 0, 0.2)","color":"rgb(100, 0, 0)","showbackground":False}, |
|
|
yaxis={"gridcolor":"rgba(0, 30, 0, 0.2)","color":"rgb(0, 100, 0)","showbackground":False}, |
|
|
zaxis={"gridcolor":"rgba(0, 0, 30, 0.2)","color":"rgb(0, 0, 100)","showbackground":False}) |
|
|
fig.update_traces(marker_size=3) |
|
|
return fig |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if False: |
|
|
if False: |
|
|
|
|
|
scaler = sklearn.preprocessing.StandardScaler() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
scale_columns = ["X","Y","Z"] |
|
|
scaledData = scaler.fit_transform(df3D[scale_columns]) |
|
|
if False: |
|
|
scaler2 = sklearn.preprocessing.Normalizer() |
|
|
scaledData = scaler2.fit_transform(scaledData) |
|
|
df3D_Scaled = pd.DataFrame(data=scaledData, columns=scale_columns) |
|
|
df3D_Scaled["Vposneg"] = df3D["Vposneg"] |
|
|
df3D = df3D_Scaled |
|
|
|
|
|
|
|
|
|
|
|
|