File size: 7,045 Bytes
acf89de
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
532aab1
 
 
acf89de
 
 
 
 
 
532aab1
 
acf89de
 
 
 
2b7bcf8
 
 
acf89de
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1e1986f
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
import pickle
from datetime import timedelta
import re
import plotly.express as px
from plotly.subplots import make_subplots
import numpy as np
from pandas import DataFrame
import shap
import matplotlib.pyplot as plt
from huggingface_hub import HfFileSystem
import warnings
warnings.simplefilter("ignore", category=DeprecationWarning)

class DataSHAP:

    def __init__(self, file_addr, company, trim):
        self.file_addr = file_addr
        self.shap_value = self.load_file(self.file_addr)
        self.df = self.mount_df(self.shap_value)
        self.total_tokens, self.h, self.m, self.s = self.calc_performance(self.shap_value.compute_time,
                                                                        self.df['qty_tokens'])
        self.statistic = self.get_statistic(self.df)
        self.trim = trim
        self.company = company
        self.plot = self.three_plot(self.df)
        self.plot_bar, self.axis, self.rank = self.plot_rank()



    def load_file(self, file_addr):
        shap_value = 0
        fs = HfFileSystem()
        with fs.open(file_addr, 'rb') as inp:
            shap_value = pickle.load(inp)
            inp.close()
        return shap_value
    
    def join_text(self, list):
        return np.array([''.join(x) for x in list])

    def count_token(self, list):
        return np.array([len(x) for x in list])

    def calc_allscore(self, shap_value):
        base_values = DataFrame(shap_value.base_values, columns=['POSITIVE', 'NEGATIVE', 'NEUTRAL'])
        df = DataFrame()
        df['NEUTRAL'] = DataFrame(shap_value[:,:,'NEUTRAL'].values,
                                     columns=['NEUTRAL'])['NEUTRAL'].apply(lambda x: x.sum()) + base_values['NEUTRAL']
        df['POSITIVE'] = DataFrame(shap_value[:,:,'POSITIVE'].values,
                                      columns=['POSITIVE'])['POSITIVE'].apply(lambda x: x.sum()) + base_values['POSITIVE']
        df['NEGATIVE'] = DataFrame(shap_value[:,:,'NEGATIVE'].values,
                                      columns=['NEGATIVE'])['NEGATIVE'].apply(lambda x: x.sum()) + base_values['NEGATIVE']
        return df


    def mount_df(self, shap_value):
        scores      = self.calc_allscore(shap_value)
        text = self.join_text(shap_value.data)
        token_qty  = self.count_token(shap_value.data)
        df = np.stack((text, token_qty), axis=-1)
        df = np.hstack((df, scores))
        df = DataFrame(df, columns=['speech', 'qty_tokens',
                                    'neutral_score', 'positive_score', 'negative_score'])
        title_score = ['positive_score', 'negative_score', 'neutral_score']
        df = self.df_idxmax_score(df, title_score)
        return df
    

    def df_idxmax_score(self, df, title_score):
        df[title_score] = df[title_score].astype('float')
        df['tag'] = df[title_score].idxmax(axis="columns")
        df['score'] = df[title_score].max(axis='columns')
        df['tag'] = df['tag'].replace({'positive_score': 'POSITIVE', 'negative_score': 'NEGATIVE', 'neutral_score': 'NEUTRAL'})
        return df
    

    def calc_performance(self, time_s, df:DataFrame):
        total_tokens = df.astype('int64').sum()
        proc_time=timedelta(seconds=time_s)
        h,m,s = re.split(':', str(proc_time))
        return total_tokens, h, m, s
    

    def get_statistic(self, df)->DataFrame:
        statistic = DataFrame()
        statistic['Score Positivo'] = df[df['tag']=='POSITIVE']['score'].describe()
        statistic['Score Negativo'] = df[df['tag']=='NEGATIVE']['score'].describe()
        statistic['Score Neutro']  = df[df['tag']=='NEUTRAL']['score'].describe()
        return statistic
    
    def get_performance(self):
        return self.total_tokens, self.h, self.m, self.s
    
    def three_plot(self, df):
        df['tag'] = df['tag'].replace({'POSITIVE': 'Positivo', 'NEGATIVE': 'Negativo', 'NEUTRAL': 'Neutro'})
        df = df.rename(columns={'tag': 'rotulo'})
        fig = make_subplots(rows=2, cols=2, horizontal_spacing = 0.0, vertical_spacing = 0.05,
                            shared_xaxes=True, shared_yaxes=True,
                            row_heights=[0.4, 0.6], column_widths=[0.8, 0.2])

        fig_scatter = px.scatter(df, x=df.index, y=['score'], color="rotulo",)
        fig_histogram = px.histogram(df, x=df.index, color='rotulo', nbins=20,)
        fig_box = px.box(df, x='rotulo', y="score", color='rotulo',)

        fig_scatter.data[1]['marker']={'color': '#000007'}
        fig_histogram.data[1]['marker']={'color': '#000007', 'pattern': {'shape': ''}}
        fig_box.data[1]['marker']={'color': '#000007'}

        for x in range(3):
            fig_histogram.data[x]['showlegend']=False
            fig_box.data[x]['showlegend']=False
            fig.add_trace(fig_scatter.data[x], row=2, col=1)
            fig.add_trace(fig_histogram.data[x], row=1, col=1)
            fig.add_trace(fig_box.data[x], row=2, col=2,)
        
        fig.update_layout(barmode='overlay', title=f'''Estatísticas: {self.company}<br>Trimestre {self.trim} de 2024''',
                        xaxis3_rangeslider=dict(visible=True, bgcolor="#636EFA", thickness=0.03),
                        legend=dict(orientation="h", yanchor="top",
                                    y=1.3, xanchor="center", x=0.5),
                        scene = dict(yaxis = dict(title=''),))
        fig.update_xaxes(showticklabels=False, showgrid=True, row=1, col=1)
        fig.update_xaxes(title_text='#Fala', showgrid=True, row=2, col=1)
        fig.update_yaxes(title_text='Score', row=2, col=1)
        fig.update_yaxes(title_text='Frequência', row=1, col=1)
        fig.update_traces(marker={"opacity": 0.7}) 
        return fig
    
    def plot_rank(self, tag={'NEUTRAL':'Neutro', 
                             'POSITIVE':'Positivo', 
                             'NEGATIVE':'Negativo',}, 
                    max_display=11):
        plot_bar = dict()
        axis = dict()
        rank = DataFrame()
        for key, val in tag.items():
            plot, ax = plt.subplots()
            shap.plots.bar(self.shap_value[:,:,key], show=False, max_display=max_display,)
            plot_bar[key] = plot
            axis[key] = ax
            rank[val] = DataFrame(ax.get_yticklabels()[:-max_display-1]).astype(str)
            plt.close()
        
        rank[list(tag.values())] = rank[list(tag.values())].replace(r"(([T])\w+|(\d+,)|(\d+.\d+,)|(['\(\)\s]))", '', regex=True)
        
        return plot_bar, axis, rank


    def get_plot_rank(self, max_display=11):
        self.plot_bar, self.axis, self.rank = self.plot_rank(max_display=max_display)
        return self.plot_bar, self.axis, self.rank

    def shap_plot_text(self, num, tag):
        return shap.plots.text(self.shap_value[num, :, tag], display = False)
    
    def shap_waterfall(self, num, tag, max_display):
        plot_waterfall, ax = plt.subplots()
        shap.plots.waterfall(self.shap_value[num, :, tag], show=False, max_display=max_display)
        plt.close()
        return plot_waterfall