Spaces:
Running
Running
Pulastya B
feat: Initial commit - Data Science Agent with React frontend and FastAPI backend
226ac39
| """ | |
| Computer Vision & Image Analytics Tools | |
| Advanced computer vision tools for image feature extraction, clustering, | |
| and hybrid tabular-image analysis. | |
| """ | |
| import polars as pl | |
| import numpy as np | |
| from typing import Dict, Any, List, Optional, Tuple | |
| from pathlib import Path | |
| import json | |
| # Core CV libraries (optional) | |
| try: | |
| from PIL import Image | |
| import cv2 | |
| CV2_AVAILABLE = True | |
| except ImportError: | |
| CV2_AVAILABLE = False | |
| try: | |
| import torch | |
| import torchvision | |
| from torchvision import models, transforms | |
| TORCH_AVAILABLE = True | |
| except ImportError: | |
| TORCH_AVAILABLE = False | |
| # ML libraries | |
| try: | |
| from sklearn.cluster import KMeans, DBSCAN | |
| from sklearn.decomposition import PCA | |
| from sklearn.preprocessing import StandardScaler | |
| from sklearn.manifold import TSNE | |
| except ImportError: | |
| pass | |
| def extract_image_features( | |
| image_paths: List[str], | |
| method: str = "cnn", | |
| model_name: str = "resnet50", | |
| color_spaces: Optional[List[str]] = None, | |
| include_histograms: bool = True, | |
| histogram_bins: int = 256 | |
| ) -> Dict[str, Any]: | |
| """ | |
| Extract features from images using CNN embeddings, color histograms, and other methods. | |
| Args: | |
| image_paths: List of paths to image files | |
| method: Feature extraction method ('cnn', 'color', 'texture', 'hybrid') | |
| model_name: Pre-trained model for CNN features ('resnet50', 'efficientnet_b0', 'vgg16') | |
| color_spaces: Color spaces for histograms (['rgb', 'hsv', 'lab']) | |
| include_histograms: Whether to include color histograms | |
| histogram_bins: Number of bins for histograms | |
| Returns: | |
| Dictionary containing feature vectors, dimensionality, and metadata | |
| """ | |
| print(f"🔍 Extracting image features using {method} method...") | |
| if not image_paths: | |
| raise ValueError("No image paths provided") | |
| result = { | |
| "method": method, | |
| "n_images": len(image_paths), | |
| "features": [], | |
| "feature_dim": 0, | |
| "failed_images": [] | |
| } | |
| try: | |
| if method == "cnn" and TORCH_AVAILABLE: | |
| print(f" Using CNN model: {model_name}") | |
| # Load pre-trained model | |
| if model_name == "resnet50": | |
| model = models.resnet50(pretrained=True) | |
| # Remove final classification layer | |
| model = torch.nn.Sequential(*list(model.children())[:-1]) | |
| elif model_name == "efficientnet_b0": | |
| model = models.efficientnet_b0(pretrained=True) | |
| model = torch.nn.Sequential(*list(model.children())[:-1]) | |
| elif model_name == "vgg16": | |
| model = models.vgg16(pretrained=True) | |
| model.classifier = torch.nn.Sequential(*list(model.classifier.children())[:-1]) | |
| else: | |
| raise ValueError(f"Unknown model '{model_name}'") | |
| model.eval() | |
| # Image preprocessing | |
| preprocess = transforms.Compose([ | |
| transforms.Resize(256), | |
| transforms.CenterCrop(224), | |
| transforms.ToTensor(), | |
| transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]) | |
| ]) | |
| # Extract features | |
| for img_path in image_paths: | |
| try: | |
| img = Image.open(img_path).convert('RGB') | |
| img_tensor = preprocess(img).unsqueeze(0) | |
| with torch.no_grad(): | |
| features = model(img_tensor) | |
| features = features.squeeze().numpy() | |
| result["features"].append({ | |
| "image_path": img_path, | |
| "feature_vector": features.tolist(), | |
| "feature_dim": len(features) | |
| }) | |
| except Exception as e: | |
| result["failed_images"].append({"path": img_path, "error": str(e)}) | |
| if result["features"]: | |
| result["feature_dim"] = result["features"][0]["feature_dim"] | |
| elif method in ["color", "hybrid"] or not TORCH_AVAILABLE: | |
| print(" Using color histogram features...") | |
| if not CV2_AVAILABLE: | |
| print("⚠️ OpenCV not available. Using PIL for basic features...") | |
| return _extract_features_basic(image_paths) | |
| color_spaces = color_spaces or ['rgb', 'hsv'] | |
| for img_path in image_paths: | |
| try: | |
| # Read image | |
| img = cv2.imread(img_path) | |
| if img is None: | |
| raise ValueError(f"Could not read image: {img_path}") | |
| feature_vector = [] | |
| # Color histograms | |
| if 'rgb' in color_spaces: | |
| for i in range(3): | |
| hist = cv2.calcHist([img], [i], None, [histogram_bins], [0, 256]) | |
| feature_vector.extend(hist.flatten().tolist()) | |
| if 'hsv' in color_spaces: | |
| hsv = cv2.cvtColor(img, cv2.COLOR_BGR2HSV) | |
| for i in range(3): | |
| hist = cv2.calcHist([hsv], [i], None, [histogram_bins], [0, 256]) | |
| feature_vector.extend(hist.flatten().tolist()) | |
| if 'lab' in color_spaces: | |
| lab = cv2.cvtColor(img, cv2.COLOR_BGR2LAB) | |
| for i in range(3): | |
| hist = cv2.calcHist([lab], [i], None, [histogram_bins], [0, 256]) | |
| feature_vector.extend(hist.flatten().tolist()) | |
| # Basic image stats | |
| feature_vector.extend([ | |
| img.shape[0], # height | |
| img.shape[1], # width | |
| img.mean(), # mean pixel value | |
| img.std() # std pixel value | |
| ]) | |
| result["features"].append({ | |
| "image_path": img_path, | |
| "feature_vector": feature_vector, | |
| "feature_dim": len(feature_vector) | |
| }) | |
| except Exception as e: | |
| result["failed_images"].append({"path": img_path, "error": str(e)}) | |
| if result["features"]: | |
| result["feature_dim"] = result["features"][0]["feature_dim"] | |
| elif method == "texture": | |
| print(" Extracting texture features...") | |
| if not CV2_AVAILABLE: | |
| raise ImportError("OpenCV required for texture features") | |
| for img_path in image_paths: | |
| try: | |
| img = cv2.imread(img_path, cv2.IMREAD_GRAYSCALE) | |
| if img is None: | |
| raise ValueError(f"Could not read image: {img_path}") | |
| # Edge detection | |
| edges = cv2.Canny(img, 100, 200) | |
| # Texture features | |
| feature_vector = [ | |
| edges.mean(), | |
| edges.std(), | |
| np.count_nonzero(edges) / edges.size, # edge density | |
| img.mean(), | |
| img.std() | |
| ] | |
| result["features"].append({ | |
| "image_path": img_path, | |
| "feature_vector": feature_vector, | |
| "feature_dim": len(feature_vector) | |
| }) | |
| except Exception as e: | |
| result["failed_images"].append({"path": img_path, "error": str(e)}) | |
| if result["features"]: | |
| result["feature_dim"] = result["features"][0]["feature_dim"] | |
| else: | |
| raise ValueError(f"Unknown method '{method}' or required libraries not available") | |
| print(f"✅ Feature extraction complete!") | |
| print(f" Processed: {len(result['features'])} images") | |
| print(f" Failed: {len(result['failed_images'])} images") | |
| print(f" Feature dimension: {result['feature_dim']}") | |
| return result | |
| except Exception as e: | |
| print(f"❌ Error during feature extraction: {str(e)}") | |
| raise | |
| def _extract_features_basic(image_paths: List[str]) -> Dict[str, Any]: | |
| """Fallback feature extraction using PIL when OpenCV/PyTorch not available.""" | |
| result = { | |
| "method": "basic_pil", | |
| "n_images": len(image_paths), | |
| "features": [], | |
| "feature_dim": 0, | |
| "failed_images": [] | |
| } | |
| for img_path in image_paths: | |
| try: | |
| img = Image.open(img_path).convert('RGB') | |
| img_array = np.array(img) | |
| # Basic statistics per channel | |
| feature_vector = [] | |
| for channel in range(3): | |
| channel_data = img_array[:, :, channel] | |
| feature_vector.extend([ | |
| channel_data.mean(), | |
| channel_data.std(), | |
| channel_data.min(), | |
| channel_data.max() | |
| ]) | |
| # Image dimensions | |
| feature_vector.extend([img_array.shape[0], img_array.shape[1]]) | |
| result["features"].append({ | |
| "image_path": img_path, | |
| "feature_vector": feature_vector, | |
| "feature_dim": len(feature_vector) | |
| }) | |
| except Exception as e: | |
| result["failed_images"].append({"path": img_path, "error": str(e)}) | |
| if result["features"]: | |
| result["feature_dim"] = result["features"][0]["feature_dim"] | |
| result["note"] = "Install torch, torchvision, and opencv for advanced features" | |
| return result | |
| def perform_image_clustering( | |
| features: Dict[str, Any], | |
| n_clusters: int = 5, | |
| method: str = "kmeans", | |
| reduce_dimensions: bool = True, | |
| target_dim: int = 50, | |
| return_similar_pairs: bool = True, | |
| top_k: int = 10 | |
| ) -> Dict[str, Any]: | |
| """ | |
| Cluster images based on extracted features and find similar images. | |
| Args: | |
| features: Output from extract_image_features | |
| n_clusters: Number of clusters | |
| method: Clustering method ('kmeans', 'dbscan') | |
| reduce_dimensions: Whether to reduce dimensions before clustering | |
| target_dim: Target dimensionality for reduction | |
| return_similar_pairs: Whether to return most similar image pairs | |
| top_k: Number of top similar pairs to return | |
| Returns: | |
| Dictionary containing cluster assignments, centroids, and similar pairs | |
| """ | |
| print(f"🔍 Clustering images using {method}...") | |
| if not features.get("features"): | |
| raise ValueError("No features provided for clustering") | |
| # Extract feature vectors | |
| feature_vectors = np.array([f["feature_vector"] for f in features["features"]]) | |
| image_paths = [f["image_path"] for f in features["features"]] | |
| print(f" Feature matrix shape: {feature_vectors.shape}") | |
| result = { | |
| "method": method, | |
| "n_images": len(image_paths), | |
| "n_clusters": n_clusters, | |
| "clusters": [] | |
| } | |
| try: | |
| # Normalize features | |
| scaler = StandardScaler() | |
| feature_vectors_scaled = scaler.fit_transform(feature_vectors) | |
| # Dimensionality reduction | |
| if reduce_dimensions and feature_vectors_scaled.shape[1] > target_dim: | |
| print(f" Reducing dimensions from {feature_vectors_scaled.shape[1]} to {target_dim}...") | |
| pca = PCA(n_components=target_dim) | |
| feature_vectors_reduced = pca.fit_transform(feature_vectors_scaled) | |
| result["explained_variance"] = float(pca.explained_variance_ratio_.sum()) | |
| print(f" Explained variance: {result['explained_variance']:.3f}") | |
| else: | |
| feature_vectors_reduced = feature_vectors_scaled | |
| # Clustering | |
| if method == "kmeans": | |
| clusterer = KMeans(n_clusters=n_clusters, random_state=42, n_init=10) | |
| labels = clusterer.fit_predict(feature_vectors_reduced) | |
| result["cluster_centers"] = clusterer.cluster_centers_.tolist() | |
| result["inertia"] = float(clusterer.inertia_) | |
| elif method == "dbscan": | |
| clusterer = DBSCAN(eps=0.5, min_samples=5) | |
| labels = clusterer.fit_predict(feature_vectors_reduced) | |
| n_clusters = len(set(labels)) - (1 if -1 in labels else 0) | |
| result["n_clusters"] = n_clusters | |
| result["n_noise_points"] = int((labels == -1).sum()) | |
| else: | |
| raise ValueError(f"Unknown method '{method}'. Use 'kmeans' or 'dbscan'") | |
| # Organize results by cluster | |
| for cluster_id in sorted(set(labels)): | |
| cluster_indices = np.where(labels == cluster_id)[0] | |
| cluster_images = [image_paths[i] for i in cluster_indices] | |
| cluster_info = { | |
| "cluster_id": int(cluster_id), | |
| "size": len(cluster_images), | |
| "images": cluster_images[:100] # Limit to first 100 | |
| } | |
| if method == "kmeans": | |
| # Calculate distances to centroid | |
| centroid = clusterer.cluster_centers_[cluster_id] | |
| distances = np.linalg.norm(feature_vectors_reduced[cluster_indices] - centroid, axis=1) | |
| # Representative images (closest to centroid) | |
| representative_indices = distances.argsort()[:5] | |
| cluster_info["representative_images"] = [ | |
| cluster_images[i] for i in representative_indices | |
| ] | |
| result["clusters"].append(cluster_info) | |
| # Find similar image pairs | |
| if return_similar_pairs: | |
| print(f" Finding top {top_k} similar image pairs...") | |
| from sklearn.metrics.pairwise import cosine_similarity | |
| similarity_matrix = cosine_similarity(feature_vectors_reduced) | |
| # Get upper triangle indices (avoid duplicates and self-similarity) | |
| triu_indices = np.triu_indices(len(image_paths), k=1) | |
| similarities = similarity_matrix[triu_indices] | |
| # Get top K most similar pairs | |
| top_indices = similarities.argsort()[-top_k:][::-1] | |
| similar_pairs = [] | |
| for idx in top_indices: | |
| i, j = triu_indices[0][idx], triu_indices[1][idx] | |
| similar_pairs.append({ | |
| "image1": image_paths[i], | |
| "image2": image_paths[j], | |
| "similarity": float(similarities[idx]) | |
| }) | |
| result["similar_pairs"] = similar_pairs | |
| # Visualize with t-SNE (if enough samples) | |
| if len(image_paths) >= 30: | |
| print(" Computing t-SNE for visualization...") | |
| tsne = TSNE(n_components=2, random_state=42, perplexity=min(30, len(image_paths)-1)) | |
| embeddings_2d = tsne.fit_transform(feature_vectors_reduced) | |
| result["tsne_embeddings"] = embeddings_2d.tolist() | |
| print(f"✅ Clustering complete!") | |
| print(f" Clusters: {len(result['clusters'])}") | |
| for cluster in result["clusters"]: | |
| print(f" Cluster {cluster['cluster_id']}: {cluster['size']} images") | |
| return result | |
| except Exception as e: | |
| print(f"❌ Error during clustering: {str(e)}") | |
| raise | |
| def analyze_tabular_image_hybrid( | |
| tabular_data: pl.DataFrame, | |
| image_column: str, | |
| target_column: Optional[str] = None, | |
| tabular_features: Optional[List[str]] = None, | |
| fusion_method: str = "concatenate", | |
| model_type: str = "classification", | |
| test_size: float = 0.2 | |
| ) -> Dict[str, Any]: | |
| """ | |
| Analyze datasets with both tabular and image data using multi-modal learning. | |
| Args: | |
| tabular_data: DataFrame with tabular features and image paths | |
| image_column: Column containing image file paths | |
| target_column: Target variable column (if supervised learning) | |
| tabular_features: List of tabular feature columns (if None, uses all except image/target) | |
| fusion_method: How to combine features ('concatenate', 'attention', 'early', 'late') | |
| model_type: Type of task ('classification', 'regression') | |
| test_size: Proportion of data for testing | |
| Returns: | |
| Dictionary containing model performance, feature importance, and predictions | |
| """ | |
| print(f"🔍 Analyzing hybrid tabular-image data...") | |
| # Validate input | |
| if image_column not in tabular_data.columns: | |
| raise ValueError(f"Image column '{image_column}' not found in DataFrame") | |
| if target_column and target_column not in tabular_data.columns: | |
| raise ValueError(f"Target column '{target_column}' not found in DataFrame") | |
| # Determine tabular features | |
| if tabular_features is None: | |
| exclude_cols = [image_column] | |
| if target_column: | |
| exclude_cols.append(target_column) | |
| tabular_features = [col for col in tabular_data.columns if col not in exclude_cols] | |
| print(f" Tabular features: {len(tabular_features)}") | |
| print(f" Image column: {image_column}") | |
| print(f" Target column: {target_column}") | |
| result = { | |
| "n_samples": tabular_data.shape[0], | |
| "n_tabular_features": len(tabular_features), | |
| "fusion_method": fusion_method, | |
| "model_type": model_type | |
| } | |
| try: | |
| # Step 1: Extract image features | |
| print("\n Step 1: Extracting image features...") | |
| image_paths = tabular_data[image_column].to_list() | |
| # Use CNN features if available, otherwise color histograms | |
| method = "cnn" if TORCH_AVAILABLE else "color" | |
| image_features_result = extract_image_features( | |
| image_paths, | |
| method=method, | |
| model_name="resnet50" if TORCH_AVAILABLE else None | |
| ) | |
| # Build image feature matrix | |
| image_feature_matrix = np.array([ | |
| f["feature_vector"] for f in image_features_result["features"] | |
| ]) | |
| print(f" Image features shape: {image_feature_matrix.shape}") | |
| # Step 2: Prepare tabular features | |
| print("\n Step 2: Preparing tabular features...") | |
| tabular_feature_matrix = tabular_data.select(tabular_features).to_numpy() | |
| # Handle missing values | |
| from sklearn.impute import SimpleImputer | |
| imputer = SimpleImputer(strategy='mean') | |
| tabular_feature_matrix = imputer.fit_transform(tabular_feature_matrix) | |
| print(f" Tabular features shape: {tabular_feature_matrix.shape}") | |
| # Step 3: Fusion | |
| print(f"\n Step 3: Fusing features using '{fusion_method}' method...") | |
| if fusion_method == "concatenate" or fusion_method == "early": | |
| # Simple concatenation | |
| combined_features = np.hstack([tabular_feature_matrix, image_feature_matrix]) | |
| result["combined_feature_dim"] = combined_features.shape[1] | |
| elif fusion_method == "late": | |
| # Train separate models and combine predictions | |
| combined_features = tabular_feature_matrix # Will handle separately | |
| result["combined_feature_dim"] = tabular_feature_matrix.shape[1] | |
| result["image_feature_dim"] = image_feature_matrix.shape[1] | |
| else: | |
| raise ValueError(f"Unknown fusion method '{fusion_method}'") | |
| print(f" Combined features shape: {combined_features.shape}") | |
| # Step 4: Train model (if target provided) | |
| if target_column: | |
| print(f"\n Step 4: Training {model_type} model...") | |
| target = tabular_data[target_column].to_numpy() | |
| # Split data | |
| from sklearn.model_selection import train_test_split | |
| X_train, X_test, y_train, y_test = train_test_split( | |
| combined_features, target, test_size=test_size, random_state=42 | |
| ) | |
| # Train model | |
| if model_type == "classification": | |
| from sklearn.ensemble import RandomForestClassifier | |
| model = RandomForestClassifier(n_estimators=100, random_state=42) | |
| model.fit(X_train, y_train) | |
| # Evaluate | |
| from sklearn.metrics import accuracy_score, classification_report | |
| train_pred = model.predict(X_train) | |
| test_pred = model.predict(X_test) | |
| result["train_accuracy"] = float(accuracy_score(y_train, train_pred)) | |
| result["test_accuracy"] = float(accuracy_score(y_test, test_pred)) | |
| # Classification report | |
| report = classification_report(y_test, test_pred, output_dict=True) | |
| result["classification_report"] = report | |
| elif model_type == "regression": | |
| from sklearn.ensemble import RandomForestRegressor | |
| model = RandomForestRegressor(n_estimators=100, random_state=42) | |
| model.fit(X_train, y_train) | |
| # Evaluate | |
| from sklearn.metrics import mean_squared_error, r2_score, mean_absolute_error | |
| train_pred = model.predict(X_train) | |
| test_pred = model.predict(X_test) | |
| result["train_rmse"] = float(np.sqrt(mean_squared_error(y_train, train_pred))) | |
| result["test_rmse"] = float(np.sqrt(mean_squared_error(y_test, test_pred))) | |
| result["train_r2"] = float(r2_score(y_train, train_pred)) | |
| result["test_r2"] = float(r2_score(y_test, test_pred)) | |
| result["test_mae"] = float(mean_absolute_error(y_test, test_pred)) | |
| # Feature importance | |
| if fusion_method == "concatenate": | |
| feature_names = tabular_features + [f"image_feat_{i}" for i in range(image_feature_matrix.shape[1])] | |
| # Top 20 most important features | |
| importances = model.feature_importances_ | |
| top_indices = importances.argsort()[-20:][::-1] | |
| result["top_features"] = [ | |
| { | |
| "feature": feature_names[i], | |
| "importance": float(importances[i]) | |
| } | |
| for i in top_indices | |
| ] | |
| # Compare tabular vs image feature importance | |
| tabular_importance = importances[:len(tabular_features)].sum() | |
| image_importance = importances[len(tabular_features):].sum() | |
| result["feature_importance_split"] = { | |
| "tabular": float(tabular_importance), | |
| "image": float(image_importance), | |
| "tabular_percentage": float(tabular_importance / importances.sum() * 100), | |
| "image_percentage": float(image_importance / importances.sum() * 100) | |
| } | |
| print(f"\n✅ Hybrid analysis complete!") | |
| if target_column: | |
| if model_type == "classification": | |
| print(f" Test accuracy: {result['test_accuracy']:.4f}") | |
| else: | |
| print(f" Test R²: {result['test_r2']:.4f}") | |
| print(f" Test RMSE: {result['test_rmse']:.4f}") | |
| return result | |
| except Exception as e: | |
| print(f"❌ Error during hybrid analysis: {str(e)}") | |
| raise | |