from fastapi import FastAPI, File, Form, UploadFile, HTTPException from fastapi.responses import FileResponse, JSONResponse from fastapi.middleware.cors import CORSMiddleware import boto3 import os from botocore.exceptions import ClientError import logging import wikipedia from gtts import gTTS from PIL import Image, ImageEnhance import uuid import shutil # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) app = FastAPI() app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) aws_access_key = os.getenv("AWS_ACCESS_KEY_ID", "AKIA6G75DYEK3NWC2AXH") aws_secret_key = os.getenv("AWS_SECRET_ACCESS_KEY", "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") region_name = "us-east-1" try: rekognition = boto3.client( "rekognition", aws_access_key_id=aws_access_key, aws_secret_access_key=aws_secret_key, region_name=region_name ) logger.info("Rekognition client initialized successfully.") except Exception as e: logger.error(f"Error initializing Rekognition client: {e}") rekognition = None KNOWN_LANDMARKS = { "Pyramid": "Giza Pyramids", "Eiffel Tower": "Eiffel Tower", "Statue of Liberty": "Statue of Liberty", "Great Wall": "Great Wall of China", "Colosseum": "Colosseum", "Taj Mahal": "Taj Mahal", "Machu Picchu": "Machu Picchu", "Christ the Redeemer": "Christ the Redeemer", "Big Ben": "Big Ben", "Leaning Tower of Pisa": "Leaning Tower of Pisa", "Sydney Opera House": "Sydney Opera House", "Mount Rushmore": "Mount Rushmore", "Burj Khalifa": "Burj Khalifa" } def enhance_image(image_path): with Image.open(image_path) as img: logger.info("Enhancing image") img = ImageEnhance.Contrast(img).enhance(1.5) img = ImageEnhance.Sharpness(img).enhance(2.0) img.thumbnail((640, 360), Image.Resampling.LANCZOS) img.save(image_path) def detect_landmark(image_path): with open(image_path, "rb") as image_file: response = rekognition.detect_labels(Image={"Bytes": image_file.read()}, MaxLabels=10) best_match = None for label in response.get("Labels", []): if label["Name"] in KNOWN_LANDMARKS: return KNOWN_LANDMARKS[label["Name"]] if best_match is None or label["Confidence"] > best_match[1]: best_match = (label["Name"], label["Confidence"]) return best_match[0] if best_match else None def get_wikipedia_info(object_name, lang="en"): try: wikipedia.set_lang(lang) results = wikipedia.search(object_name) if not results: return "No results found on Wikipedia." page = wikipedia.page(results[0]) return page.summary[:600] except Exception as e: logger.error(f"Wikipedia error: {e}") return f"Error fetching Wikipedia info: {e}" def text_to_speech(text, lang="en"): try: tts = gTTS(text=text, lang=lang) audio_path = f"temp_audio_{uuid.uuid4()}.mp3" tts.save(audio_path) return audio_path except Exception as e: logger.error(f"TTS error: {e}") return None @app.post("/recognize") async def recognize_landmark(image: UploadFile = File(...), language: str = Form(...)): if rekognition is None: raise HTTPException(status_code=500, detail="Rekognition client not initialized") image_path = f"temp_image_{uuid.uuid4()}.jpg" try: with open(image_path, "wb") as buffer: shutil.copyfileobj(image.file, buffer) enhance_image(image_path) landmark_name = detect_landmark(image_path) if not landmark_name: os.remove(image_path) raise HTTPException(status_code=400, detail="No landmark recognized") wiki_info = get_wikipedia_info(landmark_name, language) if "error" in wiki_info.lower(): os.remove(image_path) raise HTTPException(status_code=500, detail=wiki_info) audio_path = text_to_speech(wiki_info, language) if not audio_path: os.remove(image_path) raise HTTPException(status_code=500, detail="Failed to generate audio") return { "landmark": landmark_name, "information": wiki_info, "image_url": f"/image/{os.path.basename(image_path)}", "audio_url": f"/audio/{os.path.basename(audio_path)}" } except ClientError as e: raise HTTPException(status_code=500, detail=f"AWS error: {str(e)}") except Exception as e: raise HTTPException(status_code=500, detail=f"Server error: {str(e)}") @app.get("/image/{filename}") async def serve_image(filename: str): image_path = os.path.join(os.getcwd(), filename) if not os.path.exists(image_path): raise HTTPException(status_code=404, detail="Image not found") response = FileResponse(image_path, media_type="image/jpeg") try: os.remove(image_path) except Exception as e: logger.warning(f"Could not delete image: {e}") return response @app.get("/audio/{filename}") async def serve_audio(filename: str): audio_path = os.path.join(os.getcwd(), filename) if not os.path.exists(audio_path): raise HTTPException(status_code=404, detail="Audio not found") response = FileResponse(audio_path, media_type="audio/mpeg") try: os.remove(audio_path) except Exception as e: logger.warning(f"Could not delete audio: {e}") return response