| import logging |
| import pandas as pd |
| import numpy as np |
| import os |
| import json |
| import re |
| from sklearn.cluster import KMeans |
| from sklearn.feature_extraction.text import TfidfVectorizer |
| from sklearn.decomposition import PCA |
| from sentence_transformers import SentenceTransformer |
| from typing import List, Dict, Tuple, Optional |
| from langchain_openai import ChatOpenAI |
| from langchain.schema import HumanMessage |
| import plotly.express as px |
| import plotly.graph_objects as go |
|
|
|
|
| RESULTS_DIR = '/tmp/results' |
| SAVE_PATH_CLUSTERS = os.path.join(RESULTS_DIR, 'cluster_results.xlsx') |
| SAVE_PATH_ORIGINAL = os.path.join(RESULTS_DIR, 'data_with_clusters.xlsx') |
| EMBEDDING_MODEL_NAME = 'sentence-transformers/all-MiniLM-L6-v2' |
| LLM_MODEL_NAME = 'gpt-4o-mini' |
|
|
|
|
| PNRR_STOPWORDS = { |
| 'pnrr', 'piano', 'nazionale', 'ripresa', 'resilienza', 'progetto', 'progetti', |
| 'intervento', 'interventi', 'attività', 'realizzazione', 'sviluppo', |
| 'implementazione', 'potenziamento', 'miglioramento', 'sostegno', |
| 'euro', 'milioni', 'miliardi', 'finanziamento', 'investimento', |
| 'pubblico', 'pubblica', 'amministrazione', 'ente', 'comune', 'regione', |
| 'italia', 'italiano', 'italiana', 'nazionale' |
| } |
|
|
| ITALIAN_STOPWORDS = { |
| |
| 'il', 'lo', 'la', 'i', 'gli', 'le', 'un', 'uno', 'una', |
| |
| 'di', 'a', 'da', 'in', 'con', 'su', 'per', 'tra', 'fra', |
| |
| 'del', 'dello', 'della', 'dei', 'degli', 'delle', |
| 'al', 'allo', 'alla', 'ai', 'agli', 'alle', |
| 'dal', 'dallo', 'dalla', 'dai', 'dagli', 'dalle', |
| 'nel', 'nello', 'nella', 'nei', 'negli', 'nelle', |
| 'sul', 'sullo', 'sulla', 'sui', 'sugli', 'sulle', |
| |
| 'e', 'ed', 'o', 'od', 'ma', 'però', 'anche', 'ancora', 'quindi', 'dunque', 'mentre', 'quando', 'se', |
| |
| 'che', 'chi', 'cui', 'quale', 'quali', 'questo', 'questa', 'questi', 'queste', |
| 'quello', 'quella', 'quelli', 'quelle', 'stesso', 'stessa', 'stessi', 'stesse', |
| |
| 'dove', 'come', 'perché', 'già', 'più', 'molto', 'poco', 'tanto', 'quanto', 'sempre', 'mai', |
| 'oggi', 'ieri', 'domani', 'prima', 'dopo', 'sopra', 'sotto', 'dentro', 'fuori', |
| |
| 'tutto', 'tutti', 'tutte', 'ogni', 'alcuni', 'alcune', 'altro', 'altri', 'altre', |
| 'nessuno', 'nessuna', 'niente', 'nulla', 'qualche', 'qualcosa', 'qualcuno', |
| |
| 'essere', 'avere', 'fare', 'dire', 'andare', 'venire', 'volere', 'potere', 'dovere', 'sapere', |
| 'stare', 'dare', 'vedere', 'uscire', 'partire', |
| |
| 'contesto', 'attraverso', 'mediante', 'presso', 'verso', 'circa', 'oltre', 'secondo', 'durante' |
| } |
|
|
|
|
| def preprocess_text(text: str, remove_domain_stopwords: bool = True, custom_blacklist: Optional[List[str]] = None) -> str: |
| """ |
| Preprocess text by removing stopwords and applying cleaning. |
| |
| Args: |
| text: Input text |
| remove_domain_stopwords: Whether to remove PNRR-specific stopwords |
| custom_blacklist: Additional words to exclude (will be added to default stopwords) |
| |
| Returns: |
| str: Cleaned text |
| """ |
| if not isinstance(text, str): |
| return "" |
|
|
| |
| text = text.lower() |
|
|
| |
| text = re.sub(r'[^\w\sàèéìíîòóùú]', ' ', text) |
|
|
| |
| text = re.sub(r'\b\d+\b', ' ', text) |
|
|
| |
| text = ' '.join(text.split()) |
|
|
| if remove_domain_stopwords: |
| |
| words = text.split() |
|
|
| |
| stopwords_to_remove = ITALIAN_STOPWORDS.union(PNRR_STOPWORDS) |
|
|
| |
| if custom_blacklist: |
| custom_stopwords = {word.lower().strip() |
| for word in custom_blacklist if word.strip()} |
| stopwords_to_remove = stopwords_to_remove.union(custom_stopwords) |
|
|
| |
| filtered_words = [] |
| for word in words: |
| if (word not in stopwords_to_remove and |
| len(word) > 2 and |
| not word.isdigit() and |
| re.search(r'[a-zA-Zàèéìíîòóùú]', word)): |
| filtered_words.append(word) |
|
|
| |
| text = ' '.join(filtered_words) |
|
|
| return text |
|
|
|
|
| def combine_text_columns(df: pd.DataFrame, columns: List[str], preprocess: bool = True, custom_blacklist: Optional[List[str]] = None) -> pd.Series: |
| """Combine multiple text columns into a single text representation. |
| |
| Args: |
| df: DataFrame containing the data |
| columns: List of column names to combine |
| preprocess: Whether to apply text preprocessing (cleaning and stopword removal) |
| custom_blacklist: Additional words to exclude from preprocessing |
| |
| Returns: |
| pd.Series: Series containing the combined texts for each row |
| """ |
| combined_texts = [] |
| for idx, row in df.iterrows(): |
| text_parts = [] |
| for col in columns: |
| if col in df.columns and pd.notna(row[col]): |
| text_part = str(row[col]) |
| if preprocess: |
| text_part = preprocess_text( |
| text_part, custom_blacklist=custom_blacklist) |
| text_parts.append(text_part) |
|
|
| combined_text = " | ".join(text_parts) |
| |
| if preprocess: |
| combined_text = ' '.join( |
| combined_text.split()) |
|
|
| combined_texts.append(combined_text) |
| return pd.Series(combined_texts) |
|
|
|
|
| def create_embeddings(texts: List[str], model_name: str = EMBEDDING_MODEL_NAME) -> np.ndarray: |
| """Create vector embeddings for texts using sentence transformers. |
| |
| Args: |
| texts: List of texts to process |
| model_name: Name of the model to use for embeddings |
| |
| Returns: |
| np.ndarray: Numpy array containing the vector embeddings |
| """ |
| logging.info(f"Creating embeddings with model: {model_name}") |
| model = SentenceTransformer(model_name) |
| embeddings = model.encode(texts, show_progress_bar=True) |
| return embeddings |
|
|
|
|
| def perform_clustering(embeddings: np.ndarray, n_clusters: Optional[int] = None, max_clusters: int = 20, min_clusters: int = 2) -> Tuple[np.ndarray, int]: |
| """Perform K-means clustering on vector embeddings. |
| |
| Args: |
| embeddings: Numpy array of embeddings |
| n_clusters: Fixed number of clusters (if None, determined automatically) |
| max_clusters: Maximum number of clusters for automatic selection |
| min_clusters: Minimum number of clusters for automatic selection |
| |
| Returns: |
| Tuple[np.ndarray, int]: Tuple containing cluster labels and final number of clusters |
| """ |
| if n_clusters is None: |
| |
| n_clusters = find_optimal_clusters(embeddings, max_clusters, min_clusters) |
| |
| logging.info(f"Performing clustering with {n_clusters} clusters") |
| kmeans = KMeans(n_clusters=n_clusters, random_state=42, n_init=10) |
| cluster_labels = kmeans.fit_predict(embeddings) |
| |
| return cluster_labels, n_clusters |
|
|
|
|
| def find_optimal_clusters(embeddings: np.ndarray, max_clusters: int = 20, min_clusters: int = 2) -> int: |
| """Find optimal number of clusters using the elbow method. |
| |
| Args: |
| embeddings: Numpy array of embeddings |
| max_clusters: Maximum number of clusters to test |
| min_clusters: Minimum number of clusters to test |
| |
| Returns: |
| int: Optimal number of clusters determined |
| """ |
| if len(embeddings) < max_clusters: |
| max_clusters = len(embeddings) - 1 |
| |
| |
| min_clusters = max(2, min_clusters) |
| if min_clusters > max_clusters: |
| min_clusters = max_clusters |
|
|
| if max_clusters < 2: |
| return 2 |
| |
| inertias = [] |
| K_range = range(min_clusters, min(max_clusters + 1, len(embeddings))) |
| |
| for k in K_range: |
| kmeans = KMeans(n_clusters=k, random_state=42, n_init=10) |
| kmeans.fit(embeddings) |
| inertias.append(kmeans.inertia_) |
| |
| |
| if len(inertias) < 2: |
| return min_clusters |
| |
| |
| deltas = np.diff(inertias) |
| delta_deltas = np.diff(deltas) |
| |
| |
| if len(delta_deltas) > 0: |
| elbow_idx = np.argmax(delta_deltas) + min_clusters |
| return max(min_clusters, min(elbow_idx, max_clusters)) |
| |
| return min_clusters |
|
|
|
|
| def generate_cluster_description(cluster_texts: List[str], cluster_id: int) -> Tuple[str, str]: |
| """Generate title and description for a cluster using LLM. |
| |
| Args: |
| cluster_texts: List of texts belonging to the cluster |
| cluster_id: Numeric ID of the cluster |
| |
| Returns: |
| Tuple[str, str]: Tuple containing title and description of the cluster |
| """ |
| try: |
| |
| sample_texts = cluster_texts[:10] if len(cluster_texts) > 10 else cluster_texts |
| |
| |
| text_sample = "\n".join([f"- {text[:200]}" for text in sample_texts]) |
| |
| llm = ChatOpenAI(model=LLM_MODEL_NAME, temperature=0.3) |
| |
| prompt = f""" |
| Analizza i seguenti progetti PNRR e identifica il tema comune che li accomuna. |
| Devi fornire un titolo breve (max 50 caratteri) e una descrizione concisa (max 150 caratteri) che catturi l'essenza di questi progetti. |
| |
| Progetti del cluster {cluster_id + 1}: |
| {text_sample} |
| |
| Rispondi in formato JSON con le chiavi "titolo" e "descrizione". |
| Il titolo deve essere specifico e descrittivo del tema comune. |
| La descrizione deve spiegare brevemente cosa accomuna questi progetti. |
| |
| Esempio di risposta: |
| {{ |
| "titolo": "Digitalizzazione Sanità", |
| "descrizione": "Progetti di migrazione cloud e infrastrutture digitali per aziende sanitarie" |
| }} |
| """ |
| |
| response = llm.invoke([HumanMessage(content=prompt)]) |
| response_content = response.content.strip() |
| logging.info(f"LLM Response for cluster {cluster_id}: {response_content}") |
| |
| try: |
| result = json.loads(response_content) |
| title = result.get("titolo", f"Cluster {cluster_id + 1}")[:50] |
| description = result.get("descrizione", "Cluster di progetti correlati")[:150] |
| except json.JSONDecodeError: |
| try: |
| |
| json_match = re.search(r'\{[^}]*"titolo"[^}]*"descrizione"[^}]*\}', response_content, re.DOTALL) |
| if json_match: |
| json_str = json_match.group(0) |
| result = json.loads(json_str) |
| title = result.get("titolo", f"Cluster {cluster_id + 1}")[:50] |
| description = result.get("descrizione", "Cluster di progetti correlati")[:150] |
| else: |
| |
| title_match = re.search(r'"titolo":\s*"([^"]+)"', response_content) |
| desc_match = re.search(r'"descrizione":\s*"([^"]+)"', response_content) |
| |
| title = title_match.group(1)[:50] if title_match else f"Cluster {cluster_id + 1}" |
| description = desc_match.group(1)[:150] if desc_match else "Cluster di progetti correlati" |
| except (json.JSONDecodeError, AttributeError) as e: |
| |
| logging.warning(f"Failed to parse JSON for cluster {cluster_id}: {e}") |
| title = f"Cluster {cluster_id + 1}" |
| description = "Cluster di progetti correlati" |
| |
| except Exception as e: |
| logging.warning(f"Error generating description for cluster {cluster_id}: {e}") |
| title = f"Cluster {cluster_id + 1}" |
| description = f"Cluster contenente {len(cluster_texts)} progetti correlati" |
| |
| return title, description |
|
|
|
|
| def extract_keywords(cluster_texts: List[str], top_k: int = 5, custom_blacklist: Optional[List[str]] = None) -> List[str]: |
| """Extract top keywords from cluster texts using TF-IDF with advanced filtering. |
| |
| Args: |
| cluster_texts: List of cluster texts |
| top_k: Maximum number of keywords to extract |
| custom_blacklist: List of words to exclude from extraction |
| |
| Returns: |
| List[str]: List of the most relevant keywords |
| """ |
| if not cluster_texts: |
| return [] |
| |
| try: |
| |
| custom_stopwords = ITALIAN_STOPWORDS.union(PNRR_STOPWORDS) |
| |
| |
| if custom_blacklist: |
| custom_stopwords_set = {word.lower().strip() for word in custom_blacklist if word.strip()} |
| custom_stopwords = custom_stopwords.union(custom_stopwords_set) |
| |
| |
| stopwords_list = list(custom_stopwords) |
| |
| |
| vectorizer = TfidfVectorizer( |
| max_features=200, |
| stop_words=stopwords_list, |
| ngram_range=(1, 3), |
| min_df=2, |
| token_pattern=r'\b[a-zA-ZÀ-ÿ]{3,}\b' |
| ) |
| |
| tfidf_matrix = vectorizer.fit_transform(cluster_texts) |
| feature_names = vectorizer.get_feature_names_out() |
| |
| |
| mean_scores = np.mean(tfidf_matrix.toarray(), axis=0) |
|
|
| |
| candidates = [(feature_names[i], mean_scores[i]) for i in range(len(feature_names))] |
| candidates.sort(key=lambda x: x[1], reverse=True) |
|
|
| |
| filtered_keywords = [] |
| used_words = set() |
|
|
| for keyword, score in candidates: |
| |
| if len(filtered_keywords) >= top_k: |
| break |
|
|
| |
| keyword_clean = keyword.lower().strip() |
|
|
| |
| if len(keyword_clean) < 3 or keyword_clean.isdigit(): |
| continue |
|
|
| |
| if keyword_clean in custom_stopwords: |
| continue |
|
|
| |
| is_redundant = False |
|
|
| |
| keyword_words = set(keyword_clean.split()) |
|
|
| |
| if len(keyword_words) > 1: |
| |
| overlap_with_used = keyword_words.intersection(used_words) |
| if len(overlap_with_used) > 0: |
| is_redundant = True |
|
|
| |
| for existing_keyword in filtered_keywords: |
| existing_words = set(existing_keyword.lower().split()) |
|
|
| |
| if (keyword_words.issubset(existing_words) or |
| existing_words.issubset(keyword_words)): |
| is_redundant = True |
| break |
|
|
| |
| if (len(keyword_words) > 1 and len(existing_words) > 1): |
| shared_words = keyword_words.intersection(existing_words) |
| if len(shared_words) >= min(len(keyword_words), len(existing_words)) * 0.7: |
| is_redundant = True |
| break |
|
|
| if not is_redundant: |
| filtered_keywords.append(keyword) |
| |
| used_words.update(keyword_words) |
|
|
| return filtered_keywords[:top_k] |
|
|
| except Exception as e: |
| logging.warning(f"Error extracting keywords: {e}") |
| return [] |
|
|
|
|
| def analyze_clusters( |
| data_frame_path, |
| selected_columns: List[str], |
| n_clusters: Optional[int] = None, |
| max_clusters: int = 20, |
| min_clusters: int = 2, |
| preprocess_text_data: bool = True, |
| custom_blacklist: Optional[List[str]] = None |
| ) -> Tuple[pd.DataFrame, pd.DataFrame, np.ndarray, np.ndarray]: |
| """ |
| Main function to perform cluster analysis on PNRR projects. |
| |
| Args: |
| data_frame_path: Path to the Excel file |
| selected_columns: List of column names to use for clustering |
| n_clusters: Number of clusters (if None, will be determined automatically) |
| max_clusters: Maximum number of clusters for automatic selection |
| min_clusters: Minimum number of clusters for automatic selection |
| preprocess_text_data: Whether to preprocess text (remove stopwords, clean) |
| custom_blacklist: Additional words to exclude from analysis |
| |
| Returns: |
| Tuple[pd.DataFrame, pd.DataFrame, np.ndarray, np.ndarray]: Tuple of (cluster_results_df, original_data_with_clusters_df, embeddings, cluster_labels) |
| """ |
| logging.info(f"Loading DataFrame from {data_frame_path}...") |
| df = pd.read_excel(data_frame_path) |
| logging.info(f"Loaded DataFrame with {len(df)} rows") |
| |
| available_columns = [col for col in selected_columns if col in df.columns] |
| if not available_columns: |
| raise ValueError("None of the selected columns are available in the DataFrame") |
| |
| logging.info(f"Using columns for clustering: {available_columns}") |
| if preprocess_text_data: |
| logging.info( |
| "Preprocessing text data (removing stopwords and cleaning)") |
| if custom_blacklist: |
| logging.info( |
| f"Using custom blacklist with {len(custom_blacklist)} additional words") |
| |
| combined_texts = combine_text_columns( |
| df, available_columns, preprocess=preprocess_text_data, custom_blacklist=custom_blacklist) |
| non_empty_mask = combined_texts.str.strip() != "" |
| if non_empty_mask.sum() == 0: |
| raise ValueError("No non-empty text found in selected columns") |
| |
| df_filtered = df[non_empty_mask].copy() |
| texts_filtered = combined_texts[non_empty_mask].tolist() |
| |
| embeddings = create_embeddings(texts_filtered) |
| cluster_labels, final_n_clusters = perform_clustering(embeddings, n_clusters, max_clusters, min_clusters) |
| |
| df_filtered['cluster_id'] = cluster_labels |
| |
| |
| cluster_results = [] |
| for cluster_id in range(final_n_clusters): |
| cluster_mask = cluster_labels == cluster_id |
| cluster_texts = [texts_filtered[i] for i in range(len(texts_filtered)) if cluster_mask[i]] |
| |
| if not cluster_texts: |
| continue |
| |
| title, description = generate_cluster_description(cluster_texts, cluster_id) |
| keywords = extract_keywords(cluster_texts, custom_blacklist=custom_blacklist) |
| |
| cluster_results.append({ |
| 'cluster_id': cluster_id, |
| 'titolo': title, |
| 'descrizione': description, |
| 'num_progetti': len(cluster_texts), |
| 'keywords': ', '.join(keywords), |
| 'progetti_campione': ' | '.join(cluster_texts[:3]) |
| }) |
| |
| cluster_df = pd.DataFrame(cluster_results) |
| |
| |
| |
| df_with_clusters = df.copy() |
| df_with_clusters['cluster_id'] = -1 |
| df_with_clusters.loc[non_empty_mask, 'cluster_id'] = cluster_labels |
| |
| logging.info(f"Created {final_n_clusters} clusters") |
| logging.info(f"Assigned {len(cluster_labels)} projects to clusters") |
| |
| return cluster_df, df_with_clusters, embeddings, cluster_labels |
|
|
|
|
| def save_results(cluster_df: pd.DataFrame, data_with_clusters_df: pd.DataFrame) -> None: |
| """Save clustering results to Excel files. |
| |
| Args: |
| cluster_df: DataFrame with cluster results |
| data_with_clusters_df: Original DataFrame with assigned cluster IDs |
| |
| Returns: |
| None |
| """ |
| |
| os.makedirs(RESULTS_DIR, exist_ok=True) |
|
|
| logging.info(f"Saving cluster results to {SAVE_PATH_CLUSTERS}") |
| cluster_df.to_excel(SAVE_PATH_CLUSTERS, index=False) |
| |
| logging.info(f"Saving data with clusters to {SAVE_PATH_ORIGINAL}") |
| data_with_clusters_df.to_excel(SAVE_PATH_ORIGINAL, index=False) |
| |
| logging.info("Results saved successfully") |
|
|
|
|
| def get_cluster_statistics(cluster_df: pd.DataFrame, data_with_clusters_df: pd.DataFrame) -> Dict[str, float]: |
| """Generate statistics about the clustering results. |
| |
| Args: |
| cluster_df: DataFrame with cluster results |
| data_with_clusters_df: Original DataFrame with assigned cluster IDs |
| |
| Returns: |
| Dict[str, float]: Dictionary containing clustering statistics |
| """ |
| total_projects = len(data_with_clusters_df) |
| assigned_projects = len(data_with_clusters_df[data_with_clusters_df['cluster_id'] >= 0]) |
| unassigned_projects = total_projects - assigned_projects |
| |
| stats = { |
| 'total_projects': total_projects, |
| 'assigned_projects': assigned_projects, |
| 'unassigned_projects': unassigned_projects, |
| 'num_clusters': len(cluster_df), |
| 'avg_projects_per_cluster': assigned_projects / len(cluster_df) if len(cluster_df) > 0 else 0, |
| 'largest_cluster_size': cluster_df['num_progetti'].max() if len(cluster_df) > 0 else 0, |
| 'smallest_cluster_size': cluster_df['num_progetti'].min() if len(cluster_df) > 0 else 0 |
| } |
| |
| return stats |
|
|
|
|
| def create_cluster_pca_plot(embeddings: np.ndarray, cluster_labels: np.ndarray, cluster_df: pd.DataFrame) -> go.Figure: |
| """ |
| Create a 2D PCA plot of clusters using plotly express. |
| |
| Args: |
| embeddings: Numpy array of embeddings |
| cluster_labels: Cluster labels for each point |
| cluster_df: DataFrame with cluster information (for titles and descriptions) |
| |
| Returns: |
| plotly.graph_objects.Figure: Interactive plot figure |
| """ |
| try: |
| |
| logging.info("Performing PCA reduction to 2D for visualization...") |
| pca = PCA(n_components=2, random_state=42) |
| embeddings_2d = pca.fit_transform(embeddings) |
|
|
| |
| plot_df = pd.DataFrame({ |
| 'PC1': embeddings_2d[:, 0], |
| 'PC2': embeddings_2d[:, 1], |
| 'cluster_id': cluster_labels |
| }) |
|
|
| |
| cluster_titles = {} |
| cluster_colors = {} |
| for idx, row in cluster_df.iterrows(): |
| cluster_id = row['cluster_id'] |
| cluster_titles[cluster_id] = f"Cluster {cluster_id + 1}: {row['titolo']}" |
|
|
| |
| plot_df['cluster_title'] = plot_df['cluster_id'].map(cluster_titles) |
| plot_df['cluster_description'] = plot_df['cluster_id'].map( |
| lambda x: cluster_df[cluster_df['cluster_id'] == |
| x]['descrizione'].iloc[0] if x in cluster_df['cluster_id'].values else "Cluster sconosciuto" |
| ) |
| plot_df['num_progetti'] = plot_df['cluster_id'].map( |
| lambda x: cluster_df[cluster_df['cluster_id'] == |
| x]['num_progetti'].iloc[0] if x in cluster_df['cluster_id'].values else 0 |
| ) |
|
|
| |
| fig = px.scatter( |
| plot_df, |
| x='PC1', |
| y='PC2', |
| color='cluster_id', |
| hover_data={ |
| 'cluster_title': True, |
| 'cluster_description': True, |
| 'num_progetti': True, |
| 'PC1': ':.3f', |
| 'PC2': ':.3f', |
| 'cluster_id': False |
| }, |
| title='Visualizzazione 2D dei Cluster (PCA)', |
| labels={ |
| 'PC1': f'Prima Componente Principale ({pca.explained_variance_ratio_[0]:.1%} varianza)', |
| 'PC2': f'Seconda Componente Principale ({pca.explained_variance_ratio_[1]:.1%} varianza)', |
| 'cluster_id': 'Cluster ID' |
| }, |
| color_discrete_sequence=px.colors.qualitative.Set3 |
| ) |
|
|
| |
| fig.update_layout( |
| width=800, |
| height=600, |
| showlegend=True, |
| legend=dict( |
| orientation="v", |
| yanchor="top", |
| y=1, |
| xanchor="left", |
| x=1.02 |
| ), |
| margin=dict(r=150), |
| font=dict(size=12), |
| plot_bgcolor='rgba(0,0,0,0)' |
| ) |
|
|
| |
| fig.update_traces( |
| marker=dict( |
| size=8, |
| opacity=0.7, |
| line=dict(width=1, color='DarkSlateGrey') |
| ) |
| ) |
|
|
| |
| explained_variance_total = pca.explained_variance_ratio_[ |
| 0] + pca.explained_variance_ratio_[1] |
| fig.add_annotation( |
| text=f"Varianza totale spiegata: {explained_variance_total:.1%}<br>Ogni punto rappresenta un progetto PNRR", |
| xref="paper", yref="paper", |
| x=0.02, y=0.98, |
| xanchor="left", yanchor="top", |
| showarrow=False, |
| font=dict(size=10, color="gray"), |
| bgcolor="rgba(255,255,255,0.8)", |
| bordercolor="gray", |
| borderwidth=1 |
| ) |
|
|
| logging.info( |
| f"Created PCA plot with {len(plot_df)} points and {len(cluster_df)} clusters") |
| logging.info( |
| f"Total explained variance: {explained_variance_total:.3f}") |
|
|
| return fig |
|
|
| except Exception as e: |
| logging.error(f"Error creating PCA plot: {e}") |
| |
| fig = go.Figure() |
| fig.add_annotation( |
| text=f"Errore nella creazione del plot PCA: {str(e)}", |
| x=0.5, y=0.5, |
| xref="paper", yref="paper", |
| showarrow=False |
| ) |
| return fig |
|
|