Spaces:
Sleeping
Sleeping
| """ | |
| Forensic Tools for Agentic Controller | |
| LangChain Tool definitions for forensic operations. | |
| """ | |
| import json | |
| from typing import List | |
| from langchain_core.tools import Tool | |
| def create_forensic_tools() -> List[Tool]: | |
| """ | |
| Create LangChain tools for forensic operations. | |
| Returns: | |
| List of Tool instances | |
| """ | |
| def jpeg_stress_test(input_str: str) -> str: | |
| """ | |
| Run JPEG recompression stress test on an image. | |
| Args: | |
| input_str: String containing image_path and optionally quality, separated by newline | |
| Format: "image_path\\nquality" or just "image_path" | |
| Returns: | |
| JSON string with test results | |
| """ | |
| # Parse input (LangChain tools receive string input) | |
| parts = input_str.strip().split('\n') | |
| image_path = parts[0].strip() | |
| quality = int(parts[1].strip()) if len(parts) > 1 and parts[1].strip().isdigit() else 75 | |
| try: | |
| from PIL import Image | |
| import io | |
| # Load and recompress image | |
| img = Image.open(image_path).convert('RGB') | |
| buffer = io.BytesIO() | |
| img.save(buffer, format='JPEG', quality=quality) | |
| buffer.seek(0) | |
| recompressed = Image.open(buffer) | |
| # Save temporarily for re-classification | |
| temp_path = image_path.replace('.', '_recompressed.') | |
| recompressed.save(temp_path) | |
| # Re-classify (would need full pipeline - simplified here) | |
| result = { | |
| "tool": "jpeg_stress_test", | |
| "quality": quality, | |
| "status": "completed", | |
| "note": "Recompression completed. Re-classification requires full pipeline.", | |
| "temp_path": temp_path | |
| } | |
| return json.dumps(result) | |
| except Exception as e: | |
| return json.dumps({"tool": "jpeg_stress_test", "status": "error", "error": str(e)}) | |
| def compute_additional_features(input_str: str) -> str: | |
| """ | |
| Compute additional forensic features. | |
| Args: | |
| input_str: String containing image_path and optionally feature_types, separated by newline | |
| Format: "image_path\\nfeature_types" or just "image_path" | |
| Returns: | |
| JSON string with feature extraction results | |
| """ | |
| # Parse input (LangChain tools receive string input) | |
| parts = input_str.strip().split('\n') | |
| image_path = parts[0].strip() | |
| feature_types = parts[1].strip() if len(parts) > 1 else "cfa,defocus" | |
| try: | |
| from src.features.additional_features import extract_additional_features | |
| feature_list = [f.strip() for f in feature_types.split(',')] | |
| # Extract features using actual implementation | |
| extraction_result = extract_additional_features(image_path, feature_list) | |
| # Format result for agent consumption | |
| result = { | |
| "tool": "compute_additional_features", | |
| "requested_features": feature_list, | |
| "status": extraction_result.get("status", "completed"), | |
| "features": extraction_result.get("features", {}), | |
| "image_path": extraction_result.get("image_path", image_path) | |
| } | |
| # Add summary statistics for agent reasoning | |
| if "features" in extraction_result: | |
| summary = {} | |
| if "cfa" in extraction_result["features"]: | |
| cfa_feats = extraction_result["features"]["cfa"] | |
| if isinstance(cfa_feats, dict) and "status" not in cfa_feats: | |
| summary["cfa_consistency"] = cfa_feats.get("cfa_consistency_score", 0.0) | |
| summary["cfa_anomalies"] = bool(cfa_feats.get("cfa_anomalies_detected", 0)) | |
| if "defocus" in extraction_result["features"]: | |
| defocus_feats = extraction_result["features"]["defocus"] | |
| if isinstance(defocus_feats, dict) and "status" not in defocus_feats: | |
| summary["defocus_consistency"] = defocus_feats.get("defocus_consistency_score", 0.0) | |
| summary["defocus_anomalies"] = bool(defocus_feats.get("defocus_anomalies_detected", 0)) | |
| summary["defocus_n_anomalies"] = defocus_feats.get("defocus_n_anomalies", 0) | |
| result["summary"] = summary | |
| return json.dumps(result) | |
| except Exception as e: | |
| return json.dumps({ | |
| "tool": "compute_additional_features", | |
| "status": "error", | |
| "error": str(e) | |
| }) | |
| def analyze_forensic_maps(input_str: str) -> str: | |
| """ | |
| Analyze forensic visualizations using vision-language model. | |
| Args: | |
| input_str: String containing image_path | |
| Returns: | |
| JSON string with spatial analysis results | |
| """ | |
| # Parse input (LangChain tools receive string input) | |
| image_path = input_str.strip() | |
| try: | |
| # In full implementation, would use VLM to analyze forensic maps | |
| result = { | |
| "tool": "analyze_forensic_maps", | |
| "status": "completed", | |
| "spatial_anomalies": [], | |
| "note": "VLM-based forensic map analysis would be implemented here" | |
| } | |
| return json.dumps(result) | |
| except Exception as e: | |
| return json.dumps({ | |
| "tool": "analyze_forensic_maps", | |
| "status": "error", | |
| "error": str(e) | |
| }) | |
| # Create LangChain tools | |
| tools = [ | |
| Tool( | |
| name="jpeg_stress_test", | |
| func=jpeg_stress_test, | |
| description=( | |
| "Run JPEG recompression stress test. Use this when you need to test " | |
| "robustness of the detection under compression artifacts. " | |
| "Input format: 'image_path\\nquality' where quality is optional (default 75). " | |
| "Example: 'path/to/image.jpg\\n75'" | |
| ) | |
| ), | |
| Tool( | |
| name="compute_additional_features", | |
| func=compute_additional_features, | |
| description=( | |
| "Compute additional forensic features (CFA, defocus map, etc.). " | |
| "Use this when initial features are insufficient or uncertain. " | |
| "Input format: 'image_path\\nfeature_types' where feature_types is optional (default 'cfa,defocus'). " | |
| "Example: 'path/to/image.jpg\\ncfa,defocus'" | |
| ) | |
| ), | |
| Tool( | |
| name="analyze_forensic_maps", | |
| func=analyze_forensic_maps, | |
| description=( | |
| "Analyze forensic visualizations (noiseprint maps, residual maps) " | |
| "using vision-language model for spatial localization. " | |
| "Use this when you need to identify WHERE artifacts occur. " | |
| "Input format: 'image_path'. Example: 'path/to/image.jpg'" | |
| ) | |
| ), | |
| ] | |
| return tools | |