| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| import logging |
| from pathlib import Path |
|
|
| import numpy as np |
| import pandas as pd |
| import powerlaw |
| import streamlit as st |
| from scipy.stats import ks_2samp |
| from scipy.stats import zipf as zipf_lib |
|
|
| from .dataset_utils import CNT, PROP |
|
|
| |
|
|
| pd.set_option("use_inf_as_na", True) |
|
|
| logs = logging.getLogger(__name__) |
| logs.setLevel(logging.INFO) |
| logs.propagate = False |
|
|
| if not logs.handlers: |
|
|
| Path("./log_files").mkdir(exist_ok=True) |
|
|
| |
| file = logging.FileHandler("./log_files/zipf.log") |
| fileformat = logging.Formatter("%(asctime)s:%(message)s") |
| file.setLevel(logging.INFO) |
| file.setFormatter(fileformat) |
|
|
| |
| stream = logging.StreamHandler() |
| streamformat = logging.Formatter("[data_measurements_tool] %(message)s") |
| stream.setLevel(logging.WARNING) |
| stream.setFormatter(streamformat) |
|
|
| logs.addHandler(file) |
| logs.addHandler(stream) |
|
|
|
|
| class Zipf: |
| def __init__(self, vocab_counts_df=pd.DataFrame()): |
| self.vocab_counts_df = vocab_counts_df |
| self.alpha = None |
| self.xmin = None |
| self.xmax = None |
| self.fit = None |
| self.ranked_words = {} |
| self.uniq_counts = [] |
| self.uniq_ranks = [] |
| self.uniq_fit_counts = None |
| self.term_df = None |
| self.pvalue = None |
| self.ks_test = None |
| self.distance = None |
| self.fit = None |
| self.predicted_zipf_counts = None |
| if not self.vocab_counts_df.empty: |
| logs.info("Fitting based on input vocab counts.") |
| self.calc_fit(vocab_counts_df) |
| logs.info("Getting predicted counts.") |
| self.predicted_zipf_counts = self.calc_zipf_counts(vocab_counts_df) |
|
|
| def load(self, zipf_dict): |
| self.set_xmin(zipf_dict["xmin"]) |
| self.set_xmax(zipf_dict["xmax"]) |
| self.set_alpha(zipf_dict["alpha"]) |
| self.set_ks_distance(zipf_dict["ks_distance"]) |
| self.set_p(zipf_dict["p-value"]) |
| self.set_unique_ranks(zipf_dict["uniq_ranks"]) |
| self.set_unique_counts(zipf_dict["uniq_counts"]) |
|
|
| def calc_fit(self, vocab_counts_df): |
| """ |
| Uses the powerlaw package to fit the observed frequencies to a zipfian distribution. |
| We use the KS-distance to fit, as that seems more appropriate that MLE. |
| :param vocab_counts_df: |
| :return: |
| """ |
| self.vocab_counts_df = vocab_counts_df |
| |
| vocab_counts_df[PROP] = vocab_counts_df[CNT] / float(sum(vocab_counts_df[CNT])) |
| rank_column = vocab_counts_df[CNT].rank( |
| method="dense", numeric_only=True, ascending=False |
| ) |
| vocab_counts_df["rank"] = rank_column.astype("int64") |
| observed_counts = vocab_counts_df[CNT].values |
| |
| |
| self.fit = powerlaw.Fit(observed_counts, fit_method="KS", discrete=True) |
| |
| |
| |
| |
| |
| pdf_bin_edges, observed_pdf = self.fit.pdf(original_data=False) |
| |
| |
| theoretical_distro = self.fit.power_law |
| |
| |
| predicted_pdf = theoretical_distro.pdf() |
| |
| self.alpha = theoretical_distro.alpha |
| |
| |
| self.xmin = theoretical_distro.xmin |
| self.xmax = theoretical_distro.xmax |
| self.distance = theoretical_distro.KS() |
| self.ks_test = ks_2samp(observed_pdf, predicted_pdf) |
| self.pvalue = self.ks_test[1] |
| logs.info("KS test:") |
| logs.info(self.ks_test) |
|
|
| def set_xmax(self, xmax): |
| """ |
| xmax is usually None, so we add some handling to set it as the |
| maximum rank in the dataset. |
| :param xmax: |
| :return: |
| """ |
| if xmax: |
| self.xmax = int(xmax) |
| elif self.uniq_counts: |
| self.xmax = int(len(self.uniq_counts)) |
| elif self.uniq_ranks: |
| self.xmax = int(len(self.uniq_ranks)) |
|
|
| def get_xmax(self): |
| """ |
| :return: |
| """ |
| if not self.xmax: |
| self.set_xmax(self.xmax) |
| return self.xmax |
|
|
| def set_p(self, p): |
| self.p = int(p) |
|
|
| def get_p(self): |
| return int(self.p) |
|
|
| def set_xmin(self, xmin): |
| self.xmin = xmin |
|
|
| def get_xmin(self): |
| if self.xmin: |
| return int(self.xmin) |
| return self.xmin |
|
|
| def set_alpha(self, alpha): |
| self.alpha = float(alpha) |
|
|
| def get_alpha(self): |
| return float(self.alpha) |
|
|
| def set_ks_distance(self, distance): |
| self.distance = float(distance) |
|
|
| def get_ks_distance(self): |
| return self.distance |
|
|
| def calc_zipf_counts(self, vocab_counts_df): |
| """ |
| The fit is based on an optimal xmin (minimum rank) |
| Let's use this to make count estimates for the zipf fit, |
| by multiplying the fitted pmf value by the sum of counts above xmin. |
| :return: array of count values following the fitted pmf. |
| """ |
| |
| counts = vocab_counts_df[CNT] |
| self.uniq_counts = list(pd.unique(counts)) |
| self.uniq_ranks = list(np.arange(1, len(self.uniq_counts) + 1)) |
| logs.info(self.uniq_counts) |
| logs.info(self.xmin) |
| logs.info(self.xmax) |
| |
| xmin = self.get_xmin() |
| xmax = self.get_xmax() |
| self.uniq_fit_counts = self.uniq_counts[xmin + 1 : xmax] |
| pmf_mass = float(sum(self.uniq_fit_counts)) |
| zipf_counts = np.array( |
| [self.estimate_count(rank, pmf_mass) for rank in self.uniq_ranks] |
| ) |
| return zipf_counts |
|
|
| def estimate_count(self, rank, pmf_mass): |
| return int(round(zipf_lib.pmf(rank, self.alpha) * pmf_mass)) |
|
|
| def set_unique_ranks(self, ranks): |
| self.uniq_ranks = ranks |
|
|
| def get_unique_ranks(self): |
| return self.uniq_ranks |
|
|
| def get_unique_fit_counts(self): |
| return self.uniq_fit_counts |
|
|
| def set_unique_counts(self, counts): |
| self.uniq_counts = counts |
|
|
| def get_unique_counts(self): |
| return self.uniq_counts |
|
|
| def set_axes(self, unique_counts, unique_ranks): |
| self.uniq_counts = unique_counts |
| self.uniq_ranks = unique_ranks |
|
|
| |
| def fit_others(self, fit): |
| st.markdown( |
| "_Checking log likelihood ratio to see if the data is better explained by other well-behaved distributions..._" |
| ) |
| |
| better_distro = False |
| trunc = fit.distribution_compare("power_law", "truncated_power_law") |
| if trunc[0] < 0: |
| st.markdown("Seems a truncated power law is a better fit.") |
| better_distro = True |
|
|
| lognormal = fit.distribution_compare("power_law", "lognormal") |
| if lognormal[0] < 0: |
| st.markdown("Seems a lognormal distribution is a better fit.") |
| st.markdown("But don't panic -- that happens sometimes with language.") |
| better_distro = True |
|
|
| exponential = fit.distribution_compare("power_law", "exponential") |
| if exponential[0] < 0: |
| st.markdown("Seems an exponential distribution is a better fit. Panic.") |
| better_distro = True |
|
|
| if not better_distro: |
| st.markdown("\nSeems your data is best fit by a power law. Celebrate!!") |
|
|