com1 / app.py
1MR's picture
Create app.py
59e413f verified
from fastapi import FastAPI, File, Form, UploadFile, HTTPException
from fastapi.responses import FileResponse, JSONResponse
from fastapi.middleware.cors import CORSMiddleware
import boto3
import os
from botocore.exceptions import ClientError
import logging
import wikipedia
from gtts import gTTS
from PIL import Image, ImageEnhance
import uuid
import shutil
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
app = FastAPI()
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
aws_access_key = os.getenv("AWS_ACCESS_KEY_ID", "AKIA6G75DYEK3NWC2AXH")
aws_secret_key = os.getenv("AWS_SECRET_ACCESS_KEY", "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")
region_name = "us-east-1"
try:
rekognition = boto3.client(
"rekognition",
aws_access_key_id=aws_access_key,
aws_secret_access_key=aws_secret_key,
region_name=region_name
)
logger.info("Rekognition client initialized successfully.")
except Exception as e:
logger.error(f"Error initializing Rekognition client: {e}")
rekognition = None
KNOWN_LANDMARKS = {
"Pyramid": "Giza Pyramids",
"Eiffel Tower": "Eiffel Tower",
"Statue of Liberty": "Statue of Liberty",
"Great Wall": "Great Wall of China",
"Colosseum": "Colosseum",
"Taj Mahal": "Taj Mahal",
"Machu Picchu": "Machu Picchu",
"Christ the Redeemer": "Christ the Redeemer",
"Big Ben": "Big Ben",
"Leaning Tower of Pisa": "Leaning Tower of Pisa",
"Sydney Opera House": "Sydney Opera House",
"Mount Rushmore": "Mount Rushmore",
"Burj Khalifa": "Burj Khalifa"
}
def enhance_image(image_path):
with Image.open(image_path) as img:
logger.info("Enhancing image")
img = ImageEnhance.Contrast(img).enhance(1.5)
img = ImageEnhance.Sharpness(img).enhance(2.0)
img.thumbnail((640, 360), Image.Resampling.LANCZOS)
img.save(image_path)
def detect_landmark(image_path):
with open(image_path, "rb") as image_file:
response = rekognition.detect_labels(Image={"Bytes": image_file.read()}, MaxLabels=10)
best_match = None
for label in response.get("Labels", []):
if label["Name"] in KNOWN_LANDMARKS:
return KNOWN_LANDMARKS[label["Name"]]
if best_match is None or label["Confidence"] > best_match[1]:
best_match = (label["Name"], label["Confidence"])
return best_match[0] if best_match else None
def get_wikipedia_info(object_name, lang="en"):
try:
wikipedia.set_lang(lang)
results = wikipedia.search(object_name)
if not results:
return "No results found on Wikipedia."
page = wikipedia.page(results[0])
return page.summary[:600]
except Exception as e:
logger.error(f"Wikipedia error: {e}")
return f"Error fetching Wikipedia info: {e}"
def text_to_speech(text, lang="en"):
try:
tts = gTTS(text=text, lang=lang)
audio_path = f"temp_audio_{uuid.uuid4()}.mp3"
tts.save(audio_path)
return audio_path
except Exception as e:
logger.error(f"TTS error: {e}")
return None
@app.post("/recognize")
async def recognize_landmark(image: UploadFile = File(...), language: str = Form(...)):
if rekognition is None:
raise HTTPException(status_code=500, detail="Rekognition client not initialized")
image_path = f"temp_image_{uuid.uuid4()}.jpg"
try:
with open(image_path, "wb") as buffer:
shutil.copyfileobj(image.file, buffer)
enhance_image(image_path)
landmark_name = detect_landmark(image_path)
if not landmark_name:
os.remove(image_path)
raise HTTPException(status_code=400, detail="No landmark recognized")
wiki_info = get_wikipedia_info(landmark_name, language)
if "error" in wiki_info.lower():
os.remove(image_path)
raise HTTPException(status_code=500, detail=wiki_info)
audio_path = text_to_speech(wiki_info, language)
if not audio_path:
os.remove(image_path)
raise HTTPException(status_code=500, detail="Failed to generate audio")
return {
"landmark": landmark_name,
"information": wiki_info,
"image_url": f"/image/{os.path.basename(image_path)}",
"audio_url": f"/audio/{os.path.basename(audio_path)}"
}
except ClientError as e:
raise HTTPException(status_code=500, detail=f"AWS error: {str(e)}")
except Exception as e:
raise HTTPException(status_code=500, detail=f"Server error: {str(e)}")
@app.get("/image/{filename}")
async def serve_image(filename: str):
image_path = os.path.join(os.getcwd(), filename)
if not os.path.exists(image_path):
raise HTTPException(status_code=404, detail="Image not found")
response = FileResponse(image_path, media_type="image/jpeg")
try:
os.remove(image_path)
except Exception as e:
logger.warning(f"Could not delete image: {e}")
return response
@app.get("/audio/{filename}")
async def serve_audio(filename: str):
audio_path = os.path.join(os.getcwd(), filename)
if not os.path.exists(audio_path):
raise HTTPException(status_code=404, detail="Audio not found")
response = FileResponse(audio_path, media_type="audio/mpeg")
try:
os.remove(audio_path)
except Exception as e:
logger.warning(f"Could not delete audio: {e}")
return response