| |
| |
| |
|
|
| |
| |
| import os |
| import warnings |
| warnings.filterwarnings('ignore') |
|
|
| |
| from dotenv import load_dotenv |
| from anthropic import Anthropic |
|
|
| |
| try: |
| from openai import OpenAI |
| except ImportError: |
| OpenAI = None |
| |
| try: |
| from groq import Groq |
| except ImportError: |
| Groq = None |
| |
| try: |
| import google.generativeai as genai |
| except ImportError: |
| genai = None |
|
|
| |
| import pandas as pd |
| import numpy as np |
| from datetime import datetime, timedelta |
| import json |
| import gc |
|
|
| |
| import nltk |
| from nltk.corpus import stopwords |
| from nltk.tokenize import word_tokenize |
| from nltk.stem import WordNetLemmatizer |
| from textblob import TextBlob |
| import re |
| from collections import Counter |
|
|
| |
| from sklearn.feature_extraction.text import TfidfVectorizer, CountVectorizer |
| from sklearn.decomposition import LatentDirichletAllocation |
| from sklearn.cluster import KMeans |
| from sklearn.preprocessing import StandardScaler |
| from sklearn.metrics.pairwise import cosine_similarity |
|
|
| |
| import plotly.express as px |
| import plotly.graph_objects as go |
| from plotly.subplots import make_subplots |
| import matplotlib.pyplot as plt |
| import seaborn as sns |
|
|
| |
| import gradio as gr |
|
|
| |
| nltk.download('punkt', quiet=True) |
| nltk.download('punkt_tab', quiet=True) |
| nltk.download('stopwords', quiet=True) |
| nltk.download('wordnet', quiet=True) |
| nltk.download('averaged_perceptron_tagger', quiet=True) |
| nltk.download('omw-1.4', quiet=True) |
| nltk.download('brown', quiet=True) |
|
|
| |
| try: |
| from textblob import download_corpora |
| download_corpora.main() |
| except: |
| |
| import subprocess |
| import sys |
| try: |
| |
| subprocess.run([sys.executable, "-m", "textblob.download_corpora"], |
| capture_output=True, text=True, timeout=30) |
| except: |
| |
| print("Warning: Could not download TextBlob corpora. Sentiment analysis may not work properly.") |
| print("Please run: python -m textblob.download_corpora") |
|
|
| |
| load_dotenv(override=True) |
|
|
| |
| class SmartColumnDetector: |
| """ |
| Intelligently detect and extract relevant columns from uploaded data |
| This class automatically identifies what type of data each column contains |
| """ |
| |
| def __init__(self): |
| """Initialize the detector with keyword lists for different column types""" |
| |
| self.text_keywords = ['comment', 'feedback', 'review', 'description', 'text', |
| 'response', 'opinion', 'message', 'notes', 'remarks'] |
| |
| |
| self.id_keywords = ['id', 'identifier', 'key', 'number', 'code', 'ref', |
| 'reference', 'index', 'uuid'] |
| |
| |
| self.product_keywords = ['product', 'item', 'model', 'variant', 'type', |
| 'category', 'brand', 'name', 'sku'] |
| |
| |
| self.date_keywords = ['date', 'time', 'created', 'updated', 'timestamp'] |
| |
| def detect_column_types(self, df): |
| """ |
| Detect column types based on column names and content analysis |
| Returns a dictionary categorizing each column by its likely purpose |
| """ |
| |
| detected = { |
| 'text_columns': [], |
| 'id_columns': [], |
| 'product_columns': [], |
| 'date_columns': [], |
| 'other_columns': [] |
| } |
| |
| |
| for col in df.columns: |
| col_lower = col.lower() |
| |
| |
| if any(keyword in col_lower for keyword in self.text_keywords): |
| detected['text_columns'].append(col) |
| |
| elif any(keyword in col_lower for keyword in self.id_keywords): |
| detected['id_columns'].append(col) |
| |
| elif any(keyword in col_lower for keyword in self.product_keywords): |
| detected['product_columns'].append(col) |
| |
| elif any(keyword in col_lower for keyword in self.date_keywords): |
| detected['date_columns'].append(col) |
| else: |
| |
| sample = df[col].dropna().head(100) |
| if len(sample) > 0: |
| |
| if df[col].dtype == 'object': |
| |
| avg_length = sample.astype(str).str.len().mean() |
| if avg_length > 50: |
| detected['text_columns'].append(col) |
| elif avg_length < 20 and df[col].nunique() / len(df) > 0.5: |
| |
| detected['id_columns'].append(col) |
| else: |
| |
| detected['product_columns'].append(col) |
| else: |
| |
| detected['other_columns'].append(col) |
| |
| return detected |
| |
| def extract_relevant_data(self, df): |
| """ |
| Extract only relevant columns and create optimized dataset for analysis |
| This reduces memory usage and focuses on important data |
| """ |
| |
| detected = self.detect_column_types(df) |
| |
| |
| extracted_data = pd.DataFrame() |
| |
| |
| if detected['id_columns'] and len(detected['id_columns']) > 0: |
| |
| extracted_data['unique_id'] = df[detected['id_columns'][0]] |
| else: |
| |
| extracted_data['unique_id'] = range(1, len(df) + 1) |
| |
| |
| if detected['product_columns'] and len(detected['product_columns']) > 0: |
| |
| product_cols = list(detected['product_columns'])[:2] |
| for col in product_cols: |
| |
| extracted_data[f'product_{col}'] = df[col] |
| |
| |
| if detected['text_columns'] and len(detected['text_columns']) > 0: |
| text_cols = list(detected['text_columns']) |
| text_data = [] |
| |
| |
| for idx in df.index: |
| combined_text = ' '.join([ |
| str(df.loc[idx, col]) |
| for col in text_cols |
| if col in df.columns and pd.notna(df.loc[idx, col]) |
| ]) |
| text_data.append(combined_text) |
| extracted_data['combined_text'] = text_data |
| else: |
| |
| extracted_data['combined_text'] = [''] * len(df) |
| |
| |
| if detected['date_columns'] and len(detected['date_columns']) > 0: |
| |
| extracted_data['date'] = pd.to_datetime(df[detected['date_columns'][0]], errors='coerce') |
| |
| |
| return extracted_data, detected |
|
|
| |
| class EnhancedTextProcessor: |
| """ |
| Enhanced text preprocessing with actionable insights extraction |
| This class handles text cleaning and extracts meaningful patterns from customer feedback |
| """ |
|
|
| def __init__(self): |
| """Initialize the text processor with NLP tools and insight dictionaries""" |
| self.lemmatizer = WordNetLemmatizer() |
| self.stop_words = set(stopwords.words('english')) |
| |
| |
| |
| self.actionable_dictionary = { |
| 'improve speed': ['slow', 'faster', 'quick', 'speed up', 'takes too long', 'waiting'], |
| 'better quality': ['poor quality', 'cheap', 'breaks', 'defective', 'flimsy', 'weak'], |
| 'enhance ui': ['confusing', 'hard to use', 'complicated', 'not intuitive', 'difficult to navigate'], |
| 'fix bugs': ['bug', 'error', 'crash', 'freeze', 'not working', 'glitch', 'broken'], |
| 'add features': ['missing', 'need', 'want', 'should have', 'would be nice', 'lacks'], |
| 'improve support': ['no response', 'unhelpful', 'rude', 'poor service', 'bad support'], |
| 'better packaging': ['damaged', 'poor packaging', 'arrived broken', 'not protected'], |
| 'clearer docs': ['unclear', 'no instructions', 'confusing manual', 'hard to understand'], |
| 'reduce price': ['expensive', 'overpriced', 'too costly', 'not worth', 'cheaper'], |
| 'faster delivery': ['late', 'delayed', 'slow shipping', 'took forever', 'still waiting'], |
| 'better communication': ['no updates', 'not informed', 'lack of communication', 'no tracking'], |
| 'improve reliability': ['unreliable', 'stops working', 'inconsistent', 'sometimes works'], |
| 'enhance performance': ['slow performance', 'laggy', 'sluggish', 'not responsive'], |
| 'better design': ['ugly', 'poor design', 'looks cheap', 'not attractive', 'outdated look'], |
| 'more options': ['limited options', 'no variety', 'need more choices', 'only one option'] |
| } |
|
|
| def clean_text(self, text): |
| """ |
| Clean and normalize text for analysis |
| Removes special characters and standardizes format |
| """ |
| |
| if pd.isna(text) or text == '': |
| return "" |
|
|
| text = str(text).lower() |
| text = re.sub(r'[^a-zA-Z0-9\s]', '', text) |
| text = ' '.join(text.split()) |
| return text |
|
|
| def extract_actionable_insights(self, text): |
| """ |
| Extract actionable insights using dictionary matching |
| Returns comma-separated list of suggested improvements |
| """ |
| |
| if pd.isna(text) or text == '': |
| return "" |
| |
| text_lower = text.lower() |
| found_insights = [] |
| |
| |
| for action, keywords in self.actionable_dictionary.items(): |
| for keyword in keywords: |
| if keyword in text_lower: |
| found_insights.append(action) |
| break |
| |
| |
| if found_insights: |
| return ', '.join(found_insights[:3]) |
| return "" |
|
|
| def extract_specific_topics(self, text): |
| """ |
| Extract specific topics from text using keyword extraction and noun phrase detection |
| Returns list of 3 topics (may include empty strings if not enough topics found) |
| """ |
| |
| if pd.isna(text) or text == '' or len(text) < 10: |
| return ['', '', ''] |
| |
| text_lower = text.lower() |
| |
| |
| words = word_tokenize(text_lower) |
| |
| filtered_words = [w for w in words if w not in self.stop_words and len(w) > 3] |
| |
| |
| blob = TextBlob(text) |
| noun_phrases = blob.noun_phrases |
| |
| topics = [] |
| |
| |
| for phrase in noun_phrases[:5]: |
| if len(phrase.split()) <= 3: |
| topics.append(phrase) |
| |
| |
| if len(topics) < 3: |
| word_freq = Counter(filtered_words) |
| for word, _ in word_freq.most_common(5): |
| if word not in str(topics): |
| topics.append(word) |
| if len(topics) >= 3: |
| break |
| |
| |
| topics = topics[:3] |
| while len(topics) < 3: |
| topics.append('') |
| |
| return topics |
|
|
| def determine_topic(self, text): |
| """ |
| Legacy method kept for compatibility - returns first specific topic |
| This maintains backward compatibility with older versions |
| """ |
| topics = self.extract_specific_topics(text) |
| return topics[0] if topics[0] else 'General' |
|
|
| |
| class TextSearchEngine: |
| """ |
| Advanced search functionality for text data with semantic capabilities |
| Uses TF-IDF vectorization and cosine similarity for intelligent text search |
| """ |
| |
| def __init__(self): |
| """Initialize the search engine with TF-IDF vectorizer and synonym dictionary""" |
| |
| |
| self.vectorizer = TfidfVectorizer( |
| max_features=1000, |
| ngram_range=(1, 3), |
| stop_words='english', |
| use_idf=True, |
| smooth_idf=True, |
| sublinear_tf=True |
| ) |
| self.tfidf_matrix = None |
| self.data = None |
| |
| |
| self.synonyms = { |
| 'fast': ['quick', 'rapid', 'speedy', 'swift', 'prompt'], |
| 'slow': ['sluggish', 'delayed', 'laggy', 'lengthy', 'prolonged'], |
| 'good': ['excellent', 'great', 'wonderful', 'fantastic', 'amazing', 'positive'], |
| 'bad': ['poor', 'terrible', 'awful', 'negative', 'horrible', 'disappointing'], |
| 'problem': ['issue', 'bug', 'error', 'defect', 'fault', 'glitch'], |
| 'help': ['support', 'assistance', 'aid', 'service'], |
| 'price': ['cost', 'fee', 'charge', 'rate', 'payment', 'expensive', 'cheap'], |
| 'quality': ['standard', 'grade', 'condition', 'caliber'], |
| 'delivery': ['shipping', 'dispatch', 'arrival', 'transport'], |
| 'easy': ['simple', 'straightforward', 'effortless', 'user-friendly'], |
| 'hard': ['difficult', 'complex', 'complicated', 'challenging'], |
| 'broken': ['damaged', 'defective', 'faulty', 'malfunctioning'], |
| 'love': ['like', 'enjoy', 'appreciate', 'adore'], |
| 'hate': ['dislike', 'despise', 'detest'], |
| 'feature': ['function', 'capability', 'option', 'characteristic'], |
| 'customer': ['client', 'buyer', 'purchaser', 'consumer', 'user'] |
| } |
| |
| def expand_query_with_synonyms(self, query): |
| """ |
| Expand search query with synonyms for better semantic matching |
| This helps find relevant results even when different words are used |
| """ |
| query_words = query.lower().split() |
| expanded_terms = [] |
| |
| for word in query_words: |
| expanded_terms.append(word) |
| |
| |
| if word in self.synonyms: |
| expanded_terms.extend(self.synonyms[word]) |
| |
| |
| for key, syns in self.synonyms.items(): |
| if word in syns: |
| expanded_terms.append(key) |
| expanded_terms.extend([s for s in syns if s != word]) |
| |
| |
| seen = set() |
| unique_terms = [] |
| for term in expanded_terms: |
| if term not in seen: |
| unique_terms.append(term) |
| seen.add(term) |
| |
| return ' '.join(unique_terms) |
| |
| def build_index(self, df, text_column): |
| """ |
| Build search index from text data |
| Creates TF-IDF vectors for all documents to enable fast similarity search |
| """ |
| self.data = df.copy() |
| texts = df[text_column].fillna('').tolist() |
| |
| |
| if 'topic_1' in df.columns: |
| |
| texts = [f"{text} {df.iloc[i]['topic_1']} {df.iloc[i]['topic_2']} {df.iloc[i]['topic_3']}" |
| for i, text in enumerate(texts)] |
| if 'actionable_insights' in df.columns: |
| |
| texts = [f"{texts[i]} {df.iloc[i]['actionable_insights']}" |
| for i in range(len(texts))] |
| |
| |
| self.tfidf_matrix = self.vectorizer.fit_transform(texts) |
| |
| def search(self, query, top_k=10): |
| """ |
| Enhanced search with semantic understanding |
| Returns top matching documents with similarity scores |
| """ |
| |
| if self.tfidf_matrix is None: |
| return pd.DataFrame() |
| |
| |
| expanded_query = self.expand_query_with_synonyms(query) |
| |
| |
| query_vector = self.vectorizer.transform([query]) |
| expanded_vector = self.vectorizer.transform([expanded_query]) |
| |
| |
| similarities_orig = cosine_similarity(query_vector, self.tfidf_matrix).flatten() |
| similarities_exp = cosine_similarity(expanded_vector, self.tfidf_matrix).flatten() |
| |
| |
| combined_similarities = (0.7 * similarities_orig + 0.3 * similarities_exp) |
| |
| |
| top_indices = combined_similarities.argsort()[-top_k:][::-1] |
| top_scores = combined_similarities[top_indices] |
| |
| |
| valid_indices = [idx for idx, score in zip(top_indices, top_scores) if score > 0.05] |
| |
| if valid_indices: |
| |
| results = self.data.iloc[valid_indices].copy() |
| results['search_score'] = [combined_similarities[idx] for idx in valid_indices] |
| |
| |
| query_lower = query.lower() |
| for idx in results.index: |
| if 'combined_text' in results.columns: |
| |
| if query_lower in str(results.at[idx, 'combined_text']).lower(): |
| results.at[idx, 'search_score'] *= 1.5 |
| |
| return results.sort_values('search_score', ascending=False) |
| |
| return pd.DataFrame() |
|
|
| |
| class AIModelManager: |
| """ |
| Manages multiple AI model APIs and provides unified interface |
| Supports OpenAI, Anthropic, Deepseek, Groq, and Google Gemini |
| """ |
| |
| def __init__(self): |
| """Initialize the model manager and set up all available AI APIs""" |
| self.available_models = {} |
| self.clients = {} |
| self.current_model = None |
| self.initialize_apis() |
| |
| def initialize_apis(self): |
| """Initialize all available AI APIs based on environment variables""" |
| |
| |
| ANTHROPIC_API_KEY = os.getenv("ANTHROPIC_API_KEY") |
| if ANTHROPIC_API_KEY: |
| try: |
| self.clients['anthropic'] = Anthropic(api_key=ANTHROPIC_API_KEY) |
| |
| self.available_models['Claude 3 Haiku'] = { |
| 'provider': 'anthropic', |
| 'model': 'claude-3-haiku-20240307' |
| } |
| print(f"Anthropic API Key exists and begins {ANTHROPIC_API_KEY[:4]}") |
| except Exception as e: |
| print(f"Error initializing Anthropic: {e}") |
| else: |
| print("Anthropic API Key not set") |
| |
| |
| OPENAI_API_KEY = os.getenv("OPENAI_API_KEY") |
| if OPENAI_API_KEY and OpenAI: |
| try: |
| self.clients['openai'] = OpenAI(api_key=OPENAI_API_KEY) |
| |
| self.available_models['GPT-4o-mini'] = { |
| 'provider': 'openai', |
| 'model': 'gpt-4o-mini' |
| } |
| self.available_models['GPT-3.5 Turbo'] = { |
| 'provider': 'openai', |
| 'model': 'gpt-3.5-turbo' |
| } |
| print(f"OpenAI API Key exists and begins {OPENAI_API_KEY[:7]}") |
| except Exception as e: |
| print(f"Error initializing OpenAI: {e}") |
| else: |
| print("OpenAI API Key not set or library not installed") |
| |
| |
| DEEPSEEK_API_KEY = os.getenv("DEEPSEEK_API_KEY") |
| if DEEPSEEK_API_KEY and OpenAI: |
| try: |
| |
| self.clients['deepseek'] = OpenAI( |
| api_key=DEEPSEEK_API_KEY, |
| base_url="https://api.deepseek.com" |
| ) |
| self.available_models['Deepseek Chat'] = { |
| 'provider': 'deepseek', |
| 'model': 'deepseek-chat' |
| } |
| print(f"Deepseek API Key exists and begins {DEEPSEEK_API_KEY[:7]}") |
| except Exception as e: |
| print(f"Error initializing Deepseek: {e}") |
| else: |
| print("Deepseek API Key not set or OpenAI library not installed") |
| |
| |
| GROQ_API_KEY = os.getenv("GROQ_API_KEY") |
| if GROQ_API_KEY and Groq: |
| try: |
| self.clients['groq'] = Groq(api_key=GROQ_API_KEY) |
| |
| self.available_models['Llama 3.3 70B'] = { |
| 'provider': 'groq', |
| 'model': 'llama-3.3-70b-versatile' |
| } |
| self.available_models['Mixtral 8x7B'] = { |
| 'provider': 'groq', |
| 'model': 'mixtral-8x7b-32768' |
| } |
| print(f"Groq API Key exists and begins {GROQ_API_KEY[:4]}") |
| except Exception as e: |
| print(f"Error initializing Groq: {e}") |
| else: |
| print("Groq API Key not set or library not installed") |
| |
| |
| GOOGLE_API_KEY = os.getenv("GOOGLE_API_KEY") |
| if GOOGLE_API_KEY and genai: |
| try: |
| genai.configure(api_key=GOOGLE_API_KEY) |
| self.clients['google'] = genai |
| |
| self.available_models['Gemini 1.5 Flash'] = { |
| 'provider': 'google', |
| 'model': 'gemini-1.5-flash' |
| } |
| self.available_models['Gemini 1.5 Pro'] = { |
| 'provider': 'google', |
| 'model': 'gemini-1.5-pro' |
| } |
| print(f"Google API Key exists and begins {GOOGLE_API_KEY[:2]}") |
| except Exception as e: |
| print(f"Error initializing Google Gemini: {e}") |
| else: |
| print("Google API Key not set or library not installed") |
| |
| |
| if self.available_models: |
| self.current_model = list(self.available_models.keys())[0] |
| |
| def get_available_models(self): |
| """Return list of available model names""" |
| return list(self.available_models.keys()) |
| |
| def set_model(self, model_name): |
| """Set the current model for text generation""" |
| if model_name in self.available_models: |
| self.current_model = model_name |
| return True |
| return False |
| |
| def generate_text(self, prompt, max_tokens=1000): |
| """ |
| Generate text using the current model |
| Handles different API formats for each provider |
| """ |
| |
| if not self.current_model or self.current_model not in self.available_models: |
| return None |
| |
| model_info = self.available_models[self.current_model] |
| provider = model_info['provider'] |
| model = model_info['model'] |
| |
| try: |
| |
| if provider == 'anthropic': |
| client = self.clients['anthropic'] |
| response = client.messages.create( |
| model=model, |
| max_tokens=max_tokens, |
| messages=[{"role": "user", "content": prompt}] |
| ) |
| return response.content[0].text |
| |
| |
| elif provider in ['openai', 'deepseek']: |
| client = self.clients[provider] |
| response = client.chat.completions.create( |
| model=model, |
| messages=[{"role": "user", "content": prompt}], |
| max_tokens=max_tokens |
| ) |
| return response.choices[0].message.content |
| |
| |
| elif provider == 'groq': |
| client = self.clients['groq'] |
| response = client.chat.completions.create( |
| model=model, |
| messages=[{"role": "user", "content": prompt}], |
| max_tokens=max_tokens |
| ) |
| return response.choices[0].message.content |
| |
| |
| elif provider == 'google': |
| model_obj = genai.GenerativeModel(model) |
| response = model_obj.generate_content(prompt) |
| return response.text |
| |
| except Exception as e: |
| print(f"Error generating text with {self.current_model}: {e}") |
| return None |
|
|
| |
| model_manager = AIModelManager() |
|
|
| |
| class EnhancedTextAnalyzer: |
| """ |
| Main analysis engine with all enhanced features and multi-model support |
| This is the core class that orchestrates all text analysis functionality |
| """ |
| |
| def __init__(self, model_manager=None): |
| """Initialize the analyzer with all component classes""" |
| self.model_manager = model_manager |
| self.column_detector = SmartColumnDetector() |
| self.text_processor = EnhancedTextProcessor() |
| self.search_engine = TextSearchEngine() |
| self.original_df = None |
| self.processed_df = None |
| self.results = {} |
| self.visualizations = {} |
| |
| def load_file(self, file): |
| """ |
| Load data from various file formats (CSV, Excel, JSON) |
| Returns the loaded dataframe and a status message |
| """ |
| try: |
| |
| if file.name.endswith('.csv'): |
| df = pd.read_csv(file.name) |
| elif file.name.endswith(('.xlsx', '.xls')): |
| df = pd.read_excel(file.name) |
| elif file.name.endswith('.json'): |
| df = pd.read_json(file.name) |
| else: |
| return None, "Unsupported file format" |
| |
| return df, f"File loaded: {len(df)} records" |
| except Exception as e: |
| return None, f"Error loading file: {str(e)}" |
| |
| def process_data(self, df): |
| """ |
| Process data with smart extraction and analysis |
| This is the main processing pipeline that analyzes the uploaded data |
| """ |
| |
| extracted_df, detected_columns = self.column_detector.extract_relevant_data(df) |
| |
| |
| self.processed_df = extracted_df |
| |
| |
| del df |
| gc.collect() |
| |
| |
| if 'combined_text' in extracted_df.columns: |
| |
| sentiments = [] |
| polarities = [] |
| topics_1 = [] |
| topics_2 = [] |
| topics_3 = [] |
| insights = [] |
| |
| |
| for text in extracted_df['combined_text']: |
| |
| blob = TextBlob(text) |
| polarity = blob.sentiment.polarity |
| |
| |
| if polarity > 0.1: |
| sentiment = 'Positive' |
| elif polarity < -0.1: |
| sentiment = 'Negative' |
| else: |
| sentiment = 'Neutral' |
| |
| sentiments.append(sentiment) |
| polarities.append(polarity) |
| |
| |
| specific_topics = self.text_processor.extract_specific_topics(text) |
| topics_1.append(specific_topics[0]) |
| topics_2.append(specific_topics[1]) |
| topics_3.append(specific_topics[2]) |
| |
| |
| insight = self.text_processor.extract_actionable_insights(text) |
| insights.append(insight) |
| |
| |
| extracted_df['sentiment'] = sentiments |
| extracted_df['sentiment_score'] = polarities |
| extracted_df['topic_1'] = topics_1 |
| extracted_df['topic_2'] = topics_2 |
| extracted_df['topic_3'] = topics_3 |
| extracted_df['actionable_insights'] = insights |
| |
| |
| self.search_engine.build_index(extracted_df, 'combined_text') |
| |
| |
| output_file = 'processed_data.xlsx' |
| extracted_df.to_excel(output_file, index=False) |
| |
| |
| return extracted_df, detected_columns, output_file |
| |
| def generate_ai_insights(self, df, num_samples=5): |
| """ |
| Generate AI-powered insights using selected model |
| Takes sample texts and generates high-level insights using AI |
| """ |
| |
| if not self.model_manager or not self.model_manager.current_model: |
| return "No AI model available for generating insights" |
| |
| |
| if 'combined_text' not in df.columns or df.empty: |
| return "No text data available for AI analysis" |
| |
| |
| sample_texts = df['combined_text'].dropna().head(num_samples).tolist() |
| if not sample_texts: |
| return "No valid text samples found" |
| |
| |
| |
| prompt = f"""Analyze the following customer feedback samples and provide key insights: |
| |
| Samples: |
| {chr(10).join([f"{i+1}. {text[:200]}..." if len(text) > 200 else f"{i+1}. {text}" for i, text in enumerate(sample_texts)])} |
| |
| Please provide: |
| 1. Main themes and patterns |
| 2. Key sentiment indicators |
| 3. Actionable recommendations |
| 4. Areas of concern |
| |
| Keep the response concise and focused on actionable insights.""" |
|
|
| |
| try: |
| response = self.model_manager.generate_text(prompt, max_tokens=500) |
| if response: |
| return f"**AI Insights (using {self.model_manager.current_model}):**\n\n{response}" |
| else: |
| return "Failed to generate AI insights. Please check your API configuration." |
| except Exception as e: |
| return f"Error generating AI insights: {str(e)}" |
| |
| def generate_visualizations(self, df): |
| """ |
| Generate various visualizations from the analyzed data |
| Creates interactive charts using Plotly for better user experience |
| """ |
| visualizations = {} |
| |
| |
| if 'sentiment' in df.columns: |
| sentiment_counts = df['sentiment'].value_counts() |
| fig_sentiment = px.pie( |
| values=sentiment_counts.values, |
| names=sentiment_counts.index, |
| title="Sentiment Distribution", |
| color_discrete_map={ |
| 'Positive': '#27AE60', |
| 'Negative': '#E74C3C', |
| 'Neutral': '#95A5A6' |
| } |
| ) |
| visualizations['Sentiment Distribution'] = fig_sentiment |
| |
| |
| if 'topic_1' in df.columns: |
| |
| all_topics = [] |
| for col in ['topic_1', 'topic_2', 'topic_3']: |
| if col in df.columns: |
| topics = df[col].dropna().tolist() |
| all_topics.extend([t for t in topics if t != '']) |
| |
| if all_topics: |
| topic_counts = Counter(all_topics) |
| top_topics = dict(topic_counts.most_common(15)) |
| |
| fig_topics = px.bar( |
| x=list(top_topics.values()), |
| y=list(top_topics.keys()), |
| orientation='h', |
| title="Top 15 Specific Topics", |
| labels={'x': 'Count', 'y': 'Topic'} |
| ) |
| visualizations['Topic Distribution'] = fig_topics |
| |
| |
| if 'sentiment' in df.columns and 'topic_1' in df.columns: |
| df_temp = df[df['topic_1'] != ''].copy() |
| if not df_temp.empty: |
| |
| top_topics = df_temp['topic_1'].value_counts().head(10).index |
| df_filtered = df_temp[df_temp['topic_1'].isin(top_topics)] |
| |
| |
| pivot_table = pd.crosstab(df_filtered['topic_1'], df_filtered['sentiment']) |
| fig_heatmap = px.imshow( |
| pivot_table, |
| labels=dict(x="Sentiment", y="Primary Topic", color="Count"), |
| title="Sentiment by Primary Topic Heatmap", |
| color_continuous_scale="RdYlGn" |
| ) |
| visualizations['Sentiment by Topic'] = fig_heatmap |
| |
| |
| if 'date' in df.columns and 'sentiment' in df.columns: |
| df_time = df.copy() |
| df_time['date'] = pd.to_datetime(df_time['date']) |
| |
| time_data = df_time.groupby([pd.Grouper(key='date', freq='M'), 'sentiment']).size().reset_index(name='count') |
| |
| fig_timeline = px.line( |
| time_data, |
| x='date', |
| y='count', |
| color='sentiment', |
| title="Sentiment Trends Over Time", |
| color_discrete_map={ |
| 'Positive': '#27AE60', |
| 'Negative': '#E74C3C', |
| 'Neutral': '#95A5A6' |
| } |
| ) |
| visualizations['Sentiment Timeline'] = fig_timeline |
| |
| |
| if 'actionable_insights' in df.columns: |
| all_insights = [] |
| for insight in df['actionable_insights']: |
| if insight and insight != "": |
| |
| all_insights.extend([i.strip() for i in insight.split(',')]) |
| |
| if all_insights: |
| insight_counts = Counter(all_insights) |
| top_insights = dict(insight_counts.most_common(10)) |
| |
| fig_insights = px.bar( |
| x=list(top_insights.values()), |
| y=list(top_insights.keys()), |
| orientation='h', |
| title="Top 10 Actionable Insights", |
| labels={'x': 'Frequency', 'y': 'Insight'} |
| ) |
| visualizations['Top Insights'] = fig_insights |
| |
| return visualizations |
|
|
| |
| |
| analyzer = None |
| current_data = None |
| current_visualizations = None |
|
|
| def update_model(model_name): |
| """Update the selected AI model""" |
| global model_manager |
| |
| if model_manager.set_model(model_name): |
| return f"β
Model switched to: {model_name}" |
| else: |
| return f"β Failed to switch to: {model_name}" |
|
|
| def process_file(file, model_name): |
| """ |
| Process uploaded file with selected model |
| This is the main function called when user uploads a file |
| """ |
| global analyzer, current_data, current_visualizations, model_manager |
| |
| |
| if file is None: |
| return "Please upload a file", None, None, None, None, None, gr.update(choices=[]) |
| |
| try: |
| |
| if model_name and model_manager: |
| model_manager.set_model(model_name) |
| |
| |
| analyzer = EnhancedTextAnalyzer(model_manager) |
| |
| |
| df, message = analyzer.load_file(file) |
| if df is None: |
| return message, None, None, None, None, None, gr.update(choices=[]) |
| |
| |
| processed_df, detected_cols, output_file = analyzer.process_data(df) |
| current_data = processed_df |
| |
| |
| visualizations = analyzer.generate_visualizations(processed_df) |
| current_visualizations = visualizations |
| |
| |
| ai_insights = analyzer.generate_ai_insights(processed_df) |
| |
| |
| |
| text_cols = list(detected_cols.get('text_columns', []))[:3] if detected_cols.get('text_columns') else [] |
| id_cols = list(detected_cols.get('id_columns', []))[:3] if detected_cols.get('id_columns') else [] |
| product_cols = list(detected_cols.get('product_columns', []))[:3] if detected_cols.get('product_columns') else [] |
| |
| summary = f""" |
| ### β
File Processing Complete! |
| |
| **Detected Columns:** |
| - Text Columns: {', '.join(text_cols) if text_cols else 'None'} |
| - ID Columns: {', '.join(id_cols) if id_cols else 'Auto-generated'} |
| - Product Columns: {', '.join(product_cols) if product_cols else 'None'} |
| |
| **Analysis Results:** |
| - Total Records: {len(processed_df)} |
| - Processed File Saved: {output_file} |
| - AI Model Used: {model_manager.current_model if model_manager else 'None'} |
| """ |
| |
| |
| preview = processed_df.head(10) |
| |
| |
| first_viz = list(visualizations.values())[0] if visualizations else None |
| |
| |
| return ( |
| summary, |
| preview, |
| output_file, |
| ai_insights, |
| first_viz, |
| "Ready for search", |
| gr.update(choices=list(visualizations.keys())) |
| ) |
| |
| except Exception as e: |
| |
| return f"Error: {str(e)}", None, None, None, None, None, gr.update(choices=[]) |
|
|
| def search_data(query): |
| """ |
| Search through the data with enhanced semantic search |
| Uses the built search engine to find relevant text entries |
| """ |
| global analyzer, current_data |
| |
| |
| if analyzer is None or current_data is None: |
| return "Please process a file first", None, None |
| |
| |
| if not query: |
| return "Please enter a search query", None, None |
| |
| try: |
| |
| results = analyzer.search_engine.search(query, top_k=10) |
| |
| |
| if results.empty: |
| return "No results found", None, None |
| |
| |
| display_cols = ['unique_id', 'combined_text', 'sentiment', 'topic_1', 'topic_2', 'topic_3', 'actionable_insights', 'search_score'] |
| display_cols = [col for col in display_cols if col in results.columns] |
| |
| results_display = results[display_cols] |
| |
| |
| search_output = f"search_results_{datetime.now().strftime('%Y%m%d_%H%M%S')}.xlsx" |
| results_display.to_excel(search_output, index=False) |
| |
| |
| return f"Found {len(results)} results", results_display.head(10), search_output |
| |
| except Exception as e: |
| return f"Search error: {str(e)}", None, None |
|
|
| def update_visualization(viz_type): |
| """ |
| Update displayed visualization based on user selection |
| Called when user selects a different visualization from dropdown |
| """ |
| global current_visualizations |
| |
| |
| if current_visualizations and viz_type in current_visualizations: |
| return current_visualizations[viz_type] |
| return None |
|
|
| def export_results(format_type): |
| """ |
| Export processed data in different formats (Excel or CSV) |
| Allows users to download their analyzed data |
| """ |
| global current_data |
| |
| |
| if current_data is None: |
| return "No data to export", None |
| |
| try: |
| timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') |
| |
| |
| if format_type == "Excel": |
| output_file = f"analysis_results_{timestamp}.xlsx" |
| current_data.to_excel(output_file, index=False) |
| else: |
| output_file = f"analysis_results_{timestamp}.csv" |
| current_data.to_csv(output_file, index=False) |
| |
| return f"Data exported to {output_file}", output_file |
| |
| except Exception as e: |
| return f"Export error: {str(e)}", None |
|
|
| |
| def create_interface(): |
| """ |
| Create the Gradio interface with model selection |
| This function builds the entire web interface using Gradio |
| """ |
| |
| |
| with gr.Blocks(theme=gr.themes.Soft()) as app: |
| |
| gr.Markdown( |
| """ |
| # π Enhanced Text Analytics AI Agent |
| ### Smart Column Detection & Comprehensive Text Analysis with Multiple AI Models |
| |
| **Features:** |
| - π€ Multiple AI Model Support (OpenAI, Anthropic, Deepseek, Groq, Google) |
| - π Automatic detection of text, ID, and product columns |
| - πΎ Memory-efficient processing with automatic file cleanup |
| - π Sentiment analysis with scoring |
| - π― Topic/theme extraction |
| - π‘ Actionable insights generation |
| - π Advanced text search with similarity scoring |
| - π Multiple visualization options |
| - π₯ Export results in Excel or CSV format |
| """ |
| ) |
| |
| |
| with gr.Tab("π€ Upload & Process"): |
| with gr.Row(): |
| with gr.Column(scale=1): |
| |
| model_dropdown = gr.Dropdown( |
| label="π€ Select AI Model", |
| choices=model_manager.get_available_models(), |
| value=model_manager.current_model if model_manager.current_model else None, |
| interactive=True |
| ) |
| |
| |
| file_upload = gr.File( |
| label="Upload Data File", |
| file_types=[".csv", ".xlsx", ".xls", ".json"] |
| ) |
| |
| |
| process_btn = gr.Button("π Process File", variant="primary") |
| |
| with gr.Column(scale=2): |
| status_output = gr.Markdown(label="Processing Status") |
| ai_insights = gr.Markdown(label="AI-Generated Insights") |
| |
| |
| with gr.Row(): |
| data_preview = gr.Dataframe( |
| label="Data Preview (First 10 rows)", |
| interactive=False |
| ) |
| |
| |
| processed_file = gr.File( |
| label="π Processed Data File", |
| interactive=False |
| ) |
| |
| |
| with gr.Tab("π Search"): |
| gr.Markdown("### Search through your text data") |
| |
| with gr.Row(): |
| |
| search_input = gr.Textbox( |
| label="Enter search query", |
| placeholder="Type keywords to search..." |
| ) |
| |
| search_btn = gr.Button("π Search", variant="primary") |
| |
| |
| search_status = gr.Markdown(label="Search Status") |
| search_results = gr.Dataframe( |
| label="Search Results", |
| interactive=False |
| ) |
| search_file = gr.File( |
| label="π₯ Download Search Results", |
| interactive=False |
| ) |
| |
| |
| with gr.Tab("π Visualizations"): |
| with gr.Row(): |
| |
| viz_selector = gr.Dropdown( |
| label="Select Visualization", |
| choices=[], |
| interactive=True |
| ) |
| |
| |
| viz_plot = gr.Plot(label="Visualization") |
| |
| |
| with gr.Tab("π₯ Export"): |
| gr.Markdown("### Export your analyzed data") |
| |
| with gr.Row(): |
| |
| export_format = gr.Radio( |
| choices=["Excel", "CSV"], |
| value="Excel", |
| label="Export Format" |
| ) |
| |
| export_btn = gr.Button("π₯ Export Data", variant="primary") |
| |
| |
| export_status = gr.Markdown(label="Export Status") |
| export_file = gr.File( |
| label="π Download Exported File", |
| interactive=False |
| ) |
| |
| |
| |
| |
| |
| model_dropdown.change( |
| fn=update_model, |
| inputs=[model_dropdown], |
| outputs=[status_output] |
| ) |
| |
| |
| process_btn.click( |
| fn=process_file, |
| inputs=[file_upload, model_dropdown], |
| outputs=[ |
| status_output, |
| data_preview, |
| processed_file, |
| ai_insights, |
| viz_plot, |
| search_status, |
| viz_selector |
| ] |
| ) |
| |
| |
| search_btn.click( |
| fn=search_data, |
| inputs=[search_input], |
| outputs=[search_status, search_results, search_file] |
| ) |
| |
| |
| viz_selector.change( |
| fn=update_visualization, |
| inputs=[viz_selector], |
| outputs=[viz_plot] |
| ) |
| |
| |
| export_btn.click( |
| fn=export_results, |
| inputs=[export_format], |
| outputs=[export_status, export_file] |
| ) |
| |
| return app |
|
|
| |
| |
| if __name__ == "__main__": |
| app = create_interface() |
| app.launch(share=True, debug=True) |