Spaces:
Configuration error
Configuration error
| """ | |
| Unified app for HF Spaces deployment. | |
| Serves FastAPI backend + static Next.js frontend from a single process. | |
| Uses SQLite + APScheduler instead of PostgreSQL + Celery/Redis. | |
| """ | |
| import os | |
| import json | |
| from datetime import datetime, timedelta | |
| from contextlib import asynccontextmanager | |
| from typing import List, Optional | |
| from fastapi import FastAPI, Depends, HTTPException, status, BackgroundTasks | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from fastapi.staticfiles import StaticFiles | |
| from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials | |
| from fastapi.responses import FileResponse | |
| from sqlalchemy import create_engine, Column, Integer, String, DateTime, Boolean, Text, JSON, ForeignKey, Float | |
| from sqlalchemy.ext.declarative import declarative_base | |
| from sqlalchemy.orm import sessionmaker, Session, relationship | |
| import jwt | |
| from passlib.context import CryptContext | |
| from apscheduler.schedulers.asyncio import AsyncIOScheduler | |
| import httpx | |
| # ---- CONFIG ---- | |
| DATABASE_URL = os.getenv("DATABASE_URL", "sqlite:////app/data/dashboard.db") | |
| SECRET_KEY = os.getenv("SECRET_KEY", "hf-spaces-secret-key-change-me") | |
| REDIRECT_BASE = os.getenv("OAUTH_REDIRECT_BASE", "https://huggingface.co/spaces/") | |
| # OAuth credentials from env | |
| FACEBOOK_APP_ID = os.getenv("FACEBOOK_APP_ID", "") | |
| FACEBOOK_APP_SECRET = os.getenv("FACEBOOK_APP_SECRET", "") | |
| TWITTER_API_KEY = os.getenv("TWITTER_API_KEY", "") | |
| TWITTER_API_SECRET = os.getenv("TWITTER_API_SECRET", "") | |
| INSTAGRAM_APP_ID = os.getenv("INSTAGRAM_APP_ID", "") | |
| INSTAGRAM_APP_SECRET = os.getenv("INSTAGRAM_APP_SECRET", "") | |
| TIKTOK_CLIENT_KEY = os.getenv("TIKTOK_CLIENT_KEY", "") | |
| TIKTOK_CLIENT_SECRET = os.getenv("TIKTOK_CLIENT_SECRET", "") | |
| # ---- DATABASE ---- | |
| engine = create_engine(DATABASE_URL, connect_args={"check_same_thread": False} if "sqlite" in DATABASE_URL else {}) | |
| SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine) | |
| Base = declarative_base() | |
| class User(Base): | |
| __tablename__ = "users" | |
| id = Column(Integer, primary_key=True, index=True) | |
| email = Column(String, unique=True, index=True) | |
| hashed_password = Column(String) | |
| full_name = Column(String) | |
| is_active = Column(Boolean, default=True) | |
| created_at = Column(DateTime, default=datetime.utcnow) | |
| social_accounts = relationship("SocialAccount", back_populates="user", cascade="all, delete-orphan") | |
| posts = relationship("Post", back_populates="user", cascade="all, delete-orphan") | |
| listening_rules = relationship("ListeningRule", back_populates="user", cascade="all, delete-orphan") | |
| class SocialAccount(Base): | |
| __tablename__ = "social_accounts" | |
| id = Column(Integer, primary_key=True, index=True) | |
| user_id = Column(Integer, ForeignKey("users.id")) | |
| platform = Column(String) | |
| account_name = Column(String) | |
| account_id = Column(String) | |
| access_token = Column(Text) | |
| refresh_token = Column(Text) | |
| token_expires_at = Column(DateTime, nullable=True) | |
| profile_picture = Column(String, nullable=True) | |
| follower_count = Column(Integer, default=0) | |
| is_active = Column(Boolean, default=True) | |
| created_at = Column(DateTime, default=datetime.utcnow) | |
| updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow) | |
| user = relationship("User", back_populates="social_accounts") | |
| posts = relationship("Post", back_populates="account") | |
| analytics = relationship("AnalyticsSnapshot", back_populates="account") | |
| class Post(Base): | |
| __tablename__ = "posts" | |
| id = Column(Integer, primary_key=True, index=True) | |
| user_id = Column(Integer, ForeignKey("users.id")) | |
| account_id = Column(Integer, ForeignKey("social_accounts.id")) | |
| platform = Column(String) | |
| content = Column(Text) | |
| media_urls = Column(JSON, default=list) | |
| status = Column(String, default="draft") | |
| scheduled_at = Column(DateTime, nullable=True) | |
| published_at = Column(DateTime, nullable=True) | |
| external_post_id = Column(String, nullable=True) | |
| platform_post_url = Column(String, nullable=True) | |
| engagement_stats = Column(JSON, default=dict) | |
| created_at = Column(DateTime, default=datetime.utcnow) | |
| user = relationship("User", back_populates="posts") | |
| account = relationship("SocialAccount", back_populates="posts") | |
| class AnalyticsSnapshot(Base): | |
| __tablename__ = "analytics_snapshots" | |
| id = Column(Integer, primary_key=True, index=True) | |
| account_id = Column(Integer, ForeignKey("social_accounts.id")) | |
| snapshot_date = Column(DateTime, default=datetime.utcnow) | |
| followers = Column(Integer, default=0) | |
| following = Column(Integer, default=0) | |
| posts_count = Column(Integer, default=0) | |
| impressions = Column(Integer, default=0) | |
| reach = Column(Integer, default=0) | |
| engagement = Column(Integer, default=0) | |
| profile_views = Column(Integer, default=0) | |
| account = relationship("SocialAccount", back_populates="analytics") | |
| class ListeningRule(Base): | |
| __tablename__ = "listening_rules" | |
| id = Column(Integer, primary_key=True, index=True) | |
| user_id = Column(Integer, ForeignKey("users.id")) | |
| name = Column(String) | |
| platforms = Column(JSON, default=list) | |
| keywords = Column(JSON, default=list) | |
| hashtags = Column(JSON, default=list) | |
| competitor_accounts = Column(JSON, default=list) | |
| is_active = Column(Boolean, default=True) | |
| created_at = Column(DateTime, default=datetime.utcnow) | |
| user = relationship("User", back_populates="listening_rules") | |
| mentions = relationship("Mention", back_populates="rule") | |
| class Mention(Base): | |
| __tablename__ = "mentions" | |
| id = Column(Integer, primary_key=True, index=True) | |
| rule_id = Column(Integer, ForeignKey("listening_rules.id")) | |
| platform = Column(String) | |
| external_id = Column(String) | |
| author_name = Column(String) | |
| author_handle = Column(String) | |
| content = Column(Text) | |
| url = Column(String) | |
| sentiment = Column(String, nullable=True) | |
| engagement_count = Column(Integer, default=0) | |
| created_at = Column(DateTime, default=datetime.utcnow) | |
| discovered_at = Column(DateTime, default=datetime.utcnow) | |
| rule = relationship("ListeningRule", back_populates="mentions") | |
| class InboxMessage(Base): | |
| __tablename__ = "inbox_messages" | |
| id = Column(Integer, primary_key=True, index=True) | |
| account_id = Column(Integer, ForeignKey("social_accounts.id")) | |
| platform = Column(String) | |
| external_id = Column(String) | |
| sender_name = Column(String) | |
| sender_handle = Column(String) | |
| content = Column(Text) | |
| message_type = Column(String) | |
| is_read = Column(Boolean, default=False) | |
| replied_at = Column(DateTime, nullable=True) | |
| reply_content = Column(Text, nullable=True) | |
| url = Column(String, nullable=True) | |
| created_at = Column(DateTime, default=datetime.utcnow) | |
| class CompetitorBenchmark(Base): | |
| __tablename__ = "competitor_benchmarks" | |
| id = Column(Integer, primary_key=True, index=True) | |
| user_id = Column(Integer, ForeignKey("users.id")) | |
| platform = Column(String) | |
| competitor_handle = Column(String) | |
| follower_count = Column(Integer, default=0) | |
| posts_count = Column(Integer, default=0) | |
| avg_engagement_rate = Column(Float, default=0.0) | |
| snapshot_date = Column(DateTime, default=datetime.utcnow) | |
| Base.metadata.create_all(bind=engine) | |
| # ---- UTILS ---- | |
| pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") | |
| security = HTTPBearer(auto_error=False) | |
| scheduler = AsyncIOScheduler() | |
| def get_db(): | |
| db = SessionLocal() | |
| try: | |
| yield db | |
| finally: | |
| db.close() | |
| def create_access_token(data: dict, expires_delta: Optional[timedelta] = None): | |
| to_encode = data.copy() | |
| expire = datetime.utcnow() + (expires_delta or timedelta(minutes=60)) | |
| to_encode.update({"exp": expire}) | |
| return jwt.encode(to_encode, SECRET_KEY, algorithm="HS256") | |
| def get_current_user(credentials: HTTPAuthorizationCredentials = Depends(security), db: Session = Depends(get_db)): | |
| if not credentials: | |
| raise HTTPException(status_code=401, detail="Authentication required") | |
| try: | |
| payload = jwt.decode(credentials.credentials, SECRET_KEY, algorithms=["HS256"]) | |
| user_id = payload.get("sub") | |
| if user_id is None: | |
| raise HTTPException(status_code=401, detail="Invalid token") | |
| user = db.query(User).filter(User.id == int(user_id)).first() | |
| if not user: | |
| raise HTTPException(status_code=401, detail="User not found") | |
| return user | |
| except jwt.PyJWTError: | |
| raise HTTPException(status_code=401, detail="Invalid token") | |
| # ---- OAUTH SERVICE ---- | |
| class OAuthService: | |
| def __init__(self): | |
| self.providers = { | |
| 'facebook': { | |
| 'auth_url': 'https://www.facebook.com/v18.0/dialog/oauth', | |
| 'token_url': 'https://graph.facebook.com/v18.0/oauth/access_token', | |
| 'client_id': FACEBOOK_APP_ID, | |
| 'client_secret': FACEBOOK_APP_SECRET, | |
| 'scope': 'pages_manage_posts,pages_read_engagement,pages_manage_metadata,instagram_basic,instagram_content_publish', | |
| 'redirect_uri': f"{REDIRECT_BASE}/auth/facebook/callback", | |
| }, | |
| 'twitter': { | |
| 'auth_url': 'https://twitter.com/i/oauth2/authorize', | |
| 'token_url': 'https://api.twitter.com/2/oauth2/token', | |
| 'client_id': TWITTER_API_KEY, | |
| 'client_secret': TWITTER_API_SECRET, | |
| 'scope': 'tweet.read tweet.write users.read offline.access', | |
| 'redirect_uri': f"{REDIRECT_BASE}/auth/twitter/callback", | |
| }, | |
| 'instagram': { | |
| 'auth_url': 'https://www.facebook.com/v18.0/dialog/oauth', | |
| 'token_url': 'https://graph.facebook.com/v18.0/oauth/access_token', | |
| 'client_id': INSTAGRAM_APP_ID, | |
| 'client_secret': INSTAGRAM_APP_SECRET, | |
| 'scope': 'instagram_basic,instagram_content_publish', | |
| 'redirect_uri': f"{REDIRECT_BASE}/auth/instagram/callback", | |
| }, | |
| 'tiktok': { | |
| 'auth_url': 'https://www.tiktok.com/v2/auth/authorize', | |
| 'token_url': 'https://open.tiktokapis.com/v2/oauth/token/', | |
| 'client_id': TIKTOK_CLIENT_KEY, | |
| 'client_secret': TIKTOK_CLIENT_SECRET, | |
| 'scope': 'video.publish,user.info.basic', | |
| 'redirect_uri': f"{REDIRECT_BASE}/auth/tiktok/callback", | |
| } | |
| } | |
| def get_auth_url(self, platform: str, state: str): | |
| provider = self.providers.get(platform) | |
| if not provider: | |
| raise ValueError(f"Unsupported platform: {platform}") | |
| params = { | |
| 'client_id': provider['client_id'], | |
| 'redirect_uri': provider['redirect_uri'], | |
| 'scope': provider['scope'], | |
| 'response_type': 'code', | |
| 'state': state, | |
| } | |
| if platform == 'twitter': | |
| params['code_challenge'] = 'challenge' | |
| params['code_challenge_method'] = 'plain' | |
| query = '&'.join([f"{k}={v}" for k, v in params.items()]) | |
| return f"{provider['auth_url']}?{query}" | |
| oauth_service = OAuthService() | |
| # ---- SCHEDULER TASKS ---- | |
| async def publish_scheduled_posts(): | |
| db = SessionLocal() | |
| try: | |
| now = datetime.utcnow() | |
| posts = db.query(Post).filter(Post.status == 'scheduled', Post.scheduled_at <= now).all() | |
| for post in posts: | |
| post.status = 'published' | |
| post.published_at = now | |
| db.commit() | |
| finally: | |
| db.close() | |
| async def collect_analytics(): | |
| db = SessionLocal() | |
| try: | |
| accounts = db.query(SocialAccount).filter(SocialAccount.is_active == True).all() | |
| for account in accounts: | |
| snapshot = AnalyticsSnapshot(account_id=account.id, followers=account.follower_count) | |
| db.add(snapshot) | |
| db.commit() | |
| finally: | |
| db.close() | |
| # ---- LIFESPAN ---- | |
| async def lifespan(app: FastAPI): | |
| scheduler.add_job(publish_scheduled_posts, 'interval', minutes=1, id='publish_posts') | |
| scheduler.add_job(collect_analytics, 'interval', hours=1, id='collect_analytics') | |
| scheduler.start() | |
| yield | |
| scheduler.shutdown() | |
| # ---- APP ---- | |
| app = FastAPI(title="Social Dashboard", version="0.1.0", lifespan=lifespan) | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| # ---- AUTH ---- | |
| def register(user_data: dict, db: Session = Depends(get_db)): | |
| if db.query(User).filter(User.email == user_data.get('email')).first(): | |
| raise HTTPException(status_code=400, detail="Email already registered") | |
| db_user = User( | |
| email=user_data.get('email'), | |
| hashed_password=pwd_context.hash(user_data.get('password')), | |
| full_name=user_data.get('full_name', '') | |
| ) | |
| db.add(db_user) | |
| db.commit() | |
| db.refresh(db_user) | |
| return {"id": db_user.id, "email": db_user.email, "full_name": db_user.full_name, "is_active": db_user.is_active, "created_at": db_user.created_at.isoformat()} | |
| def login(user_data: dict, db: Session = Depends(get_db)): | |
| db_user = db.query(User).filter(User.email == user_data.get('email')).first() | |
| if not db_user or not pwd_context.verify(user_data.get('password'), db_user.hashed_password): | |
| raise HTTPException(status_code=401, detail="Invalid credentials") | |
| token = create_access_token({"sub": str(db_user.id)}) | |
| return {"access_token": token, "token_type": "bearer", "user": {"id": db_user.id, "email": db_user.email, "full_name": db_user.full_name, "is_active": db_user.is_active, "created_at": db_user.created_at.isoformat()}} | |
| def get_auth_url(platform: str): | |
| state = f"user_temp_{platform}_{datetime.utcnow().timestamp()}" | |
| return {"auth_url": oauth_service.get_auth_url(platform, state), "state": state} | |
| async def oauth_callback(platform: str, code: str, state: str, db: Session = Depends(get_db)): | |
| return {"success": True, "message": "Connect via official OAuth. Set your app credentials in Settings."} | |
| # ---- ACCOUNTS ---- | |
| def list_accounts(current_user: User = Depends(get_current_user), db: Session = Depends(get_db)): | |
| accounts = db.query(SocialAccount).filter(SocialAccount.user_id == current_user.id).all() | |
| return [{"id": a.id, "platform": a.platform, "account_name": a.account_name, "account_id": a.account_id, | |
| "profile_picture": a.profile_picture, "follower_count": a.follower_count, | |
| "is_active": a.is_active, "created_at": a.created_at.isoformat()} for a in accounts] | |
| def delete_account(account_id: int, current_user: User = Depends(get_current_user), db: Session = Depends(get_db)): | |
| account = db.query(SocialAccount).filter(SocialAccount.id == account_id, SocialAccount.user_id == current_user.id).first() | |
| if not account: | |
| raise HTTPException(status_code=404, detail="Account not found") | |
| db.delete(account) | |
| db.commit() | |
| return {"success": True} | |
| # ---- POSTS ---- | |
| def create_post(post_data: dict, current_user: User = Depends(get_current_user), db: Session = Depends(get_db)): | |
| db_post = Post( | |
| user_id=current_user.id, | |
| account_id=post_data.get('account_id'), | |
| platform=post_data.get('platform'), | |
| content=post_data.get('content'), | |
| media_urls=post_data.get('media_urls', []), | |
| scheduled_at=datetime.fromisoformat(post_data.get('scheduled_at').replace('Z', '+00:00')) if post_data.get('scheduled_at') else None, | |
| ) | |
| if db_post.scheduled_at and db_post.scheduled_at > datetime.utcnow(): | |
| db_post.status = 'scheduled' | |
| db.add(db_post) | |
| db.commit() | |
| db.refresh(db_post) | |
| return {"id": db_post.id, "platform": db_post.platform, "content": db_post.content, | |
| "media_urls": db_post.media_urls, "status": db_post.status, | |
| "scheduled_at": db_post.scheduled_at.isoformat() if db_post.scheduled_at else None, | |
| "published_at": None, "external_post_id": None, "platform_post_url": None, | |
| "engagement_stats": {}, "created_at": db_post.created_at.isoformat()} | |
| def list_posts(status: Optional[str] = None, current_user: User = Depends(get_current_user), db: Session = Depends(get_db)): | |
| query = db.query(Post).filter(Post.user_id == current_user.id) | |
| if status: | |
| query = query.filter(Post.status == status) | |
| posts = query.order_by(Post.created_at.desc()).all() | |
| return [{"id": p.id, "platform": p.platform, "content": p.content, "media_urls": p.media_urls, | |
| "status": p.status, "scheduled_at": p.scheduled_at.isoformat() if p.scheduled_at else None, | |
| "published_at": p.published_at.isoformat() if p.published_at else None, | |
| "external_post_id": p.external_post_id, "platform_post_url": p.platform_post_url, | |
| "engagement_stats": p.engagement_stats, "created_at": p.created_at.isoformat()} for p in posts] | |
| def delete_post(post_id: int, current_user: User = Depends(get_current_user), db: Session = Depends(get_db)): | |
| post = db.query(Post).filter(Post.id == post_id, Post.user_id == current_user.id).first() | |
| if not post: | |
| raise HTTPException(status_code=404, detail="Post not found") | |
| db.delete(post) | |
| db.commit() | |
| return {"success": True} | |
| # ---- ANALYTICS ---- | |
| def get_dashboard_analytics(current_user: User = Depends(get_current_user), db: Session = Depends(get_db)): | |
| accounts = db.query(SocialAccount).filter(SocialAccount.user_id == current_user.id, SocialAccount.is_active == True).all() | |
| total_followers = sum(a.follower_count for a in accounts) | |
| total_posts = db.query(Post).filter(Post.user_id == current_user.id, Post.status == "published").count() | |
| platforms = [] | |
| for account in accounts: | |
| snapshots = db.query(AnalyticsSnapshot).filter(AnalyticsSnapshot.account_id == account.id).order_by(AnalyticsSnapshot.snapshot_date.desc()).limit(2).all() | |
| current = snapshots[0] if snapshots else None | |
| previous = snapshots[1] if len(snapshots) > 1 else None | |
| followers = current.followers if current else account.follower_count | |
| prev_followers = previous.followers if previous else followers | |
| growth = ((followers - prev_followers) / prev_followers * 100) if prev_followers else 0 | |
| platforms.append({ | |
| "platform": account.platform, | |
| "followers": followers, | |
| "engagement_rate": 0.0, | |
| "impressions": current.impressions if current else 0, | |
| "reach": current.reach if current else 0, | |
| "posts_count": current.posts_count if current else 0, | |
| "growth_rate": round(growth, 2), | |
| }) | |
| avg_engagement = sum(p.get("engagement_rate", 0) for p in platforms) / len(platforms) if platforms else 0 | |
| daily_stats = [] | |
| for i in range(30): | |
| date = datetime.utcnow() - timedelta(days=i) | |
| day_posts = db.query(Post).filter( | |
| Post.user_id == current_user.id, | |
| Post.status == "published", | |
| Post.published_at >= date, | |
| Post.published_at < date + timedelta(days=1) | |
| ).count() | |
| daily_stats.append({"date": date.strftime("%Y-%m-%d"), "posts": day_posts}) | |
| return { | |
| "total_followers": total_followers, | |
| "total_posts": total_posts, | |
| "avg_engagement_rate": round(avg_engagement, 2), | |
| "platforms": platforms, | |
| "daily_stats": list(reversed(daily_stats)), | |
| } | |
| # ---- LISTENING ---- | |
| def create_listening_rule(rule_data: dict, current_user: User = Depends(get_current_user), db: Session = Depends(get_db)): | |
| db_rule = ListeningRule( | |
| user_id=current_user.id, | |
| name=rule_data.get('name'), | |
| platforms=rule_data.get('platforms', []), | |
| keywords=rule_data.get('keywords', []), | |
| hashtags=rule_data.get('hashtags', []), | |
| competitor_accounts=rule_data.get('competitor_accounts', []), | |
| ) | |
| db.add(db_rule) | |
| db.commit() | |
| db.refresh(db_rule) | |
| return {"id": db_rule.id, "name": db_rule.name, "platforms": db_rule.platforms, | |
| "keywords": db_rule.keywords, "hashtags": db_rule.hashtags, | |
| "competitor_accounts": db_rule.competitor_accounts, | |
| "is_active": db_rule.is_active, "created_at": db_rule.created_at.isoformat()} | |
| def list_listening_rules(current_user: User = Depends(get_current_user), db: Session = Depends(get_db)): | |
| rules = db.query(ListeningRule).filter(ListeningRule.user_id == current_user.id).all() | |
| return [{"id": r.id, "name": r.name, "platforms": r.platforms, "keywords": r.keywords, | |
| "hashtags": r.hashtags, "competitor_accounts": r.competitor_accounts, | |
| "is_active": r.is_active, "created_at": r.created_at.isoformat()} for r in rules] | |
| def get_mentions(current_user: User = Depends(get_current_user), db: Session = Depends(get_db)): | |
| mentions = db.query(Mention).join(ListeningRule).filter(ListeningRule.user_id == current_user.id).order_by(Mention.discovered_at.desc()).limit(100).all() | |
| return [{"id": m.id, "platform": m.platform, "external_id": m.external_id, | |
| "author_name": m.author_name, "author_handle": m.author_handle, | |
| "content": m.content, "url": m.url, "sentiment": m.sentiment, | |
| "engagement_count": m.engagement_count, | |
| "created_at": m.created_at.isoformat(), "discovered_at": m.discovered_at.isoformat()} for m in mentions] | |
| # ---- INBOX ---- | |
| def get_inbox(platform: Optional[str] = None, current_user: User = Depends(get_current_user), db: Session = Depends(get_db)): | |
| accounts = db.query(SocialAccount).filter(SocialAccount.user_id == current_user.id).all() | |
| account_ids = [a.id for a in accounts] | |
| query = db.query(InboxMessage).filter(InboxMessage.account_id.in_(account_ids)) | |
| if platform: | |
| query = query.filter(InboxMessage.platform == platform) | |
| messages = query.order_by(InboxMessage.created_at.desc()).all() | |
| return [{"id": m.id, "platform": m.platform, "sender_name": m.sender_name, | |
| "sender_handle": m.sender_handle, "content": m.content, | |
| "message_type": m.message_type, "is_read": m.is_read, | |
| "created_at": m.created_at.isoformat(), "url": m.url} for m in messages] | |
| def reply_to_message(message_id: int, reply_data: dict, current_user: User = Depends(get_current_user), db: Session = Depends(get_db)): | |
| message = db.query(InboxMessage).filter(InboxMessage.id == message_id).first() | |
| if not message: | |
| raise HTTPException(status_code=404, detail="Message not found") | |
| message.is_read = True | |
| message.replied_at = datetime.utcnow() | |
| if reply_data.get('reply_content'): | |
| message.reply_content = reply_data.get('reply_content') | |
| db.commit() | |
| return {"success": True} | |
| # ---- CALENDAR ---- | |
| def get_calendar_events(start: str, end: str, current_user: User = Depends(get_current_user), db: Session = Depends(get_db)): | |
| start_dt = datetime.fromisoformat(start.replace('Z', '+00:00')) | |
| end_dt = datetime.fromisoformat(end.replace('Z', '+00:00')) | |
| posts = db.query(Post).filter( | |
| Post.user_id == current_user.id, | |
| Post.scheduled_at >= start_dt, | |
| Post.scheduled_at <= end_dt, | |
| ).all() | |
| color_map = {"draft": "#9CA3AF", "scheduled": "#3B82F6", "published": "#10B981", "failed": "#EF4444"} | |
| return [{"id": p.id, "title": p.content[:50] + "..." if len(p.content) > 50 else p.content, | |
| "platform": p.platform, "status": p.status, | |
| "start": (p.scheduled_at or p.created_at).isoformat(), | |
| "color": color_map.get(p.status, "#6B7280")} for p in posts] | |
| # ---- COMPETITORS ---- | |
| def list_competitors(current_user: User = Depends(get_current_user), db: Session = Depends(get_db)): | |
| benchmarks = db.query(CompetitorBenchmark).filter(CompetitorBenchmark.user_id == current_user.id).order_by(CompetitorBenchmark.snapshot_date.desc()).all() | |
| return [{"id": b.id, "platform": b.platform, "competitor_handle": b.competitor_handle, | |
| "follower_count": b.follower_count, "posts_count": b.posts_count, | |
| "avg_engagement_rate": b.avg_engagement_rate, | |
| "snapshot_date": b.snapshot_date.isoformat()} for b in benchmarks] | |
| # ---- DEMO DATA ---- | |
| def seed_demo_data(db: Session = Depends(get_db)): | |
| demo_user = User(email="demo@example.com", hashed_password=pwd_context.hash("demo123"), full_name="Demo User") | |
| db.add(demo_user) | |
| db.commit() | |
| db.refresh(demo_user) | |
| accounts = [ | |
| SocialAccount(user_id=demo_user.id, platform="twitter", account_name="@demo_brand", account_id="123", follower_count=12500, is_active=True), | |
| SocialAccount(user_id=demo_user.id, platform="instagram", account_name="@demo_brand", account_id="456", follower_count=34200, is_active=True), | |
| SocialAccount(user_id=demo_user.id, platform="facebook", account_name="Demo Brand", account_id="789", follower_count=8900, is_active=True), | |
| ] | |
| db.add_all(accounts) | |
| db.commit() | |
| posts = [ | |
| Post(user_id=demo_user.id, account_id=accounts[0].id, platform="twitter", content="Excited to launch our new product line! 🚀 #innovation", status="published", published_at=datetime.utcnow() - timedelta(days=1)), | |
| Post(user_id=demo_user.id, account_id=accounts[1].id, platform="instagram", content="Behind the scenes of our latest campaign 📸", status="published", published_at=datetime.utcnow() - timedelta(days=2)), | |
| Post(user_id=demo_user.id, account_id=accounts[2].id, platform="facebook", content="Join us for our live event next week!", status="scheduled", scheduled_at=datetime.utcnow() + timedelta(days=3)), | |
| ] | |
| db.add_all(posts) | |
| db.commit() | |
| rule = ListeningRule(user_id=demo_user.id, name="Brand Monitor", platforms=["twitter", "instagram"], keywords=["demo brand"], hashtags=["demobrand"]) | |
| db.add(rule) | |
| db.commit() | |
| mentions = [ | |
| Mention(rule_id=rule.id, platform="twitter", external_id="m1", author_name="John Doe", author_handle="johndoe", content="Just tried @demo_brand's new product - amazing quality!", url="https://twitter.com/i/web/status/m1", sentiment="positive", engagement_count=45), | |
| Mention(rule_id=rule.id, platform="twitter", external_id="m2", author_name="Jane Smith", author_handle="janesmith", content="Anyone else having issues with #demobrand app?", url="https://twitter.com/i/web/status/m2", sentiment="negative", engagement_count=12), | |
| Mention(rule_id=rule.id, platform="instagram", external_id="m3", author_name="Mike Johnson", author_handle="mikej", content="Loving the new collection from @demo_brand 🔥", url="https://instagram.com/p/m3", sentiment="positive", engagement_count=230), | |
| ] | |
| db.add_all(mentions) | |
| db.commit() | |
| messages = [ | |
| InboxMessage(account_id=accounts[0].id, platform="twitter", external_id="msg1", sender_name="Customer Support", sender_handle="support_user", content="When will the new feature be available?", message_type="comment"), | |
| InboxMessage(account_id=accounts[1].id, platform="instagram", external_id="msg2", sender_name="Sarah Lee", sender_handle="sarahlee", content="Love your recent posts! Can we collaborate?", message_type="dm"), | |
| ] | |
| db.add_all(messages) | |
| db.commit() | |
| return {"success": True, "message": "Demo data seeded. Login with demo@example.com / demo123"} | |
| # ---- SERVE FRONTEND ---- | |
| STATIC_DIR = "/app/frontend/dist" | |
| if os.path.exists(STATIC_DIR): | |
| app.mount("/", StaticFiles(directory=STATIC_DIR, html=True), name="static") | |
| async def serve_spa(path: str): | |
| index_file = os.path.join(STATIC_DIR, "index.html") | |
| if os.path.exists(index_file): | |
| return FileResponse(index_file) | |
| raise HTTPException(status_code=404) | |
| if __name__ == "__main__": | |
| import uvicorn | |
| port = int(os.getenv("PORT", 7860)) | |
| uvicorn.run(app, host="0.0.0.0", port=port) | |