""" Image handling and fetching from Unsplash. """ import requests import json from io import BytesIO from PIL import Image from logger import logger from config import UNSPLASH_API_KEY, IMAGE_FEATURED_WIDTH, IMAGE_FEATURED_HEIGHT class ImageHandler: """Handles image fetching from Unsplash API.""" def __init__(self, api_key=UNSPLASH_API_KEY): """Initialize image handler.""" if not api_key: raise ValueError("UNSPLASH_API_KEY not set in environment") self.api_key = api_key self.base_url = "https://api.unsplash.com/photos/random" logger.debug("ImageHandler initialized") def fetch_image(self, tags): """ Fetch a random image from Unsplash based on tags. If no image is found, retry by removing one word at a time. Args: tags (list): List of search tags Returns: dict: Contains 'url', 'credit', 'filename' keys Raises: Exception: If API call fails or response is invalid """ if not tags: raise ValueError("At least one tag is required") tried_queries = set() def try_fetch(tags_subset): search_query = " ".join(tags_subset) if not search_query or search_query in tried_queries: return None tried_queries.add(search_query) logger.info(f"Searching for image with query: '{search_query}'") params = { "query": search_query, "orientation": "landscape", "w": IMAGE_FEATURED_WIDTH, "h": IMAGE_FEATURED_HEIGHT, } headers = {"Authorization": f"Client-ID {self.api_key}"} logger.debug(f"Unsplash API request: {self.base_url}") try: response = requests.get(self.base_url, params=params, headers=headers) logger.debug(f"Unsplash response status: {response.status_code}") if response.status_code != 200: logger.warning(f"No image found for query '{search_query}' (status {response.status_code})") return None photo = response.json() if "urls" not in photo or "user" not in photo: logger.warning(f"Invalid response structure for query '{search_query}': {json.dumps(photo, indent=2)}") return None image_url = photo["urls"].get("regular") image_credit = photo["user"].get("name", "") image_credit_url = photo["user"].get("links", {}).get("html", "https://unsplash.com") filename = f"featured_{search_query.replace(' ', '_')}.avif" logger.info(f"Image found by {image_credit}: {image_url}") return { "url": image_url, "credit": image_credit, "credit_url": image_credit_url, "filename": filename, } except requests.RequestException as e: logger.error(f"Request failed: {e}") return None except Exception as e: logger.error(f"Failed to fetch image: {e}") return None # Try with all tags first result = try_fetch(tags) if result: return result # Try by removing one word at a time for i in range(len(tags)): reduced_tags = tags[:i] + tags[i+1:] if not reduced_tags: continue result = try_fetch(reduced_tags) if result: return result # Try each individual tag for tag in tags: result = try_fetch([tag]) if result: return result raise Exception("No image found for any tag combination.") def download_image(self, image_url): """ Download image from URL and convert to AVIF format. Args: image_url (str): URL of the imageF Returns: bytes: Image data in AVIF format Raises: Exception: If download or conversion fails """ try: logger.info("Downloading image...") response = requests.get(image_url) response.raise_for_status() image_data = response.content logger.info(f"Downloaded image: {len(image_data) // 1024} KB") # Convert to AVIF format logger.info("Converting image to AVIF format...") try: img = Image.open(BytesIO(image_data)) # Convert RGBA to RGB if needed if img.mode in ("RGBA", "LA", "P"): rgb_img = Image.new("RGB", img.size, (255, 255, 255)) rgb_img.paste(img, mask=img.split()[-1] if img.mode == "RGBA" else None) img = rgb_img # Save as AVIF avif_buffer = BytesIO() img.save(avif_buffer, format="AVIF", quality=85) avif_data = avif_buffer.getvalue() logger.info(f"Converted to AVIF: {len(avif_data) // 1024} KB") return avif_data except Exception as e: logger.error(f"Failed to convert image to AVIF: {e}") logger.info("Returning original image data") return image_data except requests.RequestException as e: logger.error(f"Failed to download image: {e}") raise