""" Unified app for HF Spaces deployment. Serves FastAPI backend + static Next.js frontend from a single process. Uses SQLite + APScheduler instead of PostgreSQL + Celery/Redis. """ import os import json from datetime import datetime, timedelta from contextlib import asynccontextmanager from typing import List, Optional from fastapi import FastAPI, Depends, HTTPException, status, BackgroundTasks from fastapi.middleware.cors import CORSMiddleware from fastapi.staticfiles import StaticFiles from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials from fastapi.responses import FileResponse from sqlalchemy import create_engine, Column, Integer, String, DateTime, Boolean, Text, JSON, ForeignKey, Float from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import sessionmaker, Session, relationship import jwt from passlib.context import CryptContext from apscheduler.schedulers.asyncio import AsyncIOScheduler import httpx # ---- CONFIG ---- DATABASE_URL = os.getenv("DATABASE_URL", "sqlite:////app/data/dashboard.db") SECRET_KEY = os.getenv("SECRET_KEY", "hf-spaces-secret-key-change-me") REDIRECT_BASE = os.getenv("OAUTH_REDIRECT_BASE", "https://huggingface.co/spaces/") # OAuth credentials from env FACEBOOK_APP_ID = os.getenv("FACEBOOK_APP_ID", "") FACEBOOK_APP_SECRET = os.getenv("FACEBOOK_APP_SECRET", "") TWITTER_API_KEY = os.getenv("TWITTER_API_KEY", "") TWITTER_API_SECRET = os.getenv("TWITTER_API_SECRET", "") INSTAGRAM_APP_ID = os.getenv("INSTAGRAM_APP_ID", "") INSTAGRAM_APP_SECRET = os.getenv("INSTAGRAM_APP_SECRET", "") TIKTOK_CLIENT_KEY = os.getenv("TIKTOK_CLIENT_KEY", "") TIKTOK_CLIENT_SECRET = os.getenv("TIKTOK_CLIENT_SECRET", "") # ---- DATABASE ---- engine = create_engine(DATABASE_URL, connect_args={"check_same_thread": False} if "sqlite" in DATABASE_URL else {}) SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine) Base = declarative_base() class User(Base): __tablename__ = "users" id = Column(Integer, primary_key=True, index=True) email = Column(String, unique=True, index=True) hashed_password = Column(String) full_name = Column(String) is_active = Column(Boolean, default=True) created_at = Column(DateTime, default=datetime.utcnow) social_accounts = relationship("SocialAccount", back_populates="user", cascade="all, delete-orphan") posts = relationship("Post", back_populates="user", cascade="all, delete-orphan") listening_rules = relationship("ListeningRule", back_populates="user", cascade="all, delete-orphan") class SocialAccount(Base): __tablename__ = "social_accounts" id = Column(Integer, primary_key=True, index=True) user_id = Column(Integer, ForeignKey("users.id")) platform = Column(String) account_name = Column(String) account_id = Column(String) access_token = Column(Text) refresh_token = Column(Text) token_expires_at = Column(DateTime, nullable=True) profile_picture = Column(String, nullable=True) follower_count = Column(Integer, default=0) is_active = Column(Boolean, default=True) created_at = Column(DateTime, default=datetime.utcnow) updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow) user = relationship("User", back_populates="social_accounts") posts = relationship("Post", back_populates="account") analytics = relationship("AnalyticsSnapshot", back_populates="account") class Post(Base): __tablename__ = "posts" id = Column(Integer, primary_key=True, index=True) user_id = Column(Integer, ForeignKey("users.id")) account_id = Column(Integer, ForeignKey("social_accounts.id")) platform = Column(String) content = Column(Text) media_urls = Column(JSON, default=list) status = Column(String, default="draft") scheduled_at = Column(DateTime, nullable=True) published_at = Column(DateTime, nullable=True) external_post_id = Column(String, nullable=True) platform_post_url = Column(String, nullable=True) engagement_stats = Column(JSON, default=dict) created_at = Column(DateTime, default=datetime.utcnow) user = relationship("User", back_populates="posts") account = relationship("SocialAccount", back_populates="posts") class AnalyticsSnapshot(Base): __tablename__ = "analytics_snapshots" id = Column(Integer, primary_key=True, index=True) account_id = Column(Integer, ForeignKey("social_accounts.id")) snapshot_date = Column(DateTime, default=datetime.utcnow) followers = Column(Integer, default=0) following = Column(Integer, default=0) posts_count = Column(Integer, default=0) impressions = Column(Integer, default=0) reach = Column(Integer, default=0) engagement = Column(Integer, default=0) profile_views = Column(Integer, default=0) account = relationship("SocialAccount", back_populates="analytics") class ListeningRule(Base): __tablename__ = "listening_rules" id = Column(Integer, primary_key=True, index=True) user_id = Column(Integer, ForeignKey("users.id")) name = Column(String) platforms = Column(JSON, default=list) keywords = Column(JSON, default=list) hashtags = Column(JSON, default=list) competitor_accounts = Column(JSON, default=list) is_active = Column(Boolean, default=True) created_at = Column(DateTime, default=datetime.utcnow) user = relationship("User", back_populates="listening_rules") mentions = relationship("Mention", back_populates="rule") class Mention(Base): __tablename__ = "mentions" id = Column(Integer, primary_key=True, index=True) rule_id = Column(Integer, ForeignKey("listening_rules.id")) platform = Column(String) external_id = Column(String) author_name = Column(String) author_handle = Column(String) content = Column(Text) url = Column(String) sentiment = Column(String, nullable=True) engagement_count = Column(Integer, default=0) created_at = Column(DateTime, default=datetime.utcnow) discovered_at = Column(DateTime, default=datetime.utcnow) rule = relationship("ListeningRule", back_populates="mentions") class InboxMessage(Base): __tablename__ = "inbox_messages" id = Column(Integer, primary_key=True, index=True) account_id = Column(Integer, ForeignKey("social_accounts.id")) platform = Column(String) external_id = Column(String) sender_name = Column(String) sender_handle = Column(String) content = Column(Text) message_type = Column(String) is_read = Column(Boolean, default=False) replied_at = Column(DateTime, nullable=True) reply_content = Column(Text, nullable=True) url = Column(String, nullable=True) created_at = Column(DateTime, default=datetime.utcnow) class CompetitorBenchmark(Base): __tablename__ = "competitor_benchmarks" id = Column(Integer, primary_key=True, index=True) user_id = Column(Integer, ForeignKey("users.id")) platform = Column(String) competitor_handle = Column(String) follower_count = Column(Integer, default=0) posts_count = Column(Integer, default=0) avg_engagement_rate = Column(Float, default=0.0) snapshot_date = Column(DateTime, default=datetime.utcnow) Base.metadata.create_all(bind=engine) # ---- UTILS ---- pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") security = HTTPBearer(auto_error=False) scheduler = AsyncIOScheduler() def get_db(): db = SessionLocal() try: yield db finally: db.close() def create_access_token(data: dict, expires_delta: Optional[timedelta] = None): to_encode = data.copy() expire = datetime.utcnow() + (expires_delta or timedelta(minutes=60)) to_encode.update({"exp": expire}) return jwt.encode(to_encode, SECRET_KEY, algorithm="HS256") def get_current_user(credentials: HTTPAuthorizationCredentials = Depends(security), db: Session = Depends(get_db)): if not credentials: raise HTTPException(status_code=401, detail="Authentication required") try: payload = jwt.decode(credentials.credentials, SECRET_KEY, algorithms=["HS256"]) user_id = payload.get("sub") if user_id is None: raise HTTPException(status_code=401, detail="Invalid token") user = db.query(User).filter(User.id == int(user_id)).first() if not user: raise HTTPException(status_code=401, detail="User not found") return user except jwt.PyJWTError: raise HTTPException(status_code=401, detail="Invalid token") # ---- OAUTH SERVICE ---- class OAuthService: def __init__(self): self.providers = { 'facebook': { 'auth_url': 'https://www.facebook.com/v18.0/dialog/oauth', 'token_url': 'https://graph.facebook.com/v18.0/oauth/access_token', 'client_id': FACEBOOK_APP_ID, 'client_secret': FACEBOOK_APP_SECRET, 'scope': 'pages_manage_posts,pages_read_engagement,pages_manage_metadata,instagram_basic,instagram_content_publish', 'redirect_uri': f"{REDIRECT_BASE}/auth/facebook/callback", }, 'twitter': { 'auth_url': 'https://twitter.com/i/oauth2/authorize', 'token_url': 'https://api.twitter.com/2/oauth2/token', 'client_id': TWITTER_API_KEY, 'client_secret': TWITTER_API_SECRET, 'scope': 'tweet.read tweet.write users.read offline.access', 'redirect_uri': f"{REDIRECT_BASE}/auth/twitter/callback", }, 'instagram': { 'auth_url': 'https://www.facebook.com/v18.0/dialog/oauth', 'token_url': 'https://graph.facebook.com/v18.0/oauth/access_token', 'client_id': INSTAGRAM_APP_ID, 'client_secret': INSTAGRAM_APP_SECRET, 'scope': 'instagram_basic,instagram_content_publish', 'redirect_uri': f"{REDIRECT_BASE}/auth/instagram/callback", }, 'tiktok': { 'auth_url': 'https://www.tiktok.com/v2/auth/authorize', 'token_url': 'https://open.tiktokapis.com/v2/oauth/token/', 'client_id': TIKTOK_CLIENT_KEY, 'client_secret': TIKTOK_CLIENT_SECRET, 'scope': 'video.publish,user.info.basic', 'redirect_uri': f"{REDIRECT_BASE}/auth/tiktok/callback", } } def get_auth_url(self, platform: str, state: str): provider = self.providers.get(platform) if not provider: raise ValueError(f"Unsupported platform: {platform}") params = { 'client_id': provider['client_id'], 'redirect_uri': provider['redirect_uri'], 'scope': provider['scope'], 'response_type': 'code', 'state': state, } if platform == 'twitter': params['code_challenge'] = 'challenge' params['code_challenge_method'] = 'plain' query = '&'.join([f"{k}={v}" for k, v in params.items()]) return f"{provider['auth_url']}?{query}" oauth_service = OAuthService() # ---- SCHEDULER TASKS ---- async def publish_scheduled_posts(): db = SessionLocal() try: now = datetime.utcnow() posts = db.query(Post).filter(Post.status == 'scheduled', Post.scheduled_at <= now).all() for post in posts: post.status = 'published' post.published_at = now db.commit() finally: db.close() async def collect_analytics(): db = SessionLocal() try: accounts = db.query(SocialAccount).filter(SocialAccount.is_active == True).all() for account in accounts: snapshot = AnalyticsSnapshot(account_id=account.id, followers=account.follower_count) db.add(snapshot) db.commit() finally: db.close() # ---- LIFESPAN ---- @asynccontextmanager async def lifespan(app: FastAPI): scheduler.add_job(publish_scheduled_posts, 'interval', minutes=1, id='publish_posts') scheduler.add_job(collect_analytics, 'interval', hours=1, id='collect_analytics') scheduler.start() yield scheduler.shutdown() # ---- APP ---- app = FastAPI(title="Social Dashboard", version="0.1.0", lifespan=lifespan) app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # ---- AUTH ---- @app.post("/api/auth/register") def register(user_data: dict, db: Session = Depends(get_db)): if db.query(User).filter(User.email == user_data.get('email')).first(): raise HTTPException(status_code=400, detail="Email already registered") db_user = User( email=user_data.get('email'), hashed_password=pwd_context.hash(user_data.get('password')), full_name=user_data.get('full_name', '') ) db.add(db_user) db.commit() db.refresh(db_user) return {"id": db_user.id, "email": db_user.email, "full_name": db_user.full_name, "is_active": db_user.is_active, "created_at": db_user.created_at.isoformat()} @app.post("/api/auth/login") def login(user_data: dict, db: Session = Depends(get_db)): db_user = db.query(User).filter(User.email == user_data.get('email')).first() if not db_user or not pwd_context.verify(user_data.get('password'), db_user.hashed_password): raise HTTPException(status_code=401, detail="Invalid credentials") token = create_access_token({"sub": str(db_user.id)}) return {"access_token": token, "token_type": "bearer", "user": {"id": db_user.id, "email": db_user.email, "full_name": db_user.full_name, "is_active": db_user.is_active, "created_at": db_user.created_at.isoformat()}} @app.get("/api/auth/{platform}/url") def get_auth_url(platform: str): state = f"user_temp_{platform}_{datetime.utcnow().timestamp()}" return {"auth_url": oauth_service.get_auth_url(platform, state), "state": state} @app.get("/api/auth/{platform}/callback") async def oauth_callback(platform: str, code: str, state: str, db: Session = Depends(get_db)): return {"success": True, "message": "Connect via official OAuth. Set your app credentials in Settings."} # ---- ACCOUNTS ---- @app.get("/api/accounts") def list_accounts(current_user: User = Depends(get_current_user), db: Session = Depends(get_db)): accounts = db.query(SocialAccount).filter(SocialAccount.user_id == current_user.id).all() return [{"id": a.id, "platform": a.platform, "account_name": a.account_name, "account_id": a.account_id, "profile_picture": a.profile_picture, "follower_count": a.follower_count, "is_active": a.is_active, "created_at": a.created_at.isoformat()} for a in accounts] @app.delete("/api/accounts/{account_id}") def delete_account(account_id: int, current_user: User = Depends(get_current_user), db: Session = Depends(get_db)): account = db.query(SocialAccount).filter(SocialAccount.id == account_id, SocialAccount.user_id == current_user.id).first() if not account: raise HTTPException(status_code=404, detail="Account not found") db.delete(account) db.commit() return {"success": True} # ---- POSTS ---- @app.post("/api/posts") def create_post(post_data: dict, current_user: User = Depends(get_current_user), db: Session = Depends(get_db)): db_post = Post( user_id=current_user.id, account_id=post_data.get('account_id'), platform=post_data.get('platform'), content=post_data.get('content'), media_urls=post_data.get('media_urls', []), scheduled_at=datetime.fromisoformat(post_data.get('scheduled_at').replace('Z', '+00:00')) if post_data.get('scheduled_at') else None, ) if db_post.scheduled_at and db_post.scheduled_at > datetime.utcnow(): db_post.status = 'scheduled' db.add(db_post) db.commit() db.refresh(db_post) return {"id": db_post.id, "platform": db_post.platform, "content": db_post.content, "media_urls": db_post.media_urls, "status": db_post.status, "scheduled_at": db_post.scheduled_at.isoformat() if db_post.scheduled_at else None, "published_at": None, "external_post_id": None, "platform_post_url": None, "engagement_stats": {}, "created_at": db_post.created_at.isoformat()} @app.get("/api/posts") def list_posts(status: Optional[str] = None, current_user: User = Depends(get_current_user), db: Session = Depends(get_db)): query = db.query(Post).filter(Post.user_id == current_user.id) if status: query = query.filter(Post.status == status) posts = query.order_by(Post.created_at.desc()).all() return [{"id": p.id, "platform": p.platform, "content": p.content, "media_urls": p.media_urls, "status": p.status, "scheduled_at": p.scheduled_at.isoformat() if p.scheduled_at else None, "published_at": p.published_at.isoformat() if p.published_at else None, "external_post_id": p.external_post_id, "platform_post_url": p.platform_post_url, "engagement_stats": p.engagement_stats, "created_at": p.created_at.isoformat()} for p in posts] @app.delete("/api/posts/{post_id}") def delete_post(post_id: int, current_user: User = Depends(get_current_user), db: Session = Depends(get_db)): post = db.query(Post).filter(Post.id == post_id, Post.user_id == current_user.id).first() if not post: raise HTTPException(status_code=404, detail="Post not found") db.delete(post) db.commit() return {"success": True} # ---- ANALYTICS ---- @app.get("/api/analytics/dashboard") def get_dashboard_analytics(current_user: User = Depends(get_current_user), db: Session = Depends(get_db)): accounts = db.query(SocialAccount).filter(SocialAccount.user_id == current_user.id, SocialAccount.is_active == True).all() total_followers = sum(a.follower_count for a in accounts) total_posts = db.query(Post).filter(Post.user_id == current_user.id, Post.status == "published").count() platforms = [] for account in accounts: snapshots = db.query(AnalyticsSnapshot).filter(AnalyticsSnapshot.account_id == account.id).order_by(AnalyticsSnapshot.snapshot_date.desc()).limit(2).all() current = snapshots[0] if snapshots else None previous = snapshots[1] if len(snapshots) > 1 else None followers = current.followers if current else account.follower_count prev_followers = previous.followers if previous else followers growth = ((followers - prev_followers) / prev_followers * 100) if prev_followers else 0 platforms.append({ "platform": account.platform, "followers": followers, "engagement_rate": 0.0, "impressions": current.impressions if current else 0, "reach": current.reach if current else 0, "posts_count": current.posts_count if current else 0, "growth_rate": round(growth, 2), }) avg_engagement = sum(p.get("engagement_rate", 0) for p in platforms) / len(platforms) if platforms else 0 daily_stats = [] for i in range(30): date = datetime.utcnow() - timedelta(days=i) day_posts = db.query(Post).filter( Post.user_id == current_user.id, Post.status == "published", Post.published_at >= date, Post.published_at < date + timedelta(days=1) ).count() daily_stats.append({"date": date.strftime("%Y-%m-%d"), "posts": day_posts}) return { "total_followers": total_followers, "total_posts": total_posts, "avg_engagement_rate": round(avg_engagement, 2), "platforms": platforms, "daily_stats": list(reversed(daily_stats)), } # ---- LISTENING ---- @app.post("/api/listening/rules") def create_listening_rule(rule_data: dict, current_user: User = Depends(get_current_user), db: Session = Depends(get_db)): db_rule = ListeningRule( user_id=current_user.id, name=rule_data.get('name'), platforms=rule_data.get('platforms', []), keywords=rule_data.get('keywords', []), hashtags=rule_data.get('hashtags', []), competitor_accounts=rule_data.get('competitor_accounts', []), ) db.add(db_rule) db.commit() db.refresh(db_rule) return {"id": db_rule.id, "name": db_rule.name, "platforms": db_rule.platforms, "keywords": db_rule.keywords, "hashtags": db_rule.hashtags, "competitor_accounts": db_rule.competitor_accounts, "is_active": db_rule.is_active, "created_at": db_rule.created_at.isoformat()} @app.get("/api/listening/rules") def list_listening_rules(current_user: User = Depends(get_current_user), db: Session = Depends(get_db)): rules = db.query(ListeningRule).filter(ListeningRule.user_id == current_user.id).all() return [{"id": r.id, "name": r.name, "platforms": r.platforms, "keywords": r.keywords, "hashtags": r.hashtags, "competitor_accounts": r.competitor_accounts, "is_active": r.is_active, "created_at": r.created_at.isoformat()} for r in rules] @app.get("/api/listening/mentions") def get_mentions(current_user: User = Depends(get_current_user), db: Session = Depends(get_db)): mentions = db.query(Mention).join(ListeningRule).filter(ListeningRule.user_id == current_user.id).order_by(Mention.discovered_at.desc()).limit(100).all() return [{"id": m.id, "platform": m.platform, "external_id": m.external_id, "author_name": m.author_name, "author_handle": m.author_handle, "content": m.content, "url": m.url, "sentiment": m.sentiment, "engagement_count": m.engagement_count, "created_at": m.created_at.isoformat(), "discovered_at": m.discovered_at.isoformat()} for m in mentions] # ---- INBOX ---- @app.get("/api/inbox") def get_inbox(platform: Optional[str] = None, current_user: User = Depends(get_current_user), db: Session = Depends(get_db)): accounts = db.query(SocialAccount).filter(SocialAccount.user_id == current_user.id).all() account_ids = [a.id for a in accounts] query = db.query(InboxMessage).filter(InboxMessage.account_id.in_(account_ids)) if platform: query = query.filter(InboxMessage.platform == platform) messages = query.order_by(InboxMessage.created_at.desc()).all() return [{"id": m.id, "platform": m.platform, "sender_name": m.sender_name, "sender_handle": m.sender_handle, "content": m.content, "message_type": m.message_type, "is_read": m.is_read, "created_at": m.created_at.isoformat(), "url": m.url} for m in messages] @app.post("/api/inbox/{message_id}/reply") def reply_to_message(message_id: int, reply_data: dict, current_user: User = Depends(get_current_user), db: Session = Depends(get_db)): message = db.query(InboxMessage).filter(InboxMessage.id == message_id).first() if not message: raise HTTPException(status_code=404, detail="Message not found") message.is_read = True message.replied_at = datetime.utcnow() if reply_data.get('reply_content'): message.reply_content = reply_data.get('reply_content') db.commit() return {"success": True} # ---- CALENDAR ---- @app.get("/api/calendar/events") def get_calendar_events(start: str, end: str, current_user: User = Depends(get_current_user), db: Session = Depends(get_db)): start_dt = datetime.fromisoformat(start.replace('Z', '+00:00')) end_dt = datetime.fromisoformat(end.replace('Z', '+00:00')) posts = db.query(Post).filter( Post.user_id == current_user.id, Post.scheduled_at >= start_dt, Post.scheduled_at <= end_dt, ).all() color_map = {"draft": "#9CA3AF", "scheduled": "#3B82F6", "published": "#10B981", "failed": "#EF4444"} return [{"id": p.id, "title": p.content[:50] + "..." if len(p.content) > 50 else p.content, "platform": p.platform, "status": p.status, "start": (p.scheduled_at or p.created_at).isoformat(), "color": color_map.get(p.status, "#6B7280")} for p in posts] # ---- COMPETITORS ---- @app.get("/api/competitors") def list_competitors(current_user: User = Depends(get_current_user), db: Session = Depends(get_db)): benchmarks = db.query(CompetitorBenchmark).filter(CompetitorBenchmark.user_id == current_user.id).order_by(CompetitorBenchmark.snapshot_date.desc()).all() return [{"id": b.id, "platform": b.platform, "competitor_handle": b.competitor_handle, "follower_count": b.follower_count, "posts_count": b.posts_count, "avg_engagement_rate": b.avg_engagement_rate, "snapshot_date": b.snapshot_date.isoformat()} for b in benchmarks] # ---- DEMO DATA ---- @app.post("/api/demo/seed") def seed_demo_data(db: Session = Depends(get_db)): demo_user = User(email="demo@example.com", hashed_password=pwd_context.hash("demo123"), full_name="Demo User") db.add(demo_user) db.commit() db.refresh(demo_user) accounts = [ SocialAccount(user_id=demo_user.id, platform="twitter", account_name="@demo_brand", account_id="123", follower_count=12500, is_active=True), SocialAccount(user_id=demo_user.id, platform="instagram", account_name="@demo_brand", account_id="456", follower_count=34200, is_active=True), SocialAccount(user_id=demo_user.id, platform="facebook", account_name="Demo Brand", account_id="789", follower_count=8900, is_active=True), ] db.add_all(accounts) db.commit() posts = [ Post(user_id=demo_user.id, account_id=accounts[0].id, platform="twitter", content="Excited to launch our new product line! 🚀 #innovation", status="published", published_at=datetime.utcnow() - timedelta(days=1)), Post(user_id=demo_user.id, account_id=accounts[1].id, platform="instagram", content="Behind the scenes of our latest campaign 📸", status="published", published_at=datetime.utcnow() - timedelta(days=2)), Post(user_id=demo_user.id, account_id=accounts[2].id, platform="facebook", content="Join us for our live event next week!", status="scheduled", scheduled_at=datetime.utcnow() + timedelta(days=3)), ] db.add_all(posts) db.commit() rule = ListeningRule(user_id=demo_user.id, name="Brand Monitor", platforms=["twitter", "instagram"], keywords=["demo brand"], hashtags=["demobrand"]) db.add(rule) db.commit() mentions = [ Mention(rule_id=rule.id, platform="twitter", external_id="m1", author_name="John Doe", author_handle="johndoe", content="Just tried @demo_brand's new product - amazing quality!", url="https://twitter.com/i/web/status/m1", sentiment="positive", engagement_count=45), Mention(rule_id=rule.id, platform="twitter", external_id="m2", author_name="Jane Smith", author_handle="janesmith", content="Anyone else having issues with #demobrand app?", url="https://twitter.com/i/web/status/m2", sentiment="negative", engagement_count=12), Mention(rule_id=rule.id, platform="instagram", external_id="m3", author_name="Mike Johnson", author_handle="mikej", content="Loving the new collection from @demo_brand 🔥", url="https://instagram.com/p/m3", sentiment="positive", engagement_count=230), ] db.add_all(mentions) db.commit() messages = [ InboxMessage(account_id=accounts[0].id, platform="twitter", external_id="msg1", sender_name="Customer Support", sender_handle="support_user", content="When will the new feature be available?", message_type="comment"), InboxMessage(account_id=accounts[1].id, platform="instagram", external_id="msg2", sender_name="Sarah Lee", sender_handle="sarahlee", content="Love your recent posts! Can we collaborate?", message_type="dm"), ] db.add_all(messages) db.commit() return {"success": True, "message": "Demo data seeded. Login with demo@example.com / demo123"} # ---- SERVE FRONTEND ---- STATIC_DIR = "/app/frontend/dist" if os.path.exists(STATIC_DIR): app.mount("/", StaticFiles(directory=STATIC_DIR, html=True), name="static") @app.get("/{path:path}") async def serve_spa(path: str): index_file = os.path.join(STATIC_DIR, "index.html") if os.path.exists(index_file): return FileResponse(index_file) raise HTTPException(status_code=404) if __name__ == "__main__": import uvicorn port = int(os.getenv("PORT", 7860)) uvicorn.run(app, host="0.0.0.0", port=port)