|
|
""" |
|
|
Gradio UI for Forensic Image Analysis Agent |
|
|
Clean, full-page chat interface with streaming support |
|
|
""" |
|
|
|
|
|
import os |
|
|
import sys |
|
|
import json |
|
|
import logging |
|
|
from pathlib import Path |
|
|
from typing import Optional, Tuple |
|
|
import gradio as gr |
|
|
|
|
|
|
|
|
sys.path.insert(0, str(Path(__file__).parent)) |
|
|
|
|
|
from src.agents import ForensicAgent |
|
|
|
|
|
|
|
|
logging.basicConfig( |
|
|
level=logging.INFO, |
|
|
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', |
|
|
datefmt='%Y-%m-%d %H:%M:%S' |
|
|
) |
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
|
|
|
agent: Optional[ForensicAgent] = None |
|
|
agent_config: dict = {} |
|
|
|
|
|
|
|
|
def _ensure_weights(): |
|
|
"""Ensure TruFor and DRUNet weights are downloaded on startup.""" |
|
|
try: |
|
|
from src.utils.weight_downloader import ensure_trufor_weights |
|
|
workspace_root = Path(__file__).parent |
|
|
success, message = ensure_trufor_weights(workspace_root=workspace_root, auto_download=True) |
|
|
print(f"[Weights] {message}") |
|
|
except Exception as e: |
|
|
print(f"[Weights] TruFor: {e}") |
|
|
|
|
|
try: |
|
|
from src.utils.weight_downloader import ensure_drunet_weights |
|
|
workspace_root = Path(__file__).parent |
|
|
drunet_weights_path = workspace_root / "src" / "tools" / "forensic" / "drunet" / "weights" / "drunet_gray.pth" |
|
|
success, message = ensure_drunet_weights(weights_path=drunet_weights_path, auto_download=True) |
|
|
print(f"[Weights] {message}") |
|
|
except Exception as e: |
|
|
print(f"[Weights] DRUNet: {e}") |
|
|
|
|
|
|
|
|
def initialize_agent( |
|
|
model_name: str = "gpt-5.1", |
|
|
temperature: float = 0.2, |
|
|
reasoning_effort: Optional[str] = None, |
|
|
use_tools: bool = True |
|
|
) -> Tuple[str, Optional[ForensicAgent]]: |
|
|
"""Initialize the forensic agent.""" |
|
|
global agent, agent_config |
|
|
|
|
|
try: |
|
|
api_key = os.getenv("OPENAI_API_KEY") |
|
|
if not api_key: |
|
|
return "OPENAI_API_KEY not found. Please set it in Hugging Face Space Secrets.", None |
|
|
|
|
|
agent = ForensicAgent( |
|
|
llm_model=model_name, |
|
|
temperature=temperature, |
|
|
reasoning_effort=reasoning_effort, |
|
|
api_key=api_key, |
|
|
max_iterations=50 |
|
|
) |
|
|
|
|
|
agent_config = { |
|
|
'model_name': model_name, |
|
|
'temperature': temperature, |
|
|
'reasoning_effort': reasoning_effort, |
|
|
'use_tools': use_tools |
|
|
} |
|
|
|
|
|
return f"Agent ready: {model_name}", agent |
|
|
except Exception as e: |
|
|
return f"Error: {str(e)}", None |
|
|
|
|
|
|
|
|
def format_tool_result(tool_name: str, tool_result) -> str: |
|
|
"""Format tool result for display.""" |
|
|
if isinstance(tool_result, dict): |
|
|
|
|
|
formatted_lines = [] |
|
|
|
|
|
|
|
|
if 'manipulation_detected' in tool_result: |
|
|
status = "Yes" if tool_result['manipulation_detected'] else "No" |
|
|
formatted_lines.append(f"**Manipulation Detected:** {status}") |
|
|
|
|
|
if 'confidence' in tool_result: |
|
|
formatted_lines.append(f"**Confidence:** {tool_result['confidence']:.1%}") |
|
|
|
|
|
if 'ai_generated_probability' in tool_result: |
|
|
formatted_lines.append(f"**AI Generated Probability:** {tool_result['ai_generated_probability']:.1%}") |
|
|
|
|
|
if 'anomaly_score' in tool_result: |
|
|
formatted_lines.append(f"**Anomaly Score:** {tool_result['anomaly_score']:.3f}") |
|
|
|
|
|
if 'findings' in tool_result and isinstance(tool_result['findings'], list): |
|
|
if tool_result['findings']: |
|
|
formatted_lines.append("**Findings:**") |
|
|
for finding in tool_result['findings'][:5]: |
|
|
formatted_lines.append(f" - {finding}") |
|
|
|
|
|
if 'error' in tool_result: |
|
|
formatted_lines.append(f"**Error:** {tool_result['error']}") |
|
|
|
|
|
|
|
|
if formatted_lines: |
|
|
return "\n".join(formatted_lines) |
|
|
else: |
|
|
try: |
|
|
result_str = json.dumps(tool_result, indent=2) |
|
|
if len(result_str) > 1500: |
|
|
result_str = result_str[:1500] + "\n..." |
|
|
return f"```json\n{result_str}\n```" |
|
|
except: |
|
|
return str(tool_result)[:1500] |
|
|
else: |
|
|
result_str = str(tool_result) |
|
|
if len(result_str) > 1500: |
|
|
result_str = result_str[:1500] + "..." |
|
|
return result_str |
|
|
|
|
|
|
|
|
def respond_chat( |
|
|
message, |
|
|
history: list, |
|
|
use_tools: bool = True, |
|
|
model_name: str = "gpt-5.1", |
|
|
temperature: float = 0.2, |
|
|
reasoning_effort: Optional[str] = None, |
|
|
): |
|
|
""" |
|
|
Chat function with streaming support. |
|
|
|
|
|
Args: |
|
|
message: Dict with "text" and "files" keys (multimodal=True) |
|
|
history: List of previous messages |
|
|
use_tools: Whether to use forensic tools |
|
|
model_name: LLM model name |
|
|
temperature: Temperature setting |
|
|
reasoning_effort: Reasoning effort level |
|
|
""" |
|
|
global agent |
|
|
|
|
|
|
|
|
image_path = None |
|
|
query_text = "" |
|
|
|
|
|
if isinstance(message, dict): |
|
|
query_text = message.get("text", "") |
|
|
files = message.get("files", []) |
|
|
if files: |
|
|
image_path = files[0] |
|
|
elif isinstance(message, str): |
|
|
query_text = message |
|
|
|
|
|
if image_path is None: |
|
|
yield "Please upload an image to analyze. You can drag & drop or paste an image." |
|
|
return |
|
|
|
|
|
if not os.path.exists(image_path): |
|
|
yield f"Image file not found: {image_path}" |
|
|
return |
|
|
|
|
|
try: |
|
|
|
|
|
reasoning_effort_clean = None if reasoning_effort == "None" else reasoning_effort |
|
|
|
|
|
|
|
|
needs_reinit = ( |
|
|
agent is None or |
|
|
agent_config.get('model_name') != model_name or |
|
|
agent_config.get('temperature') != temperature or |
|
|
agent_config.get('reasoning_effort') != reasoning_effort_clean or |
|
|
agent_config.get('use_tools') != use_tools |
|
|
) |
|
|
|
|
|
if needs_reinit: |
|
|
status_msg, agent_obj = initialize_agent( |
|
|
model_name=model_name, |
|
|
temperature=temperature, |
|
|
reasoning_effort=reasoning_effort_clean, |
|
|
use_tools=use_tools |
|
|
) |
|
|
if agent_obj is None: |
|
|
yield f"Failed to initialize agent: {status_msg}" |
|
|
return |
|
|
agent = agent_obj |
|
|
|
|
|
|
|
|
output_parts = [] |
|
|
current_section = None |
|
|
tools_used = [] |
|
|
llm_content = [] |
|
|
|
|
|
|
|
|
logger.info(f"Starting analysis stream for image: {image_path}, query: {query_text[:50] if query_text else 'None'}...") |
|
|
for event in agent.analyze_stream( |
|
|
image_path=image_path, |
|
|
user_query=query_text if query_text else None, |
|
|
use_tools=use_tools |
|
|
): |
|
|
event_type = event.get('type') |
|
|
logger.debug(f"[UI] Event received: {event_type}") |
|
|
|
|
|
if event_type == 'status': |
|
|
|
|
|
pass |
|
|
|
|
|
elif event_type == 'tool_call': |
|
|
tool_name = event.get('tool_name', 'unknown') |
|
|
tool_args = event.get('tool_args', {}) |
|
|
logger.info(f"[UI] Tool call received: {tool_name}") |
|
|
logger.debug(f"[UI] Tool arguments: {tool_args}") |
|
|
if tool_name not in tools_used: |
|
|
tools_used.append(tool_name) |
|
|
|
|
|
output_parts.append(f"\n\n---\n### Tool: {tool_name}\n*Running...*") |
|
|
yield ''.join(output_parts) |
|
|
|
|
|
elif event_type == 'tool_result': |
|
|
tool_name = event.get('tool_name', 'unknown') |
|
|
tool_result = event.get('tool_result', '') |
|
|
logger.info(f"[UI] Tool result received: {tool_name}") |
|
|
logger.debug(f"[UI] Tool result preview: {str(tool_result)[:200]}...") |
|
|
|
|
|
|
|
|
formatted_result = format_tool_result(tool_name, tool_result) |
|
|
|
|
|
|
|
|
current_output = ''.join(output_parts) |
|
|
search_str = f"### Tool: {tool_name}\n*Running...*" |
|
|
if search_str in current_output: |
|
|
replacement = f"### Tool: {tool_name}\n{formatted_result}" |
|
|
updated_output = current_output.replace(search_str, replacement, 1) |
|
|
output_parts = [updated_output] |
|
|
else: |
|
|
|
|
|
output_parts.append(f"\n\n---\n### Tool: {tool_name}\n{formatted_result}") |
|
|
|
|
|
yield ''.join(output_parts) |
|
|
|
|
|
elif event_type == 'llm_chunk': |
|
|
chunk = event.get('content', '') |
|
|
if chunk: |
|
|
|
|
|
if not llm_content: |
|
|
output_parts.append("\n\n---\n### Analysis\n\n") |
|
|
llm_content.append(chunk) |
|
|
output_parts.append(chunk) |
|
|
yield ''.join(output_parts) |
|
|
|
|
|
elif event_type == 'final': |
|
|
|
|
|
yield ''.join(output_parts) |
|
|
|
|
|
|
|
|
final_output = ''.join(output_parts) |
|
|
if not final_output.strip(): |
|
|
final_output = "Analysis complete. No detailed output was generated." |
|
|
|
|
|
yield final_output |
|
|
|
|
|
except Exception as e: |
|
|
import traceback |
|
|
logger.error(f"Error during analysis: {str(e)}", exc_info=True) |
|
|
yield f"Error during analysis:\n\n```\n{str(e)}\n\n{traceback.format_exc()}\n```" |
|
|
|
|
|
|
|
|
def create_interface(): |
|
|
"""Create the Gradio interface.""" |
|
|
|
|
|
|
|
|
custom_css = """ |
|
|
/* Full height layout */ |
|
|
.gradio-container { |
|
|
max-width: 100% !important; |
|
|
padding: 0 !important; |
|
|
} |
|
|
|
|
|
/* Chat container fills available space */ |
|
|
.chat-container { |
|
|
height: calc(100vh - 120px) !important; |
|
|
} |
|
|
|
|
|
/* Chatbot area */ |
|
|
.chatbot { |
|
|
height: 100% !important; |
|
|
overflow-y: auto !important; |
|
|
} |
|
|
|
|
|
/* Message styling */ |
|
|
.message { |
|
|
max-width: 85% !important; |
|
|
} |
|
|
|
|
|
/* Sidebar styling */ |
|
|
.sidebar { |
|
|
background: #f8f9fa !important; |
|
|
border-right: 1px solid #e0e0e0 !important; |
|
|
} |
|
|
|
|
|
/* Tool output code blocks */ |
|
|
.message pre { |
|
|
max-height: 300px; |
|
|
overflow-y: auto; |
|
|
} |
|
|
|
|
|
/* Header */ |
|
|
.header { |
|
|
padding: 12px 20px; |
|
|
background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); |
|
|
color: white; |
|
|
text-align: center; |
|
|
} |
|
|
|
|
|
.header h1 { |
|
|
margin: 0; |
|
|
font-size: 1.4em; |
|
|
font-weight: 600; |
|
|
} |
|
|
|
|
|
.header p { |
|
|
margin: 4px 0 0 0; |
|
|
font-size: 0.9em; |
|
|
opacity: 0.9; |
|
|
} |
|
|
|
|
|
/* Hide default Gradio footer */ |
|
|
footer { |
|
|
display: none !important; |
|
|
} |
|
|
|
|
|
/* Multimodal textbox styling */ |
|
|
.multimodal-textbox { |
|
|
border-radius: 12px !important; |
|
|
} |
|
|
""" |
|
|
|
|
|
with gr.Blocks() as demo: |
|
|
|
|
|
|
|
|
gr.HTML(f"<style>{custom_css}</style>", elem_id="custom-css") |
|
|
|
|
|
|
|
|
gr.HTML(""" |
|
|
<div class="header"> |
|
|
<h1>Forensic Image Analysis Agent</h1> |
|
|
<p>AI-powered detection of manipulated, AI-generated, and deepfake images</p> |
|
|
</div> |
|
|
""") |
|
|
|
|
|
with gr.Row(equal_height=True): |
|
|
|
|
|
with gr.Column(scale=1, min_width=280): |
|
|
gr.Markdown("### Settings") |
|
|
|
|
|
use_tools = gr.Checkbox( |
|
|
label="Use Forensic Tools", |
|
|
value=True, |
|
|
info="Run JPEG, ELA, frequency analysis, etc." |
|
|
) |
|
|
|
|
|
model = gr.Dropdown( |
|
|
label="Model", |
|
|
choices=["gpt-5", "gpt-5.1", "gpt-5-mini", "gpt-5-nano"], |
|
|
value="gpt-5.1" |
|
|
) |
|
|
|
|
|
temperature = gr.Slider( |
|
|
label="Temperature", |
|
|
minimum=0.0, |
|
|
maximum=1.0, |
|
|
value=0.2, |
|
|
step=0.1 |
|
|
) |
|
|
|
|
|
reasoning = gr.Dropdown( |
|
|
label="Reasoning Effort", |
|
|
choices=["None", "low", "medium", "high"], |
|
|
value="None" |
|
|
) |
|
|
|
|
|
gr.Markdown("---") |
|
|
gr.Markdown(""" |
|
|
**How to use:** |
|
|
- Upload or paste an image |
|
|
- Optionally add a question |
|
|
- Results stream in real-time |
|
|
|
|
|
**Tips:** |
|
|
- Scroll up during analysis to read - it won't force scroll down |
|
|
- Tool results show key findings |
|
|
""") |
|
|
|
|
|
|
|
|
with gr.Column(scale=4): |
|
|
chatbot = gr.Chatbot( |
|
|
height="70vh", |
|
|
placeholder="Upload an image to begin forensic analysis...", |
|
|
layout="panel", |
|
|
autoscroll=True, |
|
|
) |
|
|
|
|
|
with gr.Row(): |
|
|
msg = gr.MultimodalTextbox( |
|
|
placeholder="Drop an image here or type a question about the image...", |
|
|
show_label=False, |
|
|
scale=4, |
|
|
file_count="single", |
|
|
file_types=["image"], |
|
|
) |
|
|
|
|
|
|
|
|
last_message = gr.State(value=None) |
|
|
|
|
|
def user_message(message, history): |
|
|
"""Add user message to history and store for bot.""" |
|
|
if not message: |
|
|
return history, gr.MultimodalTextbox(value=None), None |
|
|
|
|
|
|
|
|
stored_message = message.copy() if isinstance(message, dict) else message |
|
|
|
|
|
|
|
|
user_content = [] |
|
|
if isinstance(message, dict): |
|
|
if message.get("files"): |
|
|
for f in message["files"]: |
|
|
user_content.append({"type": "file", "path": f}) |
|
|
if message.get("text"): |
|
|
user_content.append({"type": "text", "text": message["text"]}) |
|
|
|
|
|
if not user_content: |
|
|
return history, gr.MultimodalTextbox(value=None), None |
|
|
|
|
|
|
|
|
if len(user_content) == 1 and user_content[0].get("type") == "text": |
|
|
history.append({"role": "user", "content": user_content[0]["text"]}) |
|
|
else: |
|
|
history.append({"role": "user", "content": user_content}) |
|
|
|
|
|
return history, gr.MultimodalTextbox(value=None), stored_message |
|
|
|
|
|
def bot_response(stored_message, history, use_tools, model, temperature, reasoning): |
|
|
"""Generate bot response with streaming.""" |
|
|
if not history or stored_message is None: |
|
|
return history |
|
|
|
|
|
|
|
|
history.append({"role": "assistant", "content": ""}) |
|
|
|
|
|
|
|
|
for chunk in respond_chat(stored_message, history[:-1], use_tools, model, temperature, reasoning): |
|
|
history[-1]["content"] = chunk |
|
|
yield history |
|
|
|
|
|
|
|
|
msg.submit( |
|
|
user_message, |
|
|
[msg, chatbot], |
|
|
[chatbot, msg, last_message], |
|
|
queue=False |
|
|
).then( |
|
|
bot_response, |
|
|
[last_message, chatbot, use_tools, model, temperature, reasoning], |
|
|
chatbot |
|
|
) |
|
|
|
|
|
return demo |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
_ensure_weights() |
|
|
|
|
|
status_msg, _ = initialize_agent() |
|
|
print(status_msg) |
|
|
|
|
|
demo = create_interface() |
|
|
|
|
|
app_theme = gr.themes.Soft( |
|
|
primary_hue="indigo", |
|
|
secondary_hue="purple", |
|
|
neutral_hue="slate", |
|
|
font=gr.themes.GoogleFont("Inter"), |
|
|
) |
|
|
demo.launch( |
|
|
server_name="0.0.0.0", |
|
|
server_port=7860, |
|
|
share=False, |
|
|
theme=app_theme, |
|
|
) |
|
|
|