Csaba Bolyos
initial commit
a31294b
"""
Agent-friendly API for Laban Movement Analysis
Provides simplified interfaces for AI agents and automation
"""
import asyncio
import json
import logging
from pathlib import Path
from typing import Any, Dict, List, Optional, Union
from dataclasses import dataclass, asdict
from enum import Enum
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Import the main component
from .labanmovementanalysis import LabanMovementAnalysis
class PoseModel(str, Enum):
"""Available pose estimation models"""
MEDIAPIPE = "mediapipe"
MOVENET = "movenet"
YOLO = "yolo"
class MovementIntensity(str, Enum):
"""Movement intensity levels"""
LOW = "low"
MEDIUM = "medium"
HIGH = "high"
class MovementDirection(str, Enum):
"""Movement direction categories"""
UP = "up"
DOWN = "down"
LEFT = "left"
RIGHT = "right"
STATIONARY = "stationary"
@dataclass
class AnalysisResult:
"""Structured analysis result for agents"""
success: bool
video_path: str
duration_seconds: float
fps: float
dominant_direction: MovementDirection
dominant_intensity: MovementIntensity
dominant_speed: str
movement_segments: List[Dict[str, Any]]
fluidity_score: float
expansion_score: float
error: Optional[str] = None
raw_data: Optional[Dict[str, Any]] = None
visualization_path: Optional[str] = None
class LabanAgentAPI:
"""
Simplified API for AI agents to analyze movement in videos.
Provides high-level methods with structured outputs.
"""
# Gradio component compatibility
events = {}
def __init__(self, default_model: PoseModel = PoseModel.MEDIAPIPE):
"""
Initialize the agent API.
Args:
default_model: Default pose estimation model to use
"""
self.analyzer = LabanMovementAnalysis(default_model=default_model.value)
self.default_model = default_model
self._analysis_cache = {}
def analyze(
self,
video_path: Union[str, Path],
model: Optional[PoseModel] = None,
generate_visualization: bool = False,
cache_results: bool = True
) -> AnalysisResult:
"""
Analyze a video and return structured results.
Args:
video_path: Path to video file
model: Pose estimation model to use (defaults to instance default)
generate_visualization: Whether to create annotated video
cache_results: Whether to cache results for later retrieval
Returns:
AnalysisResult with structured movement data
"""
try:
# Convert path to string
video_path = str(video_path)
# Use default model if not specified
if model is None:
model = self.default_model
# Process video
json_output, viz_video = self.analyzer.process_video(
video_path,
model=model.value,
enable_visualization=generate_visualization,
include_keypoints=False
)
# Parse results
result = self._parse_analysis_output(
json_output,
video_path,
viz_video
)
# Cache if requested
if cache_results:
cache_key = f"{Path(video_path).stem}_{model.value}"
self._analysis_cache[cache_key] = result
return result
except Exception as e:
logger.error(f"Analysis failed: {str(e)}")
return AnalysisResult(
success=False,
video_path=str(video_path),
duration_seconds=0.0,
fps=0.0,
dominant_direction=MovementDirection.STATIONARY,
dominant_intensity=MovementIntensity.LOW,
dominant_speed="unknown",
movement_segments=[],
fluidity_score=0.0,
expansion_score=0.0,
error=str(e)
)
async def analyze_async(
self,
video_path: Union[str, Path],
model: Optional[PoseModel] = None,
generate_visualization: bool = False
) -> AnalysisResult:
"""
Asynchronously analyze a video.
Args:
video_path: Path to video file
model: Pose estimation model to use
generate_visualization: Whether to create annotated video
Returns:
AnalysisResult with structured movement data
"""
loop = asyncio.get_event_loop()
return await loop.run_in_executor(
None,
self.analyze,
video_path,
model,
generate_visualization
)
def batch_analyze(
self,
video_paths: List[Union[str, Path]],
model: Optional[PoseModel] = None,
parallel: bool = True,
max_workers: int = 4
) -> List[AnalysisResult]:
"""
Analyze multiple videos in batch.
Args:
video_paths: List of video file paths
model: Pose estimation model to use
parallel: Whether to process in parallel
max_workers: Maximum parallel workers
Returns:
List of AnalysisResult objects
"""
if parallel:
from concurrent.futures import ThreadPoolExecutor
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = [
executor.submit(self.analyze, path, model, False)
for path in video_paths
]
results = [future.result() for future in futures]
else:
results = [
self.analyze(path, model, False)
for path in video_paths
]
return results
def get_movement_summary(self, analysis_result: AnalysisResult) -> str:
"""
Generate a natural language summary of movement analysis.
Args:
analysis_result: Analysis result to summarize
Returns:
Human-readable summary string
"""
if not analysis_result.success:
return f"Analysis failed: {analysis_result.error}"
summary_parts = [
f"Movement Analysis Summary for {Path(analysis_result.video_path).name}:",
f"- Duration: {analysis_result.duration_seconds:.1f} seconds",
f"- Primary movement direction: {analysis_result.dominant_direction.value}",
f"- Movement intensity: {analysis_result.dominant_intensity.value}",
f"- Movement speed: {analysis_result.dominant_speed}",
f"- Fluidity score: {analysis_result.fluidity_score:.2f}/1.00",
f"- Expansion score: {analysis_result.expansion_score:.2f}/1.00"
]
if analysis_result.movement_segments:
summary_parts.append(f"- Detected {len(analysis_result.movement_segments)} movement segments")
return "\n".join(summary_parts)
def compare_videos(
self,
video_path1: Union[str, Path],
video_path2: Union[str, Path],
model: Optional[PoseModel] = None
) -> Dict[str, Any]:
"""
Compare movement patterns between two videos.
Args:
video_path1: First video path
video_path2: Second video path
model: Pose estimation model to use
Returns:
Comparison results dictionary
"""
# Analyze both videos
result1 = self.analyze(video_path1, model, False)
result2 = self.analyze(video_path2, model, False)
if not result1.success or not result2.success:
return {
"success": False,
"error": "One or both analyses failed"
}
# Compare metrics
comparison = {
"success": True,
"video1": Path(video_path1).name,
"video2": Path(video_path2).name,
"metrics": {
"direction_match": result1.dominant_direction == result2.dominant_direction,
"intensity_match": result1.dominant_intensity == result2.dominant_intensity,
"speed_match": result1.dominant_speed == result2.dominant_speed,
"fluidity_difference": abs(result1.fluidity_score - result2.fluidity_score),
"expansion_difference": abs(result1.expansion_score - result2.expansion_score)
},
"details": {
"video1": {
"direction": result1.dominant_direction.value,
"intensity": result1.dominant_intensity.value,
"speed": result1.dominant_speed,
"fluidity": result1.fluidity_score,
"expansion": result1.expansion_score
},
"video2": {
"direction": result2.dominant_direction.value,
"intensity": result2.dominant_intensity.value,
"speed": result2.dominant_speed,
"fluidity": result2.fluidity_score,
"expansion": result2.expansion_score
}
}
}
return comparison
def filter_by_movement(
self,
video_paths: List[Union[str, Path]],
direction: Optional[MovementDirection] = None,
intensity: Optional[MovementIntensity] = None,
min_fluidity: Optional[float] = None,
min_expansion: Optional[float] = None
) -> List[AnalysisResult]:
"""
Filter videos based on movement characteristics.
Args:
video_paths: List of video paths to analyze
direction: Filter by movement direction
intensity: Filter by movement intensity
min_fluidity: Minimum fluidity score
min_expansion: Minimum expansion score
Returns:
List of AnalysisResults that match criteria
"""
# Analyze all videos
results = self.batch_analyze(video_paths)
# Apply filters
filtered = []
for result in results:
if not result.success:
continue
if direction and result.dominant_direction != direction:
continue
if intensity and result.dominant_intensity != intensity:
continue
if min_fluidity and result.fluidity_score < min_fluidity:
continue
if min_expansion and result.expansion_score < min_expansion:
continue
filtered.append(result)
return filtered
def _parse_analysis_output(
self,
json_output: Dict[str, Any],
video_path: str,
viz_path: Optional[str]
) -> AnalysisResult:
"""Parse JSON output into structured result"""
try:
# Extract video info
video_info = json_output.get("video_info", {})
duration = video_info.get("duration_seconds", 0.0)
fps = video_info.get("fps", 0.0)
# Extract movement summary
movement_analysis = json_output.get("movement_analysis", {})
summary = movement_analysis.get("summary", {})
# Parse dominant metrics
direction_data = summary.get("direction", {})
dominant_direction = direction_data.get("dominant", "stationary")
dominant_direction = MovementDirection(dominant_direction.lower())
intensity_data = summary.get("intensity", {})
dominant_intensity = intensity_data.get("dominant", "low")
dominant_intensity = MovementIntensity(dominant_intensity.lower())
speed_data = summary.get("speed", {})
dominant_speed = speed_data.get("dominant", "unknown")
# Get segments
segments = summary.get("movement_segments", [])
# Calculate aggregate scores
frames = movement_analysis.get("frames", [])
fluidity_scores = [f.get("metrics", {}).get("fluidity", 0) for f in frames]
expansion_scores = [f.get("metrics", {}).get("expansion", 0) for f in frames]
avg_fluidity = sum(fluidity_scores) / len(fluidity_scores) if fluidity_scores else 0.0
avg_expansion = sum(expansion_scores) / len(expansion_scores) if expansion_scores else 0.0
return AnalysisResult(
success=True,
video_path=video_path,
duration_seconds=duration,
fps=fps,
dominant_direction=dominant_direction,
dominant_intensity=dominant_intensity,
dominant_speed=dominant_speed,
movement_segments=segments,
fluidity_score=avg_fluidity,
expansion_score=avg_expansion,
raw_data=json_output,
visualization_path=viz_path
)
except Exception as e:
logger.error(f"Failed to parse analysis output: {str(e)}")
return AnalysisResult(
success=False,
video_path=video_path,
duration_seconds=0.0,
fps=0.0,
dominant_direction=MovementDirection.STATIONARY,
dominant_intensity=MovementIntensity.LOW,
dominant_speed="unknown",
movement_segments=[],
fluidity_score=0.0,
expansion_score=0.0,
error=f"Parse error: {str(e)}",
raw_data=json_output
)
# Convenience functions for quick analysis
def quick_analyze(video_path: Union[str, Path]) -> Dict[str, Any]:
"""Quick analysis with default settings, returns dict"""
api = LabanAgentAPI()
result = api.analyze(video_path)
return asdict(result)
# Gradio component compatibility
quick_analyze.events = {}
def analyze_and_summarize(video_path: Union[str, Path]) -> str:
"""Analyze video and return natural language summary"""
api = LabanAgentAPI()
result = api.analyze(video_path)
return api.get_movement_summary(result)
# Gradio component compatibility
analyze_and_summarize.events = {}