Spaces:
Running
Running
| import sqlite3 | |
| import json | |
| import os | |
| import uuid | |
| from datetime import datetime | |
| from typing import List, Dict, Optional, Union | |
| from pathlib import Path | |
| import shutil | |
| from enum import Enum | |
| import threading | |
| class ImageType(Enum): | |
| FEATURED = "featured" | |
| POST_CONTENT = "post_content" | |
| GALLERY = "gallery" | |
| class FlexibleBlogDatabase: | |
| def __init__(self, db_path: str = "blog.db", media_dir: str = "blog_media"): | |
| self.db_path = db_path | |
| self.media_dir = Path(media_dir) | |
| self.media_dir.mkdir(exist_ok=True) | |
| self._lock = threading.Lock() | |
| self.init_database() | |
| def _get_connection(self): | |
| """Get a database connection with proper settings""" | |
| conn = sqlite3.connect(self.db_path, timeout=20.0) | |
| conn.execute("PRAGMA journal_mode=WAL") # Better for concurrent access | |
| conn.execute("PRAGMA busy_timeout=20000") # 20 second timeout | |
| return conn | |
| def init_database(self): | |
| """Initialize the flexible blog database with enhanced image support""" | |
| with self._lock: | |
| conn = self._get_connection() | |
| try: | |
| cursor = conn.cursor() | |
| # Blog posts table | |
| cursor.execute(''' | |
| CREATE TABLE IF NOT EXISTS blog_posts ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| title TEXT NOT NULL, | |
| content TEXT NOT NULL, | |
| author TEXT DEFAULT 'Admin', | |
| created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, | |
| published BOOLEAN DEFAULT 1, | |
| tags TEXT DEFAULT '[]', | |
| featured_image_id INTEGER, | |
| FOREIGN KEY (featured_image_id) REFERENCES images (id) | |
| ) | |
| ''') | |
| # Enhanced images table | |
| cursor.execute(''' | |
| CREATE TABLE IF NOT EXISTS images ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| filename TEXT NOT NULL, | |
| original_filename TEXT NOT NULL, | |
| file_path TEXT NOT NULL, | |
| file_size INTEGER, | |
| mime_type TEXT, | |
| alt_text TEXT DEFAULT '', | |
| caption TEXT DEFAULT '', | |
| width INTEGER, | |
| height INTEGER, | |
| created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP | |
| ) | |
| ''') | |
| # Enhanced junction table for post images | |
| cursor.execute(''' | |
| CREATE TABLE IF NOT EXISTS blog_post_images ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| blog_post_id INTEGER, | |
| image_id INTEGER, | |
| image_type TEXT DEFAULT 'post_content', | |
| image_order INTEGER DEFAULT 0, | |
| position_in_content INTEGER, | |
| FOREIGN KEY (blog_post_id) REFERENCES blog_posts (id), | |
| FOREIGN KEY (image_id) REFERENCES images (id) | |
| ) | |
| ''') | |
| conn.commit() | |
| finally: | |
| conn.close() | |
| def save_image(self, file_path: str, alt_text: str = "", caption: str = "", | |
| original_filename: str = "") -> int: | |
| """Save an image file and return its database ID""" | |
| if not os.path.exists(file_path): | |
| raise FileNotFoundError(f"Image file not found: {file_path}") | |
| # Generate unique filename | |
| file_extension = Path(file_path).suffix | |
| unique_filename = f"{uuid.uuid4()}{file_extension}" | |
| destination_path = self.media_dir / unique_filename | |
| # Copy file to media directory | |
| shutil.copy2(file_path, destination_path) | |
| # Get file info | |
| file_size = os.path.getsize(destination_path) | |
| mime_type = self._get_mime_type(file_extension) | |
| # Get image dimensions (optional - requires PIL) | |
| width, height = self._get_image_dimensions(destination_path) | |
| # Save to database with lock | |
| with self._lock: | |
| conn = self._get_connection() | |
| try: | |
| cursor = conn.cursor() | |
| cursor.execute(''' | |
| INSERT INTO images (filename, original_filename, file_path, file_size, | |
| mime_type, alt_text, caption, width, height) | |
| VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) | |
| ''', (unique_filename, original_filename or Path(file_path).name, | |
| str(destination_path), file_size, mime_type, alt_text, caption, width, height)) | |
| image_id = cursor.lastrowid | |
| conn.commit() | |
| return image_id | |
| finally: | |
| conn.close() | |
| def create_blog_post(self, title: str, content: str, author: str = "Admin", | |
| tags: List[str] = None) -> int: | |
| """Create a basic blog post without images""" | |
| with self._lock: | |
| conn = self._get_connection() | |
| try: | |
| cursor = conn.cursor() | |
| tags_json = json.dumps(tags or []) | |
| cursor.execute(''' | |
| INSERT INTO blog_posts (title, content, author, tags) | |
| VALUES (?, ?, ?, ?) | |
| ''', (title, content, author, tags_json)) | |
| blog_post_id = cursor.lastrowid | |
| conn.commit() | |
| return blog_post_id | |
| finally: | |
| conn.close() | |
| def add_featured_image(self, blog_post_id: int, image_path: str, | |
| alt_text: str = "", caption: str = "") -> int: | |
| """Add a featured image to an existing blog post""" | |
| # Save the image first | |
| image_id = self.save_image(image_path, alt_text, caption) | |
| # Update blog post with featured image | |
| with self._lock: | |
| conn = self._get_connection() | |
| try: | |
| cursor = conn.cursor() | |
| cursor.execute(''' | |
| UPDATE blog_posts SET featured_image_id = ? WHERE id = ? | |
| ''', (image_id, blog_post_id)) | |
| conn.commit() | |
| return image_id | |
| finally: | |
| conn.close() | |
| def add_post_images(self, blog_post_id: int, image_configs: List[Dict]) -> List[int]: | |
| """Add multiple post images to a blog post""" | |
| image_ids = [] | |
| # Save all images first | |
| for config in image_configs: | |
| image_id = self.save_image( | |
| config["file_path"], | |
| config.get("alt_text", ""), | |
| config.get("caption", "") | |
| ) | |
| image_ids.append((image_id, config)) | |
| # Link all images to blog post in one transaction | |
| with self._lock: | |
| conn = self._get_connection() | |
| try: | |
| cursor = conn.cursor() | |
| for image_id, config in image_ids: | |
| cursor.execute(''' | |
| INSERT INTO blog_post_images | |
| (blog_post_id, image_id, image_type, image_order, position_in_content) | |
| VALUES (?, ?, ?, ?, ?) | |
| ''', ( | |
| blog_post_id, | |
| image_id, | |
| ImageType.POST_CONTENT.value, | |
| config.get("order", 0), | |
| config.get("position") | |
| )) | |
| conn.commit() | |
| return [img_id for img_id, _ in image_ids] | |
| finally: | |
| conn.close() | |
| def create_complete_blog_post(self, title: str, content: str, author: str = "Admin", | |
| tags: List[str] = None, featured_image: Dict = None, | |
| post_images: List[Dict] = None) -> int: | |
| """Create a complete blog post with all images in one go""" | |
| # Create the blog post first | |
| blog_post_id = self.create_blog_post(title, content, author, tags) | |
| # Add featured image if provided | |
| if featured_image: | |
| self.add_featured_image( | |
| blog_post_id, | |
| featured_image["file_path"], | |
| featured_image.get("alt_text", ""), | |
| featured_image.get("caption", "") | |
| ) | |
| # Add post images if provided | |
| if post_images: | |
| self.add_post_images(blog_post_id, post_images) | |
| return blog_post_id | |
| def get_blog_post_complete(self, post_id: int) -> Optional[Dict]: | |
| """Get a complete blog post with all associated images""" | |
| with self._lock: | |
| conn = self._get_connection() | |
| try: | |
| cursor = conn.cursor() | |
| # Get blog post with featured image | |
| cursor.execute(''' | |
| SELECT bp.id, bp.title, bp.content, bp.author, bp.created_at, | |
| bp.published, bp.tags, bp.featured_image_id, | |
| fi.filename as featured_filename, fi.file_path as featured_path, | |
| fi.alt_text as featured_alt, fi.caption as featured_caption, | |
| fi.width as featured_width, fi.height as featured_height | |
| FROM blog_posts bp | |
| LEFT JOIN images fi ON bp.featured_image_id = fi.id | |
| WHERE bp.id = ? | |
| ''', (post_id,)) | |
| row = cursor.fetchone() | |
| if not row: | |
| return None | |
| # Get post content images | |
| cursor.execute(''' | |
| SELECT i.id, i.filename, i.file_path, i.alt_text, i.caption, | |
| i.mime_type, i.width, i.height, bpi.image_order, | |
| bpi.position_in_content, bpi.image_type | |
| FROM blog_post_images bpi | |
| JOIN images i ON bpi.image_id = i.id | |
| WHERE bpi.blog_post_id = ? AND bpi.image_type = ? | |
| ORDER BY bpi.image_order | |
| ''', (post_id, ImageType.POST_CONTENT.value)) | |
| post_images = cursor.fetchall() | |
| # Build result | |
| result = { | |
| 'id': row[0], | |
| 'title': row[1], | |
| 'content': row[2], | |
| 'author': row[3], | |
| 'created_at': row[4], | |
| 'published': row[5], | |
| 'tags': json.loads(row[6]), | |
| 'featured_image': { | |
| 'filename': row[8], | |
| 'file_path': row[9], | |
| 'alt_text': row[10], | |
| 'caption': row[11], | |
| 'width': row[12], | |
| 'height': row[13], | |
| 'url': self.get_image_url(row[8]) if row[8] else None | |
| } if row[7] else None, | |
| 'post_images': [ | |
| { | |
| 'id': img[0], | |
| 'filename': img[1], | |
| 'file_path': img[2], | |
| 'alt_text': img[3], | |
| 'caption': img[4], | |
| 'mime_type': img[5], | |
| 'width': img[6], | |
| 'height': img[7], | |
| 'order': img[8], | |
| 'position': img[9], | |
| 'type': img[10], | |
| 'url': self.get_image_url(img[1]) | |
| } | |
| for img in post_images | |
| ] | |
| } | |
| return result | |
| finally: | |
| conn.close() | |
| def _get_mime_type(self, file_extension: str) -> str: | |
| """Get MIME type based on file extension""" | |
| mime_types = { | |
| '.jpg': 'image/jpeg', | |
| '.jpeg': 'image/jpeg', | |
| '.png': 'image/png', | |
| '.gif': 'image/gif', | |
| '.webp': 'image/webp', | |
| '.svg': 'image/svg+xml' | |
| } | |
| return mime_types.get(file_extension.lower(), 'application/octet-stream') | |
| def _get_image_dimensions(self, image_path: str) -> tuple: | |
| """Get image dimensions (requires PIL/Pillow)""" | |
| try: | |
| from PIL import Image | |
| with Image.open(image_path) as img: | |
| return img.size | |
| except ImportError: | |
| return None, None | |
| except Exception: | |
| return None, None | |
| def get_image_url(self, image_filename: str) -> str: | |
| """Generate URL for serving images""" | |
| return f"/media/{image_filename}" | |
| def list_recent_posts_with_images(self, limit: int = 10) -> List[Dict]: | |
| """Get recent blog posts with image counts""" | |
| with self._lock: | |
| conn = self._get_connection() | |
| try: | |
| cursor = conn.cursor() | |
| cursor.execute(''' | |
| SELECT bp.id, bp.title, bp.author, bp.created_at, bp.published, bp.tags, | |
| bp.featured_image_id, | |
| fi.filename as featured_filename, | |
| COUNT(bpi.id) as post_image_count | |
| FROM blog_posts bp | |
| LEFT JOIN images fi ON bp.featured_image_id = fi.id | |
| LEFT JOIN blog_post_images bpi ON bp.id = bpi.blog_post_id | |
| WHERE bp.published = 1 | |
| GROUP BY bp.id | |
| ORDER BY bp.created_at DESC | |
| LIMIT ? | |
| ''', (limit,)) | |
| rows = cursor.fetchall() | |
| return [ | |
| { | |
| 'id': row[0], | |
| 'title': row[1], | |
| 'author': row[2], | |
| 'created_at': row[3], | |
| 'published': row[4], | |
| 'tags': json.loads(row[5]), | |
| 'has_featured_image': row[6] is not None, | |
| 'featured_image_url': self.get_image_url(row[7]) if row[7] else None, | |
| 'post_image_count': row[8] | |
| } | |
| for row in rows | |
| ] | |
| finally: | |
| conn.close() |