dftest1 / src /agents /forensic_tools.py
akcanca's picture
Upload 110 files (#1)
07fe054 verified
"""
Forensic Tools for Agentic Controller
LangChain Tool definitions for forensic operations.
"""
import json
from typing import List
from langchain_core.tools import Tool
def create_forensic_tools() -> List[Tool]:
"""
Create LangChain tools for forensic operations.
Returns:
List of Tool instances
"""
def jpeg_stress_test(input_str: str) -> str:
"""
Run JPEG recompression stress test on an image.
Args:
input_str: String containing image_path and optionally quality, separated by newline
Format: "image_path\\nquality" or just "image_path"
Returns:
JSON string with test results
"""
# Parse input (LangChain tools receive string input)
parts = input_str.strip().split('\n')
image_path = parts[0].strip()
quality = int(parts[1].strip()) if len(parts) > 1 and parts[1].strip().isdigit() else 75
try:
from PIL import Image
import io
# Load and recompress image
img = Image.open(image_path).convert('RGB')
buffer = io.BytesIO()
img.save(buffer, format='JPEG', quality=quality)
buffer.seek(0)
recompressed = Image.open(buffer)
# Save temporarily for re-classification
temp_path = image_path.replace('.', '_recompressed.')
recompressed.save(temp_path)
# Re-classify (would need full pipeline - simplified here)
result = {
"tool": "jpeg_stress_test",
"quality": quality,
"status": "completed",
"note": "Recompression completed. Re-classification requires full pipeline.",
"temp_path": temp_path
}
return json.dumps(result)
except Exception as e:
return json.dumps({"tool": "jpeg_stress_test", "status": "error", "error": str(e)})
def compute_additional_features(input_str: str) -> str:
"""
Compute additional forensic features.
Args:
input_str: String containing image_path and optionally feature_types, separated by newline
Format: "image_path\\nfeature_types" or just "image_path"
Returns:
JSON string with feature extraction results
"""
# Parse input (LangChain tools receive string input)
parts = input_str.strip().split('\n')
image_path = parts[0].strip()
feature_types = parts[1].strip() if len(parts) > 1 else "cfa,defocus"
try:
from src.features.additional_features import extract_additional_features
feature_list = [f.strip() for f in feature_types.split(',')]
# Extract features using actual implementation
extraction_result = extract_additional_features(image_path, feature_list)
# Format result for agent consumption
result = {
"tool": "compute_additional_features",
"requested_features": feature_list,
"status": extraction_result.get("status", "completed"),
"features": extraction_result.get("features", {}),
"image_path": extraction_result.get("image_path", image_path)
}
# Add summary statistics for agent reasoning
if "features" in extraction_result:
summary = {}
if "cfa" in extraction_result["features"]:
cfa_feats = extraction_result["features"]["cfa"]
if isinstance(cfa_feats, dict) and "status" not in cfa_feats:
summary["cfa_consistency"] = cfa_feats.get("cfa_consistency_score", 0.0)
summary["cfa_anomalies"] = bool(cfa_feats.get("cfa_anomalies_detected", 0))
if "defocus" in extraction_result["features"]:
defocus_feats = extraction_result["features"]["defocus"]
if isinstance(defocus_feats, dict) and "status" not in defocus_feats:
summary["defocus_consistency"] = defocus_feats.get("defocus_consistency_score", 0.0)
summary["defocus_anomalies"] = bool(defocus_feats.get("defocus_anomalies_detected", 0))
summary["defocus_n_anomalies"] = defocus_feats.get("defocus_n_anomalies", 0)
result["summary"] = summary
return json.dumps(result)
except Exception as e:
return json.dumps({
"tool": "compute_additional_features",
"status": "error",
"error": str(e)
})
def analyze_forensic_maps(input_str: str) -> str:
"""
Analyze forensic visualizations using vision-language model.
Args:
input_str: String containing image_path
Returns:
JSON string with spatial analysis results
"""
# Parse input (LangChain tools receive string input)
image_path = input_str.strip()
try:
# In full implementation, would use VLM to analyze forensic maps
result = {
"tool": "analyze_forensic_maps",
"status": "completed",
"spatial_anomalies": [],
"note": "VLM-based forensic map analysis would be implemented here"
}
return json.dumps(result)
except Exception as e:
return json.dumps({
"tool": "analyze_forensic_maps",
"status": "error",
"error": str(e)
})
# Create LangChain tools
tools = [
Tool(
name="jpeg_stress_test",
func=jpeg_stress_test,
description=(
"Run JPEG recompression stress test. Use this when you need to test "
"robustness of the detection under compression artifacts. "
"Input format: 'image_path\\nquality' where quality is optional (default 75). "
"Example: 'path/to/image.jpg\\n75'"
)
),
Tool(
name="compute_additional_features",
func=compute_additional_features,
description=(
"Compute additional forensic features (CFA, defocus map, etc.). "
"Use this when initial features are insufficient or uncertain. "
"Input format: 'image_path\\nfeature_types' where feature_types is optional (default 'cfa,defocus'). "
"Example: 'path/to/image.jpg\\ncfa,defocus'"
)
),
Tool(
name="analyze_forensic_maps",
func=analyze_forensic_maps,
description=(
"Analyze forensic visualizations (noiseprint maps, residual maps) "
"using vision-language model for spatial localization. "
"Use this when you need to identify WHERE artifacts occur. "
"Input format: 'image_path'. Example: 'path/to/image.jpg'"
)
),
]
return tools