|
|
from fastapi import FastAPI, File, Form, UploadFile, HTTPException |
|
|
from fastapi.responses import FileResponse, JSONResponse |
|
|
from fastapi.middleware.cors import CORSMiddleware |
|
|
import boto3 |
|
|
import os |
|
|
from botocore.exceptions import ClientError |
|
|
import logging |
|
|
import wikipedia |
|
|
from gtts import gTTS |
|
|
from PIL import Image, ImageEnhance |
|
|
import uuid |
|
|
import shutil |
|
|
|
|
|
|
|
|
logging.basicConfig(level=logging.INFO) |
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
app = FastAPI() |
|
|
|
|
|
app.add_middleware( |
|
|
CORSMiddleware, |
|
|
allow_origins=["*"], |
|
|
allow_credentials=True, |
|
|
allow_methods=["*"], |
|
|
allow_headers=["*"], |
|
|
) |
|
|
|
|
|
aws_access_key = os.getenv("AWS_ACCESS_KEY_ID", "AKIA6G75DYEK3NWC2AXH") |
|
|
aws_secret_key = os.getenv("AWS_SECRET_ACCESS_KEY", "z4xEI2RI56DExIwtnbrnMAkAVLr/rPVFwz1PkeKt") |
|
|
region_name = "us-east-1" |
|
|
|
|
|
try: |
|
|
rekognition = boto3.client( |
|
|
"rekognition", |
|
|
aws_access_key_id=aws_access_key, |
|
|
aws_secret_access_key=aws_secret_key, |
|
|
region_name=region_name |
|
|
) |
|
|
logger.info("Rekognition client initialized successfully.") |
|
|
except Exception as e: |
|
|
logger.error(f"Error initializing Rekognition client: {e}") |
|
|
rekognition = None |
|
|
|
|
|
KNOWN_LANDMARKS = { |
|
|
"Pyramid": "Giza Pyramids", |
|
|
"Eiffel Tower": "Eiffel Tower", |
|
|
"Statue of Liberty": "Statue of Liberty", |
|
|
"Great Wall": "Great Wall of China", |
|
|
"Colosseum": "Colosseum", |
|
|
"Taj Mahal": "Taj Mahal", |
|
|
"Machu Picchu": "Machu Picchu", |
|
|
"Christ the Redeemer": "Christ the Redeemer", |
|
|
"Big Ben": "Big Ben", |
|
|
"Leaning Tower of Pisa": "Leaning Tower of Pisa", |
|
|
"Sydney Opera House": "Sydney Opera House", |
|
|
"Mount Rushmore": "Mount Rushmore", |
|
|
"Burj Khalifa": "Burj Khalifa" |
|
|
} |
|
|
|
|
|
def enhance_image(image_path): |
|
|
with Image.open(image_path) as img: |
|
|
logger.info("Enhancing image") |
|
|
img = ImageEnhance.Contrast(img).enhance(1.5) |
|
|
img = ImageEnhance.Sharpness(img).enhance(2.0) |
|
|
img.thumbnail((640, 360), Image.Resampling.LANCZOS) |
|
|
img.save(image_path) |
|
|
|
|
|
def detect_landmark(image_path): |
|
|
with open(image_path, "rb") as image_file: |
|
|
response = rekognition.detect_labels(Image={"Bytes": image_file.read()}, MaxLabels=10) |
|
|
|
|
|
best_match = None |
|
|
for label in response.get("Labels", []): |
|
|
if label["Name"] in KNOWN_LANDMARKS: |
|
|
return KNOWN_LANDMARKS[label["Name"]] |
|
|
if best_match is None or label["Confidence"] > best_match[1]: |
|
|
best_match = (label["Name"], label["Confidence"]) |
|
|
|
|
|
return best_match[0] if best_match else None |
|
|
|
|
|
def get_wikipedia_info(object_name, lang="en"): |
|
|
try: |
|
|
wikipedia.set_lang(lang) |
|
|
results = wikipedia.search(object_name) |
|
|
if not results: |
|
|
return "No results found on Wikipedia." |
|
|
page = wikipedia.page(results[0]) |
|
|
return page.summary[:600] |
|
|
except Exception as e: |
|
|
logger.error(f"Wikipedia error: {e}") |
|
|
return f"Error fetching Wikipedia info: {e}" |
|
|
|
|
|
def text_to_speech(text, lang="en"): |
|
|
try: |
|
|
tts = gTTS(text=text, lang=lang) |
|
|
audio_path = f"temp_audio_{uuid.uuid4()}.mp3" |
|
|
tts.save(audio_path) |
|
|
return audio_path |
|
|
except Exception as e: |
|
|
logger.error(f"TTS error: {e}") |
|
|
return None |
|
|
|
|
|
@app.post("/recognize") |
|
|
async def recognize_landmark(image: UploadFile = File(...), language: str = Form(...)): |
|
|
if rekognition is None: |
|
|
raise HTTPException(status_code=500, detail="Rekognition client not initialized") |
|
|
|
|
|
image_path = f"temp_image_{uuid.uuid4()}.jpg" |
|
|
try: |
|
|
with open(image_path, "wb") as buffer: |
|
|
shutil.copyfileobj(image.file, buffer) |
|
|
|
|
|
enhance_image(image_path) |
|
|
landmark_name = detect_landmark(image_path) |
|
|
if not landmark_name: |
|
|
os.remove(image_path) |
|
|
raise HTTPException(status_code=400, detail="No landmark recognized") |
|
|
|
|
|
wiki_info = get_wikipedia_info(landmark_name, language) |
|
|
if "error" in wiki_info.lower(): |
|
|
os.remove(image_path) |
|
|
raise HTTPException(status_code=500, detail=wiki_info) |
|
|
|
|
|
audio_path = text_to_speech(wiki_info, language) |
|
|
if not audio_path: |
|
|
os.remove(image_path) |
|
|
raise HTTPException(status_code=500, detail="Failed to generate audio") |
|
|
|
|
|
return { |
|
|
"landmark": landmark_name, |
|
|
"information": wiki_info, |
|
|
"image_url": f"/image/{os.path.basename(image_path)}", |
|
|
"audio_url": f"/audio/{os.path.basename(audio_path)}" |
|
|
} |
|
|
except ClientError as e: |
|
|
raise HTTPException(status_code=500, detail=f"AWS error: {str(e)}") |
|
|
except Exception as e: |
|
|
raise HTTPException(status_code=500, detail=f"Server error: {str(e)}") |
|
|
|
|
|
@app.get("/image/{filename}") |
|
|
async def serve_image(filename: str): |
|
|
image_path = os.path.join(os.getcwd(), filename) |
|
|
if not os.path.exists(image_path): |
|
|
raise HTTPException(status_code=404, detail="Image not found") |
|
|
response = FileResponse(image_path, media_type="image/jpeg") |
|
|
try: |
|
|
os.remove(image_path) |
|
|
except Exception as e: |
|
|
logger.warning(f"Could not delete image: {e}") |
|
|
return response |
|
|
|
|
|
@app.get("/audio/{filename}") |
|
|
async def serve_audio(filename: str): |
|
|
audio_path = os.path.join(os.getcwd(), filename) |
|
|
if not os.path.exists(audio_path): |
|
|
raise HTTPException(status_code=404, detail="Audio not found") |
|
|
response = FileResponse(audio_path, media_type="audio/mpeg") |
|
|
try: |
|
|
os.remove(audio_path) |
|
|
except Exception as e: |
|
|
logger.warning(f"Could not delete audio: {e}") |
|
|
return response |
|
|
|