MandyDeep's picture
Upload folder using huggingface_hub
da0a1f7 verified
"""
Unified app for HF Spaces deployment.
Serves FastAPI backend + static Next.js frontend from a single process.
Uses SQLite + APScheduler instead of PostgreSQL + Celery/Redis.
"""
import os
import json
from datetime import datetime, timedelta
from contextlib import asynccontextmanager
from typing import List, Optional
from fastapi import FastAPI, Depends, HTTPException, status, BackgroundTasks
from fastapi.middleware.cors import CORSMiddleware
from fastapi.staticfiles import StaticFiles
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from fastapi.responses import FileResponse
from sqlalchemy import create_engine, Column, Integer, String, DateTime, Boolean, Text, JSON, ForeignKey, Float
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, Session, relationship
import jwt
from passlib.context import CryptContext
from apscheduler.schedulers.asyncio import AsyncIOScheduler
import httpx
# ---- CONFIG ----
DATABASE_URL = os.getenv("DATABASE_URL", "sqlite:////app/data/dashboard.db")
SECRET_KEY = os.getenv("SECRET_KEY", "hf-spaces-secret-key-change-me")
REDIRECT_BASE = os.getenv("OAUTH_REDIRECT_BASE", "https://huggingface.co/spaces/")
# OAuth credentials from env
FACEBOOK_APP_ID = os.getenv("FACEBOOK_APP_ID", "")
FACEBOOK_APP_SECRET = os.getenv("FACEBOOK_APP_SECRET", "")
TWITTER_API_KEY = os.getenv("TWITTER_API_KEY", "")
TWITTER_API_SECRET = os.getenv("TWITTER_API_SECRET", "")
INSTAGRAM_APP_ID = os.getenv("INSTAGRAM_APP_ID", "")
INSTAGRAM_APP_SECRET = os.getenv("INSTAGRAM_APP_SECRET", "")
TIKTOK_CLIENT_KEY = os.getenv("TIKTOK_CLIENT_KEY", "")
TIKTOK_CLIENT_SECRET = os.getenv("TIKTOK_CLIENT_SECRET", "")
# ---- DATABASE ----
engine = create_engine(DATABASE_URL, connect_args={"check_same_thread": False} if "sqlite" in DATABASE_URL else {})
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True, index=True)
email = Column(String, unique=True, index=True)
hashed_password = Column(String)
full_name = Column(String)
is_active = Column(Boolean, default=True)
created_at = Column(DateTime, default=datetime.utcnow)
social_accounts = relationship("SocialAccount", back_populates="user", cascade="all, delete-orphan")
posts = relationship("Post", back_populates="user", cascade="all, delete-orphan")
listening_rules = relationship("ListeningRule", back_populates="user", cascade="all, delete-orphan")
class SocialAccount(Base):
__tablename__ = "social_accounts"
id = Column(Integer, primary_key=True, index=True)
user_id = Column(Integer, ForeignKey("users.id"))
platform = Column(String)
account_name = Column(String)
account_id = Column(String)
access_token = Column(Text)
refresh_token = Column(Text)
token_expires_at = Column(DateTime, nullable=True)
profile_picture = Column(String, nullable=True)
follower_count = Column(Integer, default=0)
is_active = Column(Boolean, default=True)
created_at = Column(DateTime, default=datetime.utcnow)
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
user = relationship("User", back_populates="social_accounts")
posts = relationship("Post", back_populates="account")
analytics = relationship("AnalyticsSnapshot", back_populates="account")
class Post(Base):
__tablename__ = "posts"
id = Column(Integer, primary_key=True, index=True)
user_id = Column(Integer, ForeignKey("users.id"))
account_id = Column(Integer, ForeignKey("social_accounts.id"))
platform = Column(String)
content = Column(Text)
media_urls = Column(JSON, default=list)
status = Column(String, default="draft")
scheduled_at = Column(DateTime, nullable=True)
published_at = Column(DateTime, nullable=True)
external_post_id = Column(String, nullable=True)
platform_post_url = Column(String, nullable=True)
engagement_stats = Column(JSON, default=dict)
created_at = Column(DateTime, default=datetime.utcnow)
user = relationship("User", back_populates="posts")
account = relationship("SocialAccount", back_populates="posts")
class AnalyticsSnapshot(Base):
__tablename__ = "analytics_snapshots"
id = Column(Integer, primary_key=True, index=True)
account_id = Column(Integer, ForeignKey("social_accounts.id"))
snapshot_date = Column(DateTime, default=datetime.utcnow)
followers = Column(Integer, default=0)
following = Column(Integer, default=0)
posts_count = Column(Integer, default=0)
impressions = Column(Integer, default=0)
reach = Column(Integer, default=0)
engagement = Column(Integer, default=0)
profile_views = Column(Integer, default=0)
account = relationship("SocialAccount", back_populates="analytics")
class ListeningRule(Base):
__tablename__ = "listening_rules"
id = Column(Integer, primary_key=True, index=True)
user_id = Column(Integer, ForeignKey("users.id"))
name = Column(String)
platforms = Column(JSON, default=list)
keywords = Column(JSON, default=list)
hashtags = Column(JSON, default=list)
competitor_accounts = Column(JSON, default=list)
is_active = Column(Boolean, default=True)
created_at = Column(DateTime, default=datetime.utcnow)
user = relationship("User", back_populates="listening_rules")
mentions = relationship("Mention", back_populates="rule")
class Mention(Base):
__tablename__ = "mentions"
id = Column(Integer, primary_key=True, index=True)
rule_id = Column(Integer, ForeignKey("listening_rules.id"))
platform = Column(String)
external_id = Column(String)
author_name = Column(String)
author_handle = Column(String)
content = Column(Text)
url = Column(String)
sentiment = Column(String, nullable=True)
engagement_count = Column(Integer, default=0)
created_at = Column(DateTime, default=datetime.utcnow)
discovered_at = Column(DateTime, default=datetime.utcnow)
rule = relationship("ListeningRule", back_populates="mentions")
class InboxMessage(Base):
__tablename__ = "inbox_messages"
id = Column(Integer, primary_key=True, index=True)
account_id = Column(Integer, ForeignKey("social_accounts.id"))
platform = Column(String)
external_id = Column(String)
sender_name = Column(String)
sender_handle = Column(String)
content = Column(Text)
message_type = Column(String)
is_read = Column(Boolean, default=False)
replied_at = Column(DateTime, nullable=True)
reply_content = Column(Text, nullable=True)
url = Column(String, nullable=True)
created_at = Column(DateTime, default=datetime.utcnow)
class CompetitorBenchmark(Base):
__tablename__ = "competitor_benchmarks"
id = Column(Integer, primary_key=True, index=True)
user_id = Column(Integer, ForeignKey("users.id"))
platform = Column(String)
competitor_handle = Column(String)
follower_count = Column(Integer, default=0)
posts_count = Column(Integer, default=0)
avg_engagement_rate = Column(Float, default=0.0)
snapshot_date = Column(DateTime, default=datetime.utcnow)
Base.metadata.create_all(bind=engine)
# ---- UTILS ----
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
security = HTTPBearer(auto_error=False)
scheduler = AsyncIOScheduler()
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
to_encode = data.copy()
expire = datetime.utcnow() + (expires_delta or timedelta(minutes=60))
to_encode.update({"exp": expire})
return jwt.encode(to_encode, SECRET_KEY, algorithm="HS256")
def get_current_user(credentials: HTTPAuthorizationCredentials = Depends(security), db: Session = Depends(get_db)):
if not credentials:
raise HTTPException(status_code=401, detail="Authentication required")
try:
payload = jwt.decode(credentials.credentials, SECRET_KEY, algorithms=["HS256"])
user_id = payload.get("sub")
if user_id is None:
raise HTTPException(status_code=401, detail="Invalid token")
user = db.query(User).filter(User.id == int(user_id)).first()
if not user:
raise HTTPException(status_code=401, detail="User not found")
return user
except jwt.PyJWTError:
raise HTTPException(status_code=401, detail="Invalid token")
# ---- OAUTH SERVICE ----
class OAuthService:
def __init__(self):
self.providers = {
'facebook': {
'auth_url': 'https://www.facebook.com/v18.0/dialog/oauth',
'token_url': 'https://graph.facebook.com/v18.0/oauth/access_token',
'client_id': FACEBOOK_APP_ID,
'client_secret': FACEBOOK_APP_SECRET,
'scope': 'pages_manage_posts,pages_read_engagement,pages_manage_metadata,instagram_basic,instagram_content_publish',
'redirect_uri': f"{REDIRECT_BASE}/auth/facebook/callback",
},
'twitter': {
'auth_url': 'https://twitter.com/i/oauth2/authorize',
'token_url': 'https://api.twitter.com/2/oauth2/token',
'client_id': TWITTER_API_KEY,
'client_secret': TWITTER_API_SECRET,
'scope': 'tweet.read tweet.write users.read offline.access',
'redirect_uri': f"{REDIRECT_BASE}/auth/twitter/callback",
},
'instagram': {
'auth_url': 'https://www.facebook.com/v18.0/dialog/oauth',
'token_url': 'https://graph.facebook.com/v18.0/oauth/access_token',
'client_id': INSTAGRAM_APP_ID,
'client_secret': INSTAGRAM_APP_SECRET,
'scope': 'instagram_basic,instagram_content_publish',
'redirect_uri': f"{REDIRECT_BASE}/auth/instagram/callback",
},
'tiktok': {
'auth_url': 'https://www.tiktok.com/v2/auth/authorize',
'token_url': 'https://open.tiktokapis.com/v2/oauth/token/',
'client_id': TIKTOK_CLIENT_KEY,
'client_secret': TIKTOK_CLIENT_SECRET,
'scope': 'video.publish,user.info.basic',
'redirect_uri': f"{REDIRECT_BASE}/auth/tiktok/callback",
}
}
def get_auth_url(self, platform: str, state: str):
provider = self.providers.get(platform)
if not provider:
raise ValueError(f"Unsupported platform: {platform}")
params = {
'client_id': provider['client_id'],
'redirect_uri': provider['redirect_uri'],
'scope': provider['scope'],
'response_type': 'code',
'state': state,
}
if platform == 'twitter':
params['code_challenge'] = 'challenge'
params['code_challenge_method'] = 'plain'
query = '&'.join([f"{k}={v}" for k, v in params.items()])
return f"{provider['auth_url']}?{query}"
oauth_service = OAuthService()
# ---- SCHEDULER TASKS ----
async def publish_scheduled_posts():
db = SessionLocal()
try:
now = datetime.utcnow()
posts = db.query(Post).filter(Post.status == 'scheduled', Post.scheduled_at <= now).all()
for post in posts:
post.status = 'published'
post.published_at = now
db.commit()
finally:
db.close()
async def collect_analytics():
db = SessionLocal()
try:
accounts = db.query(SocialAccount).filter(SocialAccount.is_active == True).all()
for account in accounts:
snapshot = AnalyticsSnapshot(account_id=account.id, followers=account.follower_count)
db.add(snapshot)
db.commit()
finally:
db.close()
# ---- LIFESPAN ----
@asynccontextmanager
async def lifespan(app: FastAPI):
scheduler.add_job(publish_scheduled_posts, 'interval', minutes=1, id='publish_posts')
scheduler.add_job(collect_analytics, 'interval', hours=1, id='collect_analytics')
scheduler.start()
yield
scheduler.shutdown()
# ---- APP ----
app = FastAPI(title="Social Dashboard", version="0.1.0", lifespan=lifespan)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# ---- AUTH ----
@app.post("/api/auth/register")
def register(user_data: dict, db: Session = Depends(get_db)):
if db.query(User).filter(User.email == user_data.get('email')).first():
raise HTTPException(status_code=400, detail="Email already registered")
db_user = User(
email=user_data.get('email'),
hashed_password=pwd_context.hash(user_data.get('password')),
full_name=user_data.get('full_name', '')
)
db.add(db_user)
db.commit()
db.refresh(db_user)
return {"id": db_user.id, "email": db_user.email, "full_name": db_user.full_name, "is_active": db_user.is_active, "created_at": db_user.created_at.isoformat()}
@app.post("/api/auth/login")
def login(user_data: dict, db: Session = Depends(get_db)):
db_user = db.query(User).filter(User.email == user_data.get('email')).first()
if not db_user or not pwd_context.verify(user_data.get('password'), db_user.hashed_password):
raise HTTPException(status_code=401, detail="Invalid credentials")
token = create_access_token({"sub": str(db_user.id)})
return {"access_token": token, "token_type": "bearer", "user": {"id": db_user.id, "email": db_user.email, "full_name": db_user.full_name, "is_active": db_user.is_active, "created_at": db_user.created_at.isoformat()}}
@app.get("/api/auth/{platform}/url")
def get_auth_url(platform: str):
state = f"user_temp_{platform}_{datetime.utcnow().timestamp()}"
return {"auth_url": oauth_service.get_auth_url(platform, state), "state": state}
@app.get("/api/auth/{platform}/callback")
async def oauth_callback(platform: str, code: str, state: str, db: Session = Depends(get_db)):
return {"success": True, "message": "Connect via official OAuth. Set your app credentials in Settings."}
# ---- ACCOUNTS ----
@app.get("/api/accounts")
def list_accounts(current_user: User = Depends(get_current_user), db: Session = Depends(get_db)):
accounts = db.query(SocialAccount).filter(SocialAccount.user_id == current_user.id).all()
return [{"id": a.id, "platform": a.platform, "account_name": a.account_name, "account_id": a.account_id,
"profile_picture": a.profile_picture, "follower_count": a.follower_count,
"is_active": a.is_active, "created_at": a.created_at.isoformat()} for a in accounts]
@app.delete("/api/accounts/{account_id}")
def delete_account(account_id: int, current_user: User = Depends(get_current_user), db: Session = Depends(get_db)):
account = db.query(SocialAccount).filter(SocialAccount.id == account_id, SocialAccount.user_id == current_user.id).first()
if not account:
raise HTTPException(status_code=404, detail="Account not found")
db.delete(account)
db.commit()
return {"success": True}
# ---- POSTS ----
@app.post("/api/posts")
def create_post(post_data: dict, current_user: User = Depends(get_current_user), db: Session = Depends(get_db)):
db_post = Post(
user_id=current_user.id,
account_id=post_data.get('account_id'),
platform=post_data.get('platform'),
content=post_data.get('content'),
media_urls=post_data.get('media_urls', []),
scheduled_at=datetime.fromisoformat(post_data.get('scheduled_at').replace('Z', '+00:00')) if post_data.get('scheduled_at') else None,
)
if db_post.scheduled_at and db_post.scheduled_at > datetime.utcnow():
db_post.status = 'scheduled'
db.add(db_post)
db.commit()
db.refresh(db_post)
return {"id": db_post.id, "platform": db_post.platform, "content": db_post.content,
"media_urls": db_post.media_urls, "status": db_post.status,
"scheduled_at": db_post.scheduled_at.isoformat() if db_post.scheduled_at else None,
"published_at": None, "external_post_id": None, "platform_post_url": None,
"engagement_stats": {}, "created_at": db_post.created_at.isoformat()}
@app.get("/api/posts")
def list_posts(status: Optional[str] = None, current_user: User = Depends(get_current_user), db: Session = Depends(get_db)):
query = db.query(Post).filter(Post.user_id == current_user.id)
if status:
query = query.filter(Post.status == status)
posts = query.order_by(Post.created_at.desc()).all()
return [{"id": p.id, "platform": p.platform, "content": p.content, "media_urls": p.media_urls,
"status": p.status, "scheduled_at": p.scheduled_at.isoformat() if p.scheduled_at else None,
"published_at": p.published_at.isoformat() if p.published_at else None,
"external_post_id": p.external_post_id, "platform_post_url": p.platform_post_url,
"engagement_stats": p.engagement_stats, "created_at": p.created_at.isoformat()} for p in posts]
@app.delete("/api/posts/{post_id}")
def delete_post(post_id: int, current_user: User = Depends(get_current_user), db: Session = Depends(get_db)):
post = db.query(Post).filter(Post.id == post_id, Post.user_id == current_user.id).first()
if not post:
raise HTTPException(status_code=404, detail="Post not found")
db.delete(post)
db.commit()
return {"success": True}
# ---- ANALYTICS ----
@app.get("/api/analytics/dashboard")
def get_dashboard_analytics(current_user: User = Depends(get_current_user), db: Session = Depends(get_db)):
accounts = db.query(SocialAccount).filter(SocialAccount.user_id == current_user.id, SocialAccount.is_active == True).all()
total_followers = sum(a.follower_count for a in accounts)
total_posts = db.query(Post).filter(Post.user_id == current_user.id, Post.status == "published").count()
platforms = []
for account in accounts:
snapshots = db.query(AnalyticsSnapshot).filter(AnalyticsSnapshot.account_id == account.id).order_by(AnalyticsSnapshot.snapshot_date.desc()).limit(2).all()
current = snapshots[0] if snapshots else None
previous = snapshots[1] if len(snapshots) > 1 else None
followers = current.followers if current else account.follower_count
prev_followers = previous.followers if previous else followers
growth = ((followers - prev_followers) / prev_followers * 100) if prev_followers else 0
platforms.append({
"platform": account.platform,
"followers": followers,
"engagement_rate": 0.0,
"impressions": current.impressions if current else 0,
"reach": current.reach if current else 0,
"posts_count": current.posts_count if current else 0,
"growth_rate": round(growth, 2),
})
avg_engagement = sum(p.get("engagement_rate", 0) for p in platforms) / len(platforms) if platforms else 0
daily_stats = []
for i in range(30):
date = datetime.utcnow() - timedelta(days=i)
day_posts = db.query(Post).filter(
Post.user_id == current_user.id,
Post.status == "published",
Post.published_at >= date,
Post.published_at < date + timedelta(days=1)
).count()
daily_stats.append({"date": date.strftime("%Y-%m-%d"), "posts": day_posts})
return {
"total_followers": total_followers,
"total_posts": total_posts,
"avg_engagement_rate": round(avg_engagement, 2),
"platforms": platforms,
"daily_stats": list(reversed(daily_stats)),
}
# ---- LISTENING ----
@app.post("/api/listening/rules")
def create_listening_rule(rule_data: dict, current_user: User = Depends(get_current_user), db: Session = Depends(get_db)):
db_rule = ListeningRule(
user_id=current_user.id,
name=rule_data.get('name'),
platforms=rule_data.get('platforms', []),
keywords=rule_data.get('keywords', []),
hashtags=rule_data.get('hashtags', []),
competitor_accounts=rule_data.get('competitor_accounts', []),
)
db.add(db_rule)
db.commit()
db.refresh(db_rule)
return {"id": db_rule.id, "name": db_rule.name, "platforms": db_rule.platforms,
"keywords": db_rule.keywords, "hashtags": db_rule.hashtags,
"competitor_accounts": db_rule.competitor_accounts,
"is_active": db_rule.is_active, "created_at": db_rule.created_at.isoformat()}
@app.get("/api/listening/rules")
def list_listening_rules(current_user: User = Depends(get_current_user), db: Session = Depends(get_db)):
rules = db.query(ListeningRule).filter(ListeningRule.user_id == current_user.id).all()
return [{"id": r.id, "name": r.name, "platforms": r.platforms, "keywords": r.keywords,
"hashtags": r.hashtags, "competitor_accounts": r.competitor_accounts,
"is_active": r.is_active, "created_at": r.created_at.isoformat()} for r in rules]
@app.get("/api/listening/mentions")
def get_mentions(current_user: User = Depends(get_current_user), db: Session = Depends(get_db)):
mentions = db.query(Mention).join(ListeningRule).filter(ListeningRule.user_id == current_user.id).order_by(Mention.discovered_at.desc()).limit(100).all()
return [{"id": m.id, "platform": m.platform, "external_id": m.external_id,
"author_name": m.author_name, "author_handle": m.author_handle,
"content": m.content, "url": m.url, "sentiment": m.sentiment,
"engagement_count": m.engagement_count,
"created_at": m.created_at.isoformat(), "discovered_at": m.discovered_at.isoformat()} for m in mentions]
# ---- INBOX ----
@app.get("/api/inbox")
def get_inbox(platform: Optional[str] = None, current_user: User = Depends(get_current_user), db: Session = Depends(get_db)):
accounts = db.query(SocialAccount).filter(SocialAccount.user_id == current_user.id).all()
account_ids = [a.id for a in accounts]
query = db.query(InboxMessage).filter(InboxMessage.account_id.in_(account_ids))
if platform:
query = query.filter(InboxMessage.platform == platform)
messages = query.order_by(InboxMessage.created_at.desc()).all()
return [{"id": m.id, "platform": m.platform, "sender_name": m.sender_name,
"sender_handle": m.sender_handle, "content": m.content,
"message_type": m.message_type, "is_read": m.is_read,
"created_at": m.created_at.isoformat(), "url": m.url} for m in messages]
@app.post("/api/inbox/{message_id}/reply")
def reply_to_message(message_id: int, reply_data: dict, current_user: User = Depends(get_current_user), db: Session = Depends(get_db)):
message = db.query(InboxMessage).filter(InboxMessage.id == message_id).first()
if not message:
raise HTTPException(status_code=404, detail="Message not found")
message.is_read = True
message.replied_at = datetime.utcnow()
if reply_data.get('reply_content'):
message.reply_content = reply_data.get('reply_content')
db.commit()
return {"success": True}
# ---- CALENDAR ----
@app.get("/api/calendar/events")
def get_calendar_events(start: str, end: str, current_user: User = Depends(get_current_user), db: Session = Depends(get_db)):
start_dt = datetime.fromisoformat(start.replace('Z', '+00:00'))
end_dt = datetime.fromisoformat(end.replace('Z', '+00:00'))
posts = db.query(Post).filter(
Post.user_id == current_user.id,
Post.scheduled_at >= start_dt,
Post.scheduled_at <= end_dt,
).all()
color_map = {"draft": "#9CA3AF", "scheduled": "#3B82F6", "published": "#10B981", "failed": "#EF4444"}
return [{"id": p.id, "title": p.content[:50] + "..." if len(p.content) > 50 else p.content,
"platform": p.platform, "status": p.status,
"start": (p.scheduled_at or p.created_at).isoformat(),
"color": color_map.get(p.status, "#6B7280")} for p in posts]
# ---- COMPETITORS ----
@app.get("/api/competitors")
def list_competitors(current_user: User = Depends(get_current_user), db: Session = Depends(get_db)):
benchmarks = db.query(CompetitorBenchmark).filter(CompetitorBenchmark.user_id == current_user.id).order_by(CompetitorBenchmark.snapshot_date.desc()).all()
return [{"id": b.id, "platform": b.platform, "competitor_handle": b.competitor_handle,
"follower_count": b.follower_count, "posts_count": b.posts_count,
"avg_engagement_rate": b.avg_engagement_rate,
"snapshot_date": b.snapshot_date.isoformat()} for b in benchmarks]
# ---- DEMO DATA ----
@app.post("/api/demo/seed")
def seed_demo_data(db: Session = Depends(get_db)):
demo_user = User(email="demo@example.com", hashed_password=pwd_context.hash("demo123"), full_name="Demo User")
db.add(demo_user)
db.commit()
db.refresh(demo_user)
accounts = [
SocialAccount(user_id=demo_user.id, platform="twitter", account_name="@demo_brand", account_id="123", follower_count=12500, is_active=True),
SocialAccount(user_id=demo_user.id, platform="instagram", account_name="@demo_brand", account_id="456", follower_count=34200, is_active=True),
SocialAccount(user_id=demo_user.id, platform="facebook", account_name="Demo Brand", account_id="789", follower_count=8900, is_active=True),
]
db.add_all(accounts)
db.commit()
posts = [
Post(user_id=demo_user.id, account_id=accounts[0].id, platform="twitter", content="Excited to launch our new product line! 🚀 #innovation", status="published", published_at=datetime.utcnow() - timedelta(days=1)),
Post(user_id=demo_user.id, account_id=accounts[1].id, platform="instagram", content="Behind the scenes of our latest campaign 📸", status="published", published_at=datetime.utcnow() - timedelta(days=2)),
Post(user_id=demo_user.id, account_id=accounts[2].id, platform="facebook", content="Join us for our live event next week!", status="scheduled", scheduled_at=datetime.utcnow() + timedelta(days=3)),
]
db.add_all(posts)
db.commit()
rule = ListeningRule(user_id=demo_user.id, name="Brand Monitor", platforms=["twitter", "instagram"], keywords=["demo brand"], hashtags=["demobrand"])
db.add(rule)
db.commit()
mentions = [
Mention(rule_id=rule.id, platform="twitter", external_id="m1", author_name="John Doe", author_handle="johndoe", content="Just tried @demo_brand's new product - amazing quality!", url="https://twitter.com/i/web/status/m1", sentiment="positive", engagement_count=45),
Mention(rule_id=rule.id, platform="twitter", external_id="m2", author_name="Jane Smith", author_handle="janesmith", content="Anyone else having issues with #demobrand app?", url="https://twitter.com/i/web/status/m2", sentiment="negative", engagement_count=12),
Mention(rule_id=rule.id, platform="instagram", external_id="m3", author_name="Mike Johnson", author_handle="mikej", content="Loving the new collection from @demo_brand 🔥", url="https://instagram.com/p/m3", sentiment="positive", engagement_count=230),
]
db.add_all(mentions)
db.commit()
messages = [
InboxMessage(account_id=accounts[0].id, platform="twitter", external_id="msg1", sender_name="Customer Support", sender_handle="support_user", content="When will the new feature be available?", message_type="comment"),
InboxMessage(account_id=accounts[1].id, platform="instagram", external_id="msg2", sender_name="Sarah Lee", sender_handle="sarahlee", content="Love your recent posts! Can we collaborate?", message_type="dm"),
]
db.add_all(messages)
db.commit()
return {"success": True, "message": "Demo data seeded. Login with demo@example.com / demo123"}
# ---- SERVE FRONTEND ----
STATIC_DIR = "/app/frontend/dist"
if os.path.exists(STATIC_DIR):
app.mount("/", StaticFiles(directory=STATIC_DIR, html=True), name="static")
@app.get("/{path:path}")
async def serve_spa(path: str):
index_file = os.path.join(STATIC_DIR, "index.html")
if os.path.exists(index_file):
return FileResponse(index_file)
raise HTTPException(status_code=404)
if __name__ == "__main__":
import uvicorn
port = int(os.getenv("PORT", 7860))
uvicorn.run(app, host="0.0.0.0", port=port)