| import pandas as pd |
| import en_core_web_sm |
| import it_core_news_sm |
| from modules.combined import calcola_clustering_e_score, elabora_e_correla |
| from modules.lexical import LexicalModule |
| from modules.syntax import SyntaxModule |
| from modules.cohesion import CohesionModule |
| from modules.figurative import FigurativeModule |
| from modules.consecutio import ConsecutioAnalyzer |
| from modules.frequencies import FrequenciesModule |
|
|
| class AnalyzerEngine: |
| def __init__(self): |
| |
| self.nlp = en_core_web_sm.load() |
| self.nlp_it = it_core_news_sm.load() |
| |
| |
| |
| |
| en_r1 = self._load_set("src/basewrd1.txt") |
| en_r2 = self._load_set("src/basewrd2.txt") |
| en_r3 = self._load_set("src/basewrd3.txt") |
| en_mrc = self._load_mrc("src/MRCPLDatabase.txt") |
| self.lex_mod_en = LexicalModule(en_r1, en_r2, en_r3, en_mrc) |
|
|
| |
| it_r1 = self._load_set("src/it_basewrd1.txt") |
| it_r2 = self._load_set("src/it_basewrd2.txt") |
| it_r3 = self._load_set("src/it_basewrd3.txt") |
| it_mrc = self._load_mrc("src/ConcreteRating.txt") |
| self.lex_mod_it = LexicalModule(it_r1, it_r2, it_r3, it_mrc) |
| |
| |
| self.syn_mod = SyntaxModule() |
| self.coh_mod = CohesionModule() |
| self.fig_mod = FigurativeModule() |
| self.consecutio = ConsecutioAnalyzer() |
| self.freq_mod = FrequenciesModule() |
| |
| self.last_doc = None |
| |
| def prepare_filtered_data(self, res, fig_res=None): |
| rows = [] |
| |
| esclusioni = { |
| "tense_stability", "avg_depth", "consecution_index","verb_density", |
| "semantic_cohesion_sentences", "pron_noun_ratio", "definite_articles_ratio", |
| "demonstratives_ratio", "hapax_ratio", "mds_s", "mds_w", "indice_d_testo", |
| "r1", "r2", "r3", "concreteness", "deictic_Frequency", "attr_adjs_freq", |
| "emphatic_particles", "rel_clauses_per_sent", "present_ratio", "participle_ratio", "past_ratio", |
| "first_person_ratio", "mod_per_noun", "avg_sent_len", "Conn_AdNeg", "Conn_AdPos", |
| "Conn_CausNeg", "Conn_CausPos", "Conn_LogNeg", "Conn_LogPos", "Conn_TempNeg", "Conn_TempPos" |
| } |
| |
| categories = { |
| "Statistiche Base": res.get("basic", {}), |
| "Lessico": res.get("lexical", {}), |
| "Sintassi e Verbi": res.get("syntax", {}), |
| "Coesione": res.get("cohesion", {}), |
| "Consecutio": res.get("consecutio", {}), |
| "Combined": res.get("combined", {}) |
| } |
| |
| for cat_name, dict_data in categories.items(): |
| for key, value in dict_data.items(): |
| if isinstance(value, (int, float, str)) and key not in ["texts", "conll", "doc"]: |
| if key in esclusioni: |
| rows.append({"Categoria": cat_name, "Metrica": key, "Valore": value}) |
| |
| connectors = res.get("cohesion", {}).get("connectors", {}) |
| for conn_type, val in connectors.items(): |
| metrica_conn = f"Conn_{conn_type}" |
| if metrica_conn in esclusioni: |
| rows.append({"Categoria": "Connettori", "Metrica": metrica_conn, "Valore": val}) |
|
|
| bert_metrics = ["mds_s", "mds_w", "total"] |
| if fig_res: |
| for m in bert_metrics: |
| if m in esclusioni: |
| val = fig_res.get(m, 0) |
| rows.append({"Categoria": "Figuratività (BERT)", "Metrica": m, "Valore": val}) |
| |
| return pd.DataFrame(rows) |
| |
| def run_combined_analysis(self, res, fig_res, source_name): |
| df_filtrato = self.prepare_filtered_data(res, fig_res) |
| df_input = df_filtrato.set_index(["Categoria", "Metrica"]) |
| df_input.columns = [source_name] |
|
|
| q_score, matrice_corr, medie_classi, gs_perc, df_perc_nuovo = elabora_e_correla(df_input, source_name) |
| q_score, figura, classe_assegnata = calcola_clustering_e_score(q_score, medie_classi, matrice_corr, source_name, gs_perc, df_perc_nuovo) |
| |
| return q_score, figura, classe_assegnata |
|
|
| def _load_set(self, p): |
| try: |
| with open(p, 'r', encoding='utf-8', errors='replace') as f: |
| words = set() |
| for line in f: |
| |
| clean_line = line.strip() |
| clean_line = clean_line.replace('\t', '') |
| if clean_line: |
| |
| word = clean_line.split()[0].lower() |
| if word: |
| words.add(word) |
| return words |
| except Exception as e: |
| print(f"Errore caricamento {p}: {e}") |
| return set() |
|
|
| def _load_mrc(self, p): |
| d = {} |
| with open(p, 'r', encoding='utf-8') as f: |
| for l in f: |
| pts = l.split('\t') |
| if len(pts) > 1: d[pts[0].lower()] = int(pts[1]) |
| return d |
|
|
| def run(self, text, lang="en"): |
| |
| if lang == "it": |
| doc = self.nlp_it(text) |
| active_lex_mod = self.lex_mod_it |
| else: |
| doc = self.nlp(text) |
| active_lex_mod = self.lex_mod_en |
|
|
| self.last_doc = doc |
| sentences = list(doc.sents) |
| lemmas = [t.lemma_.lower() for t in doc if not t.is_punct] |
| tokens_text = [t.text.lower() for t in doc if not t.is_punct] |
| token_text_SP = [t.text.lower() for t in doc] |
| paragraphs = text.split("\n") if "\n" in text else [text] |
| |
| |
| conll = [] |
| for sent in sentences: |
| lines = [] |
| for i, t in enumerate(sent): |
| if t.dep_ == "ROOT" or t.head == t: |
| head_idx = 0 |
| else: |
| head_idx = t.head.i - sent.start + 1 |
| |
| line = f"{i+1}\t{t.text}\t{t.lemma_}\t{t.pos_}\t{t.tag_}\t_\t{head_idx}\t{t.dep_}" |
| lines.append(line) |
| |
| conll.append("\n".join(lines)) |
|
|
| return { |
| "doc": doc, |
| "basic": { |
| "tokens": len(tokens_text), |
| "tokens_including_punct": len(token_text_SP), |
| "sentences": len(sentences), |
| "paragraphs": len(paragraphs), |
| "chars": len(text), |
| "texts": [s.text for s in sentences], |
| "conll": conll |
| }, |
| |
| "lexical": active_lex_mod.analyze(doc, lemmas, tokens_text, token_text_SP, text, sentences, len(paragraphs),lang=lang), |
| "syntax": self.syn_mod.analyze(doc, sentences,lang=lang), |
| "cohesion": self.coh_mod.analyze(doc, sentences, tokens_text, token_text_SP,lang=lang), |
| "consecutio": self.consecutio.analyze(doc), |
| } |