Data-Science-Agent / src /tools /computer_vision.py
Pulastya B
feat: Initial commit - Data Science Agent with React frontend and FastAPI backend
226ac39
"""
Computer Vision & Image Analytics Tools
Advanced computer vision tools for image feature extraction, clustering,
and hybrid tabular-image analysis.
"""
import polars as pl
import numpy as np
from typing import Dict, Any, List, Optional, Tuple
from pathlib import Path
import json
# Core CV libraries (optional)
try:
from PIL import Image
import cv2
CV2_AVAILABLE = True
except ImportError:
CV2_AVAILABLE = False
try:
import torch
import torchvision
from torchvision import models, transforms
TORCH_AVAILABLE = True
except ImportError:
TORCH_AVAILABLE = False
# ML libraries
try:
from sklearn.cluster import KMeans, DBSCAN
from sklearn.decomposition import PCA
from sklearn.preprocessing import StandardScaler
from sklearn.manifold import TSNE
except ImportError:
pass
def extract_image_features(
image_paths: List[str],
method: str = "cnn",
model_name: str = "resnet50",
color_spaces: Optional[List[str]] = None,
include_histograms: bool = True,
histogram_bins: int = 256
) -> Dict[str, Any]:
"""
Extract features from images using CNN embeddings, color histograms, and other methods.
Args:
image_paths: List of paths to image files
method: Feature extraction method ('cnn', 'color', 'texture', 'hybrid')
model_name: Pre-trained model for CNN features ('resnet50', 'efficientnet_b0', 'vgg16')
color_spaces: Color spaces for histograms (['rgb', 'hsv', 'lab'])
include_histograms: Whether to include color histograms
histogram_bins: Number of bins for histograms
Returns:
Dictionary containing feature vectors, dimensionality, and metadata
"""
print(f"🔍 Extracting image features using {method} method...")
if not image_paths:
raise ValueError("No image paths provided")
result = {
"method": method,
"n_images": len(image_paths),
"features": [],
"feature_dim": 0,
"failed_images": []
}
try:
if method == "cnn" and TORCH_AVAILABLE:
print(f" Using CNN model: {model_name}")
# Load pre-trained model
if model_name == "resnet50":
model = models.resnet50(pretrained=True)
# Remove final classification layer
model = torch.nn.Sequential(*list(model.children())[:-1])
elif model_name == "efficientnet_b0":
model = models.efficientnet_b0(pretrained=True)
model = torch.nn.Sequential(*list(model.children())[:-1])
elif model_name == "vgg16":
model = models.vgg16(pretrained=True)
model.classifier = torch.nn.Sequential(*list(model.classifier.children())[:-1])
else:
raise ValueError(f"Unknown model '{model_name}'")
model.eval()
# Image preprocessing
preprocess = transforms.Compose([
transforms.Resize(256),
transforms.CenterCrop(224),
transforms.ToTensor(),
transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])
# Extract features
for img_path in image_paths:
try:
img = Image.open(img_path).convert('RGB')
img_tensor = preprocess(img).unsqueeze(0)
with torch.no_grad():
features = model(img_tensor)
features = features.squeeze().numpy()
result["features"].append({
"image_path": img_path,
"feature_vector": features.tolist(),
"feature_dim": len(features)
})
except Exception as e:
result["failed_images"].append({"path": img_path, "error": str(e)})
if result["features"]:
result["feature_dim"] = result["features"][0]["feature_dim"]
elif method in ["color", "hybrid"] or not TORCH_AVAILABLE:
print(" Using color histogram features...")
if not CV2_AVAILABLE:
print("⚠️ OpenCV not available. Using PIL for basic features...")
return _extract_features_basic(image_paths)
color_spaces = color_spaces or ['rgb', 'hsv']
for img_path in image_paths:
try:
# Read image
img = cv2.imread(img_path)
if img is None:
raise ValueError(f"Could not read image: {img_path}")
feature_vector = []
# Color histograms
if 'rgb' in color_spaces:
for i in range(3):
hist = cv2.calcHist([img], [i], None, [histogram_bins], [0, 256])
feature_vector.extend(hist.flatten().tolist())
if 'hsv' in color_spaces:
hsv = cv2.cvtColor(img, cv2.COLOR_BGR2HSV)
for i in range(3):
hist = cv2.calcHist([hsv], [i], None, [histogram_bins], [0, 256])
feature_vector.extend(hist.flatten().tolist())
if 'lab' in color_spaces:
lab = cv2.cvtColor(img, cv2.COLOR_BGR2LAB)
for i in range(3):
hist = cv2.calcHist([lab], [i], None, [histogram_bins], [0, 256])
feature_vector.extend(hist.flatten().tolist())
# Basic image stats
feature_vector.extend([
img.shape[0], # height
img.shape[1], # width
img.mean(), # mean pixel value
img.std() # std pixel value
])
result["features"].append({
"image_path": img_path,
"feature_vector": feature_vector,
"feature_dim": len(feature_vector)
})
except Exception as e:
result["failed_images"].append({"path": img_path, "error": str(e)})
if result["features"]:
result["feature_dim"] = result["features"][0]["feature_dim"]
elif method == "texture":
print(" Extracting texture features...")
if not CV2_AVAILABLE:
raise ImportError("OpenCV required for texture features")
for img_path in image_paths:
try:
img = cv2.imread(img_path, cv2.IMREAD_GRAYSCALE)
if img is None:
raise ValueError(f"Could not read image: {img_path}")
# Edge detection
edges = cv2.Canny(img, 100, 200)
# Texture features
feature_vector = [
edges.mean(),
edges.std(),
np.count_nonzero(edges) / edges.size, # edge density
img.mean(),
img.std()
]
result["features"].append({
"image_path": img_path,
"feature_vector": feature_vector,
"feature_dim": len(feature_vector)
})
except Exception as e:
result["failed_images"].append({"path": img_path, "error": str(e)})
if result["features"]:
result["feature_dim"] = result["features"][0]["feature_dim"]
else:
raise ValueError(f"Unknown method '{method}' or required libraries not available")
print(f"✅ Feature extraction complete!")
print(f" Processed: {len(result['features'])} images")
print(f" Failed: {len(result['failed_images'])} images")
print(f" Feature dimension: {result['feature_dim']}")
return result
except Exception as e:
print(f"❌ Error during feature extraction: {str(e)}")
raise
def _extract_features_basic(image_paths: List[str]) -> Dict[str, Any]:
"""Fallback feature extraction using PIL when OpenCV/PyTorch not available."""
result = {
"method": "basic_pil",
"n_images": len(image_paths),
"features": [],
"feature_dim": 0,
"failed_images": []
}
for img_path in image_paths:
try:
img = Image.open(img_path).convert('RGB')
img_array = np.array(img)
# Basic statistics per channel
feature_vector = []
for channel in range(3):
channel_data = img_array[:, :, channel]
feature_vector.extend([
channel_data.mean(),
channel_data.std(),
channel_data.min(),
channel_data.max()
])
# Image dimensions
feature_vector.extend([img_array.shape[0], img_array.shape[1]])
result["features"].append({
"image_path": img_path,
"feature_vector": feature_vector,
"feature_dim": len(feature_vector)
})
except Exception as e:
result["failed_images"].append({"path": img_path, "error": str(e)})
if result["features"]:
result["feature_dim"] = result["features"][0]["feature_dim"]
result["note"] = "Install torch, torchvision, and opencv for advanced features"
return result
def perform_image_clustering(
features: Dict[str, Any],
n_clusters: int = 5,
method: str = "kmeans",
reduce_dimensions: bool = True,
target_dim: int = 50,
return_similar_pairs: bool = True,
top_k: int = 10
) -> Dict[str, Any]:
"""
Cluster images based on extracted features and find similar images.
Args:
features: Output from extract_image_features
n_clusters: Number of clusters
method: Clustering method ('kmeans', 'dbscan')
reduce_dimensions: Whether to reduce dimensions before clustering
target_dim: Target dimensionality for reduction
return_similar_pairs: Whether to return most similar image pairs
top_k: Number of top similar pairs to return
Returns:
Dictionary containing cluster assignments, centroids, and similar pairs
"""
print(f"🔍 Clustering images using {method}...")
if not features.get("features"):
raise ValueError("No features provided for clustering")
# Extract feature vectors
feature_vectors = np.array([f["feature_vector"] for f in features["features"]])
image_paths = [f["image_path"] for f in features["features"]]
print(f" Feature matrix shape: {feature_vectors.shape}")
result = {
"method": method,
"n_images": len(image_paths),
"n_clusters": n_clusters,
"clusters": []
}
try:
# Normalize features
scaler = StandardScaler()
feature_vectors_scaled = scaler.fit_transform(feature_vectors)
# Dimensionality reduction
if reduce_dimensions and feature_vectors_scaled.shape[1] > target_dim:
print(f" Reducing dimensions from {feature_vectors_scaled.shape[1]} to {target_dim}...")
pca = PCA(n_components=target_dim)
feature_vectors_reduced = pca.fit_transform(feature_vectors_scaled)
result["explained_variance"] = float(pca.explained_variance_ratio_.sum())
print(f" Explained variance: {result['explained_variance']:.3f}")
else:
feature_vectors_reduced = feature_vectors_scaled
# Clustering
if method == "kmeans":
clusterer = KMeans(n_clusters=n_clusters, random_state=42, n_init=10)
labels = clusterer.fit_predict(feature_vectors_reduced)
result["cluster_centers"] = clusterer.cluster_centers_.tolist()
result["inertia"] = float(clusterer.inertia_)
elif method == "dbscan":
clusterer = DBSCAN(eps=0.5, min_samples=5)
labels = clusterer.fit_predict(feature_vectors_reduced)
n_clusters = len(set(labels)) - (1 if -1 in labels else 0)
result["n_clusters"] = n_clusters
result["n_noise_points"] = int((labels == -1).sum())
else:
raise ValueError(f"Unknown method '{method}'. Use 'kmeans' or 'dbscan'")
# Organize results by cluster
for cluster_id in sorted(set(labels)):
cluster_indices = np.where(labels == cluster_id)[0]
cluster_images = [image_paths[i] for i in cluster_indices]
cluster_info = {
"cluster_id": int(cluster_id),
"size": len(cluster_images),
"images": cluster_images[:100] # Limit to first 100
}
if method == "kmeans":
# Calculate distances to centroid
centroid = clusterer.cluster_centers_[cluster_id]
distances = np.linalg.norm(feature_vectors_reduced[cluster_indices] - centroid, axis=1)
# Representative images (closest to centroid)
representative_indices = distances.argsort()[:5]
cluster_info["representative_images"] = [
cluster_images[i] for i in representative_indices
]
result["clusters"].append(cluster_info)
# Find similar image pairs
if return_similar_pairs:
print(f" Finding top {top_k} similar image pairs...")
from sklearn.metrics.pairwise import cosine_similarity
similarity_matrix = cosine_similarity(feature_vectors_reduced)
# Get upper triangle indices (avoid duplicates and self-similarity)
triu_indices = np.triu_indices(len(image_paths), k=1)
similarities = similarity_matrix[triu_indices]
# Get top K most similar pairs
top_indices = similarities.argsort()[-top_k:][::-1]
similar_pairs = []
for idx in top_indices:
i, j = triu_indices[0][idx], triu_indices[1][idx]
similar_pairs.append({
"image1": image_paths[i],
"image2": image_paths[j],
"similarity": float(similarities[idx])
})
result["similar_pairs"] = similar_pairs
# Visualize with t-SNE (if enough samples)
if len(image_paths) >= 30:
print(" Computing t-SNE for visualization...")
tsne = TSNE(n_components=2, random_state=42, perplexity=min(30, len(image_paths)-1))
embeddings_2d = tsne.fit_transform(feature_vectors_reduced)
result["tsne_embeddings"] = embeddings_2d.tolist()
print(f"✅ Clustering complete!")
print(f" Clusters: {len(result['clusters'])}")
for cluster in result["clusters"]:
print(f" Cluster {cluster['cluster_id']}: {cluster['size']} images")
return result
except Exception as e:
print(f"❌ Error during clustering: {str(e)}")
raise
def analyze_tabular_image_hybrid(
tabular_data: pl.DataFrame,
image_column: str,
target_column: Optional[str] = None,
tabular_features: Optional[List[str]] = None,
fusion_method: str = "concatenate",
model_type: str = "classification",
test_size: float = 0.2
) -> Dict[str, Any]:
"""
Analyze datasets with both tabular and image data using multi-modal learning.
Args:
tabular_data: DataFrame with tabular features and image paths
image_column: Column containing image file paths
target_column: Target variable column (if supervised learning)
tabular_features: List of tabular feature columns (if None, uses all except image/target)
fusion_method: How to combine features ('concatenate', 'attention', 'early', 'late')
model_type: Type of task ('classification', 'regression')
test_size: Proportion of data for testing
Returns:
Dictionary containing model performance, feature importance, and predictions
"""
print(f"🔍 Analyzing hybrid tabular-image data...")
# Validate input
if image_column not in tabular_data.columns:
raise ValueError(f"Image column '{image_column}' not found in DataFrame")
if target_column and target_column not in tabular_data.columns:
raise ValueError(f"Target column '{target_column}' not found in DataFrame")
# Determine tabular features
if tabular_features is None:
exclude_cols = [image_column]
if target_column:
exclude_cols.append(target_column)
tabular_features = [col for col in tabular_data.columns if col not in exclude_cols]
print(f" Tabular features: {len(tabular_features)}")
print(f" Image column: {image_column}")
print(f" Target column: {target_column}")
result = {
"n_samples": tabular_data.shape[0],
"n_tabular_features": len(tabular_features),
"fusion_method": fusion_method,
"model_type": model_type
}
try:
# Step 1: Extract image features
print("\n Step 1: Extracting image features...")
image_paths = tabular_data[image_column].to_list()
# Use CNN features if available, otherwise color histograms
method = "cnn" if TORCH_AVAILABLE else "color"
image_features_result = extract_image_features(
image_paths,
method=method,
model_name="resnet50" if TORCH_AVAILABLE else None
)
# Build image feature matrix
image_feature_matrix = np.array([
f["feature_vector"] for f in image_features_result["features"]
])
print(f" Image features shape: {image_feature_matrix.shape}")
# Step 2: Prepare tabular features
print("\n Step 2: Preparing tabular features...")
tabular_feature_matrix = tabular_data.select(tabular_features).to_numpy()
# Handle missing values
from sklearn.impute import SimpleImputer
imputer = SimpleImputer(strategy='mean')
tabular_feature_matrix = imputer.fit_transform(tabular_feature_matrix)
print(f" Tabular features shape: {tabular_feature_matrix.shape}")
# Step 3: Fusion
print(f"\n Step 3: Fusing features using '{fusion_method}' method...")
if fusion_method == "concatenate" or fusion_method == "early":
# Simple concatenation
combined_features = np.hstack([tabular_feature_matrix, image_feature_matrix])
result["combined_feature_dim"] = combined_features.shape[1]
elif fusion_method == "late":
# Train separate models and combine predictions
combined_features = tabular_feature_matrix # Will handle separately
result["combined_feature_dim"] = tabular_feature_matrix.shape[1]
result["image_feature_dim"] = image_feature_matrix.shape[1]
else:
raise ValueError(f"Unknown fusion method '{fusion_method}'")
print(f" Combined features shape: {combined_features.shape}")
# Step 4: Train model (if target provided)
if target_column:
print(f"\n Step 4: Training {model_type} model...")
target = tabular_data[target_column].to_numpy()
# Split data
from sklearn.model_selection import train_test_split
X_train, X_test, y_train, y_test = train_test_split(
combined_features, target, test_size=test_size, random_state=42
)
# Train model
if model_type == "classification":
from sklearn.ensemble import RandomForestClassifier
model = RandomForestClassifier(n_estimators=100, random_state=42)
model.fit(X_train, y_train)
# Evaluate
from sklearn.metrics import accuracy_score, classification_report
train_pred = model.predict(X_train)
test_pred = model.predict(X_test)
result["train_accuracy"] = float(accuracy_score(y_train, train_pred))
result["test_accuracy"] = float(accuracy_score(y_test, test_pred))
# Classification report
report = classification_report(y_test, test_pred, output_dict=True)
result["classification_report"] = report
elif model_type == "regression":
from sklearn.ensemble import RandomForestRegressor
model = RandomForestRegressor(n_estimators=100, random_state=42)
model.fit(X_train, y_train)
# Evaluate
from sklearn.metrics import mean_squared_error, r2_score, mean_absolute_error
train_pred = model.predict(X_train)
test_pred = model.predict(X_test)
result["train_rmse"] = float(np.sqrt(mean_squared_error(y_train, train_pred)))
result["test_rmse"] = float(np.sqrt(mean_squared_error(y_test, test_pred)))
result["train_r2"] = float(r2_score(y_train, train_pred))
result["test_r2"] = float(r2_score(y_test, test_pred))
result["test_mae"] = float(mean_absolute_error(y_test, test_pred))
# Feature importance
if fusion_method == "concatenate":
feature_names = tabular_features + [f"image_feat_{i}" for i in range(image_feature_matrix.shape[1])]
# Top 20 most important features
importances = model.feature_importances_
top_indices = importances.argsort()[-20:][::-1]
result["top_features"] = [
{
"feature": feature_names[i],
"importance": float(importances[i])
}
for i in top_indices
]
# Compare tabular vs image feature importance
tabular_importance = importances[:len(tabular_features)].sum()
image_importance = importances[len(tabular_features):].sum()
result["feature_importance_split"] = {
"tabular": float(tabular_importance),
"image": float(image_importance),
"tabular_percentage": float(tabular_importance / importances.sum() * 100),
"image_percentage": float(image_importance / importances.sum() * 100)
}
print(f"\n✅ Hybrid analysis complete!")
if target_column:
if model_type == "classification":
print(f" Test accuracy: {result['test_accuracy']:.4f}")
else:
print(f" Test R²: {result['test_r2']:.4f}")
print(f" Test RMSE: {result['test_rmse']:.4f}")
return result
except Exception as e:
print(f"❌ Error during hybrid analysis: {str(e)}")
raise