File size: 15,687 Bytes
a66a74f |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 |
# --- Imports ---
import os
import io
from fastapi import FastAPI, HTTPException, File, UploadFile, Form, Response
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
# --- Dotenv Import ---
from dotenv import load_dotenv
# --- Other Imports ---
from openai import AsyncOpenAI, OpenAIError
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError as GoogleHttpError
from langdetect import detect, LangDetectException
from deep_translator import GoogleTranslator, exceptions as TranslatorExceptions
# --- Load Environment Variables ---
load_dotenv()
print("Attempted to load environment variables from .env file.")
# --- Retrieve Environment Variables ---
# Get keys from environment (which load_dotenv populated if .env exists)
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
GOOGLE_API_KEY = os.getenv("GOOGLE_API_KEY")
APP_NAME = os.getenv("APP_NAME", "Multilingual Scam Detector") # Example with default
# --- API Client Initialization & Checks ---
client = None
if OPENAI_API_KEY:
try:
client = AsyncOpenAI(api_key=OPENAI_API_KEY)
print("OpenAI client initialized.")
except OpenAIError as e:
print(f"Failed to initialize OpenAI client: {e}")
client = None # Ensure client is None if init fails
else:
print("\n*** WARNING: OPENAI_API_KEY not found in environment/.env. OpenAI features disabled. ***\n")
if not GOOGLE_API_KEY:
print("\n*** WARNING: GOOGLE_API_KEY not found in environment/.env. Google Safe Browsing checks disabled. ***\n")
# --- FastAPI App Initialization ---
app = FastAPI(
title=APP_NAME,
description="Analyzes text, URLs, and audio for potential scams using AI and external APIs.",
version="1.1.0" # Incremented version for change
)
# --- CORS Middleware ---
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# --- Pydantic Models (Remain the same) ---
class AnalysisResponse(BaseModel):
status: str
reason: str
detected_language: str | None = None
is_translated: bool = False
class TTSRequest(BaseModel):
text: str
language: str | None = "en"
# --- Helper Functions (detect_and_translate, translate_reason_back - Remain the same) ---
async def detect_and_translate(text: str) -> tuple[str, str | None, bool]:
detected_lang = None
is_translated = False
try:
detected_lang = detect(text)
if detected_lang != 'en':
print(f"Detected language: {detected_lang}. Translating to English...")
translator = GoogleTranslator(source=detected_lang, target='en')
translated_text = translator.translate(text=text)
if translated_text:
print("Translation successful.")
return translated_text, detected_lang, True
else:
print("Translation returned empty result. Using original text.")
return text, detected_lang, False
else:
print("Detected language: English. No translation needed.")
return text, 'en', False
except LangDetectException:
print("Language detection failed. Assuming English.")
return text, None, False
except TranslatorExceptions.TranslationNotFound:
print(f"Translation engine could not find translation for language '{detected_lang}'. Using original text.")
return text, detected_lang, False
except Exception as e:
print(f"Translation error ({type(e).__name__}): {e}. Using original text.")
return text, detected_lang, False
async def translate_reason_back(reason: str, target_lang: str | None) -> str:
if target_lang and target_lang != 'en':
try:
print(f"Translating reason back to: {target_lang}")
translator = GoogleTranslator(source='en', target=target_lang)
translated_reason = translator.translate(text=reason)
return translated_reason or reason
except Exception as e:
print(f"Failed to translate reason back to {target_lang}: {e}")
return reason
return reason
# --- Core API Call Functions (Updated to use os.getenv results) ---
async def check_url_safety(url: str):
"""Checks URL safety using Google Safe Browsing API V4."""
print(f"Checking URL safety for: {url}")
# Use GOOGLE_API_KEY retrieved earlier
if not GOOGLE_API_KEY:
print("Warning: GOOGLE_API_KEY not configured. Mock response for URL check.")
if "bad-link" in url or "malware.testing.google" in url: return {"status": "Scam", "reason": "URL flagged (Mock - API Key Missing)"}
return {"status": "Safe", "reason": "URL appears safe (Mock - API Key Missing)"}
try:
# Pass the key directly
service = build("safebrowsing", "v4", developerKey=GOOGLE_API_KEY, static_discovery=False)
threat_info = {
'threatTypes': ["MALWARE", "SOCIAL_ENGINEERING", "UNWANTED_SOFTWARE", "POTENTIALLY_HARMFUL_APPLICATION", "THREAT_TYPE_UNSPECIFIED"],
'platformTypes': ["ANY_PLATFORM"],
'threatEntryTypes': ["URL"],
'threatEntries': [{'url': url}]
}
body = {'client': {'clientId': APP_NAME.replace(" ", "-").lower(), 'clientVersion': "1.1.0"}, 'threatInfo': threat_info}
request = service.threatMatches().find(body=body)
response = request.execute()
matches = response.get('matches')
if matches:
threat_types = ", ".join(sorted(list(set([match['threatType'] for match in matches]))))
print(f"URL Found in Safe Browsing: {threat_types}")
return {"status": "Scam", "reason": f"URL flagged by Google Safe Browsing for: {threat_types}"}
else:
print("URL not found in Safe Browsing database.")
return {"status": "Safe", "reason": "URL not flagged by Google Safe Browsing."}
except GoogleHttpError as e:
print(f"Google Safe Browsing API HTTP Error: {e}")
status = e.resp.status
if status == 400: return {"status": "Error", "reason": f"Safe Browsing API request failed (Bad Request): {e}"}
if status == 403: return {"status": "Error", "reason": f"Safe Browsing API request failed (Permission Denied/Invalid Key?): {e}"}
return {"status": "Error", "reason": f"Could not verify URL via Safe Browsing (HTTP Error {status}): {e}"}
except Exception as e:
print(f"Generic Google Safe Browsing API Error: {e}")
return {"status": "Error", "reason": f"Could not verify URL via Safe Browsing (General Error): {e}"}
async def transcribe_audio(audio_file: UploadFile):
"""Transcribes audio using OpenAI Whisper API."""
# Use the globally initialized client
if not client:
raise HTTPException(status_code=503, detail="Audio transcription unavailable: OpenAI client not initialized (check API key).")
print(f"Transcribing audio file: {audio_file.filename} (size: {audio_file.size})")
try:
audio_bytes = await audio_file.read()
if not audio_bytes: raise ValueError("Received empty audio file.")
audio_file_like = io.BytesIO(audio_bytes)
audio_file_like.name = audio_file.filename or "audio.mp3"
transcription = await client.audio.transcriptions.create(
model="whisper-1", file=audio_file_like
)
print(f"Transcription successful: {transcription.text[:100]}...")
return transcription.text
except OpenAIError as e:
print(f"OpenAI API Error during transcription: {e}")
raise HTTPException(status_code=502, detail=f"Audio transcription failed (API Error): {e.status_code} {e.body}")
except Exception as e:
print(f"Error during transcription: {e}")
raise HTTPException(status_code=500, detail=f"Audio transcription failed (Server Error): {e}")
finally:
await audio_file.close()
async def check_text_scam(text: str):
"""Analyzes text for scams using OpenAI GPT and a specific prompt."""
# Use the globally initialized client
if not client:
print("Warning: OpenAI client not initialized. Using mock analysis for text check.")
if "won" in text.lower() and "click" in text.lower(): return {"status": "Scam", "reason": "Potential prize scam (Mock - API Key Missing)"}
if "urgent" in text.lower() and "verify" in text.lower(): return {"status": "Suspicious", "reason": "Urgency/Phishing tactic (Mock - API Key Missing)"}
return {"status": "Safe", "reason": "Text appears safe (Mock - API Key Missing)"}
print(f"Analyzing text with LLM: {text[:60]}...")
# System prompt remains the same
system_prompt = """
You are an AI assistant specialized in detecting scams, phishing attempts, and fraudulent content within text messages, including those transcribed from audio. Your goal is to classify the input text into one of three categories: "Safe", "Suspicious", or "Scam". Provide a concise reason for your classification, maximum 1-2 sentences.
Consider these factors: Urgency, Generic Greetings, Requests for Personal Information, Suspicious Links, Unsolicited Offers/Prizes, Grammatical Errors/Typos, Unusual Payment Methods, Threats or Blackmail, Impersonation, Investment Scams, Job Scams, Romance Scams.
Output Format: Provide the classification FIRST, followed by a colon, then a BRIEF reason.
Example 1: Scam: Contains a suspicious link and requests urgent login verification, typical of phishing.
Example 2: Suspicious: Unsolicited job offer with vague details asking for personal info upfront.
Example 3: Safe: Appears to be a standard appointment reminder or casual conversation.
"""
try:
response = await client.chat.completions.create(
model="gpt-4-turbo", # Or gpt-3.5-turbo
messages=[ {"role": "system", "content": system_prompt}, {"role": "user", "content": text} ],
max_tokens=100, temperature=0.2
)
analysis_result = response.choices[0].message.content.strip()
print(f"LLM Analysis Result: {analysis_result}")
# Parsing logic remains the same
status = "Suspicious"; reason = analysis_result
if ":" in analysis_result:
parts = analysis_result.split(":", 1)
potential_status = parts[0].strip().capitalize()
if potential_status in ["Safe", "Suspicious", "Scam"]: status = potential_status; reason = parts[1].strip()
else: # Fallback inference
analysis_lower = analysis_result.lower()
if any(w in analysis_lower for w in ["scam", "phishing", "fraud", "malicious"]): status = "Scam"
elif any(w in analysis_lower for w in ["suspicious", "warning", "risk", "caution", "unsolicited"]): status = "Suspicious"
elif any(w in analysis_lower for w in ["safe", "legitimate", "benign"]): status = "Safe"
if not reason: reason = "Analysis complete."
return {"status": status, "reason": reason}
except OpenAIError as e:
print(f"OpenAI API Error during text analysis: {e}")
return {"status": "Error", "reason": f"LLM analysis failed (API Error): {e.status_code} {e.body}"}
except Exception as e:
print(f"Generic error during text analysis: {e}")
return {"status": "Error", "reason": f"LLM analysis failed (Server Error): {e}"}
# --- API Endpoints (/analyze, /synthesize - Remain the same structurally) ---
@app.post("/analyze", response_model=AnalysisResponse, tags=["Analysis"])
async def analyze_input(
input_type: str = Form(..., description="Type of input: 'text', 'url', or 'audio'"),
text: str | None = Form(None, description="Text message or URL (required if input_type is 'text' or 'url')"),
file: UploadFile | None = File(None, description="Audio file (required if input_type is 'audio')")
):
analysis_result = {}
detected_language = None
is_translated = False
reason_target_language = 'en'
try:
if input_type == 'url':
if not text: raise HTTPException(status_code=400, detail="URL required for type 'url'")
analysis_result = await check_url_safety(text)
elif input_type == 'text':
if not text: raise HTTPException(status_code=400, detail="Text required for type 'text'")
text_to_analyze, detected_language, is_translated = await detect_and_translate(text)
reason_target_language = detected_language or 'en'
analysis_result = await check_text_scam(text_to_analyze)
elif input_type == 'audio':
if not file: raise HTTPException(status_code=400, detail="File required for type 'audio'")
transcribed_text = await transcribe_audio(file)
if not transcribed_text: raise HTTPException(status_code=500, detail="Transcription empty.")
text_to_analyze, detected_language, is_translated = await detect_and_translate(transcribed_text)
reason_target_language = detected_language or 'en'
analysis_result = await check_text_scam(text_to_analyze)
else:
raise HTTPException(status_code=400, detail=f"Invalid input_type '{input_type}'.")
if analysis_result.get("status") == "Error":
return AnalysisResponse(status="Error", reason=analysis_result.get('reason', 'Analysis failed.'), detected_language=detected_language, is_translated=is_translated)
final_reason = await translate_reason_back(analysis_result.get('reason', 'Analysis reason missing.'), reason_target_language)
return AnalysisResponse(status=analysis_result.get('status', 'Error'), reason=final_reason, detected_language=detected_language, is_translated=is_translated)
except HTTPException as he:
raise he
except Exception as e:
print(f"Unexpected error in /analyze endpoint: {e}")
return AnalysisResponse(status="Error", reason=f"Unexpected server error: {type(e).__name__}", detected_language=None, is_translated=False)
@app.post("/synthesize", tags=["TTS"])
async def synthesize_speech(request: TTSRequest):
"""Generates speech from text using OpenAI TTS-1 model."""
if not client:
raise HTTPException(status_code=503, detail="TTS unavailable: OpenAI client not initialized.")
print(f"Synthesizing speech for text: {request.text[:50]}...")
try:
voice_model = "alloy"
response = await client.audio.speech.create(model="tts-1", voice=voice_model, input=request.text, response_format="mp3")
return Response(content=response.content, media_type="audio/mpeg")
except OpenAIError as e:
print(f"OpenAI API Error during TTS: {e}")
raise HTTPException(status_code=502, detail=f"TTS generation failed (API Error): {e.status_code} {e.body}")
except Exception as e:
print(f"TTS Error: {e}")
raise HTTPException(status_code=500, detail=f"Text-to-Speech generation failed (Server Error): {e}")
@app.get("/", tags=["Health"])
async def read_root():
# Check status of API keys loaded
openai_status = "OK" if OPENAI_API_KEY and client else "Not Configured"
google_status = "OK" if GOOGLE_API_KEY else "Not Configured"
return {"message": f"{APP_NAME} API is running!", "openai_key_status": openai_status, "google_key_status": google_status}
|