""" Export Service for Creative Breakthrough Handles bulk export of ad creatives with images and Excel data """ import os import zipfile import tempfile import shutil import re from datetime import datetime from typing import List, Dict, Any, Optional import httpx from openpyxl import Workbook from openpyxl.styles import Font, Alignment import logging from config import settings logger = logging.getLogger(__name__) class ExportService: """Service for exporting ad creatives in bulk.""" def __init__(self): self.temp_dir = None def sanitize_filename(self, text: str, max_length: int = 50) -> str: """ Sanitize text for use in filename. Remove special characters and limit length. """ if not text: return "unknown" # Convert to lowercase text = text.lower() # Replace spaces and special chars with underscore text = re.sub(r'[^a-z0-9]+', '_', text) # Remove leading/trailing underscores text = text.strip('_') # Limit length if len(text) > max_length: text = text[:max_length] # Remove trailing underscore if any text = text.rstrip('_') return text or "unknown" def generate_image_filename( self, ad: Dict[str, Any], version: int, date_str: str ) -> str: """ Generate filename using nomenclature: {niche}_{concept}_{angle}_{date}_{version}.png Example: home_insurance_before_after_fear_20260130_001.png """ # Get niche niche = self.sanitize_filename(ad.get("niche", "standard"), max_length=20) # Get concept (from concept_name or concept_key) concept = self.sanitize_filename( ad.get("concept_name") or ad.get("concept_key") or "standard", max_length=20 ) # Get angle (from angle_name or angle_key or psychological_angle) angle = self.sanitize_filename( ad.get("angle_name") or ad.get("angle_key") or ad.get("psychological_angle") or "standard", max_length=20 ) # Format version with leading zeros version_str = f"{version:03d}" # Construct filename filename = f"{niche}_{concept}_{angle}_{date_str}_{version_str}.png" return filename async def download_image(self, image_url: str) -> Optional[bytes]: """Download image from URL and return bytes.""" try: # Handle local file paths if not image_url.startswith(("http://", "https://")): local_path = os.path.join(settings.output_dir, image_url.lstrip("/images/")) if os.path.exists(local_path): with open(local_path, "rb") as f: return f.read() logger.warning(f"Local file not found: {local_path}") return None # Download from URL async with httpx.AsyncClient(timeout=30.0) as client: response = await client.get(image_url) response.raise_for_status() return response.content except Exception as e: logger.error(f"Failed to download image from {image_url}: {e}") return None async def download_and_rename_images( self, ads: List[Dict[str, Any]], output_dir: str ) -> Dict[str, str]: """ Download all images and rename them according to nomenclature. Returns mapping of ad_id -> new_filename. """ filename_map = {} date_str = datetime.now().strftime("%Y%m%d") for idx, ad in enumerate(ads, start=1): ad_id = ad.get("id") # Get image URL (prefer r2_url, fallback to image_url) image_url = ad.get("r2_url") or ad.get("image_url") if not image_url: logger.warning(f"No image URL for ad {ad_id}, skipping") continue # Generate new filename new_filename = self.generate_image_filename(ad, idx, date_str) # Download image logger.info(f"Downloading image {idx}/{len(ads)}: {image_url}") image_bytes = await self.download_image(image_url) if not image_bytes: logger.warning(f"Failed to download image for ad {ad_id}, skipping") continue # Save with new filename output_path = os.path.join(output_dir, new_filename) with open(output_path, "wb") as f: f.write(image_bytes) filename_map[ad_id] = new_filename logger.info(f"Saved: {new_filename}") return filename_map def create_excel_sheet( self, ads: List[Dict[str, Any]], filename_map: Dict[str, str], output_path: str ): """ Create Excel sheet with ad copy data. Columns: Image Filename, Headline, Title, Description, CTA, Psychological Angle """ wb = Workbook() ws = wb.active ws.title = "Ad Copy Data" # Define headers (Core fields as requested) headers = [ "Image Filename", "Image URL", "Headline", "Title", "Description", "CTA", "Psychological Angle", "Niche", "Created Date" ] # Write headers with formatting for col_idx, header in enumerate(headers, start=1): cell = ws.cell(row=1, column=col_idx, value=header) cell.font = Font(bold=True) cell.alignment = Alignment(horizontal="center", vertical="center") # Write data rows for row_idx, ad in enumerate(ads, start=2): ad_id = ad.get("id") # Get filename from map filename = filename_map.get(ad_id, "N/A") # Get image URL (prefer r2_url, fallback to image_url) image_url = ad.get("r2_url") or ad.get("image_url") or "" # Extract data row_data = [ filename, image_url, ad.get("headline", ""), ad.get("title", ""), ad.get("description", ""), ad.get("cta", ""), ad.get("psychological_angle", ""), ad.get("niche", ""), ad.get("created_at", "")[:10] if ad.get("created_at") else "" # Date only ] # Write row for col_idx, value in enumerate(row_data, start=1): ws.cell(row=row_idx, column=col_idx, value=value) # Auto-adjust column widths for column in ws.columns: max_length = 0 column_letter = column[0].column_letter for cell in column: try: if cell.value: max_length = max(max_length, len(str(cell.value))) except: pass adjusted_width = min(max_length + 2, 50) # Cap at 50 for readability ws.column_dimensions[column_letter].width = adjusted_width # Freeze first row ws.freeze_panes = "A2" # Save workbook wb.save(output_path) logger.info(f"Excel sheet created: {output_path}") async def create_export_package( self, ads: List[Dict[str, Any]] ) -> str: """ Create a complete export package with images and Excel sheet. Returns path to the ZIP file. """ # Create temporary directory for export self.temp_dir = tempfile.mkdtemp(prefix="export_") try: # Create subdirectories creatives_dir = os.path.join(self.temp_dir, "creatives") os.makedirs(creatives_dir, exist_ok=True) # Download and rename images logger.info(f"Downloading {len(ads)} images...") filename_map = await self.download_and_rename_images(ads, creatives_dir) if not filename_map: raise Exception("No images were successfully downloaded") # Create Excel sheet excel_path = os.path.join(self.temp_dir, "ad_copy_data.xlsx") logger.info("Creating Excel sheet...") self.create_excel_sheet(ads, filename_map, excel_path) # Create ZIP file timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") zip_filename = f"creatives_export_{timestamp}.zip" zip_path = os.path.join(tempfile.gettempdir(), zip_filename) logger.info(f"Creating ZIP file: {zip_filename}") with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf: # Add all images for filename in os.listdir(creatives_dir): file_path = os.path.join(creatives_dir, filename) zipf.write(file_path, os.path.join("creatives", filename)) # Add Excel file zipf.write(excel_path, "ad_copy_data.xlsx") logger.info(f"Export package created successfully: {zip_path}") return zip_path except Exception as e: logger.error(f"Failed to create export package: {e}") raise finally: # Cleanup temp directory (but keep ZIP file) if self.temp_dir and os.path.exists(self.temp_dir): try: shutil.rmtree(self.temp_dir) except Exception as e: logger.warning(f"Failed to cleanup temp directory: {e}") def cleanup_zip(self, zip_path: str): """Clean up the ZIP file after it's been sent.""" try: if os.path.exists(zip_path): os.remove(zip_path) logger.info(f"Cleaned up ZIP file: {zip_path}") except Exception as e: logger.warning(f"Failed to cleanup ZIP file: {e}") # Global export service instance export_service = ExportService()