amplify / backend /blog_api_local.py
github-actions
Sync from GitHub Fri Dec 26 12:29:52 UTC 2025
aff341e
from fastapi import FastAPI, HTTPException
from fastapi.staticfiles import StaticFiles
from pydantic import BaseModel
from typing import List, Optional, Dict
import sqlite3
import json
from pathlib import Path
import os
class BlogPost(BaseModel):
id: int
title: str
content: str
author: str
created_at: str
published: bool
tags: List[str]
featured_image: Optional[Dict] = None
post_images: List[Dict] = []
class BlogSummary(BaseModel):
id: int
title: str
author: str
created_at: str
tags: List[str]
excerpt: str
has_featured_image: bool
featured_image_url: Optional[str] = None
post_image_count: int
class BlogDatabase:
def __init__(self, db_path: str = "blog.db"):
self.db_path = db_path
self.init_database()
def init_database(self):
"""Initialize the blog database if it doesn't exist"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
# Check if tables exist
cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='blog_posts'")
if not cursor.fetchone():
# Create tables if they don't exist
cursor.execute('''
CREATE TABLE blog_posts (
id INTEGER PRIMARY KEY AUTOINCREMENT,
title TEXT NOT NULL,
content TEXT NOT NULL,
author TEXT DEFAULT 'Admin',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
published BOOLEAN DEFAULT 1,
tags TEXT DEFAULT '[]',
featured_image_id INTEGER
)
''')
cursor.execute('''
CREATE TABLE images (
id INTEGER PRIMARY KEY AUTOINCREMENT,
filename TEXT NOT NULL,
original_filename TEXT NOT NULL,
file_path TEXT NOT NULL,
file_size INTEGER,
mime_type TEXT,
alt_text TEXT DEFAULT '',
caption TEXT DEFAULT '',
width INTEGER,
height INTEGER,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
cursor.execute('''
CREATE TABLE blog_post_images (
id INTEGER PRIMARY KEY AUTOINCREMENT,
blog_post_id INTEGER,
image_id INTEGER,
image_type TEXT DEFAULT 'post_content',
image_order INTEGER DEFAULT 0,
position_in_content INTEGER,
FOREIGN KEY (blog_post_id) REFERENCES blog_posts (id),
FOREIGN KEY (image_id) REFERENCES images (id)
)
''')
# Insert sample blog posts
sample_posts = [
{
"title": "Welcome to Our Blog",
"content": "This is our first blog post! We're excited to share insights about technology, development, and innovation. Stay tuned for more amazing content coming your way.",
"author": "Admin",
"tags": '["welcome", "introduction", "blog"]'
},
{
"title": "The Future of AI Development",
"content": "Artificial Intelligence is revolutionizing how we build applications. From machine learning models to natural language processing, AI is becoming an integral part of modern software development. In this post, we explore the latest trends and technologies shaping the future of AI development.",
"author": "Tech Team",
"tags": '["AI", "development", "technology", "future"]'
},
{
"title": "Best Practices for Web Development",
"content": "Building modern web applications requires following best practices for performance, security, and user experience. We'll cover essential techniques including responsive design, API optimization, and modern JavaScript frameworks that every developer should know.",
"author": "Development Team",
"tags": '["web-development", "best-practices", "javascript", "performance"]'
},
{
"title": "Building Scalable Applications",
"content": "Scalability is crucial for applications that need to handle growing user bases and increasing data loads. We'll discuss architectural patterns, database optimization, and cloud deployment strategies that help applications scale efficiently.",
"author": "Architecture Team",
"tags": '["scalability", "architecture", "cloud", "performance"]'
}
]
for post in sample_posts:
cursor.execute('''
INSERT INTO blog_posts (title, content, author, tags)
VALUES (?, ?, ?, ?)
''', (post["title"], post["content"], post["author"], post["tags"]))
conn.commit()
conn.close()
def get_blog_posts_summary(self, limit: int = 4, offset: int = 0) -> Dict:
"""Get blog posts summary for card display with pagination"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
# Get total count
cursor.execute('SELECT COUNT(*) FROM blog_posts WHERE published = 1')
total_count = cursor.fetchone()[0]
# Get posts with pagination
cursor.execute('''
SELECT bp.id, bp.title, bp.author, bp.created_at, bp.tags, bp.content,
bp.featured_image_id,
fi.filename as featured_filename,
COUNT(bpi.id) as post_image_count
FROM blog_posts bp
LEFT JOIN images fi ON bp.featured_image_id = fi.id
LEFT JOIN blog_post_images bpi ON bp.id = bpi.blog_post_id
WHERE bp.published = 1
GROUP BY bp.id
ORDER BY bp.created_at DESC
LIMIT ? OFFSET ?
''', (limit, offset))
rows = cursor.fetchall()
conn.close()
results = []
for row in rows:
# Create excerpt from content (first 150 characters)
content = row[5]
excerpt = content[:150] + "..." if len(content) > 150 else content
results.append({
'id': row[0],
'title': row[1],
'author': row[2],
'created_at': row[3],
'tags': json.loads(row[4]),
'excerpt': excerpt,
'has_featured_image': row[6] is not None,
'featured_image_url': f"/media/{row[7]}" if row[7] else None,
'post_image_count': row[8]
})
return {
'posts': results,
'total': total_count,
'limit': limit,
'offset': offset,
'has_more': offset + limit < total_count
}
def get_blog_post_complete(self, post_id: int) -> Optional[Dict]:
"""Get complete blog post with all images"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
# Get blog post with featured image
cursor.execute('''
SELECT bp.id, bp.title, bp.content, bp.author, bp.created_at,
bp.published, bp.tags, bp.featured_image_id,
fi.filename as featured_filename, fi.file_path as featured_path,
fi.alt_text as featured_alt, fi.caption as featured_caption,
fi.width as featured_width, fi.height as featured_height
FROM blog_posts bp
LEFT JOIN images fi ON bp.featured_image_id = fi.id
WHERE bp.id = ? AND bp.published = 1
''', (post_id,))
row = cursor.fetchone()
if not row:
conn.close()
return None
# Get post content images
cursor.execute('''
SELECT i.id, i.filename, i.file_path, i.alt_text, i.caption,
i.mime_type, i.width, i.height, bpi.image_order,
bpi.position_in_content, bpi.image_type
FROM blog_post_images bpi
JOIN images i ON bpi.image_id = i.id
WHERE bpi.blog_post_id = ?
ORDER BY bpi.image_order
''', (post_id,))
post_images = cursor.fetchall()
conn.close()
# Build result
result = {
'id': row[0],
'title': row[1],
'content': row[2],
'author': row[3],
'created_at': row[4],
'published': row[5],
'tags': json.loads(row[6]),
'featured_image': {
'filename': row[8],
'file_path': row[9],
'alt_text': row[10],
'caption': row[11],
'width': row[12],
'height': row[13],
'url': f"/media/{row[8]}" if row[8] else None
} if row[7] else None,
'post_images': [
{
'id': img[0],
'filename': img[1],
'file_path': img[2],
'alt_text': img[3],
'caption': img[4],
'mime_type': img[5],
'width': img[6],
'height': img[7],
'order': img[8],
'position': img[9],
'type': img[10],
'url': f"/media/{img[1]}"
}
for img in post_images
]
}
return result
# Initialize database
blog_db = BlogDatabase()
def setup_blog_routes(app: FastAPI):
"""Setup blog API routes"""
@app.get("/api/blog/posts")
async def get_blog_posts(page: int = 1, limit: int = 4):
"""Get blog posts for card display with pagination"""
offset = (page - 1) * limit
result = blog_db.get_blog_posts_summary(limit=limit, offset=offset)
return result
@app.get("/api/blog/posts/{post_id}", response_model=BlogPost)
async def get_blog_post(post_id: int):
"""Get complete blog post"""
post = blog_db.get_blog_post_complete(post_id)
if not post:
raise HTTPException(status_code=404, detail="Blog post not found")
return post
# Mount media files if blog_media directory exists
media_dir = Path("blog_media")
if media_dir.exists():
app.mount("/media", StaticFiles(directory=str(media_dir)), name="media")