BladeSzaSza's picture
version bump agentic analysis added
7e08013
"""
Agentic Analysis Module for Laban Movement Analysis
This module provides intelligent analysis capabilities that go beyond raw pose detection
to offer meaningful insights about movement patterns, quality, and characteristics.
"""
import numpy as np
from collections import Counter
from typing import Dict, List, Any, Optional
def generate_agentic_analysis(
json_data: Dict[str, Any],
analysis_type: str,
filter_direction: str = "any",
filter_intensity: str = "any",
filter_min_fluidity: float = 0.0,
filter_min_expansion: float = 0.0
) -> Dict[str, Any]:
"""
Generate intelligent analysis based on JSON data and selected type.
Args:
json_data: Movement analysis data from pose estimation
analysis_type: Type of analysis ("summary", "structured", "movement_filters")
filter_direction: Direction filter for movement_filters analysis
filter_intensity: Intensity filter for movement_filters analysis
filter_min_fluidity: Minimum fluidity threshold for movement_filters
filter_min_expansion: Minimum expansion threshold for movement_filters
Returns:
Dictionary containing analysis results or error information
"""
if not json_data or "error" in json_data:
return {"error": "No valid analysis data available. Please run Standard Analysis first."}
try:
# Extract movement data
if "movement_analysis" not in json_data or "frames" not in json_data["movement_analysis"]:
return {"error": "Invalid analysis format - missing movement data"}
frames = json_data["movement_analysis"]["frames"]
video_info = json_data.get("video_info", {})
model_info = json_data.get("analysis_metadata", {}).get("model_info", {})
if not frames:
return {"error": "No movement data found in analysis"}
if analysis_type == "summary":
return generate_summary_analysis(frames, video_info, model_info)
elif analysis_type == "structured":
return generate_structured_analysis(frames, video_info, model_info)
elif analysis_type == "movement_filters":
return generate_movement_filter_analysis(
frames, video_info, model_info,
filter_direction, filter_intensity,
filter_min_fluidity, filter_min_expansion
)
else:
return {"error": "Unknown analysis type"}
except Exception as e:
return {"error": f"Analysis failed: {str(e)}"}
def generate_summary_analysis(frames: List[Dict], video_info: Dict, model_info: Dict) -> Dict[str, Any]:
"""
Generate a comprehensive movement summary with temporal patterns.
Args:
frames: List of frame analysis data
video_info: Video metadata
model_info: Model information
Returns:
Dictionary containing summary analysis with narrative interpretation
"""
# Extract metrics
directions = [f.get("metrics", {}).get("direction", "stationary") for f in frames]
intensities = [f.get("metrics", {}).get("intensity", "low") for f in frames]
speeds = [f.get("metrics", {}).get("speed", "slow") for f in frames]
velocities = [f.get("metrics", {}).get("velocity", 0) for f in frames]
accelerations = [f.get("metrics", {}).get("acceleration", 0) for f in frames]
fluidities = [f.get("metrics", {}).get("fluidity", 0) for f in frames]
expansions = [f.get("metrics", {}).get("expansion", 0) for f in frames]
# Calculate dominant characteristics
dominant_direction = Counter(directions).most_common(1)[0][0]
dominant_intensity = Counter(intensities).most_common(1)[0][0]
dominant_speed = Counter(speeds).most_common(1)[0][0]
# Calculate temporal patterns
direction_changes = sum(1 for i in range(1, len(directions)) if directions[i] != directions[i-1])
intensity_changes = sum(1 for i in range(1, len(intensities)) if intensities[i] != intensities[i-1])
# Statistical analysis
avg_velocity = np.mean(velocities) if velocities else 0
max_velocity = np.max(velocities) if velocities else 0
avg_acceleration = np.mean(accelerations) if accelerations else 0
avg_fluidity = np.mean(fluidities) if fluidities else 0
avg_expansion = np.mean(expansions) if expansions else 0
# Movement complexity score
complexity_score = (direction_changes / len(frames)) * 0.4 + (intensity_changes / len(frames)) * 0.3 + (avg_fluidity * 0.3)
# Generate narrative summary
duration = video_info.get("duration_seconds", len(frames) / video_info.get("fps", 25))
return {
"analysis_type": "Movement Summary",
"model_used": model_info.get("name", "unknown"),
"video_duration": f"{duration:.1f} seconds",
"total_frames": len(frames),
"dominant_characteristics": {
"primary_direction": dominant_direction,
"primary_intensity": dominant_intensity,
"primary_speed": dominant_speed
},
"temporal_patterns": {
"direction_transitions": direction_changes,
"intensity_variations": intensity_changes,
"movement_consistency": f"{(1 - direction_changes/len(frames))*100:.1f}%",
"complexity_score": f"{complexity_score:.3f}"
},
"movement_quality": {
"average_fluidity": f"{avg_fluidity:.3f}",
"average_expansion": f"{avg_expansion:.3f}",
"peak_velocity": f"{max_velocity:.3f}",
"average_velocity": f"{avg_velocity:.3f}"
},
"narrative_summary": f"This {duration:.1f}-second movement sequence shows predominantly {dominant_direction} movement with {dominant_intensity} intensity. "
f"The performer demonstrates {direction_changes} directional changes and {intensity_changes} intensity variations, "
f"indicating a {'complex' if complexity_score > 0.3 else 'simple'} movement pattern. "
f"Movement quality shows {avg_fluidity:.2f} fluidity and {avg_expansion:.2f} spatial expansion, "
f"suggesting {'expressive' if avg_expansion > 0.5 else 'contained'} movement vocabulary.",
"laban_interpretation": {
"effort_qualities": f"Primary effort: {dominant_intensity} intensity with {dominant_speed} timing",
"space_usage": f"{'Expansive' if avg_expansion > 0.5 else 'Contracted'} spatial patterns",
"flow_quality": f"{'Bound' if avg_fluidity < 0.3 else 'Free' if avg_fluidity > 0.7 else 'Balanced'} flow"
}
}
def generate_structured_analysis(frames: List[Dict], video_info: Dict, model_info: Dict) -> Dict[str, Any]:
"""
Generate detailed structured analysis with metrics breakdown.
Args:
frames: List of frame analysis data
video_info: Video metadata
model_info: Model information
Returns:
Dictionary containing detailed quantitative analysis
"""
# Extract all metrics
directions = [f.get("metrics", {}).get("direction", "stationary") for f in frames]
intensities = [f.get("metrics", {}).get("intensity", "low") for f in frames]
speeds = [f.get("metrics", {}).get("speed", "slow") for f in frames]
velocities = [f.get("metrics", {}).get("velocity", 0) for f in frames]
accelerations = [f.get("metrics", {}).get("acceleration", 0) for f in frames]
fluidities = [f.get("metrics", {}).get("fluidity", 0) for f in frames]
expansions = [f.get("metrics", {}).get("expansion", 0) for f in frames]
# Direction analysis
direction_stats = Counter(directions)
direction_percentages = {k: (v/len(frames))*100 for k, v in direction_stats.items()}
# Intensity analysis
intensity_stats = Counter(intensities)
intensity_percentages = {k: (v/len(frames))*100 for k, v in intensity_stats.items()}
# Speed analysis
speed_stats = Counter(speeds)
speed_percentages = {k: (v/len(frames))*100 for k, v in speed_stats.items()}
# Temporal segmentation (divide into segments)
segment_size = max(1, len(frames) // 8) # 8 segments
segments = []
for i in range(0, len(frames), segment_size):
segment_frames = frames[i:i+segment_size]
if segment_frames:
seg_directions = [f.get("metrics", {}).get("direction", "stationary") for f in segment_frames]
seg_intensities = [f.get("metrics", {}).get("intensity", "low") for f in segment_frames]
seg_fluidities = [f.get("metrics", {}).get("fluidity", 0) for f in segment_frames]
segments.append({
"segment_index": len(segments) + 1,
"time_range": f"{i/video_info.get('fps', 25):.1f}-{(i+len(segment_frames))/video_info.get('fps', 25):.1f}s",
"dominant_direction": Counter(seg_directions).most_common(1)[0][0],
"dominant_intensity": Counter(seg_intensities).most_common(1)[0][0],
"average_fluidity": np.mean(seg_fluidities) if seg_fluidities else 0
})
return {
"analysis_type": "Structured Analysis",
"model_used": model_info.get("name", "unknown"),
"video_info": {
"duration": video_info.get("duration_seconds", 0),
"fps": video_info.get("fps", 0),
"resolution": f"{video_info.get('width', 0)}x{video_info.get('height', 0)}",
"total_frames": len(frames)
},
"direction_breakdown": {
"statistics": dict(direction_stats),
"percentages": {k: f"{v:.1f}%" for k, v in direction_percentages.items()},
"most_common": direction_stats.most_common(3)
},
"intensity_breakdown": {
"statistics": dict(intensity_stats),
"percentages": {k: f"{v:.1f}%" for k, v in intensity_percentages.items()},
"distribution": intensity_stats.most_common()
},
"speed_breakdown": {
"statistics": dict(speed_stats),
"percentages": {k: f"{v:.1f}%" for k, v in speed_percentages.items()},
"distribution": speed_stats.most_common()
},
"quantitative_metrics": {
"velocity": {
"mean": f"{np.mean(velocities):.4f}",
"std": f"{np.std(velocities):.4f}",
"min": f"{np.min(velocities):.4f}",
"max": f"{np.max(velocities):.4f}"
},
"acceleration": {
"mean": f"{np.mean(accelerations):.4f}",
"std": f"{np.std(accelerations):.4f}",
"min": f"{np.min(accelerations):.4f}",
"max": f"{np.max(accelerations):.4f}"
},
"fluidity": {
"mean": f"{np.mean(fluidities):.4f}",
"std": f"{np.std(fluidities):.4f}",
"min": f"{np.min(fluidities):.4f}",
"max": f"{np.max(fluidities):.4f}"
},
"expansion": {
"mean": f"{np.mean(expansions):.4f}",
"std": f"{np.std(expansions):.4f}",
"min": f"{np.min(expansions):.4f}",
"max": f"{np.max(expansions):.4f}"
}
},
"temporal_segments": segments,
"movement_patterns": {
"consistency_index": f"{1 - (len(set(directions))/len(frames)):.3f}",
"dynamic_range": f"{np.max(velocities) - np.min(velocities):.4f}",
"complexity_score": f"{len(set(directions)) * len(set(intensities)) / len(frames):.3f}"
}
}
def generate_movement_filter_analysis(
frames: List[Dict],
video_info: Dict,
model_info: Dict,
filter_direction: str,
filter_intensity: str,
filter_min_fluidity: float,
filter_min_expansion: float
) -> Dict[str, Any]:
"""
Generate movement filter analysis with pattern detection.
Args:
frames: List of frame analysis data
video_info: Video metadata
model_info: Model information
filter_direction: Direction filter criteria
filter_intensity: Intensity filter criteria
filter_min_fluidity: Minimum fluidity threshold
filter_min_expansion: Minimum expansion threshold
Returns:
Dictionary containing filter analysis results and recommendations
"""
# Apply filters
matching_frames = []
total_frames = len(frames)
for frame in frames:
metrics = frame.get("metrics", {})
direction = metrics.get("direction", "stationary")
intensity = metrics.get("intensity", "low")
fluidity = metrics.get("fluidity", 0)
expansion = metrics.get("expansion", 0)
# Check filters
direction_match = filter_direction == "any" or direction == filter_direction
intensity_match = filter_intensity == "any" or intensity == filter_intensity
fluidity_match = fluidity >= filter_min_fluidity
expansion_match = expansion >= filter_min_expansion
if direction_match and intensity_match and fluidity_match and expansion_match:
matching_frames.append(frame)
match_percentage = (len(matching_frames) / total_frames) * 100 if total_frames > 0 else 0
# Pattern detection in matching frames
if matching_frames:
match_velocities = [f.get("metrics", {}).get("velocity", 0) for f in matching_frames]
match_accelerations = [f.get("metrics", {}).get("acceleration", 0) for f in matching_frames]
# Find continuous sequences
frame_indices = [f.get("frame_index", 0) for f in matching_frames]
sequences = []
if frame_indices:
current_seq = [frame_indices[0]]
for i in range(1, len(frame_indices)):
if frame_indices[i] == frame_indices[i-1] + 1:
current_seq.append(frame_indices[i])
else:
if len(current_seq) > 1:
sequences.append(current_seq)
current_seq = [frame_indices[i]]
if len(current_seq) > 1:
sequences.append(current_seq)
longest_sequence = max(sequences, key=len) if sequences else []
pattern_analysis = {
"total_matching_frames": len(matching_frames),
"match_percentage": f"{match_percentage:.1f}%",
"continuous_sequences": len(sequences),
"longest_sequence": {
"length": len(longest_sequence),
"start_frame": longest_sequence[0] if longest_sequence else 0,
"end_frame": longest_sequence[-1] if longest_sequence else 0,
"duration": f"{len(longest_sequence) / video_info.get('fps', 25):.2f}s" if longest_sequence else "0s"
},
"velocity_in_matches": {
"mean": f"{np.mean(match_velocities):.4f}" if match_velocities else "0",
"peak": f"{np.max(match_velocities):.4f}" if match_velocities else "0"
}
}
else:
pattern_analysis = {
"total_matching_frames": 0,
"match_percentage": "0%",
"message": "No frames match the specified criteria"
}
return {
"analysis_type": "Movement Filter Analysis",
"model_used": model_info.get("name", "unknown"),
"filter_criteria": {
"direction": filter_direction,
"intensity": filter_intensity,
"min_fluidity": filter_min_fluidity,
"min_expansion": filter_min_expansion
},
"results": pattern_analysis,
"recommendations": generate_filter_recommendations(matching_frames, total_frames, filter_direction, filter_intensity)
}
def generate_filter_recommendations(matching_frames: List[Dict], total_frames: int, filter_direction: str, filter_intensity: str) -> str:
"""
Generate recommendations based on filter results.
Args:
matching_frames: List of frames that match filter criteria
total_frames: Total number of frames analyzed
filter_direction: Applied direction filter
filter_intensity: Applied intensity filter
Returns:
String containing intelligent recommendations
"""
match_ratio = len(matching_frames) / total_frames if total_frames > 0 else 0
if match_ratio > 0.7:
return f"Strong pattern detected: {match_ratio*100:.1f}% of movement matches your criteria. This suggests consistent {filter_direction} movement with {filter_intensity} intensity."
elif match_ratio > 0.3:
return f"Moderate pattern: {match_ratio*100:.1f}% match suggests intermittent {filter_direction} movement patterns. Consider analyzing temporal distribution."
elif match_ratio > 0.1:
return f"Weak pattern: Only {match_ratio*100:.1f}% match. The movement may be more varied than your filter criteria suggest."
else:
return f"No significant pattern found ({match_ratio*100:.1f}% match). Consider broadening filter criteria or analyzing different movement qualities."
def process_standard_for_agent(json_data: Dict[str, Any], output_format: str = "summary") -> Dict[str, Any]:
"""
Convert standard analysis JSON to agent format.
Args:
json_data: Standard movement analysis data
output_format: Desired output format ("summary", "structured", "json")
Returns:
Dictionary containing converted analysis in agent format
"""
if not json_data or "error" in json_data:
return json_data
try:
# Extract key metrics from standard analysis
if "movement_analysis" in json_data and "frames" in json_data["movement_analysis"]:
frames = json_data["movement_analysis"]["frames"]
if not frames:
return {"error": "No movement data found"}
# Compute dominant characteristics
directions = [f.get("metrics", {}).get("direction", "stationary") for f in frames]
intensities = [f.get("metrics", {}).get("intensity", "low") for f in frames]
speeds = [f.get("metrics", {}).get("speed", "slow") for f in frames]
fluidities = [f.get("metrics", {}).get("fluidity", 0.0) for f in frames]
expansions = [f.get("metrics", {}).get("expansion", 0.5) for f in frames]
# Find dominant values
dominant_direction = Counter(directions).most_common(1)[0][0]
dominant_intensity = Counter(intensities).most_common(1)[0][0]
dominant_speed = Counter(speeds).most_common(1)[0][0]
avg_fluidity = sum(fluidities) / len(fluidities) if fluidities else 0.0
avg_expansion = sum(expansions) / len(expansions) if expansions else 0.5
if output_format == "summary":
return {
"summary": f"Movement Analysis: Predominantly {dominant_direction} direction with {dominant_intensity} intensity. "
f"Speed: {dominant_speed}. Fluidity: {avg_fluidity:.2f}, Expansion: {avg_expansion:.2f}"
}
elif output_format == "structured":
return {
"success": True,
"direction": dominant_direction,
"intensity": dominant_intensity,
"speed": dominant_speed,
"fluidity": avg_fluidity,
"expansion": avg_expansion,
"segments": len(frames)
}
else: # json
return json_data
return {"error": "Invalid analysis format"}
except Exception as e:
return {"error": f"Conversion failed: {str(e)}"}