Spaces:
Sleeping
Sleeping
| """ | |
| Image handling and fetching from Unsplash. | |
| """ | |
| import requests | |
| import json | |
| from io import BytesIO | |
| from PIL import Image | |
| from logger import logger | |
| from config import UNSPLASH_API_KEY, IMAGE_FEATURED_WIDTH, IMAGE_FEATURED_HEIGHT | |
| class ImageHandler: | |
| """Handles image fetching from Unsplash API.""" | |
| def __init__(self, api_key=UNSPLASH_API_KEY): | |
| """Initialize image handler.""" | |
| if not api_key: | |
| raise ValueError("UNSPLASH_API_KEY not set in environment") | |
| self.api_key = api_key | |
| self.base_url = "https://api.unsplash.com/photos/random" | |
| logger.debug("ImageHandler initialized") | |
| def fetch_image(self, tags): | |
| """ | |
| Fetch a random image from Unsplash based on tags. If no image is found, retry by removing one word at a time. | |
| Args: | |
| tags (list): List of search tags | |
| Returns: | |
| dict: Contains 'url', 'credit', 'filename' keys | |
| Raises: | |
| Exception: If API call fails or response is invalid | |
| """ | |
| if not tags: | |
| raise ValueError("At least one tag is required") | |
| tried_queries = set() | |
| def try_fetch(tags_subset): | |
| search_query = " ".join(tags_subset) | |
| if not search_query or search_query in tried_queries: | |
| return None | |
| tried_queries.add(search_query) | |
| logger.info(f"Searching for image with query: '{search_query}'") | |
| params = { | |
| "query": search_query, | |
| "orientation": "landscape", | |
| "w": IMAGE_FEATURED_WIDTH, | |
| "h": IMAGE_FEATURED_HEIGHT, | |
| } | |
| headers = {"Authorization": f"Client-ID {self.api_key}"} | |
| logger.debug(f"Unsplash API request: {self.base_url}") | |
| try: | |
| response = requests.get(self.base_url, params=params, headers=headers) | |
| logger.debug(f"Unsplash response status: {response.status_code}") | |
| if response.status_code != 200: | |
| logger.warning(f"No image found for query '{search_query}' (status {response.status_code})") | |
| return None | |
| photo = response.json() | |
| if "urls" not in photo or "user" not in photo: | |
| logger.warning(f"Invalid response structure for query '{search_query}': {json.dumps(photo, indent=2)}") | |
| return None | |
| image_url = photo["urls"].get("regular") | |
| image_credit = photo["user"].get("name", "") | |
| image_credit_url = photo["user"].get("links", {}).get("html", "https://unsplash.com") | |
| filename = f"featured_{search_query.replace(' ', '_')}.avif" | |
| logger.info(f"Image found by {image_credit}: {image_url}") | |
| return { | |
| "url": image_url, | |
| "credit": image_credit, | |
| "credit_url": image_credit_url, | |
| "filename": filename, | |
| } | |
| except requests.RequestException as e: | |
| logger.error(f"Request failed: {e}") | |
| return None | |
| except Exception as e: | |
| logger.error(f"Failed to fetch image: {e}") | |
| return None | |
| # Try with all tags first | |
| result = try_fetch(tags) | |
| if result: | |
| return result | |
| # Try by removing one word at a time | |
| for i in range(len(tags)): | |
| reduced_tags = tags[:i] + tags[i+1:] | |
| if not reduced_tags: | |
| continue | |
| result = try_fetch(reduced_tags) | |
| if result: | |
| return result | |
| # Try each individual tag | |
| for tag in tags: | |
| result = try_fetch([tag]) | |
| if result: | |
| return result | |
| raise Exception("No image found for any tag combination.") | |
| def download_image(self, image_url): | |
| """ | |
| Download image from URL and convert to AVIF format. | |
| Args: | |
| image_url (str): URL of the imageF | |
| Returns: | |
| bytes: Image data in AVIF format | |
| Raises: | |
| Exception: If download or conversion fails | |
| """ | |
| try: | |
| logger.info("Downloading image...") | |
| response = requests.get(image_url) | |
| response.raise_for_status() | |
| image_data = response.content | |
| logger.info(f"Downloaded image: {len(image_data) // 1024} KB") | |
| # Convert to AVIF format | |
| logger.info("Converting image to AVIF format...") | |
| try: | |
| img = Image.open(BytesIO(image_data)) | |
| # Convert RGBA to RGB if needed | |
| if img.mode in ("RGBA", "LA", "P"): | |
| rgb_img = Image.new("RGB", img.size, (255, 255, 255)) | |
| rgb_img.paste(img, mask=img.split()[-1] if img.mode == "RGBA" else None) | |
| img = rgb_img | |
| # Save as AVIF | |
| avif_buffer = BytesIO() | |
| img.save(avif_buffer, format="AVIF", quality=85) | |
| avif_data = avif_buffer.getvalue() | |
| logger.info(f"Converted to AVIF: {len(avif_data) // 1024} KB") | |
| return avif_data | |
| except Exception as e: | |
| logger.error(f"Failed to convert image to AVIF: {e}") | |
| logger.info("Returning original image data") | |
| return image_data | |
| except requests.RequestException as e: | |
| logger.error(f"Failed to download image: {e}") | |
| raise | |