sujoy0011's picture
Upload 3 files
a66a74f verified
# --- Imports ---
import os
import io
from fastapi import FastAPI, HTTPException, File, UploadFile, Form, Response
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
# --- Dotenv Import ---
from dotenv import load_dotenv
# --- Other Imports ---
from openai import AsyncOpenAI, OpenAIError
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError as GoogleHttpError
from langdetect import detect, LangDetectException
from deep_translator import GoogleTranslator, exceptions as TranslatorExceptions
# --- Load Environment Variables ---
load_dotenv()
print("Attempted to load environment variables from .env file.")
# --- Retrieve Environment Variables ---
# Get keys from environment (which load_dotenv populated if .env exists)
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
GOOGLE_API_KEY = os.getenv("GOOGLE_API_KEY")
APP_NAME = os.getenv("APP_NAME", "Multilingual Scam Detector") # Example with default
# --- API Client Initialization & Checks ---
client = None
if OPENAI_API_KEY:
try:
client = AsyncOpenAI(api_key=OPENAI_API_KEY)
print("OpenAI client initialized.")
except OpenAIError as e:
print(f"Failed to initialize OpenAI client: {e}")
client = None # Ensure client is None if init fails
else:
print("\n*** WARNING: OPENAI_API_KEY not found in environment/.env. OpenAI features disabled. ***\n")
if not GOOGLE_API_KEY:
print("\n*** WARNING: GOOGLE_API_KEY not found in environment/.env. Google Safe Browsing checks disabled. ***\n")
# --- FastAPI App Initialization ---
app = FastAPI(
title=APP_NAME,
description="Analyzes text, URLs, and audio for potential scams using AI and external APIs.",
version="1.1.0" # Incremented version for change
)
# --- CORS Middleware ---
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# --- Pydantic Models (Remain the same) ---
class AnalysisResponse(BaseModel):
status: str
reason: str
detected_language: str | None = None
is_translated: bool = False
class TTSRequest(BaseModel):
text: str
language: str | None = "en"
# --- Helper Functions (detect_and_translate, translate_reason_back - Remain the same) ---
async def detect_and_translate(text: str) -> tuple[str, str | None, bool]:
detected_lang = None
is_translated = False
try:
detected_lang = detect(text)
if detected_lang != 'en':
print(f"Detected language: {detected_lang}. Translating to English...")
translator = GoogleTranslator(source=detected_lang, target='en')
translated_text = translator.translate(text=text)
if translated_text:
print("Translation successful.")
return translated_text, detected_lang, True
else:
print("Translation returned empty result. Using original text.")
return text, detected_lang, False
else:
print("Detected language: English. No translation needed.")
return text, 'en', False
except LangDetectException:
print("Language detection failed. Assuming English.")
return text, None, False
except TranslatorExceptions.TranslationNotFound:
print(f"Translation engine could not find translation for language '{detected_lang}'. Using original text.")
return text, detected_lang, False
except Exception as e:
print(f"Translation error ({type(e).__name__}): {e}. Using original text.")
return text, detected_lang, False
async def translate_reason_back(reason: str, target_lang: str | None) -> str:
if target_lang and target_lang != 'en':
try:
print(f"Translating reason back to: {target_lang}")
translator = GoogleTranslator(source='en', target=target_lang)
translated_reason = translator.translate(text=reason)
return translated_reason or reason
except Exception as e:
print(f"Failed to translate reason back to {target_lang}: {e}")
return reason
return reason
# --- Core API Call Functions (Updated to use os.getenv results) ---
async def check_url_safety(url: str):
"""Checks URL safety using Google Safe Browsing API V4."""
print(f"Checking URL safety for: {url}")
# Use GOOGLE_API_KEY retrieved earlier
if not GOOGLE_API_KEY:
print("Warning: GOOGLE_API_KEY not configured. Mock response for URL check.")
if "bad-link" in url or "malware.testing.google" in url: return {"status": "Scam", "reason": "URL flagged (Mock - API Key Missing)"}
return {"status": "Safe", "reason": "URL appears safe (Mock - API Key Missing)"}
try:
# Pass the key directly
service = build("safebrowsing", "v4", developerKey=GOOGLE_API_KEY, static_discovery=False)
threat_info = {
'threatTypes': ["MALWARE", "SOCIAL_ENGINEERING", "UNWANTED_SOFTWARE", "POTENTIALLY_HARMFUL_APPLICATION", "THREAT_TYPE_UNSPECIFIED"],
'platformTypes': ["ANY_PLATFORM"],
'threatEntryTypes': ["URL"],
'threatEntries': [{'url': url}]
}
body = {'client': {'clientId': APP_NAME.replace(" ", "-").lower(), 'clientVersion': "1.1.0"}, 'threatInfo': threat_info}
request = service.threatMatches().find(body=body)
response = request.execute()
matches = response.get('matches')
if matches:
threat_types = ", ".join(sorted(list(set([match['threatType'] for match in matches]))))
print(f"URL Found in Safe Browsing: {threat_types}")
return {"status": "Scam", "reason": f"URL flagged by Google Safe Browsing for: {threat_types}"}
else:
print("URL not found in Safe Browsing database.")
return {"status": "Safe", "reason": "URL not flagged by Google Safe Browsing."}
except GoogleHttpError as e:
print(f"Google Safe Browsing API HTTP Error: {e}")
status = e.resp.status
if status == 400: return {"status": "Error", "reason": f"Safe Browsing API request failed (Bad Request): {e}"}
if status == 403: return {"status": "Error", "reason": f"Safe Browsing API request failed (Permission Denied/Invalid Key?): {e}"}
return {"status": "Error", "reason": f"Could not verify URL via Safe Browsing (HTTP Error {status}): {e}"}
except Exception as e:
print(f"Generic Google Safe Browsing API Error: {e}")
return {"status": "Error", "reason": f"Could not verify URL via Safe Browsing (General Error): {e}"}
async def transcribe_audio(audio_file: UploadFile):
"""Transcribes audio using OpenAI Whisper API."""
# Use the globally initialized client
if not client:
raise HTTPException(status_code=503, detail="Audio transcription unavailable: OpenAI client not initialized (check API key).")
print(f"Transcribing audio file: {audio_file.filename} (size: {audio_file.size})")
try:
audio_bytes = await audio_file.read()
if not audio_bytes: raise ValueError("Received empty audio file.")
audio_file_like = io.BytesIO(audio_bytes)
audio_file_like.name = audio_file.filename or "audio.mp3"
transcription = await client.audio.transcriptions.create(
model="whisper-1", file=audio_file_like
)
print(f"Transcription successful: {transcription.text[:100]}...")
return transcription.text
except OpenAIError as e:
print(f"OpenAI API Error during transcription: {e}")
raise HTTPException(status_code=502, detail=f"Audio transcription failed (API Error): {e.status_code} {e.body}")
except Exception as e:
print(f"Error during transcription: {e}")
raise HTTPException(status_code=500, detail=f"Audio transcription failed (Server Error): {e}")
finally:
await audio_file.close()
async def check_text_scam(text: str):
"""Analyzes text for scams using OpenAI GPT and a specific prompt."""
# Use the globally initialized client
if not client:
print("Warning: OpenAI client not initialized. Using mock analysis for text check.")
if "won" in text.lower() and "click" in text.lower(): return {"status": "Scam", "reason": "Potential prize scam (Mock - API Key Missing)"}
if "urgent" in text.lower() and "verify" in text.lower(): return {"status": "Suspicious", "reason": "Urgency/Phishing tactic (Mock - API Key Missing)"}
return {"status": "Safe", "reason": "Text appears safe (Mock - API Key Missing)"}
print(f"Analyzing text with LLM: {text[:60]}...")
# System prompt remains the same
system_prompt = """
You are an AI assistant specialized in detecting scams, phishing attempts, and fraudulent content within text messages, including those transcribed from audio. Your goal is to classify the input text into one of three categories: "Safe", "Suspicious", or "Scam". Provide a concise reason for your classification, maximum 1-2 sentences.
Consider these factors: Urgency, Generic Greetings, Requests for Personal Information, Suspicious Links, Unsolicited Offers/Prizes, Grammatical Errors/Typos, Unusual Payment Methods, Threats or Blackmail, Impersonation, Investment Scams, Job Scams, Romance Scams.
Output Format: Provide the classification FIRST, followed by a colon, then a BRIEF reason.
Example 1: Scam: Contains a suspicious link and requests urgent login verification, typical of phishing.
Example 2: Suspicious: Unsolicited job offer with vague details asking for personal info upfront.
Example 3: Safe: Appears to be a standard appointment reminder or casual conversation.
"""
try:
response = await client.chat.completions.create(
model="gpt-4-turbo", # Or gpt-3.5-turbo
messages=[ {"role": "system", "content": system_prompt}, {"role": "user", "content": text} ],
max_tokens=100, temperature=0.2
)
analysis_result = response.choices[0].message.content.strip()
print(f"LLM Analysis Result: {analysis_result}")
# Parsing logic remains the same
status = "Suspicious"; reason = analysis_result
if ":" in analysis_result:
parts = analysis_result.split(":", 1)
potential_status = parts[0].strip().capitalize()
if potential_status in ["Safe", "Suspicious", "Scam"]: status = potential_status; reason = parts[1].strip()
else: # Fallback inference
analysis_lower = analysis_result.lower()
if any(w in analysis_lower for w in ["scam", "phishing", "fraud", "malicious"]): status = "Scam"
elif any(w in analysis_lower for w in ["suspicious", "warning", "risk", "caution", "unsolicited"]): status = "Suspicious"
elif any(w in analysis_lower for w in ["safe", "legitimate", "benign"]): status = "Safe"
if not reason: reason = "Analysis complete."
return {"status": status, "reason": reason}
except OpenAIError as e:
print(f"OpenAI API Error during text analysis: {e}")
return {"status": "Error", "reason": f"LLM analysis failed (API Error): {e.status_code} {e.body}"}
except Exception as e:
print(f"Generic error during text analysis: {e}")
return {"status": "Error", "reason": f"LLM analysis failed (Server Error): {e}"}
# --- API Endpoints (/analyze, /synthesize - Remain the same structurally) ---
@app.post("/analyze", response_model=AnalysisResponse, tags=["Analysis"])
async def analyze_input(
input_type: str = Form(..., description="Type of input: 'text', 'url', or 'audio'"),
text: str | None = Form(None, description="Text message or URL (required if input_type is 'text' or 'url')"),
file: UploadFile | None = File(None, description="Audio file (required if input_type is 'audio')")
):
analysis_result = {}
detected_language = None
is_translated = False
reason_target_language = 'en'
try:
if input_type == 'url':
if not text: raise HTTPException(status_code=400, detail="URL required for type 'url'")
analysis_result = await check_url_safety(text)
elif input_type == 'text':
if not text: raise HTTPException(status_code=400, detail="Text required for type 'text'")
text_to_analyze, detected_language, is_translated = await detect_and_translate(text)
reason_target_language = detected_language or 'en'
analysis_result = await check_text_scam(text_to_analyze)
elif input_type == 'audio':
if not file: raise HTTPException(status_code=400, detail="File required for type 'audio'")
transcribed_text = await transcribe_audio(file)
if not transcribed_text: raise HTTPException(status_code=500, detail="Transcription empty.")
text_to_analyze, detected_language, is_translated = await detect_and_translate(transcribed_text)
reason_target_language = detected_language or 'en'
analysis_result = await check_text_scam(text_to_analyze)
else:
raise HTTPException(status_code=400, detail=f"Invalid input_type '{input_type}'.")
if analysis_result.get("status") == "Error":
return AnalysisResponse(status="Error", reason=analysis_result.get('reason', 'Analysis failed.'), detected_language=detected_language, is_translated=is_translated)
final_reason = await translate_reason_back(analysis_result.get('reason', 'Analysis reason missing.'), reason_target_language)
return AnalysisResponse(status=analysis_result.get('status', 'Error'), reason=final_reason, detected_language=detected_language, is_translated=is_translated)
except HTTPException as he:
raise he
except Exception as e:
print(f"Unexpected error in /analyze endpoint: {e}")
return AnalysisResponse(status="Error", reason=f"Unexpected server error: {type(e).__name__}", detected_language=None, is_translated=False)
@app.post("/synthesize", tags=["TTS"])
async def synthesize_speech(request: TTSRequest):
"""Generates speech from text using OpenAI TTS-1 model."""
if not client:
raise HTTPException(status_code=503, detail="TTS unavailable: OpenAI client not initialized.")
print(f"Synthesizing speech for text: {request.text[:50]}...")
try:
voice_model = "alloy"
response = await client.audio.speech.create(model="tts-1", voice=voice_model, input=request.text, response_format="mp3")
return Response(content=response.content, media_type="audio/mpeg")
except OpenAIError as e:
print(f"OpenAI API Error during TTS: {e}")
raise HTTPException(status_code=502, detail=f"TTS generation failed (API Error): {e.status_code} {e.body}")
except Exception as e:
print(f"TTS Error: {e}")
raise HTTPException(status_code=500, detail=f"Text-to-Speech generation failed (Server Error): {e}")
@app.get("/", tags=["Health"])
async def read_root():
# Check status of API keys loaded
openai_status = "OK" if OPENAI_API_KEY and client else "Not Configured"
google_status = "OK" if GOOGLE_API_KEY else "Not Configured"
return {"message": f"{APP_NAME} API is running!", "openai_key_status": openai_status, "google_key_status": google_status}