|
|
|
|
|
import os
|
|
|
import io
|
|
|
from fastapi import FastAPI, HTTPException, File, UploadFile, Form, Response
|
|
|
from fastapi.middleware.cors import CORSMiddleware
|
|
|
from pydantic import BaseModel
|
|
|
|
|
|
from dotenv import load_dotenv
|
|
|
|
|
|
from openai import AsyncOpenAI, OpenAIError
|
|
|
from googleapiclient.discovery import build
|
|
|
from googleapiclient.errors import HttpError as GoogleHttpError
|
|
|
from langdetect import detect, LangDetectException
|
|
|
from deep_translator import GoogleTranslator, exceptions as TranslatorExceptions
|
|
|
|
|
|
|
|
|
load_dotenv()
|
|
|
print("Attempted to load environment variables from .env file.")
|
|
|
|
|
|
|
|
|
|
|
|
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
|
|
|
GOOGLE_API_KEY = os.getenv("GOOGLE_API_KEY")
|
|
|
APP_NAME = os.getenv("APP_NAME", "Multilingual Scam Detector")
|
|
|
|
|
|
|
|
|
client = None
|
|
|
if OPENAI_API_KEY:
|
|
|
try:
|
|
|
client = AsyncOpenAI(api_key=OPENAI_API_KEY)
|
|
|
print("OpenAI client initialized.")
|
|
|
except OpenAIError as e:
|
|
|
print(f"Failed to initialize OpenAI client: {e}")
|
|
|
client = None
|
|
|
else:
|
|
|
print("\n*** WARNING: OPENAI_API_KEY not found in environment/.env. OpenAI features disabled. ***\n")
|
|
|
|
|
|
if not GOOGLE_API_KEY:
|
|
|
print("\n*** WARNING: GOOGLE_API_KEY not found in environment/.env. Google Safe Browsing checks disabled. ***\n")
|
|
|
|
|
|
|
|
|
app = FastAPI(
|
|
|
title=APP_NAME,
|
|
|
description="Analyzes text, URLs, and audio for potential scams using AI and external APIs.",
|
|
|
version="1.1.0"
|
|
|
)
|
|
|
|
|
|
|
|
|
app.add_middleware(
|
|
|
CORSMiddleware,
|
|
|
allow_origins=["*"],
|
|
|
allow_credentials=True,
|
|
|
allow_methods=["*"],
|
|
|
allow_headers=["*"],
|
|
|
)
|
|
|
|
|
|
|
|
|
class AnalysisResponse(BaseModel):
|
|
|
status: str
|
|
|
reason: str
|
|
|
detected_language: str | None = None
|
|
|
is_translated: bool = False
|
|
|
|
|
|
class TTSRequest(BaseModel):
|
|
|
text: str
|
|
|
language: str | None = "en"
|
|
|
|
|
|
|
|
|
async def detect_and_translate(text: str) -> tuple[str, str | None, bool]:
|
|
|
detected_lang = None
|
|
|
is_translated = False
|
|
|
try:
|
|
|
detected_lang = detect(text)
|
|
|
if detected_lang != 'en':
|
|
|
print(f"Detected language: {detected_lang}. Translating to English...")
|
|
|
translator = GoogleTranslator(source=detected_lang, target='en')
|
|
|
translated_text = translator.translate(text=text)
|
|
|
if translated_text:
|
|
|
print("Translation successful.")
|
|
|
return translated_text, detected_lang, True
|
|
|
else:
|
|
|
print("Translation returned empty result. Using original text.")
|
|
|
return text, detected_lang, False
|
|
|
else:
|
|
|
print("Detected language: English. No translation needed.")
|
|
|
return text, 'en', False
|
|
|
except LangDetectException:
|
|
|
print("Language detection failed. Assuming English.")
|
|
|
return text, None, False
|
|
|
except TranslatorExceptions.TranslationNotFound:
|
|
|
print(f"Translation engine could not find translation for language '{detected_lang}'. Using original text.")
|
|
|
return text, detected_lang, False
|
|
|
except Exception as e:
|
|
|
print(f"Translation error ({type(e).__name__}): {e}. Using original text.")
|
|
|
return text, detected_lang, False
|
|
|
|
|
|
async def translate_reason_back(reason: str, target_lang: str | None) -> str:
|
|
|
if target_lang and target_lang != 'en':
|
|
|
try:
|
|
|
print(f"Translating reason back to: {target_lang}")
|
|
|
translator = GoogleTranslator(source='en', target=target_lang)
|
|
|
translated_reason = translator.translate(text=reason)
|
|
|
return translated_reason or reason
|
|
|
except Exception as e:
|
|
|
print(f"Failed to translate reason back to {target_lang}: {e}")
|
|
|
return reason
|
|
|
return reason
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async def check_url_safety(url: str):
|
|
|
"""Checks URL safety using Google Safe Browsing API V4."""
|
|
|
print(f"Checking URL safety for: {url}")
|
|
|
|
|
|
if not GOOGLE_API_KEY:
|
|
|
print("Warning: GOOGLE_API_KEY not configured. Mock response for URL check.")
|
|
|
if "bad-link" in url or "malware.testing.google" in url: return {"status": "Scam", "reason": "URL flagged (Mock - API Key Missing)"}
|
|
|
return {"status": "Safe", "reason": "URL appears safe (Mock - API Key Missing)"}
|
|
|
|
|
|
try:
|
|
|
|
|
|
service = build("safebrowsing", "v4", developerKey=GOOGLE_API_KEY, static_discovery=False)
|
|
|
threat_info = {
|
|
|
'threatTypes': ["MALWARE", "SOCIAL_ENGINEERING", "UNWANTED_SOFTWARE", "POTENTIALLY_HARMFUL_APPLICATION", "THREAT_TYPE_UNSPECIFIED"],
|
|
|
'platformTypes': ["ANY_PLATFORM"],
|
|
|
'threatEntryTypes': ["URL"],
|
|
|
'threatEntries': [{'url': url}]
|
|
|
}
|
|
|
body = {'client': {'clientId': APP_NAME.replace(" ", "-").lower(), 'clientVersion': "1.1.0"}, 'threatInfo': threat_info}
|
|
|
request = service.threatMatches().find(body=body)
|
|
|
response = request.execute()
|
|
|
|
|
|
matches = response.get('matches')
|
|
|
if matches:
|
|
|
threat_types = ", ".join(sorted(list(set([match['threatType'] for match in matches]))))
|
|
|
print(f"URL Found in Safe Browsing: {threat_types}")
|
|
|
return {"status": "Scam", "reason": f"URL flagged by Google Safe Browsing for: {threat_types}"}
|
|
|
else:
|
|
|
print("URL not found in Safe Browsing database.")
|
|
|
return {"status": "Safe", "reason": "URL not flagged by Google Safe Browsing."}
|
|
|
except GoogleHttpError as e:
|
|
|
print(f"Google Safe Browsing API HTTP Error: {e}")
|
|
|
status = e.resp.status
|
|
|
if status == 400: return {"status": "Error", "reason": f"Safe Browsing API request failed (Bad Request): {e}"}
|
|
|
if status == 403: return {"status": "Error", "reason": f"Safe Browsing API request failed (Permission Denied/Invalid Key?): {e}"}
|
|
|
return {"status": "Error", "reason": f"Could not verify URL via Safe Browsing (HTTP Error {status}): {e}"}
|
|
|
except Exception as e:
|
|
|
print(f"Generic Google Safe Browsing API Error: {e}")
|
|
|
return {"status": "Error", "reason": f"Could not verify URL via Safe Browsing (General Error): {e}"}
|
|
|
|
|
|
|
|
|
async def transcribe_audio(audio_file: UploadFile):
|
|
|
"""Transcribes audio using OpenAI Whisper API."""
|
|
|
|
|
|
if not client:
|
|
|
raise HTTPException(status_code=503, detail="Audio transcription unavailable: OpenAI client not initialized (check API key).")
|
|
|
|
|
|
print(f"Transcribing audio file: {audio_file.filename} (size: {audio_file.size})")
|
|
|
try:
|
|
|
audio_bytes = await audio_file.read()
|
|
|
if not audio_bytes: raise ValueError("Received empty audio file.")
|
|
|
audio_file_like = io.BytesIO(audio_bytes)
|
|
|
audio_file_like.name = audio_file.filename or "audio.mp3"
|
|
|
|
|
|
transcription = await client.audio.transcriptions.create(
|
|
|
model="whisper-1", file=audio_file_like
|
|
|
)
|
|
|
print(f"Transcription successful: {transcription.text[:100]}...")
|
|
|
return transcription.text
|
|
|
except OpenAIError as e:
|
|
|
print(f"OpenAI API Error during transcription: {e}")
|
|
|
raise HTTPException(status_code=502, detail=f"Audio transcription failed (API Error): {e.status_code} {e.body}")
|
|
|
except Exception as e:
|
|
|
print(f"Error during transcription: {e}")
|
|
|
raise HTTPException(status_code=500, detail=f"Audio transcription failed (Server Error): {e}")
|
|
|
finally:
|
|
|
await audio_file.close()
|
|
|
|
|
|
|
|
|
async def check_text_scam(text: str):
|
|
|
"""Analyzes text for scams using OpenAI GPT and a specific prompt."""
|
|
|
|
|
|
if not client:
|
|
|
print("Warning: OpenAI client not initialized. Using mock analysis for text check.")
|
|
|
if "won" in text.lower() and "click" in text.lower(): return {"status": "Scam", "reason": "Potential prize scam (Mock - API Key Missing)"}
|
|
|
if "urgent" in text.lower() and "verify" in text.lower(): return {"status": "Suspicious", "reason": "Urgency/Phishing tactic (Mock - API Key Missing)"}
|
|
|
return {"status": "Safe", "reason": "Text appears safe (Mock - API Key Missing)"}
|
|
|
|
|
|
print(f"Analyzing text with LLM: {text[:60]}...")
|
|
|
|
|
|
system_prompt = """
|
|
|
You are an AI assistant specialized in detecting scams, phishing attempts, and fraudulent content within text messages, including those transcribed from audio. Your goal is to classify the input text into one of three categories: "Safe", "Suspicious", or "Scam". Provide a concise reason for your classification, maximum 1-2 sentences.
|
|
|
Consider these factors: Urgency, Generic Greetings, Requests for Personal Information, Suspicious Links, Unsolicited Offers/Prizes, Grammatical Errors/Typos, Unusual Payment Methods, Threats or Blackmail, Impersonation, Investment Scams, Job Scams, Romance Scams.
|
|
|
Output Format: Provide the classification FIRST, followed by a colon, then a BRIEF reason.
|
|
|
Example 1: Scam: Contains a suspicious link and requests urgent login verification, typical of phishing.
|
|
|
Example 2: Suspicious: Unsolicited job offer with vague details asking for personal info upfront.
|
|
|
Example 3: Safe: Appears to be a standard appointment reminder or casual conversation.
|
|
|
"""
|
|
|
try:
|
|
|
response = await client.chat.completions.create(
|
|
|
model="gpt-4-turbo",
|
|
|
messages=[ {"role": "system", "content": system_prompt}, {"role": "user", "content": text} ],
|
|
|
max_tokens=100, temperature=0.2
|
|
|
)
|
|
|
analysis_result = response.choices[0].message.content.strip()
|
|
|
print(f"LLM Analysis Result: {analysis_result}")
|
|
|
|
|
|
|
|
|
status = "Suspicious"; reason = analysis_result
|
|
|
if ":" in analysis_result:
|
|
|
parts = analysis_result.split(":", 1)
|
|
|
potential_status = parts[0].strip().capitalize()
|
|
|
if potential_status in ["Safe", "Suspicious", "Scam"]: status = potential_status; reason = parts[1].strip()
|
|
|
else:
|
|
|
analysis_lower = analysis_result.lower()
|
|
|
if any(w in analysis_lower for w in ["scam", "phishing", "fraud", "malicious"]): status = "Scam"
|
|
|
elif any(w in analysis_lower for w in ["suspicious", "warning", "risk", "caution", "unsolicited"]): status = "Suspicious"
|
|
|
elif any(w in analysis_lower for w in ["safe", "legitimate", "benign"]): status = "Safe"
|
|
|
if not reason: reason = "Analysis complete."
|
|
|
return {"status": status, "reason": reason}
|
|
|
except OpenAIError as e:
|
|
|
print(f"OpenAI API Error during text analysis: {e}")
|
|
|
return {"status": "Error", "reason": f"LLM analysis failed (API Error): {e.status_code} {e.body}"}
|
|
|
except Exception as e:
|
|
|
print(f"Generic error during text analysis: {e}")
|
|
|
return {"status": "Error", "reason": f"LLM analysis failed (Server Error): {e}"}
|
|
|
|
|
|
|
|
|
@app.post("/analyze", response_model=AnalysisResponse, tags=["Analysis"])
|
|
|
async def analyze_input(
|
|
|
input_type: str = Form(..., description="Type of input: 'text', 'url', or 'audio'"),
|
|
|
text: str | None = Form(None, description="Text message or URL (required if input_type is 'text' or 'url')"),
|
|
|
file: UploadFile | None = File(None, description="Audio file (required if input_type is 'audio')")
|
|
|
):
|
|
|
analysis_result = {}
|
|
|
detected_language = None
|
|
|
is_translated = False
|
|
|
reason_target_language = 'en'
|
|
|
|
|
|
try:
|
|
|
if input_type == 'url':
|
|
|
if not text: raise HTTPException(status_code=400, detail="URL required for type 'url'")
|
|
|
analysis_result = await check_url_safety(text)
|
|
|
elif input_type == 'text':
|
|
|
if not text: raise HTTPException(status_code=400, detail="Text required for type 'text'")
|
|
|
text_to_analyze, detected_language, is_translated = await detect_and_translate(text)
|
|
|
reason_target_language = detected_language or 'en'
|
|
|
analysis_result = await check_text_scam(text_to_analyze)
|
|
|
elif input_type == 'audio':
|
|
|
if not file: raise HTTPException(status_code=400, detail="File required for type 'audio'")
|
|
|
transcribed_text = await transcribe_audio(file)
|
|
|
if not transcribed_text: raise HTTPException(status_code=500, detail="Transcription empty.")
|
|
|
text_to_analyze, detected_language, is_translated = await detect_and_translate(transcribed_text)
|
|
|
reason_target_language = detected_language or 'en'
|
|
|
analysis_result = await check_text_scam(text_to_analyze)
|
|
|
else:
|
|
|
raise HTTPException(status_code=400, detail=f"Invalid input_type '{input_type}'.")
|
|
|
|
|
|
if analysis_result.get("status") == "Error":
|
|
|
return AnalysisResponse(status="Error", reason=analysis_result.get('reason', 'Analysis failed.'), detected_language=detected_language, is_translated=is_translated)
|
|
|
|
|
|
final_reason = await translate_reason_back(analysis_result.get('reason', 'Analysis reason missing.'), reason_target_language)
|
|
|
return AnalysisResponse(status=analysis_result.get('status', 'Error'), reason=final_reason, detected_language=detected_language, is_translated=is_translated)
|
|
|
except HTTPException as he:
|
|
|
raise he
|
|
|
except Exception as e:
|
|
|
print(f"Unexpected error in /analyze endpoint: {e}")
|
|
|
return AnalysisResponse(status="Error", reason=f"Unexpected server error: {type(e).__name__}", detected_language=None, is_translated=False)
|
|
|
|
|
|
|
|
|
@app.post("/synthesize", tags=["TTS"])
|
|
|
async def synthesize_speech(request: TTSRequest):
|
|
|
"""Generates speech from text using OpenAI TTS-1 model."""
|
|
|
if not client:
|
|
|
raise HTTPException(status_code=503, detail="TTS unavailable: OpenAI client not initialized.")
|
|
|
print(f"Synthesizing speech for text: {request.text[:50]}...")
|
|
|
try:
|
|
|
voice_model = "alloy"
|
|
|
response = await client.audio.speech.create(model="tts-1", voice=voice_model, input=request.text, response_format="mp3")
|
|
|
return Response(content=response.content, media_type="audio/mpeg")
|
|
|
except OpenAIError as e:
|
|
|
print(f"OpenAI API Error during TTS: {e}")
|
|
|
raise HTTPException(status_code=502, detail=f"TTS generation failed (API Error): {e.status_code} {e.body}")
|
|
|
except Exception as e:
|
|
|
print(f"TTS Error: {e}")
|
|
|
raise HTTPException(status_code=500, detail=f"Text-to-Speech generation failed (Server Error): {e}")
|
|
|
|
|
|
|
|
|
@app.get("/", tags=["Health"])
|
|
|
async def read_root():
|
|
|
|
|
|
openai_status = "OK" if OPENAI_API_KEY and client else "Not Configured"
|
|
|
google_status = "OK" if GOOGLE_API_KEY else "Not Configured"
|
|
|
return {"message": f"{APP_NAME} API is running!", "openai_key_status": openai_status, "google_key_status": google_status}
|
|
|
|
|
|
|