video-analysis-system / agent_workflow.py
AmirAziz1221's picture
Update agent_workflow.py
fe74762 verified
"""
agent_workflow.py
Agentic AI workflow using Groq (LLaMA-3).
Falls back to rule-based mock if the key is missing or the call fails.
"""
import json
import os
def run_groq_agent(
detection_summary: dict,
video_meta: dict,
ai_summary: str,
) -> dict:
"""
Run a Groq-powered analysis and return a JSON report.
Falls back to mock if Groq is unavailable.
"""
try:
from groq import Groq
client = Groq(api_key=os.getenv("GROQ_API_KEY"))
payload = {
"detection_summary": detection_summary,
"video_meta": video_meta,
"ai_summary": ai_summary,
}
prompt = f"""
You are a professional video-analysis assistant.
Analyze the following structured data and return ONLY valid JSON.
Required JSON schema:
{{
"insights": ["...", "...", "..."],
"risk_flags": ["..."],
"recommended_actions": ["...", "...", "..."]
}}
Rules:
- Return ONLY JSON.
- "insights" should contain exactly 3 concise observations.
- "risk_flags" should be an empty list [] if there are no meaningful risks.
- "recommended_actions" should contain 2 or 3 useful next steps.
- Be factual and do not invent details beyond the provided data.
DATA:
{json.dumps(payload, indent=2)}
""".strip()
response = client.chat.completions.create(
model="llama3-8b-8192",
messages=[{"role": "user", "content": prompt}],
temperature=0.2,
max_tokens=400,
)
content = response.choices[0].message.content.strip()
clean = content.removeprefix("```json").removeprefix("```").removesuffix("```").strip()
result = json.loads(clean)
if not isinstance(result, dict):
raise ValueError("LLM output is not a dictionary.")
result.setdefault("insights", [])
result.setdefault("risk_flags", [])
result.setdefault("recommended_actions", [])
if not isinstance(result["insights"], list):
result["insights"] = []
if not isinstance(result["risk_flags"], list):
result["risk_flags"] = []
if not isinstance(result["recommended_actions"], list):
result["recommended_actions"] = []
if len(result["insights"]) < 3:
result["insights"] += [
"Detected objects were analyzed across extracted frames.",
"The summary is based on object frequency and video metadata.",
"Manual review of annotated frames can provide additional context.",
]
result["insights"] = result["insights"][:3]
if len(result["recommended_actions"]) < 2:
result["recommended_actions"] += [
"Review the annotated frames to validate detections.",
"Export the report for further analysis or archiving.",
]
result["recommended_actions"] = result["recommended_actions"][:3]
return result
except Exception as e:
return run_mock_agent(detection_summary, video_meta, ai_summary, error=str(e))
def run_mock_agent(
detection_summary: dict,
video_meta: dict,
ai_summary: str,
error: str = "",
) -> dict:
"""
Rule-based fallback report.
"""
total = sum(detection_summary.values())
categories = len(detection_summary)
top_class = max(detection_summary, key=detection_summary.get) if detection_summary else "none"
top_count = detection_summary.get(top_class, 0)
fps = max(video_meta.get("fps", 25), 1)
duration = round(video_meta.get("total_frames", 0) / fps, 1)
w = video_meta.get("width", "N/A")
h = video_meta.get("height", "N/A")
insights = [
f"Dominant object: '{top_class}' detected {top_count} times across analyzed frames.",
f"Total: {total} detections across {categories} categories in a {duration}s video.",
f"Video resolution {w}x{h} @ {fps:.0f} fps — suitable for detailed object analysis.",
]
risk_flags = []
if detection_summary.get("person", 0) > 20:
risk_flags.append("High person density — possible crowd or public-safety scenario.")
if detection_summary.get("knife", 0) > 0 or detection_summary.get("gun", 0) > 0:
risk_flags.append("Potentially dangerous object detected — manual review recommended.")
if total == 0:
risk_flags.append("Zero detections — video may be blank, very dark, or corrupted.")
if categories == 1 and total > 15:
risk_flags.append(
f"Unusually high concentration of a single class ('{top_class}') — verify context."
)
if error:
risk_flags.append(f"Groq agent unavailable, fallback mock analysis used: {error[:120]}")
recommended_actions = [
f"Review annotated frames focusing on '{top_class}' detections for context validation.",
"Lower the confidence threshold if important objects appear to be missed.",
"Export the JSON report for downstream analytics or alerting integration.",
]
return {
"insights": insights,
"risk_flags": risk_flags,
"recommended_actions": recommended_actions,
}
def run_agent(
detection_summary: dict,
video_meta: dict,
ai_summary: str,
mode: str = "mock",
) -> dict:
if mode == "groq":
return run_groq_agent(detection_summary, video_meta, ai_summary)
return run_mock_agent(detection_summary, video_meta, ai_summary)