|
|
""" |
|
|
Forensic Agent |
|
|
|
|
|
A simplified LLM agent that receives images directly and uses forensic tools |
|
|
to analyze them. No model classification - pure agent reasoning with tools. |
|
|
""" |
|
|
|
|
|
import os |
|
|
import base64 |
|
|
import sys |
|
|
import logging |
|
|
import time |
|
|
from typing import Dict, Optional, Iterator, Callable |
|
|
from pathlib import Path |
|
|
|
|
|
from dotenv import load_dotenv |
|
|
from langchain_openai import ChatOpenAI |
|
|
from langchain_core.messages import HumanMessage, SystemMessage, AIMessage, ToolMessage |
|
|
try: |
|
|
from langchain.agents import create_react_agent |
|
|
except ImportError: |
|
|
from langgraph.prebuilt import create_react_agent |
|
|
from langgraph.checkpoint.memory import MemorySaver |
|
|
|
|
|
from ..tools.forensic_tools import create_forensic_tools |
|
|
|
|
|
|
|
|
load_dotenv() |
|
|
|
|
|
|
|
|
logger = logging.getLogger(__name__) |
|
|
if not logger.handlers: |
|
|
handler = logging.StreamHandler(sys.stdout) |
|
|
handler.setFormatter(logging.Formatter( |
|
|
'%(asctime)s - %(name)s - %(levelname)s - %(message)s', |
|
|
datefmt='%Y-%m-%d %H:%M:%S' |
|
|
)) |
|
|
logger.addHandler(handler) |
|
|
logger.setLevel(logging.INFO) |
|
|
|
|
|
|
|
|
class ForensicAgent: |
|
|
""" |
|
|
Simplified forensic agent that receives images directly. |
|
|
|
|
|
The agent: |
|
|
1. Receives an image path |
|
|
2. Analyzes it using vision-capable LLM |
|
|
3. Can use forensic tools to gather more evidence |
|
|
4. Provides reasoning and analysis |
|
|
""" |
|
|
|
|
|
def __init__(self, |
|
|
llm_model: str = "gpt-5.1", |
|
|
temperature: float = None, |
|
|
reasoning_effort: Optional[str] = None, |
|
|
api_key: Optional[str] = None, |
|
|
max_iterations: Optional[int] = 50): |
|
|
""" |
|
|
Args: |
|
|
llm_model: OpenAI model name (should support vision, e.g., gpt-5.1) |
|
|
temperature: LLM temperature |
|
|
reasoning_effort: Reasoning effort level for the model |
|
|
api_key: OpenAI API key (or set OPENAI_API_KEY env var) |
|
|
max_iterations: Maximum number of agent iterations (tool calls + reasoning cycles). |
|
|
Default is 50, which allows extensive tool usage. Set to None for no limit |
|
|
(not recommended as it could run indefinitely). |
|
|
""" |
|
|
llm_kwargs = { |
|
|
"model": llm_model, |
|
|
"temperature": temperature, |
|
|
"api_key": api_key or os.getenv("OPENAI_API_KEY"), |
|
|
} |
|
|
|
|
|
if reasoning_effort: |
|
|
llm_kwargs["reasoning_effort"] = reasoning_effort |
|
|
self.llm = ChatOpenAI(**llm_kwargs) |
|
|
self.tools = create_forensic_tools() |
|
|
self.max_iterations = max_iterations |
|
|
self.agent_executor = self._create_agent() |
|
|
|
|
|
def _create_agent(self): |
|
|
"""Create LangGraph agent with forensic tools.""" |
|
|
system_prompt = """You are a forensic image analysis agent specializing in detecting AI-generated or manipulated images. |
|
|
|
|
|
CRITICAL: You MUST always start your analysis by describing what is actually in the image - the subjects, scene, objects, people, animals, environment, etc. Do NOT skip directly to forensic metrics. |
|
|
|
|
|
Your role (in this exact order): |
|
|
1. FIRST: Provide a detailed visual description of the image content: |
|
|
- What is in the image? Describe the scene, subjects, objects, people, animals, environment, composition, colors, and overall content |
|
|
- Analyze lighting: identify light sources, their direction and intensity, shadows, highlights, reflections, and overall lighting consistency |
|
|
- Analyze physics: check for consistency in shadows, reflections, perspective, gravity, and physical interactions |
|
|
- Note any visual anomalies or inconsistencies you observe |
|
|
- This visual description section should come BEFORE any forensic tool results |
|
|
|
|
|
2. THEN: Use forensic tools to gather technical evidence (JPEG compression, frequency analysis, residuals, etc.) |
|
|
|
|
|
IMPORTANT TOOL USAGE GUIDELINES: |
|
|
- You are ENCOURAGED to use multiple tools and can call them multiple times if needed |
|
|
- If a tool's output is unclear, incomplete, or unsatisfactory, you SHOULD try another tool or retry with different parameters |
|
|
- You can run tools in sequence to gather comprehensive evidence - do not hesitate to use multiple tools |
|
|
- If initial tool results are inconclusive, try alternative tools to cross-validate findings |
|
|
- You can call the same tool multiple times if you need to verify results or if the first attempt was unsuccessful |
|
|
- Continue gathering evidence until you have sufficient information to reach a confident conclusion |
|
|
- Do not stop prematurely - use as many tool calls as needed to reduce uncertainty |
|
|
|
|
|
Available tools: |
|
|
- analyze_jpeg_compression: Analyze JPEG compression artifacts and quantization tables |
|
|
- extract_noiseprint: Extract camera model fingerprint features (noiseprint) |
|
|
- analyze_frequency_domain: Analyze DCT/FFT frequency domain features |
|
|
- extract_residuals: Extract denoiser residual statistics using DRUNet (deep learning denoiser). Returns comprehensive statistics including mean, std, skew, kurtosis, and energy metrics. Useful for detecting manipulation, AI generation, or compression artifacts. |
|
|
- perform_ela: Error Level Analysis (recompress + error map for localized inconsistencies) |
|
|
- perform_trufor: AI-driven forgery detection and localization (combines RGB + Noiseprint++ features) |
|
|
- execute_python_code: Execute Python code dynamically for custom analysis (zoom, crop, statistics, etc.) |
|
|
|
|
|
3. FINALLY: Combine visual observations with forensic evidence to reach a conclusion |
|
|
|
|
|
Output format: |
|
|
Your response MUST follow this structure: |
|
|
1. "### Visual Description" section - describe what's in the image, lighting, physics |
|
|
2. "### Forensic Analysis" section - results from tools (if used) |
|
|
3. "### Conclusion" section - combine both visual and forensic evidence |
|
|
|
|
|
Always provide clear reasoning and cite specific evidence.""" |
|
|
|
|
|
memory = MemorySaver() |
|
|
graph = create_react_agent( |
|
|
model=self.llm, |
|
|
tools=self.tools, |
|
|
prompt=system_prompt, |
|
|
checkpointer=memory |
|
|
) |
|
|
|
|
|
self.system_prompt = system_prompt |
|
|
return graph |
|
|
|
|
|
def _encode_image(self, image_path: str) -> str: |
|
|
"""Encode image to base64 for vision API.""" |
|
|
with open(image_path, "rb") as image_file: |
|
|
return base64.b64encode(image_file.read()).decode('utf-8') |
|
|
|
|
|
def analyze(self, image_path: str, user_query: Optional[str] = None, use_tools: bool = True) -> Dict: |
|
|
""" |
|
|
Analyze an image using the forensic agent. |
|
|
|
|
|
Args: |
|
|
image_path: Path to the image file |
|
|
user_query: Optional specific question about the image |
|
|
use_tools: If False, run a simple vision-only prompt with no tools |
|
|
|
|
|
Returns: |
|
|
Dictionary with analysis results: |
|
|
{ |
|
|
'conclusion': str, |
|
|
'confidence': str, |
|
|
'evidence': list, |
|
|
'reasoning': str, |
|
|
'tool_usage': list |
|
|
} |
|
|
""" |
|
|
|
|
|
if not os.path.exists(image_path): |
|
|
raise FileNotFoundError(f"Image not found: {image_path}") |
|
|
|
|
|
logger.info(f"Starting analysis (non-streaming) for image: {image_path}, use_tools: {use_tools}") |
|
|
|
|
|
|
|
|
base64_image = self._encode_image(image_path) |
|
|
|
|
|
|
|
|
image_ext = Path(image_path).suffix.lower() |
|
|
mime_type = "image/jpeg" if image_ext in [".jpg", ".jpeg"] else "image/png" if image_ext == ".png" else "image/jpeg" |
|
|
|
|
|
|
|
|
simple_system_prompt = ( |
|
|
"You are a forensic image analyst. Do not call any tools. " |
|
|
"Rely only on the visible content to judge if an image is AI-generated, synthetic, or a deepfake. " |
|
|
"Always start with a detailed visual description before any conclusion." |
|
|
) |
|
|
simple_prompt = f"""Analyze this image and assess whether it appears AI-generated, synthetic, or a deepfake. |
|
|
|
|
|
Respond in this format: |
|
|
### Visual Description |
|
|
- Describe what is visibly in the image (subjects, scene, objects, people/animals, environment, colors, composition) |
|
|
- Analyze lighting: sources, direction, intensity, shadows, reflections, consistency |
|
|
- Check physics: perspective, shadows, reflections, physical interactions, textures |
|
|
|
|
|
### Deepfake/Synthetic Indicators |
|
|
- List visual cues for or against synthesis (skin/eyes/teeth artifacts, texture oddities, edge halos, warped geometry, repetitive patterns, inconsistent lighting) |
|
|
|
|
|
### Conclusion |
|
|
- State if the image looks synthetic/AI vs natural, and why (refer to observations above) |
|
|
|
|
|
### Confidence |
|
|
- High / Medium / Low with a brief justification |
|
|
""" |
|
|
|
|
|
|
|
|
vision_messages = [ |
|
|
SystemMessage(content=simple_system_prompt), |
|
|
HumanMessage(content=[ |
|
|
{"type": "text", "text": simple_prompt}, |
|
|
{ |
|
|
"type": "image_url", |
|
|
"image_url": {"url": f"data:{mime_type};base64,{base64_image}"} |
|
|
} |
|
|
]) |
|
|
] |
|
|
vision_result = self.llm.invoke(vision_messages) |
|
|
visual_output = vision_result.content if hasattr(vision_result, 'content') else str(vision_result) |
|
|
|
|
|
if not use_tools: |
|
|
|
|
|
output = visual_output |
|
|
tool_usage = [] |
|
|
else: |
|
|
|
|
|
agent_prompt = f"""You already produced this visual description (reuse it; do not drop it): |
|
|
{visual_output} |
|
|
|
|
|
Image path: {image_path} |
|
|
|
|
|
IMPORTANT: When using execute_python_code tool, include the image_path in your tool call: |
|
|
{{"code": "your_python_code", "image_path": "{image_path}"}} |
|
|
|
|
|
Now decide if the image is synthetic/AI-generated/manipulated. You are encouraged to use forensic tools to gather comprehensive evidence. If a tool's output is unclear or unsatisfactory, try another tool or retry. You can use multiple tools and call them multiple times as needed to reach a confident conclusion. |
|
|
|
|
|
Respond with: |
|
|
### Visual Description |
|
|
- Reuse/paraphrase the provided description (do not omit it) |
|
|
|
|
|
### Forensic Analysis |
|
|
- Summarize only the tools you actually used (or say "No tools used" briefly) |
|
|
|
|
|
### Conclusion |
|
|
- Combine visual cues and any tool evidence to judge synthetic/AI vs natural; state reasoning |
|
|
|
|
|
### Confidence |
|
|
- High / Medium / Low with a brief justification |
|
|
""" |
|
|
messages = [ |
|
|
SystemMessage(content=self.system_prompt), |
|
|
HumanMessage(content=[ |
|
|
{"type": "text", "text": agent_prompt}, |
|
|
{ |
|
|
"type": "image_url", |
|
|
"image_url": {"url": f"data:{mime_type};base64,{base64_image}"} |
|
|
} |
|
|
]) |
|
|
] |
|
|
|
|
|
|
|
|
|
|
|
config = {"configurable": {"thread_id": "1"}} |
|
|
if self.max_iterations is not None: |
|
|
config["recursion_limit"] = 2 * self.max_iterations + 1 |
|
|
|
|
|
logger.info("Invoking agent executor (non-streaming mode)") |
|
|
start_time = time.time() |
|
|
result = self.agent_executor.invoke( |
|
|
{"messages": messages}, |
|
|
config=config |
|
|
) |
|
|
execution_time = time.time() - start_time |
|
|
logger.info(f"Agent executor completed in {execution_time:.2f}s") |
|
|
|
|
|
if isinstance(result, dict) and 'messages' in result: |
|
|
messages = result['messages'] |
|
|
final_message = messages[-1] if messages else None |
|
|
if final_message: |
|
|
output = final_message.content if hasattr(final_message, 'content') else str(final_message) |
|
|
else: |
|
|
output = "No response generated" |
|
|
else: |
|
|
output = str(result) |
|
|
|
|
|
|
|
|
if "visual description" not in output.lower(): |
|
|
retry_prompt = f"""The previous response omitted the required "### Visual Description" section. |
|
|
|
|
|
Rewrite the analysis with this exact structure: |
|
|
1) ### Visual Description — reuse or paraphrase the provided visual description |
|
|
2) ### Forensic Analysis — summarize only the tools you actually used (or state none) |
|
|
3) ### Conclusion — combine visual observations and any forensic evidence |
|
|
4) Confidence Level |
|
|
|
|
|
Provided visual description: |
|
|
{visual_output} |
|
|
|
|
|
Previous response: |
|
|
{output} |
|
|
|
|
|
Regenerate now.""" |
|
|
|
|
|
retry_messages = [ |
|
|
SystemMessage(content=self.system_prompt), |
|
|
HumanMessage(content=[ |
|
|
{"type": "text", "text": retry_prompt}, |
|
|
{ |
|
|
"type": "image_url", |
|
|
"image_url": {"url": f"data:{mime_type};base64,{base64_image}"} |
|
|
} |
|
|
]) |
|
|
] |
|
|
|
|
|
retry_result = self.llm.invoke(retry_messages) |
|
|
output = retry_result.content if hasattr(retry_result, 'content') else str(retry_result) |
|
|
|
|
|
|
|
|
tool_usage = [] |
|
|
if isinstance(result, dict) and 'messages' in result: |
|
|
for msg in result['messages']: |
|
|
if hasattr(msg, 'tool_calls') and msg.tool_calls: |
|
|
for tool_call in msg.tool_calls: |
|
|
tool_name = tool_call.get('name') if isinstance(tool_call, dict) else getattr(tool_call, 'name', None) |
|
|
if tool_name: |
|
|
tool_usage.append(tool_name) |
|
|
logger.debug(f"[ANALYZE] Found tool call: {tool_name}") |
|
|
|
|
|
logger.info(f"Analysis complete. Tools used: {tool_usage}") |
|
|
|
|
|
return { |
|
|
'conclusion': output, |
|
|
'reasoning': output, |
|
|
'tool_usage': tool_usage, |
|
|
'image_path': image_path |
|
|
} |
|
|
|
|
|
def analyze_stream(self, |
|
|
image_path: str, |
|
|
user_query: Optional[str] = None, |
|
|
use_tools: bool = True, |
|
|
stream_callback: Optional[Callable[[str, str], None]] = None) -> Iterator[Dict]: |
|
|
""" |
|
|
Analyze an image using the forensic agent with streaming output. |
|
|
|
|
|
Args: |
|
|
image_path: Path to the image file |
|
|
user_query: Optional specific question about the image |
|
|
use_tools: If False, run a simple vision-only prompt with no tools |
|
|
stream_callback: Optional callback function(stream_type, content) for streaming events |
|
|
stream_type can be: 'tool_call', 'tool_result', 'llm_chunk', 'status' |
|
|
|
|
|
Yields: |
|
|
Dictionary with streaming updates: |
|
|
{ |
|
|
'type': 'tool_call' | 'tool_result' | 'llm_chunk' | 'status' | 'final', |
|
|
'content': str, |
|
|
'tool_name': str (if type is 'tool_call' or 'tool_result'), |
|
|
'final_result': Dict (if type is 'final') |
|
|
} |
|
|
""" |
|
|
|
|
|
if not os.path.exists(image_path): |
|
|
raise FileNotFoundError(f"Image not found: {image_path}") |
|
|
|
|
|
|
|
|
base64_image = self._encode_image(image_path) |
|
|
|
|
|
|
|
|
image_ext = Path(image_path).suffix.lower() |
|
|
mime_type = "image/jpeg" if image_ext in [".jpg", ".jpeg"] else "image/png" if image_ext == ".png" else "image/jpeg" |
|
|
|
|
|
|
|
|
simple_system_prompt = ( |
|
|
"You are a forensic image analyst. Do not call any tools. " |
|
|
"Rely only on the visible content to judge if an image is AI-generated, synthetic, or a deepfake. " |
|
|
"Always start with a detailed visual description before any conclusion." |
|
|
) |
|
|
simple_prompt = f"""Analyze this image and assess whether it appears AI-generated, synthetic, or a deepfake. |
|
|
|
|
|
Respond in this format: |
|
|
### Visual Description |
|
|
- Describe what is visibly in the image (subjects, scene, objects, people/animals, environment, colors, composition) |
|
|
- Analyze lighting: sources, direction, intensity, shadows, reflections, consistency |
|
|
- Check physics: perspective, shadows, reflections, physical interactions, textures |
|
|
|
|
|
### Deepfake/Synthetic Indicators |
|
|
- List visual cues for or against synthesis (skin/eyes/teeth artifacts, texture oddities, edge halos, warped geometry, repetitive patterns, inconsistent lighting) |
|
|
|
|
|
### Conclusion |
|
|
- State if the image looks synthetic/AI vs natural, and why (refer to observations above) |
|
|
|
|
|
### Confidence |
|
|
- High / Medium / Low with a brief justification |
|
|
""" |
|
|
|
|
|
|
|
|
logger.info(f"Starting visual description phase for image: {image_path}") |
|
|
if stream_callback: |
|
|
stream_callback('status', '🔍 Getting initial visual description...') |
|
|
|
|
|
yield {'type': 'status', 'content': '🔍 Getting initial visual description...'} |
|
|
|
|
|
vision_messages = [ |
|
|
SystemMessage(content=simple_system_prompt), |
|
|
HumanMessage(content=[ |
|
|
{"type": "text", "text": simple_prompt}, |
|
|
{ |
|
|
"type": "image_url", |
|
|
"image_url": {"url": f"data:{mime_type};base64,{base64_image}"} |
|
|
} |
|
|
]) |
|
|
] |
|
|
|
|
|
|
|
|
vision_output_parts = [] |
|
|
if hasattr(self.llm, 'stream'): |
|
|
for chunk in self.llm.stream(vision_messages): |
|
|
if hasattr(chunk, 'content') and chunk.content: |
|
|
vision_output_parts.append(chunk.content) |
|
|
if stream_callback: |
|
|
stream_callback('llm_chunk', chunk.content) |
|
|
yield {'type': 'llm_chunk', 'content': chunk.content} |
|
|
else: |
|
|
vision_result = self.llm.invoke(vision_messages) |
|
|
visual_output = vision_result.content if hasattr(vision_result, 'content') else str(vision_result) |
|
|
vision_output_parts = [visual_output] |
|
|
if stream_callback: |
|
|
stream_callback('llm_chunk', visual_output) |
|
|
yield {'type': 'llm_chunk', 'content': visual_output} |
|
|
|
|
|
visual_output = ''.join(vision_output_parts) |
|
|
|
|
|
if not use_tools: |
|
|
|
|
|
tool_usage = [] |
|
|
final_result = { |
|
|
'conclusion': visual_output, |
|
|
'reasoning': visual_output, |
|
|
'tool_usage': tool_usage, |
|
|
'image_path': image_path |
|
|
} |
|
|
yield {'type': 'final', 'final_result': final_result} |
|
|
return |
|
|
|
|
|
|
|
|
logger.info("Starting agent analysis phase with tools enabled") |
|
|
if stream_callback: |
|
|
stream_callback('status', '\n🤖 Starting agent analysis with tools...\n') |
|
|
|
|
|
yield {'type': 'status', 'content': '\n🤖 Starting agent analysis with tools...\n'} |
|
|
|
|
|
agent_prompt = f"""You already produced this visual description (reuse it; do not drop it): |
|
|
{visual_output} |
|
|
|
|
|
Image path: {image_path} |
|
|
|
|
|
IMPORTANT: When using execute_python_code tool, include the image_path in your tool call: |
|
|
{{"code": "your_python_code", "image_path": "{image_path}"}} |
|
|
|
|
|
Now decide if the image is synthetic/AI-generated/manipulated. You are encouraged to use forensic tools to gather comprehensive evidence. If a tool's output is unclear or unsatisfactory, try another tool or retry. You can use multiple tools and call them multiple times as needed to reach a confident conclusion. |
|
|
|
|
|
Respond with: |
|
|
### Visual Description |
|
|
- Reuse/paraphrase the provided description (do not omit it) |
|
|
|
|
|
### Forensic Analysis |
|
|
- Summarize only the tools you actually used (or say "No tools used" briefly) |
|
|
|
|
|
### Conclusion |
|
|
- Combine visual cues and any tool evidence to judge synthetic/AI vs natural; state reasoning |
|
|
|
|
|
### Confidence |
|
|
- High / Medium / Low with a brief justification |
|
|
""" |
|
|
messages = [ |
|
|
SystemMessage(content=self.system_prompt), |
|
|
HumanMessage(content=[ |
|
|
{"type": "text", "text": agent_prompt}, |
|
|
{ |
|
|
"type": "image_url", |
|
|
"image_url": {"url": f"data:{mime_type};base64,{base64_image}"} |
|
|
} |
|
|
]) |
|
|
] |
|
|
|
|
|
|
|
|
|
|
|
config = {"configurable": {"thread_id": "1"}} |
|
|
if self.max_iterations is not None: |
|
|
config["recursion_limit"] = 2 * self.max_iterations + 1 |
|
|
|
|
|
tool_usage = [] |
|
|
accumulated_output = [] |
|
|
seen_tool_calls = set() |
|
|
tool_start_times = {} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
seen_tool_result_ids = set() |
|
|
try: |
|
|
logger.info(f"Starting agent analysis stream for image: {image_path}") |
|
|
for event in self.agent_executor.stream( |
|
|
{"messages": messages}, |
|
|
config=config, |
|
|
stream_mode="updates" |
|
|
): |
|
|
|
|
|
for node_name, node_output in event.items(): |
|
|
logger.debug(f"Processing node: {node_name}") |
|
|
if 'messages' in node_output: |
|
|
|
|
|
for msg in node_output['messages']: |
|
|
|
|
|
if isinstance(msg, AIMessage) and hasattr(msg, 'tool_calls') and msg.tool_calls: |
|
|
for tool_call in msg.tool_calls: |
|
|
tool_call_id = tool_call.get('id') if isinstance(tool_call, dict) else getattr(tool_call, 'id', None) |
|
|
|
|
|
|
|
|
if tool_call_id and tool_call_id not in seen_tool_calls: |
|
|
seen_tool_calls.add(tool_call_id) |
|
|
tool_name = tool_call.get('name') if isinstance(tool_call, dict) else getattr(tool_call, 'name', 'unknown') |
|
|
tool_args = tool_call.get('args') if isinstance(tool_call, dict) else getattr(tool_call, 'args', {}) |
|
|
|
|
|
|
|
|
logger.info(f"[TOOL CALL] Initiating tool: {tool_name} (ID: {tool_call_id})") |
|
|
logger.debug(f"[TOOL CALL] Tool arguments: {tool_args}") |
|
|
|
|
|
|
|
|
tool_start_times[tool_call_id] = time.time() |
|
|
|
|
|
if tool_name not in tool_usage: |
|
|
tool_usage.append(tool_name) |
|
|
|
|
|
status_msg = f"🔧 Calling tool: {tool_name}" |
|
|
if stream_callback: |
|
|
stream_callback('tool_call', status_msg) |
|
|
yield { |
|
|
'type': 'tool_call', |
|
|
'content': status_msg, |
|
|
'tool_name': tool_name, |
|
|
'tool_args': tool_args |
|
|
} |
|
|
|
|
|
|
|
|
elif isinstance(msg, ToolMessage): |
|
|
|
|
|
tool_call_id = getattr(msg, 'tool_call_id', None) |
|
|
if tool_call_id and tool_call_id in seen_tool_result_ids: |
|
|
continue |
|
|
if tool_call_id: |
|
|
seen_tool_result_ids.add(tool_call_id) |
|
|
|
|
|
tool_name = getattr(msg, 'name', 'unknown') |
|
|
tool_result = msg.content if hasattr(msg, 'content') else str(msg) |
|
|
|
|
|
|
|
|
execution_time = None |
|
|
if tool_call_id in tool_start_times: |
|
|
execution_time = time.time() - tool_start_times[tool_call_id] |
|
|
del tool_start_times[tool_call_id] |
|
|
|
|
|
|
|
|
if execution_time is not None: |
|
|
logger.info(f"[TOOL RESULT] Tool '{tool_name}' completed in {execution_time:.2f}s (ID: {tool_call_id})") |
|
|
else: |
|
|
logger.info(f"[TOOL RESULT] Tool '{tool_name}' completed (ID: {tool_call_id})") |
|
|
|
|
|
|
|
|
result_preview = str(tool_result)[:200] if tool_result else "No result" |
|
|
logger.debug(f"[TOOL RESULT] Result preview: {result_preview}...") |
|
|
|
|
|
status_msg = f"✅ Tool '{tool_name}' completed" |
|
|
if stream_callback: |
|
|
stream_callback('tool_result', status_msg) |
|
|
yield { |
|
|
'type': 'tool_result', |
|
|
'content': status_msg, |
|
|
'tool_name': tool_name, |
|
|
'tool_result': tool_result |
|
|
} |
|
|
|
|
|
|
|
|
elif isinstance(msg, AIMessage) and hasattr(msg, 'content') and msg.content: |
|
|
|
|
|
if not (hasattr(msg, 'tool_calls') and msg.tool_calls): |
|
|
content = msg.content |
|
|
|
|
|
if not accumulated_output or content != accumulated_output[-1]: |
|
|
accumulated_output.append(content) |
|
|
if stream_callback: |
|
|
stream_callback('llm_chunk', content) |
|
|
yield { |
|
|
'type': 'llm_chunk', |
|
|
'content': content |
|
|
} |
|
|
except Exception as e: |
|
|
|
|
|
|
|
|
error_msg = str(e) |
|
|
|
|
|
|
|
|
logger.error(f"[ERROR] Exception during agent analysis stream: {error_msg}", exc_info=True) |
|
|
|
|
|
|
|
|
is_tool_error = any(x in error_msg.lower() for x in ['gpu', 'tool', 'aborted', 'cuda', 'memory']) |
|
|
if is_tool_error: |
|
|
logger.warning(f"[ERROR] Tool execution error detected: {error_msg}") |
|
|
|
|
|
|
|
|
if tool_start_times: |
|
|
logger.warning(f"[ERROR] {len(tool_start_times)} tool(s) were still running when error occurred: {list(tool_start_times.keys())}") |
|
|
|
|
|
if stream_callback: |
|
|
stream_callback('status', f'\n⚠️ Error during analysis: {error_msg}\n') |
|
|
yield {'type': 'status', 'content': f'\n⚠️ Error during analysis: {error_msg}\n'} |
|
|
|
|
|
|
|
|
|
|
|
if accumulated_output: |
|
|
|
|
|
error_note = f"\n\n---\n\n⚠️ **Analysis interrupted**: {error_msg}\n\nThe above shows partial results before the error occurred." |
|
|
accumulated_output.append(error_note) |
|
|
if stream_callback: |
|
|
stream_callback('llm_chunk', error_note) |
|
|
yield {'type': 'llm_chunk', 'content': error_note} |
|
|
else: |
|
|
|
|
|
error_response = f"### Analysis Error\n\n⚠️ The analysis encountered an error: {error_msg}\n\n" |
|
|
if visual_output: |
|
|
error_response += f"### Visual Description (from initial analysis)\n\n{visual_output}\n\n" |
|
|
error_response += "### Note\n\nForensic tool analysis could not be completed due to the error above. The visual description above is based on the initial LLM analysis only." |
|
|
accumulated_output.append(error_response) |
|
|
if stream_callback: |
|
|
stream_callback('llm_chunk', error_response) |
|
|
yield {'type': 'llm_chunk', 'content': error_response} |
|
|
|
|
|
|
|
|
|
|
|
output = ''.join(accumulated_output) if accumulated_output else "" |
|
|
|
|
|
|
|
|
if "visual description" not in output.lower(): |
|
|
logger.warning("Visual description missing from output, regenerating...") |
|
|
if stream_callback: |
|
|
stream_callback('status', '\n⚠️ Visual description missing, regenerating...\n') |
|
|
|
|
|
yield {'type': 'status', 'content': '\n⚠️ Visual description missing, regenerating...\n'} |
|
|
|
|
|
retry_prompt = f"""The previous response omitted the required "### Visual Description" section. |
|
|
|
|
|
Rewrite the analysis with this exact structure: |
|
|
1) ### Visual Description — reuse or paraphrase the provided visual description |
|
|
2) ### Forensic Analysis — summarize only the tools you actually used (or state none) |
|
|
3) ### Conclusion — combine visual observations and any forensic evidence |
|
|
4) Confidence Level |
|
|
|
|
|
Provided visual description: |
|
|
{visual_output} |
|
|
|
|
|
Previous response: |
|
|
{output} |
|
|
|
|
|
Regenerate now.""" |
|
|
|
|
|
retry_messages = [ |
|
|
SystemMessage(content=self.system_prompt), |
|
|
HumanMessage(content=[ |
|
|
{"type": "text", "text": retry_prompt}, |
|
|
{ |
|
|
"type": "image_url", |
|
|
"image_url": {"url": f"data:{mime_type};base64,{base64_image}"} |
|
|
} |
|
|
]) |
|
|
] |
|
|
|
|
|
retry_output_parts = [] |
|
|
if hasattr(self.llm, 'stream'): |
|
|
for chunk in self.llm.stream(retry_messages): |
|
|
if hasattr(chunk, 'content') and chunk.content: |
|
|
retry_output_parts.append(chunk.content) |
|
|
if stream_callback: |
|
|
stream_callback('llm_chunk', chunk.content) |
|
|
yield {'type': 'llm_chunk', 'content': chunk.content} |
|
|
output = ''.join(retry_output_parts) |
|
|
else: |
|
|
retry_result = self.llm.invoke(retry_messages) |
|
|
output = retry_result.content if hasattr(retry_result, 'content') else str(retry_result) |
|
|
if stream_callback: |
|
|
stream_callback('llm_chunk', output) |
|
|
yield {'type': 'llm_chunk', 'content': output} |
|
|
|
|
|
final_result = { |
|
|
'conclusion': output, |
|
|
'reasoning': output, |
|
|
'tool_usage': tool_usage, |
|
|
'image_path': image_path |
|
|
} |
|
|
|
|
|
logger.info(f"Analysis complete. Tools used: {tool_usage}") |
|
|
yield {'type': 'final', 'final_result': final_result} |
|
|
|
|
|
|
|
|
|