Spaces:
Sleeping
Sleeping
| from fastapi import FastAPI, HTTPException | |
| from fastapi.staticfiles import StaticFiles | |
| from pydantic import BaseModel | |
| from typing import List, Optional, Dict | |
| import json | |
| from pathlib import Path | |
| import os | |
| from datetime import datetime | |
| from supabase import create_client, Client | |
| #http_client = httpx.Client(verify=r'C:\Users\PD817AE\OneDrive - EY\Desktop\AgenticDev\amplify\backend\certs\Zscaler Root CA.crt') | |
| class BlogPost(BaseModel): | |
| id: int | |
| title: str | |
| content: str | |
| author: str | |
| created_at: str | |
| published: bool | |
| tags: List[str] | |
| category: Optional[str] = None | |
| featured_image: Optional[Dict] = None | |
| post_images: List[Dict] = [] | |
| class BlogSummary(BaseModel): | |
| id: int | |
| title: str | |
| author: str | |
| created_at: str | |
| tags: List[str] | |
| category: Optional[str] = None | |
| excerpt: str | |
| has_featured_image: bool | |
| featured_image_url: Optional[str] = None | |
| post_image_count: int | |
| class BlogDatabase: | |
| def __init__(self): | |
| self.url = os.getenv("SUPABASE_URL") | |
| self.key = os.getenv("SUPABASE_KEY") | |
| if not self.url or not self.key: | |
| raise ValueError("SUPABASE_URL and SUPABASE_KEY environment variables are required") | |
| self.supabase: Client = create_client(self.url, self.key) | |
| def get_blog_posts_summary(self, limit: int = 6, offset: int = 0, category: Optional[str] = None) -> Dict: | |
| """Get blog posts summary for card display with pagination, optional category filter""" | |
| try: | |
| # Get total count | |
| # Request exact count (Supabase client accepts 'exact') | |
| count_query = self.supabase.table('blog_posts').select('id', count='exact').eq('published', True) # type: ignore[arg-type] | |
| if category and category.lower() != 'all': | |
| count_query = count_query.eq('category', category) | |
| count_result = count_query.execute() | |
| total_raw = getattr(count_result, 'count', 0) | |
| try: | |
| total_count = int(total_raw) if total_raw is not None else 0 | |
| except Exception: | |
| total_count = 0 | |
| # Get posts with pagination | |
| list_query = ( | |
| self.supabase | |
| .table('blog_posts') | |
| .select(''' | |
| id, | |
| title, | |
| author, | |
| created_at, | |
| tags, | |
| category, | |
| content, | |
| featured_image_id, | |
| images!featured_image_id(filename), | |
| blog_post_images(id) | |
| ''') | |
| .eq('published', True) | |
| ) | |
| if category and category.lower() != 'all': | |
| list_query = list_query.eq('category', category) | |
| result = ( | |
| list_query | |
| .order('created_at', desc=True) | |
| .range(offset, offset + limit - 1) | |
| .execute() | |
| ) | |
| results = [] | |
| for row in result.data: | |
| # Create excerpt from content (first 150 characters) | |
| content = row['content'] | |
| excerpt = content[:150] + "..." if len(content) > 150 else content | |
| # Parse tags if they're stored as JSON string | |
| tags = row['tags'] | |
| if isinstance(tags, str): | |
| try: | |
| tags = json.loads(tags) | |
| except: | |
| tags = [] | |
| featured_image = row.get('images') | |
| results.append({ | |
| 'id': row['id'], | |
| 'title': row['title'], | |
| 'author': row['author'], | |
| 'created_at': row['created_at'], | |
| 'tags': tags, | |
| 'category': row.get('category'), | |
| 'excerpt': excerpt, | |
| 'has_featured_image': featured_image is not None, | |
| 'featured_image_url': f"/media/{featured_image['filename']}" if featured_image else None, | |
| 'post_image_count': len(row.get('blog_post_images', [])) | |
| }) | |
| has_more = False | |
| try: | |
| has_more = (offset + limit) < int(total_count) | |
| except Exception: | |
| has_more = False | |
| return { | |
| 'posts': results, | |
| 'total': total_count, | |
| 'limit': limit, | |
| 'offset': offset, | |
| 'has_more': has_more | |
| } | |
| except Exception as e: | |
| print(f"Error fetching blog posts: {e}") | |
| return { | |
| 'posts': [], | |
| 'total': 0, | |
| 'limit': limit, | |
| 'offset': offset, | |
| 'has_more': False | |
| } | |
| def get_blog_post_complete(self, post_id: int) -> Optional[Dict]: | |
| """Get complete blog post with all images""" | |
| try: | |
| # Get blog post with featured image | |
| result = ( | |
| self.supabase | |
| .table('blog_posts') | |
| .select(''' | |
| id, | |
| title, | |
| content, | |
| author, | |
| created_at, | |
| published, | |
| category, | |
| tags, | |
| featured_image_id, | |
| images!featured_image_id( | |
| filename, | |
| file_path, | |
| alt_text, | |
| caption, | |
| width, | |
| height | |
| ) | |
| ''') | |
| .eq('id', post_id) | |
| .eq('published', True) | |
| .single() | |
| .execute() | |
| ) | |
| if not result.data: | |
| return None | |
| row = result.data | |
| # Get post content images | |
| images_result = ( | |
| self.supabase | |
| .table('blog_post_images') | |
| .select(''' | |
| images( | |
| id, | |
| filename, | |
| file_path, | |
| alt_text, | |
| caption, | |
| mime_type, | |
| width, | |
| height | |
| ), | |
| image_order, | |
| position_in_content, | |
| image_type | |
| ''') | |
| .eq('blog_post_id', post_id) | |
| .order('image_order') | |
| .execute() | |
| ) | |
| # Parse tags if they're stored as JSON string | |
| tags = row['tags'] | |
| if isinstance(tags, str): | |
| try: | |
| tags = json.loads(tags) | |
| except: | |
| tags = [] | |
| # Build result | |
| featured_image_data = row.get('images') | |
| result = { | |
| 'id': row['id'], | |
| 'title': row['title'], | |
| 'content': row['content'], | |
| 'author': row['author'], | |
| 'created_at': row['created_at'], | |
| 'published': row['published'], | |
| 'tags': tags, | |
| 'category': row.get('category'), | |
| 'featured_image': { | |
| 'filename': featured_image_data['filename'], | |
| 'file_path': featured_image_data['file_path'], | |
| 'alt_text': featured_image_data['alt_text'], | |
| 'caption': featured_image_data['caption'], | |
| 'width': featured_image_data['width'], | |
| 'height': featured_image_data['height'], | |
| 'url': f"/media/{featured_image_data['filename']}" | |
| } if featured_image_data else None, | |
| 'post_images': [ | |
| { | |
| 'id': img_row['images']['id'], | |
| 'filename': img_row['images']['filename'], | |
| 'file_path': img_row['images']['file_path'], | |
| 'alt_text': img_row['images']['alt_text'], | |
| 'caption': img_row['images']['caption'], | |
| 'mime_type': img_row['images']['mime_type'], | |
| 'width': img_row['images']['width'], | |
| 'height': img_row['images']['height'], | |
| 'order': img_row['image_order'], | |
| 'position': img_row['position_in_content'], | |
| 'type': img_row['image_type'], | |
| 'url': f"/media/{img_row['images']['filename']}" | |
| } | |
| for img_row in images_result.data | |
| ] | |
| } | |
| return result | |
| except Exception as e: | |
| print(f"Error fetching blog post {post_id}: {e}") | |
| return None | |
| # Initialize database | |
| blog_db = BlogDatabase() | |
| def setup_blog_routes(app: FastAPI): | |
| """Setup blog API routes""" | |
| async def get_blog_posts(page: int = 1, limit: int = 6, category: Optional[str] = None): | |
| """Get blog posts for card display with pagination""" | |
| offset = (page - 1) * limit | |
| result = blog_db.get_blog_posts_summary(limit=limit, offset=offset, category=category) | |
| return result | |
| async def get_blog_post(post_id: int): | |
| """Get complete blog post""" | |
| post = blog_db.get_blog_post_complete(post_id) | |
| if not post: | |
| raise HTTPException(status_code=404, detail="Blog post not found") | |
| return post | |
| async def search_blog_posts(q: str, limit: int = 50, category: Optional[str] = None): | |
| """Search blog posts by tag relevance. | |
| Scoring: | |
| - Exact tag match: 1 point | |
| - Partial (substring) match: 0.5 point (only if not exact) | |
| Percentage = score / len(unique query tokens) | |
| Returns posts sorted by percentage desc then created_at desc. | |
| """ | |
| query = (q or "").strip().lower() | |
| if not query: | |
| return { 'posts': [], 'total': 0 } | |
| # Split on spaces / commas, remove empties, dedupe, limit tokens | |
| raw_tokens = [t for t in [p.strip() for p in query.replace(',', ' ').split(' ')] if t] | |
| tokens: List[str] = [] | |
| for t in raw_tokens: | |
| if t not in tokens: | |
| tokens.append(t) | |
| if len(tokens) >= 8: # hard cap to avoid large scoring loops | |
| break | |
| if not tokens: | |
| return { 'posts': [], 'total': 0 } | |
| try: | |
| # Fetch a larger slice of published posts (could be optimized w/ materialized view later) | |
| base_query = ( | |
| blog_db.supabase | |
| .table('blog_posts') | |
| .select(''' | |
| id, | |
| title, | |
| author, | |
| created_at, | |
| tags, | |
| category, | |
| content, | |
| featured_image_id, | |
| images!featured_image_id(filename), | |
| blog_post_images(id) | |
| ''') | |
| .eq('published', True) | |
| ) | |
| if category and category.lower() != 'all': | |
| base_query = base_query.eq('category', category) | |
| result = ( | |
| base_query | |
| .order('created_at', desc=True) | |
| .limit(400) # safety cap | |
| .execute() | |
| ) | |
| except Exception as e: | |
| print(f"Search fetch error: {e}") | |
| raise HTTPException(status_code=500, detail="Search failed") | |
| scored = [] | |
| token_set = set(tokens) | |
| max_score = float(len(token_set)) | |
| for row in result.data: | |
| row_tags = row.get('tags', []) | |
| if isinstance(row_tags, str): | |
| try: | |
| row_tags = json.loads(row_tags) | |
| except: | |
| row_tags = [] | |
| # Normalize tags | |
| norm_tags = [str(t).lower() for t in row_tags] | |
| if not norm_tags: | |
| continue | |
| score = 0.0 | |
| for tk in token_set: | |
| exact = any(tk == tag for tag in norm_tags) | |
| if exact: | |
| score += 1.0 | |
| continue | |
| partial = any(tk in tag for tag in norm_tags) | |
| if partial: | |
| score += 0.5 | |
| if score <= 0: | |
| continue | |
| percent = score / max_score | |
| content = row['content'] | |
| excerpt = content[:150] + "..." if len(content) > 150 else content | |
| featured_image = row.get('images') | |
| scored.append({ | |
| 'id': row['id'], | |
| 'title': row['title'], | |
| 'author': row['author'], | |
| 'created_at': row['created_at'], | |
| 'tags': row_tags, | |
| 'category': row.get('category'), | |
| 'excerpt': excerpt, | |
| 'has_featured_image': featured_image is not None, | |
| 'featured_image_url': f"/media/{featured_image['filename']}" if featured_image else None, | |
| 'post_image_count': len(row.get('blog_post_images', [])), | |
| 'percent_match': round(percent * 100, 2) | |
| }) | |
| # Prepare sortable timestamp (fallback to 0 if missing or unparsable) | |
| for item in scored: | |
| raw_dt = item.get('created_at') | |
| ts = 0.0 | |
| if raw_dt: | |
| try: | |
| # Remove Z if present for fromisoformat compatibility | |
| cleaned = raw_dt.replace('Z', '') | |
| ts = datetime.fromisoformat(cleaned).timestamp() | |
| except Exception: | |
| ts = 0.0 | |
| item['_ts'] = ts | |
| # Sort: highest percent_match first, then newest (_ts desc) | |
| scored.sort(key=lambda x: (-x['percent_match'], -x['_ts'])) | |
| # Drop helper key | |
| for item in scored: | |
| item.pop('_ts', None) | |
| # Trim | |
| scored = scored[:limit] | |
| return { 'posts': scored, 'total': len(scored), 'query_tokens': tokens } | |
| # Mount media files if blog_media directory exists | |
| media_dir = Path("blog_media") | |
| if media_dir.exists(): | |
| app.mount("/media", StaticFiles(directory=str(media_dir)), name="media") |