Spaces:
Runtime error
Runtime error
File size: 13,863 Bytes
b949a69 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 | import os
import re
import json
from typing import Dict, List, Optional, Union, Tuple
import google.generativeai as genai
import tempfile
from config import config
class InputProcessor:
"""
Intelligent input processor that converts chatbot input into structured verification requests
"""
def __init__(self):
# Configure Gemini
genai.configure(api_key=config.GEMINI_API_KEY)
self.model = genai.GenerativeModel(
config.GEMINI_MODEL,
generation_config=genai.types.GenerationConfig(
temperature=config.GEMINI_TEMPERATURE,
top_p=config.GEMINI_TOP_P,
max_output_tokens=config.GEMINI_MAX_TOKENS
)
)
self.system_prompt = """You are an intelligent input processor for a visual verification service.
Your task is to analyze user input and extract:
1. Image/video/audio content (files, URLs, or descriptions)
2. Claim context (what the user is claiming)
3. Claim date (when the claim was made)
4. Type of verification needed (image, video, audio, or text)
Return a JSON response with this structure:
{
"verification_type": "image" or "video" or "audio" or "text",
"content": {
"files": ["list of file paths if files provided"],
"urls": ["list of image/video/audio URLs"],
"descriptions": ["list of text descriptions"],
"text": "the text claim to verify (if verification_type is text)"
},
"claim_context": "extracted or inferred claim context",
"claim_date": "extracted or inferred date"
}
Rules:
- If multiple images/videos/audio files are mentioned, separate them clearly
- Extract URLs from text using regex patterns
- Infer context from surrounding text if not explicitly stated
- If no date is mentioned leave it blank
- Handle mixed content types appropriately"""
async def process_input(
self,
text_input: Optional[str] = None,
files: Optional[List] = None
) -> Dict:
"""
Process chatbot input and return structured verification request
"""
try:
print(f"π DEBUG: InputProcessor.process_input called")
print(f"π DEBUG: text_input = {text_input}")
print(f"π DEBUG: files = {files}")
print(f"π DEBUG: files type = {type(files)}")
# Prepare input for LLM analysis
print(f"π DEBUG: Preparing input text for LLM analysis")
input_text = self._prepare_input_text(text_input, files)
print(f"π DEBUG: Prepared input_text = {input_text}")
# Get LLM analysis
print(f"π DEBUG: Calling LLM analysis")
llm_response = await self._analyze_with_llm(input_text)
print(f"π DEBUG: LLM response = {llm_response}")
# Parse and validate LLM response
print(f"π DEBUG: Parsing LLM response")
parsed_response = self._parse_llm_response(llm_response)
print(f"π DEBUG: Parsed response = {parsed_response}")
# Post-process and enhance the response
print(f"π DEBUG: Post-processing response")
final_response = await self._post_process_response(parsed_response, files)
# PATCH: If verification_type is 'video' but all files have audio extensions, reassign to 'audio'
audio_exts = ['.mp3', '.wav', '.ogg', '.flac', '.m4a']
content_files = final_response.get('content', {}).get('files', [])
if (
final_response.get('verification_type') == 'video' and
content_files and
all(any(f.lower().endswith(e) for e in audio_exts) for f in content_files)
):
print(f"π PATCH: Rewriting 'verification_type' from 'video' to 'audio' (all files are audio)")
final_response['verification_type'] = 'audio'
print(f"π DEBUG: Final response = {final_response}")
return final_response
except Exception as e:
print(f"β DEBUG: Exception in InputProcessor.process_input: {e}")
print(f"β DEBUG: Exception type: {type(e).__name__}")
import traceback
print(f"β DEBUG: Traceback: {traceback.format_exc()}")
return {
"error": f"Failed to process input: {str(e)}",
"verification_type": "unknown",
"content": {"files": [], "urls": [], "descriptions": []},
"claim_context": "Unknown context",
"claim_date": "Unknown date",
}
def _prepare_input_text(self, text_input: Optional[str], files: Optional[List]) -> str:
"""Prepare input text for LLM analysis"""
print(f"π DEBUG: _prepare_input_text called with text_input={text_input}, files={files}")
input_parts = []
if text_input:
input_parts.append(f"Text input: {text_input}")
print(f"π DEBUG: Added text input: {text_input}")
if files:
file_info = []
for i, file in enumerate(files):
file_info.append(f"File {i+1}: {file.filename} ({file.content_type})")
print(f"π DEBUG: Added file {i+1}: {file.filename} ({file.content_type})")
input_parts.append(f"Files provided: {'; '.join(file_info)}")
if not input_parts:
input_parts.append("No text or files provided")
print(f"π DEBUG: No input parts, using default message")
result = "\n".join(input_parts)
print(f"π DEBUG: Final prepared input text: {result}")
return result
async def _analyze_with_llm(self, input_text: str) -> str:
"""Use Gemini to analyze the input"""
try:
print(f"π DEBUG: _analyze_with_llm called with input_text: {input_text}")
prompt = f"{self.system_prompt}\n\nUser input: {input_text}"
print(f"π DEBUG: Generated prompt: {prompt}")
response = self.model.generate_content(prompt)
print(f"π DEBUG: LLM response text: {response.text}")
return response.text
except Exception as e:
print(f"β DEBUG: LLM analysis failed: {e}")
print(f"π DEBUG: Falling back to rule-based parsing")
# Fallback to rule-based parsing if LLM fails
return self._fallback_parsing(input_text)
def _fallback_parsing(self, input_text: str) -> str:
"""Fallback parsing when LLM is unavailable"""
print(f"π DEBUG: _fallback_parsing called with input_text: {input_text}")
# Extract URLs using regex
url_pattern = r'https?://[^\s<>"]+|www\.[^\s<>"]+'
urls = re.findall(url_pattern, input_text)
print(f"π DEBUG: Extracted URLs: {urls}")
# Simple content type detection
verification_type = "text" # default for text-only queries
# Check for video platform URLs first
video_platforms = [
'instagram.com/reels/', 'instagram.com/p/', 'instagram.com/tv/',
'youtube.com/watch', 'youtu.be/', 'youtube.com/shorts/',
'tiktok.com/', 'vm.tiktok.com/',
'twitter.com/', 'x.com/', 't.co/',
'facebook.com/', 'fb.watch/',
'vimeo.com/', 'twitch.tv/', 'dailymotion.com/',
'imgur.com/', 'soundcloud.com/', 'mixcloud.com/',
'lbry.tv/', 'odysee.com/', 't.me/'
]
# Check for image platform URLs
image_platforms = [
'instagram.com/p/', 'imgur.com/', 'flickr.com/',
'pinterest.com/', 'unsplash.com/', 'pexels.com/'
]
# Check for direct file extensions
if any(ext in input_text.lower() for ext in ['.mp4', '.avi', '.mov', '.mkv', '.webm', 'video']):
verification_type = "video"
elif any(ext in input_text.lower() for ext in ['.jpg', '.jpeg', '.png', '.gif', '.webp', 'image', 'photo', 'picture']):
verification_type = "image"
elif any(ext in input_text.lower() for ext in ['.mp3', '.wav', '.ogg', '.flac', '.m4a', 'audio']):
verification_type = "audio"
# Check for video platform URLs
elif any(platform in input_text.lower() for platform in video_platforms):
verification_type = "video"
# Check for image platform URLs
elif any(platform in input_text.lower() for platform in image_platforms):
verification_type = "image"
print(f"π DEBUG: Detected verification_type: {verification_type}")
# Extract date patterns
date_pattern = r'\d{1,2}[/-]\d{1,2}[/-]\d{2,4}|\d{4}[/-]\d{1,2}[/-]\d{1,2}'
dates = re.findall(date_pattern, input_text)
claim_date = dates[0] if dates else "Unknown date"
print(f"π DEBUG: Extracted dates: {dates}, using: {claim_date}")
# Clean up the input text for better processing
clean_text = input_text.replace("Text input: ", "").strip()
result = {
"verification_type": verification_type,
"content": {
"files": [],
"urls": urls,
"descriptions": [clean_text],
"text": clean_text if verification_type == "text" else None
},
"claim_context": clean_text,
"claim_date": claim_date,
}
print(f"π DEBUG: Fallback parsing result: {result}")
return json.dumps(result)
def _parse_llm_response(self, llm_response: str) -> Dict:
"""Parse and validate LLM response"""
try:
print(f"π DEBUG: _parse_llm_response called with llm_response: {llm_response}")
# Extract JSON from response
json_match = re.search(r'\{.*\}', llm_response, re.DOTALL)
if json_match:
print(f"π DEBUG: Found JSON match: {json_match.group()}")
parsed = json.loads(json_match.group())
print(f"π DEBUG: Parsed JSON: {parsed}")
else:
print(f"β DEBUG: No JSON found in response")
raise ValueError("No JSON found in response")
# Validate required fields
required_fields = ["verification_type", "content", "claim_context", "claim_date"]
for field in required_fields:
if field not in parsed:
print(f"β DEBUG: Missing required field: {field}")
raise ValueError(f"Missing required field: {field}")
print(f"π DEBUG: Successfully parsed and validated response")
return parsed
except Exception as e:
print(f"β DEBUG: Failed to parse LLM response: {e}")
print(f"π DEBUG: Returning safe defaults")
# Return safe defaults if parsing fails
return {
"verification_type": "image",
"content": {"files": [], "urls": [], "descriptions": []},
"claim_context": "Unknown context",
"claim_date": "Unknown date",
}
async def _post_process_response(self, parsed_response: Dict, files: Optional[List]) -> Dict:
"""Post-process the parsed response and add file information"""
print(f"π DEBUG: _post_process_response called with parsed_response: {parsed_response}, files: {files}")
# Add actual file information if files were provided
if files:
print(f"π DEBUG: Processing {len(files)} files")
file_paths = []
for i, file in enumerate(files):
print(f"π DEBUG: Saving file {i}: {file.filename}")
# Save file temporarily and get path
temp_path = await self._save_temp_file(file)
if temp_path:
file_paths.append(temp_path)
print(f"π DEBUG: Saved file {i} to: {temp_path}")
else:
print(f"β DEBUG: Failed to save file {i}")
parsed_response["content"]["files"] = file_paths
print(f"π DEBUG: Updated files list: {file_paths}")
else:
print(f"π DEBUG: No files to process")
print(f"π DEBUG: Final post-processed response: {parsed_response}")
return parsed_response
async def _save_temp_file(self, file) -> Optional[str]:
"""Save uploaded file temporarily and return path"""
try:
print(f"π DEBUG: _save_temp_file called for file: {file.filename}")
# Create temp file
import os
suffix = os.path.splitext(file.filename)[1] if file.filename else ""
print(f"π DEBUG: Using suffix: {suffix}")
with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as temp_file:
content = await file.read()
print(f"π DEBUG: Read {len(content)} bytes from file")
temp_file.write(content)
temp_path = temp_file.name
print(f"π DEBUG: Saved temp file to: {temp_path}")
return temp_path
except Exception as e:
print(f"β DEBUG: Failed to save temp file: {e}")
return None
def cleanup_temp_files(self, file_paths: List[str]):
"""Clean up temporary files"""
for path in file_paths:
try:
if os.path.exists(path):
os.unlink(path)
except Exception as e:
print(f"Failed to cleanup temp file {path}: {e}")
|