|
|
|
|
|
|
|
|
import torch |
|
|
import cryptography |
|
|
import cryptography.fernet |
|
|
from flopth import flopth |
|
|
import huggingface_hub |
|
|
import huggingface_hub.hf_api |
|
|
import gradio |
|
|
import openai |
|
|
|
|
|
import json |
|
|
import requests |
|
|
import time |
|
|
import os |
|
|
import random |
|
|
import re |
|
|
import sys |
|
|
import psutil |
|
|
import threading |
|
|
import socket |
|
|
import PIL |
|
|
import pandas |
|
|
import matplotlib |
|
|
import numpy |
|
|
import importlib.metadata |
|
|
import types |
|
|
import cpuinfo |
|
|
import pynvml |
|
|
import pathlib |
|
|
import re |
|
|
import subprocess |
|
|
|
|
|
class Pluto_Happy(object): |
|
|
""" |
|
|
The Pluto projects starts with fun AI hackings and become a part of my |
|
|
first book "Data Augmentation with Python" with Packt Publishing. |
|
|
In particular, Pluto_Happy is a clean and lite kernel of a simple class, |
|
|
and using @add_module decoractor to add in specific methods to be a new class, |
|
|
such as Pluto_HFace with a lot more function on HuggingFace, LLM and Transformers. |
|
|
Args: |
|
|
name (str): the display name, e.g. "Hanna the seeker" |
|
|
Returns: |
|
|
(object): the class instance. |
|
|
""" |
|
|
|
|
|
|
|
|
def __init__(self, name="Pluto",*args, **kwargs): |
|
|
super(Pluto_Happy, self).__init__(*args, **kwargs) |
|
|
self.author = "Duc Haba" |
|
|
self.name = name |
|
|
self._ph() |
|
|
self._pp("Hello from class", str(self.__class__) + " Class: " + str(self.__class__.__name__)) |
|
|
self._pp("Code name", self.name) |
|
|
self._pp("Author is", self.author) |
|
|
self._ph() |
|
|
|
|
|
|
|
|
self._huggingface_crkey="gAAAAABkduT-XeiYtD41bzjLtwsLCe9y1FbHH6wZkOZwvLwCrgmOtNsFUPWVqMVG8MumazFhiUZy91mWEnLDLCFw3eKNWtOboIyON6yu4lctn6RCQ4Y9nJvx8wPyOnkzt7dm5OISgFcm" |
|
|
self._gpt_crkey="'gAAAAABkgiYGQY8ef5y192LpNgrAAZVCP3bo2za9iWSZzkyOJtc6wykLwGjFjxKFpsNryMgEhCATJSonslooNSBJFM3OcnVBz4jj_lyXPQABOCsOWqZm6W9nrZYTZkJ0uWAAGJV2B8uzQ13QZgI7VCZ12j8Q7WfrIg=='" |
|
|
self._fkey="your_key_goes_here" |
|
|
self._github_crkey="gAAAAABksjLYjRoFxZDDW5RgBN_uvm6pqDP128S2qOEfv9PgVL8fwdtXzWvCeMHwnGcibAky5cGs3XNxMH4VgbaPBA3I_CPRp3bRK3TMNU4HGRKxbdMnJ7U04IkVSdcMn8o86z3yhcSn" |
|
|
self._kaggle_crkey="gAAAAABksjOOU2a-BtZ4NV8BkmFhBzqjix7XL9DsKPrua7OaMc7t8QKGw_3Ut5wyv4NL4FHX74JFEEbmpVbsPINN7LcqLtewuyF0o0P9461PY9qLBAGy6Wr7PyE0qwDogQoDGJ1UJgPn" |
|
|
|
|
|
self.fname_id = 0 |
|
|
self.dname_img = "img_colab/" |
|
|
self.flops_per_sec_gcolab_cpu = 4887694725 |
|
|
self.flops_per_sec_gcolab_gpu = 6365360673 |
|
|
self.fname_requirements = './pluto_happy/requirements.txt' |
|
|
|
|
|
self.color_primary = '#2780e3' |
|
|
self.color_secondary = '#373a3c' |
|
|
self.color_success = '#3fb618' |
|
|
self.color_info = '#9954bb' |
|
|
self.color_warning = '#ff7518' |
|
|
self.color_danger = '#ff0039' |
|
|
self.color_mid_gray = '#495057' |
|
|
self._xkeyfile = '.xoxo' |
|
|
return |
|
|
|
|
|
|
|
|
def _pp(self, a, b,is_print=True): |
|
|
|
|
|
""" |
|
|
Pretty print output name-value line |
|
|
Args: |
|
|
a (str) : |
|
|
b (str) : |
|
|
is_print (bool): whether to print the header or footer lines to console or return a str. |
|
|
Returns: |
|
|
y : None or output as (str) |
|
|
""" |
|
|
|
|
|
x = f'{"%34s" % str(a)} : {str(b)}' |
|
|
y = None |
|
|
if (is_print): |
|
|
print(x) |
|
|
else: |
|
|
y = x |
|
|
return y |
|
|
|
|
|
|
|
|
def _ph(self,is_print=True): |
|
|
""" |
|
|
Pretty prints the header or footer lines. |
|
|
Args: |
|
|
is_print (bool): whether to print the header or footer lines to console or return a str. |
|
|
Return: |
|
|
y : None or output as (str) |
|
|
""" |
|
|
x = f'{"-"*34} : {"-"*34}' |
|
|
y = None |
|
|
if (is_print): |
|
|
print(x) |
|
|
else: |
|
|
y = x |
|
|
return y |
|
|
|
|
|
|
|
|
def fetch_hface_files(self, |
|
|
hf_names, |
|
|
hf_space="duchaba/monty", |
|
|
local_dir="/content/"): |
|
|
""" |
|
|
Given a list of huggingface file names, download them from the provided huggingface space. |
|
|
Args: |
|
|
hf_names: (list) list of huggingface file names to download |
|
|
hf_space: (str) huggingface space to download from. |
|
|
local_dir: (str) local directory to store the files. |
|
|
Returns: |
|
|
status: (bool) True if download was successful, False otherwise. |
|
|
""" |
|
|
status = True |
|
|
|
|
|
try: |
|
|
for f in hf_names: |
|
|
lo = local_dir + f |
|
|
huggingface_hub.hf_hub_download(repo_id=hf_space, |
|
|
filename=f, |
|
|
use_auth_token=True, |
|
|
repo_type=huggingface_hub.REPO_TYPE_SPACE, |
|
|
force_filename=lo) |
|
|
except: |
|
|
self._pp("*Error", f) |
|
|
status = False |
|
|
return status |
|
|
|
|
|
|
|
|
def push_hface_files(self, |
|
|
hf_names, |
|
|
hf_space="duchaba/skin_cancer_diagnose", |
|
|
local_dir="/content/"): |
|
|
|
|
|
|
|
|
""" |
|
|
Pushes files to huggingface space. |
|
|
The function takes a list of file names as a |
|
|
paramater and pushes to the provided huggingface space. |
|
|
Args: |
|
|
hf_names: list(of strings), list of file names to be pushed. |
|
|
hf_space: (str), the huggingface space to push to. |
|
|
local_dir: (str), the local directory where the files |
|
|
are stored. |
|
|
Returns: |
|
|
status: (bool) True if successfully pushed else False. |
|
|
""" |
|
|
status = True |
|
|
try: |
|
|
for f in hf_names: |
|
|
lo = local_dir + f |
|
|
huggingface_hub.upload_file( |
|
|
path_or_fileobj=lo, |
|
|
path_in_repo=f, |
|
|
repo_id=hf_space, |
|
|
repo_type=huggingface_hub.REPO_TYPE_SPACE) |
|
|
except Exception as e: |
|
|
self._pp("*Error", e) |
|
|
status = False |
|
|
return status |
|
|
|
|
|
|
|
|
def push_hface_folder(self, |
|
|
hf_folder, |
|
|
hf_space_id, |
|
|
hf_dest_folder=None): |
|
|
|
|
|
""" |
|
|
This function pushes the folder to huggingface space. |
|
|
Args: |
|
|
hf_folder: (str). The path to the folder to push. |
|
|
hf_space_id: (str). The space id to push the folder to. |
|
|
hf_dest_folder: (str). The destination folder in the space. If not specified, |
|
|
the folder name will be used as the destination folder. |
|
|
Returns: |
|
|
status: (bool) True if the folder is pushed successfully, otherwise False. |
|
|
""" |
|
|
|
|
|
status = True |
|
|
try: |
|
|
api = huggingface_hub.HfApi() |
|
|
api.upload_folder(folder_path=hf_folder, |
|
|
repo_id=hf_space_id, |
|
|
path_in_repo=hf_dest_folder, |
|
|
repo_type="space") |
|
|
except Exception as e: |
|
|
self._pp("*Error: ",e) |
|
|
status = False |
|
|
return status |
|
|
|
|
|
|
|
|
def restart_hface_periodically(self): |
|
|
|
|
|
""" |
|
|
This function restarts the huggingface space automatically in random |
|
|
periodically. |
|
|
Args: |
|
|
None |
|
|
Returns: |
|
|
None |
|
|
""" |
|
|
|
|
|
while True: |
|
|
random_time = random.randint(15800, 21600) |
|
|
time.sleep(random_time) |
|
|
os.execl(sys.executable, sys.executable, *sys.argv) |
|
|
return |
|
|
|
|
|
|
|
|
def login_hface(self, key=None): |
|
|
|
|
|
""" |
|
|
Log into HuggingFace. |
|
|
Args: |
|
|
key: (str, optional) If key is set, this key will be used to log in, |
|
|
otherwise the key will be decrypted from the key file. |
|
|
Returns: |
|
|
None |
|
|
""" |
|
|
|
|
|
if (key is None): |
|
|
x = self._decrypt_it(self._huggingface_crkey) |
|
|
else: |
|
|
x = key |
|
|
huggingface_hub.login(x, add_to_git_credential=True) |
|
|
self._ph() |
|
|
return |
|
|
|
|
|
|
|
|
def fetch_info_system(self): |
|
|
|
|
|
""" |
|
|
Fetches system information, such as CPU usage and memory usage. |
|
|
Args: |
|
|
None. |
|
|
Returns: |
|
|
s: (str) A string containing the system information. |
|
|
""" |
|
|
|
|
|
s='' |
|
|
|
|
|
cpu_usage = psutil.cpu_percent() |
|
|
|
|
|
mem = psutil.virtual_memory() |
|
|
|
|
|
mem_total_gb = mem.total / (1024 ** 3) |
|
|
mem_available_gb = mem.available / (1024 ** 3) |
|
|
mem_used_gb = mem.used / (1024 ** 3) |
|
|
|
|
|
s += f"Total memory: {mem_total_gb:.2f} GB\n" |
|
|
s += f"Available memory: {mem_available_gb:.2f} GB\n" |
|
|
|
|
|
s += f"Memory usage: {mem_used_gb/mem_total_gb:.2f}%\n" |
|
|
try: |
|
|
cpu_info = cpuinfo.get_cpu_info() |
|
|
s += f'CPU type: {cpu_info["brand_raw"]}, arch: {cpu_info["arch"]}\n' |
|
|
s += f'Number of CPU cores: {cpu_info["count"]}\n' |
|
|
s += f"CPU usage: {cpu_usage}%\n" |
|
|
s += f'Python version: {cpu_info["python_version"]}' |
|
|
except Exception as e: |
|
|
s += f'CPU type: Not accessible, Error: {e}' |
|
|
return s |
|
|
|
|
|
|
|
|
def fetch_info_gpu(self): |
|
|
|
|
|
""" |
|
|
Function to fetch GPU RAM info |
|
|
Args: |
|
|
None. |
|
|
Returns: |
|
|
s: (str) GPU RAM info in human readable format. |
|
|
""" |
|
|
|
|
|
s='' |
|
|
mtotal = 0 |
|
|
mfree = 0 |
|
|
try: |
|
|
nvml_handle = pynvml.nvmlInit() |
|
|
devices = pynvml.nvmlDeviceGetCount() |
|
|
for i in range(devices): |
|
|
device = pynvml.nvmlDeviceGetHandleByIndex(i) |
|
|
memory_info = pynvml.nvmlDeviceGetMemoryInfo(device) |
|
|
mtotal += memory_info.total |
|
|
mfree += memory_info.free |
|
|
mtotal = mtotal / 1024**3 |
|
|
mfree = mfree / 1024**3 |
|
|
|
|
|
s += f'GPU type: {torch.cuda.get_device_name(0)}\n' |
|
|
s += f'GPU ready staus: {torch.cuda.is_available()}\n' |
|
|
s += f'Number of GPUs: {devices}\n' |
|
|
s += f'Total Memory: {mtotal:.2f} GB\n' |
|
|
s += f'Free Memory: {mfree:.2f} GB\n' |
|
|
s += f'GPU allocated RAM: {round(torch.cuda.memory_allocated(0)/1024**3,2)} GB\n' |
|
|
s += f'GPU reserved RAM {round(torch.cuda.memory_reserved(0)/1024**3,2)} GB\n' |
|
|
except Exception as e: |
|
|
s += f'**Warning, No GPU: {e}' |
|
|
return s |
|
|
|
|
|
|
|
|
def fetch_info_host_ip(self): |
|
|
""" |
|
|
Function to fetch current host name and ip address |
|
|
Args: |
|
|
None. |
|
|
Returns: |
|
|
s: (str) host name and ip info in human readable format. |
|
|
""" |
|
|
s='' |
|
|
try: |
|
|
hostname = socket.gethostname() |
|
|
ip_address = socket.gethostbyname(hostname) |
|
|
s += f"Hostname: {hostname}\n" |
|
|
s += f"IP Address: {ip_address}\n" |
|
|
except Exception as e: |
|
|
s += f"**Warning, No hostname: {e}" |
|
|
return s |
|
|
|
|
|
|
|
|
def fetch_file_names(self,directory, file_extension=None): |
|
|
""" |
|
|
This function gets all the filenames with a given extension. |
|
|
Args: |
|
|
directory (str): |
|
|
directory path to scan for files in. |
|
|
file_extension (list): |
|
|
file extension to look for or "None" (default) to get all files. |
|
|
Returns: |
|
|
filenames (list): |
|
|
list of strings containing the filenames with the given extension. |
|
|
""" |
|
|
filenames = [] |
|
|
for (root, subFolders, files) in os.walk(directory): |
|
|
for fname in files: |
|
|
if (file_extension is None): |
|
|
filenames.append(os.path.join(root, fname)) |
|
|
else: |
|
|
for ext in file_extension: |
|
|
if fname.endswith(ext): |
|
|
filenames.append(os.path.join(root, fname)) |
|
|
return filenames |
|
|
|
|
|
|
|
|
def _fetch_crypt(self,has_new_key=False): |
|
|
|
|
|
""" |
|
|
This function fetches the crypto key from the file or from the |
|
|
variable created previously in the class. |
|
|
Args: |
|
|
has_new_key (bool): |
|
|
is_generate flag to indicate whether the key should be |
|
|
use as-is or fetch from the file. |
|
|
Returns: |
|
|
s (str): |
|
|
string value containing the crypto key. |
|
|
""" |
|
|
if self._fkey == 'your_key_goes_here': |
|
|
raise Exception('Cryto Key is not correct!') |
|
|
|
|
|
s=self._fkey[::-1] |
|
|
if (has_new_key): |
|
|
s=open(self._xkeyfile, "rb").read() |
|
|
self._fkey = s[::-1] |
|
|
return s |
|
|
|
|
|
|
|
|
def gen_key(self): |
|
|
""" |
|
|
This function generates a new cryto key and saves it to a file |
|
|
Args: |
|
|
None |
|
|
Returns: |
|
|
(str) crypto key |
|
|
""" |
|
|
|
|
|
key = cryptography.fernet.Fernet.generate_key() |
|
|
with open(self._xkeyfile, "wb") as key_file: |
|
|
key_file.write(key[::-1]) |
|
|
return key |
|
|
|
|
|
|
|
|
def decrypt_it(self, x): |
|
|
""" |
|
|
Decrypts the encrypted string using the stored crypto key. |
|
|
Args: |
|
|
x: (str) to be decrypted. |
|
|
Returns: |
|
|
x: (str) decrypted version of x. |
|
|
""" |
|
|
y = self._fetch_crypt() |
|
|
f = cryptography.fernet.Fernet(y) |
|
|
m = f.decrypt(x) |
|
|
return m.decode() |
|
|
|
|
|
|
|
|
def encrypt_it(self, x): |
|
|
""" |
|
|
encrypt message |
|
|
Args: |
|
|
x (str): message to encrypt |
|
|
Returns: |
|
|
str: encrypted message |
|
|
""" |
|
|
|
|
|
key = self._fetch_crypt() |
|
|
p = x.encode() |
|
|
f = cryptography.fernet.Fernet(key) |
|
|
y = f.encrypt(p) |
|
|
return y |
|
|
|
|
|
|
|
|
def _fetch_lib_import(self): |
|
|
|
|
|
""" |
|
|
This function fetches all the imported libraries that are installed. |
|
|
Args: |
|
|
None |
|
|
Returns: |
|
|
x (list): |
|
|
list of strings containing the name of the imported libraries. |
|
|
""" |
|
|
|
|
|
x = [] |
|
|
for name, val in globals().items(): |
|
|
if isinstance(val, types.ModuleType): |
|
|
x.append(val.__name__) |
|
|
x.sort() |
|
|
return x |
|
|
|
|
|
|
|
|
def _fetch_lib_version(self,lib_name): |
|
|
|
|
|
""" |
|
|
This function fetches the version of the imported libraries. |
|
|
Args: |
|
|
lib_name (list): |
|
|
list of strings containing the name of the imported libraries. |
|
|
Returns: |
|
|
val (list): |
|
|
list of strings containing the version of the imported libraries. |
|
|
""" |
|
|
|
|
|
val = [] |
|
|
for x in lib_name: |
|
|
try: |
|
|
y = importlib.metadata.version(x) |
|
|
val.append(f'{x}=={y}') |
|
|
except Exception as e: |
|
|
val.append(f'|{x}==unknown_*or_system') |
|
|
val.sort() |
|
|
return val |
|
|
|
|
|
|
|
|
def fetch_info_lib_import(self): |
|
|
""" |
|
|
This function fetches all the imported libraries name and version that are installed. |
|
|
Args: |
|
|
None |
|
|
Returns: |
|
|
x (list): |
|
|
list of strings containing the name and version of the imported libraries. |
|
|
""" |
|
|
x = self._fetch_lib_version(self._fetch_lib_import()) |
|
|
return x |
|
|
|
|
|
|
|
|
def write_file(self,fname, in_data): |
|
|
|
|
|
""" |
|
|
Write a file to local or cloud diskspace or append to it if it already exists. |
|
|
Args: |
|
|
fname (str): The name of the file to write. |
|
|
in_data (list): The |
|
|
This is a utility function that writes a file to disk. |
|
|
The file name and text to write are passed in as arguments. |
|
|
The file is created, the text is written to it, and then the file is closed. |
|
|
Args: |
|
|
fname (str): The name of the file to write. |
|
|
in_data (list): The text to write to the file. |
|
|
Returns: |
|
|
None |
|
|
""" |
|
|
|
|
|
if os.path.isfile(fname): |
|
|
f = open(fname, "a") |
|
|
else: |
|
|
f = open(fname, "w") |
|
|
f.writelines("\n".join(in_data)) |
|
|
f.close() |
|
|
return |
|
|
|
|
|
|
|
|
def fetch_info_flops(self,model, input_shape=(1, 3, 224, 224), device="cpu", max_epoch=1): |
|
|
|
|
|
""" |
|
|
Calculates the number of floating point operations (FLOPs). |
|
|
Args: |
|
|
model (torch.nn.Module): neural network model. |
|
|
input_shape (tuple): input tensor size. |
|
|
device (str): device to perform computation on. |
|
|
max_epoch (int): number of times |
|
|
Returns: |
|
|
(float): number of FLOPs, average from epoch, default is 1 epoch. |
|
|
(float): elapsed seconds |
|
|
(list): of string for a friendly human readable output |
|
|
""" |
|
|
|
|
|
ttm_input = torch.rand(input_shape, dtype=torch.float32, device=device) |
|
|
|
|
|
tstart = time.time() |
|
|
for i in range(max_epoch): |
|
|
flops, params = flopth(model, inputs=(ttm_input,), bare_number=True) |
|
|
tend = time.time() |
|
|
etime = (tend - tstart)/max_epoch |
|
|
|
|
|
|
|
|
valstr = [] |
|
|
valstr.append(f'Tensors device: {device}') |
|
|
valstr.append(f'flops: {flops:,}') |
|
|
valstr.append(f'params: {params:,}') |
|
|
valstr.append(f'epoch: {max_epoch}') |
|
|
valstr.append(f'sec: {etime}') |
|
|
|
|
|
x = flops/etime |
|
|
y = (x/10**15)*86400 |
|
|
valstr.append(f'Flops/s: {x:,}') |
|
|
valstr.append(f'PetaFlops/s: {x/10**15}') |
|
|
valstr.append(f'PetaFlops/day: {y}') |
|
|
valstr.append(f'1 PetaFlopsDay (on this system will take): {round(1/y, 2):,.2f} days') |
|
|
return flops, etime, valstr |
|
|
|
|
|
def print_petaflops(self): |
|
|
|
|
|
""" |
|
|
Prints the flops and peta-flops-day calculation. |
|
|
**WARING**: This method will break/interfer with Stable Diffusion use of LoRA. |
|
|
I can't debug why yet. |
|
|
Args: |
|
|
None |
|
|
Returns: |
|
|
None |
|
|
""" |
|
|
self._pp('Model', 'TTM, Tiny Torch Model on: CPU') |
|
|
mtoy = TTM() |
|
|
|
|
|
dev = torch.device("cuda:0") |
|
|
a,b,c = self.fetch_info_flops(mtoy) |
|
|
y = round((a/b)/self.flops_per_sec_gcolab_cpu * 100, 2) |
|
|
self._pp('Flops', f'{a:,} flops') |
|
|
self._pp('Total elapse time', f'{b:,} seconds') |
|
|
self._pp('Flops compared', f'{y:,}% of Google Colab Pro') |
|
|
for i, val in enumerate(c): |
|
|
self._pp(f'Info {i}', val) |
|
|
self._ph() |
|
|
|
|
|
try: |
|
|
self._pp('Model', 'TTM, Tiny Torch Model on: GPU') |
|
|
dev = torch.device("cuda:0") |
|
|
a2,b2,c2 = self.fetch_info_flops(mtoy, device=dev) |
|
|
y2 = round((a2/b2)/self.flops_per_sec_gcolab_gpu * 100, 2) |
|
|
self._pp('Flops', f'{a2:,} flops') |
|
|
self._pp('Total elapse time', f'{b2:,} seconds') |
|
|
self._pp('Flops compared', f'{y2:,}% of Google Colab Pro') |
|
|
d2 = round(((a2/b2)/(a/b))*100, 2) |
|
|
self._pp('Flops GPU compared', f'{d2:,}% of CPU (or {round(d2-100,2):,}% faster)') |
|
|
for i, val in enumerate(c2): |
|
|
self._pp(f'Info {i}', val) |
|
|
except Exception as e: |
|
|
self._pp('Error', e) |
|
|
self._ph() |
|
|
return |
|
|
|
|
|
|
|
|
def fetch_installed_libraries(self): |
|
|
""" |
|
|
Retrieves and prints the names and versions of Python libraries installed by the user, |
|
|
excluding the standard libraries. |
|
|
Args: |
|
|
----- |
|
|
None |
|
|
Returns: |
|
|
-------- |
|
|
dictionary: (dict) |
|
|
A dictionary where keys are the names of the libraries and values are their respective versions. |
|
|
Examples: |
|
|
--------- |
|
|
libraries = get_installed_libraries() |
|
|
for name, version in libraries.items(): |
|
|
print(f"{name}: {version}") |
|
|
""" |
|
|
|
|
|
|
|
|
result = subprocess.run(['pip', 'freeze'], stdout=subprocess.PIPE) |
|
|
|
|
|
|
|
|
packages = result.stdout.decode('utf-8').splitlines() |
|
|
|
|
|
|
|
|
installed_libraries = {} |
|
|
for package in packages: |
|
|
try: |
|
|
name, version = package.split('==') |
|
|
installed_libraries[name] = version |
|
|
except Exception as e: |
|
|
|
|
|
pass |
|
|
return installed_libraries |
|
|
|
|
|
|
|
|
def fetch_match_file_dict(self, file_path, reference_dict): |
|
|
""" |
|
|
Reads a file from the disk, creates an array with each line as an item, |
|
|
and checks if each line exists as a key in the provided dictionary. If it exists, |
|
|
the associated value from the dictionary is also returned. |
|
|
Parameters: |
|
|
----------- |
|
|
file_path: str |
|
|
Path to the file to be read. |
|
|
reference_dict: dict |
|
|
Dictionary against which the file content (each line) will be checked. |
|
|
Returns: |
|
|
-------- |
|
|
dict: |
|
|
A dictionary where keys are the lines from the file and values are either |
|
|
the associated values from the reference dictionary or None if the key |
|
|
doesn't exist in the dictionary. |
|
|
Raises: |
|
|
------- |
|
|
FileNotFoundError: |
|
|
If the provided file path does not exist. |
|
|
""" |
|
|
|
|
|
if not os.path.exists(file_path): |
|
|
raise FileNotFoundError(f"The file at {file_path} does not exist.") |
|
|
|
|
|
with open(file_path, 'r') as file: |
|
|
lines = file.readlines() |
|
|
|
|
|
|
|
|
|
|
|
results = {line.strip(): reference_dict.get(line.strip().replace('_', '-'), None) for line in lines} |
|
|
|
|
|
return results |
|
|
|
|
|
def print_info_self(self): |
|
|
|
|
|
""" |
|
|
Prints information about the model/myself. |
|
|
Args: |
|
|
None |
|
|
Returns: |
|
|
None |
|
|
""" |
|
|
|
|
|
self._ph() |
|
|
self._pp("Hello, I am", self.name) |
|
|
self._pp("I will display", "Python, Jupyter, and system info.") |
|
|
self._pp("For complete doc type", "help(pluto) ...or help(your_object_name)") |
|
|
self._pp('.','.') |
|
|
self._pp("...", "Β―\_(γ)_/Β―") |
|
|
self._ph() |
|
|
|
|
|
self._pp('System', 'Info') |
|
|
x = self.fetch_info_system() |
|
|
print(x) |
|
|
self._ph() |
|
|
|
|
|
self._pp('GPU', 'Info') |
|
|
x = self.fetch_info_gpu() |
|
|
print(x) |
|
|
self._ph() |
|
|
|
|
|
self._pp('Installed lib from', self.fname_requirements) |
|
|
self._ph() |
|
|
x = self.fetch_match_file_dict(self.fname_requirements, self.fetch_installed_libraries()) |
|
|
for item, value in x.items(): |
|
|
self._pp(f'{item} version', value) |
|
|
self._ph() |
|
|
self._pp('Standard lib from', 'System') |
|
|
self._ph() |
|
|
self._pp('matplotlib version', matplotlib.__version__) |
|
|
self._pp('numpy version', numpy.__version__) |
|
|
self._pp('pandas version',pandas.__version__) |
|
|
self._pp('PIL version', PIL.__version__) |
|
|
self._pp('torch version', torch.__version__) |
|
|
self._ph() |
|
|
|
|
|
self._pp('Host', 'Info') |
|
|
x = self.fetch_info_host_ip() |
|
|
print(x) |
|
|
self._ph() |
|
|
|
|
|
return |
|
|
|
|
|
|
|
|
|
|
|
class TTM(torch.nn.Module): |
|
|
|
|
|
""" |
|
|
Tiny Torch Model (TTM) |
|
|
This is a toy model consisting of four convolutional layers. |
|
|
Args: |
|
|
input_shape (tuple): input tensor size. |
|
|
Returns: |
|
|
(tensor): output of the model. |
|
|
""" |
|
|
|
|
|
def __init__(self, input_shape=(1, 3, 224, 224)): |
|
|
super(TTM, self).__init__() |
|
|
self.conv1 = torch.nn.Conv2d(3, 3, kernel_size=3, padding=1) |
|
|
self.conv2 = torch.nn.Conv2d(3, 3, kernel_size=3, padding=1) |
|
|
self.conv3 = torch.nn.Conv2d(3, 3, kernel_size=3, padding=1) |
|
|
self.conv4 = torch.nn.Conv2d(3, 3, kernel_size=3, padding=1) |
|
|
|
|
|
def forward(self, x1): |
|
|
x1 = self.conv1(x1) |
|
|
x1 = self.conv2(x1) |
|
|
x1 = self.conv3(x1) |
|
|
x1 = self.conv4(x1) |
|
|
return x1 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import functools |
|
|
def add_method(cls): |
|
|
def decorator(func): |
|
|
@functools.wraps(func) |
|
|
def wrapper(*args, **kwargs): |
|
|
return func(*args, **kwargs) |
|
|
setattr(cls, func.__name__, wrapper) |
|
|
return func |
|
|
return decorator |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
monty = Pluto_Happy('Monty, Monty Said!') |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
fname = 'toxic_data.csv' |
|
|
monty.df_toxic_data = pandas.read_csv(fname) |
|
|
|
|
|
|
|
|
import os |
|
|
monty._openai_key=os.getenv('openai_key') |
|
|
monty._github_key=os.getenv('github_key') |
|
|
monty._huggingface_key=os.getenv('huggingface_key') |
|
|
monty._kaggle_key=os.getenv('kaggle_key') |
|
|
|
|
|
|
|
|
@add_method(Pluto_Happy) |
|
|
|
|
|
def _fetch_moderate_engine(self): |
|
|
self.ai_client = openai.OpenAI(api_key=self._openai_key) |
|
|
self.text_model = "text-moderation-latest" |
|
|
return |
|
|
|
|
|
@add_method(Pluto_Happy) |
|
|
|
|
|
def _censor_me(self, p, safer=0.0005): |
|
|
self._fetch_moderate_engine() |
|
|
resp_orig = self.ai_client.moderations.create(input=p, model=self.text_model) |
|
|
resp_dict = resp_orig.model_dump() |
|
|
|
|
|
v1 = resp_dict["results"][0]["category_scores"] |
|
|
v1 = {key: value if value is not None else 0 for key, value in v1.items()} |
|
|
print(f'resp_dic: {resp_dict}') |
|
|
print(f'v1: {v1}') |
|
|
|
|
|
max_key = max(v1, key=v1.get) |
|
|
max_value = v1[max_key] |
|
|
sum_value = sum(v1.values()) |
|
|
|
|
|
v1["is_safer_flagged"] = False |
|
|
if (max_value >= safer): |
|
|
v1["is_safer_flagged"] = True |
|
|
v1["is_flagged"] = resp_dict["results"][0]["flagged"] |
|
|
v1['max_key'] = max_key |
|
|
v1['max_value'] = max_value |
|
|
v1['sum_value'] = sum_value |
|
|
v1['safer_value'] = safer |
|
|
v1['message'] = p |
|
|
return v1 |
|
|
|
|
|
@add_method(Pluto_Happy) |
|
|
def _draw_censor(self,data): |
|
|
self._color_mid_gray = '#6c757d' |
|
|
exp = (0.01, 0.01) |
|
|
x = [data['max_value'], (1-data['max_value'])] |
|
|
title=f"\nUnsafe: {data['max_key']}: {(data['max_value']*100):.2f}% Confidence\n" |
|
|
lab = [data['max_key'], 'Other 13 categories'] |
|
|
if (data['is_flagged']): |
|
|
col=[self.color_danger, self.color_mid_gray] |
|
|
elif (data['is_safer_flagged']): |
|
|
col=[self.color_warning, self.color_mid_gray] |
|
|
lab = ['Relative Score:\n'+data['max_key'], 'Other 13 categories'] |
|
|
title=f"\nPersonal Unsafe: {data['max_key']}: {(data['max_value']*100):.2f}% Confidence\n" |
|
|
else: |
|
|
col=[self.color_mid_gray, self.color_success] |
|
|
lab = ['False Negative:\n'+data['max_key'], 'Other 13 categories'] |
|
|
title='\nSafe Message\n' |
|
|
canvas = self._draw_donut(x, lab, col, exp,title) |
|
|
return canvas |
|
|
|
|
|
@add_method(Pluto_Happy) |
|
|
def _draw_donut(self,data,labels,col, exp,title): |
|
|
|
|
|
|
|
|
|
|
|
canvas, pic = matplotlib.pyplot.subplots() |
|
|
pic.pie(data, explode=exp, |
|
|
labels=labels, |
|
|
colors=col, |
|
|
autopct='%1.1f%%', |
|
|
startangle=90, |
|
|
textprops={'color':'#0a0a0a'}) |
|
|
|
|
|
|
|
|
centre_circle = matplotlib.pyplot.Circle((0,0),0.45,fc=col[0],linewidth=2, ec='white') |
|
|
canvas = matplotlib.pyplot.gcf() |
|
|
canvas.gca().add_artist(centre_circle) |
|
|
|
|
|
|
|
|
pic.axis('equal') |
|
|
pic.set_title(title) |
|
|
canvas.tight_layout() |
|
|
|
|
|
return canvas |
|
|
|
|
|
@add_method(Pluto_Happy) |
|
|
|
|
|
def fetch_toxicity_level(self, msg, safer): |
|
|
|
|
|
yjson = self._censor_me(msg,safer) |
|
|
_canvas = self._draw_censor(yjson) |
|
|
_yjson = json.dumps(yjson, indent=4) |
|
|
return (_canvas, _yjson) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import random |
|
|
def say_hello(val): |
|
|
return f"Hello: {val}" |
|
|
def say_toxic(): |
|
|
return f"I am toxic" |
|
|
def fetch_toxic_tweets(maxi=2): |
|
|
sample_df = monty.df_toxic_data.sample(maxi) |
|
|
is_true = random.choice([True, False]) |
|
|
c1 = "more_toxic" |
|
|
if is_true: |
|
|
c1 = "less_toxic" |
|
|
toxic1 = sample_df[c1].iloc[0] |
|
|
|
|
|
return sample_df.to_html(index=False), toxic1 |
|
|
|
|
|
|
|
|
|
|
|
in1 = gradio.Textbox(lines=3, label="Enter Text:") |
|
|
in2 = gradio.Slider(0.005, .1, value=0.02, step=.005,label="Personalize Safer Value: (larger value is less safe)") |
|
|
out1 = gradio.Plot(label="Output:") |
|
|
out2 = gradio.HTML(label="Real-world Toxic Posts/Tweets: *WARNING") |
|
|
out3 = gradio.Textbox(lines=5, label="Output JSON:") |
|
|
but1 = gradio.Button("Measure 14 Toxicity", variant="primary",size="sm") |
|
|
but2 = gradio.Button("Fetch Toxic Text", variant="stop", size="sm") |
|
|
|
|
|
txt1 = """ |
|
|
# π Welcome To The Friendly Text Moderation |
|
|
### Identify 14 categories of text toxicity. |
|
|
> This NLP (Natural Language Processing) AI demonstration aims to prevent profanity, vulgarity, hate speech, violence, sexism, and other offensive language. |
|
|
>It is **not an act of censorship**, as the final UI (User Interface) will give the reader, but not a young reader, the option to click on a label to read the toxic message. |
|
|
>The goal is to create a safer and more respectful environment for you, your colleages, and your family. |
|
|
> This NLP app is 1 of 3 hands-on courses, ["AI Solution Architect," from ELVTR and Duc Haba](https://elvtr.com/course/ai-solution-architect?utm_source=instructor&utm_campaign=AISA&utm_content=linkedin). |
|
|
--- |
|
|
### π΄ Helpful Instruction: |
|
|
1. Enter your [harmful] message in the input box. |
|
|
2. Click the "Measure 14 Toxicity" button. |
|
|
3. View the result on the Donut plot. |
|
|
4. (**Optional**) Click on the "Fetch Real World Toxic Dataset" below. |
|
|
5. There are additional options and notes below. |
|
|
""" |
|
|
txt2 = """ |
|
|
## π» Author and Developer Notes: |
|
|
--- |
|
|
- The demo uses the cutting-edge (2024) AI Natural Language Processing (NLP) model from OpenAI. |
|
|
- This NLP app is 1 of 3 hands-on apps from the ["AI Solution Architect," from ELVTR and Duc Haba](https://elvtr.com/course/ai-solution-architect?utm_source=instructor&utm_campaign=AISA&utm_content=linkedin). |
|
|
- It is not a Generative (GenAI) model, such as Google Gemini or GPT-4. |
|
|
- The NLP understands the message context, nuance, innuendo, and not just swear words. |
|
|
- We **challenge you** to trick it, i.e., write a toxic tweet or post, but our AI thinks it is safe. If you win, please send us your message. |
|
|
- The 14 toxicity categories are as follows: |
|
|
1. harassment |
|
|
2. harassment threatening |
|
|
3. harassment instructions |
|
|
4. hate |
|
|
5. hate threatening |
|
|
6. hate instructions |
|
|
7. self harm |
|
|
8. self harm instructions |
|
|
9. self harm intent |
|
|
10. self harm minor |
|
|
11. sexual |
|
|
12. sexual minors |
|
|
13. violence |
|
|
14. violence graphic |
|
|
- If the NLP model classifies the message as "safe," you can still limit the level of toxicity by using the "Personal Safe" slider. |
|
|
- The smaller the personal-safe value, the stricter the limitation. It means that if you're a young or sensitive adult, you should choose a lower personal-safe value, less than 0.02, to ensure you're not exposed to harmful content. |
|
|
- The color of the donut plot is as follows: |
|
|
- Red is an "unsafe" message by the NLP model |
|
|
- Green is a "safe" message |
|
|
- Yellow is an "unsafe" message by your toxicity level |
|
|
- The **"confidence"** score refers to the confidence level in detecting a particular type of toxicity among the 14 tracked types. For instance, if the confidence score is 90%, it indicates a 90% chance that the toxicity detected is of that particular type. In comparison, the remaining 13 toxicities collectively have a 10% chance of being the detected toxicity. Conversely, if the confidence score is 3%, it could indicate any toxicity. It's worth noting that the Red, Green, or Yellow safety levels do not influence the confidence score. |
|
|
- The real-world dataset is from the Jigsaw Rate Severity of Toxic Comments on Kaggle. It has 30,108 records. |
|
|
- Citation: |
|
|
- Ian Kivlichan, Jeffrey Sorensen, Lucas Dixon, Lucy Vasserman, Meghan Graham, Tin Acosta, Walter Reade. (2021). Jigsaw Rate Severity of Toxic Comments . Kaggle. https://kaggle.com/competitions/jigsaw-toxic-severity-rating |
|
|
- The intent is to share with Duc's friends and colleagues, but for those with nefarious intent, this Text Moderation model is governed by the GNU 3.0 License: https://www.gnu.org/licenses/gpl-3.0.en.html |
|
|
- Author: Copyright (C), 2024 **[Duc Haba](https://linkedin.com/in/duchaba)** |
|
|
--- |
|
|
# π "AI Solution Architect" Course by ELVTR |
|
|
>Welcome to the fascinating world of AI and natural language processing (NLP). This NLP model is a part of one of three hands-on application. In our journey together, we will explore the [AI Solution Architect](https://elvtr.com/course/ai-solution-architect?utm_source=instructor&utm_campaign=AISA&utm_content=linkedin) course, meticulously crafted by ELVTR in collaboration with Duc Haba. This course is intended to serve as your gateway into the dynamic and constantly evolving field of AI Solution Architect, providing you with a comprehensive understanding of its complexities and applications. |
|
|
>An AI Solution Architect (AISA) is a mastermind who possesses a deep understanding of the complex technicalities of AI and knows how to creatively integrate them into real-world solutions. They bridge the gap between theoretical AI models and practical, effective applications. AISA works as a strategist to design AI systems that align with business objectives and technical requirements. They delve into algorithms, data structures, and computational theories to translate them into tangible, impactful AI solutions that have the potential to revolutionize industries. |
|
|
> π [Sign up for the course today](https://elvtr.com/course/ai-solution-architect?utm_source=instructor&utm_campaign=AISA&utm_content=linkedin), and I will see you in class. |
|
|
- An article about this NLP Text Moderation will be coming soon. |
|
|
""" |
|
|
txt3 = """ |
|
|
## π₯ WARNING: WARNING: |
|
|
--- |
|
|
- The following button will retrieve **real-world** offensive posts from Twitter and customer reviews from consumer companies. |
|
|
- The button will display four toxic messages at a time. **Click again** for four more randomly selected postings/tweets. |
|
|
- They contain **profanity, vulgarity, hate, violence, sexism, and other offensive language.** |
|
|
- After you fetch the toxic messages, Click on the **"Measure 14 Toxicity" button**. |
|
|
""" |
|
|
|
|
|
|
|
|
|
|
|
with gradio.Blocks() as gradio_app: |
|
|
|
|
|
gradio.Markdown(txt1) |
|
|
|
|
|
|
|
|
with gradio.Row(): |
|
|
|
|
|
with gradio.Column(scale=1): |
|
|
|
|
|
in1.render() |
|
|
in2.render() |
|
|
but1.render() |
|
|
out3.render() |
|
|
but1.click(monty.fetch_toxicity_level, inputs=[in1, in2], outputs=[out1,out3]) |
|
|
|
|
|
with gradio.Column(scale=2): |
|
|
out1.render() |
|
|
|
|
|
|
|
|
with gradio.Row(): |
|
|
gradio.Markdown(txt3) |
|
|
|
|
|
|
|
|
with gradio.Row(): |
|
|
with gradio.Column(scale=1): |
|
|
but2.render() |
|
|
but2.click(fetch_toxic_tweets, inputs=None, outputs=[out2, in1]) |
|
|
with gradio.Column(scale=2): |
|
|
out2.render() |
|
|
|
|
|
|
|
|
with gradio.Row(): |
|
|
gradio.Markdown(txt2) |
|
|
|
|
|
|
|
|
|
|
|
gradio_app.launch() |