import gradio as gr
import sqlite3
import hashlib
import uuid
import os
from datetime import datetime
from PIL import Image
import numpy as np
import shutil
# ===============================
# DATABASE LAYER
# ===============================
conn = sqlite3.connect("vibes.db", check_same_thread=False)
cursor = conn.cursor()
def init_db():
# Users table
cursor.execute("""
CREATE TABLE IF NOT EXISTS users(
username TEXT PRIMARY KEY,
password TEXT,
display_name TEXT,
bio TEXT,
avatar TEXT,
created_at TEXT
)
""")
# Posts table (supports both images and videos)
cursor.execute("""
CREATE TABLE IF NOT EXISTS posts(
id TEXT PRIMARY KEY,
username TEXT,
caption TEXT,
media_path TEXT,
media_type TEXT,
timestamp TEXT,
views INTEGER DEFAULT 0
)
""")
# Likes table
cursor.execute("""
CREATE TABLE IF NOT EXISTS likes(
post_id TEXT,
username TEXT,
PRIMARY KEY (post_id, username)
)
""")
# Comments table
cursor.execute("""
CREATE TABLE IF NOT EXISTS comments(
id TEXT PRIMARY KEY,
post_id TEXT,
username TEXT,
comment TEXT,
timestamp TEXT
)
""")
# Followers table
cursor.execute("""
CREATE TABLE IF NOT EXISTS followers(
user TEXT,
follower TEXT,
PRIMARY KEY (user, follower)
)
""")
# Notifications table
cursor.execute("""
CREATE TABLE IF NOT EXISTS notifications(
id TEXT PRIMARY KEY,
username TEXT,
message TEXT,
timestamp TEXT,
read INTEGER DEFAULT 0
)
""")
conn.commit()
init_db()
# Create directories
os.makedirs("uploads", exist_ok=True)
os.makedirs("avatars", exist_ok=True)
# ===============================
# AUTH LAYER
# ===============================
def hash_password(p):
return hashlib.sha256(p.encode()).hexdigest()
def signup(username, password, display_name):
if not username or not password:
return "❌ Username and password required", "", "", ""
cursor.execute("SELECT * FROM users WHERE username=?", (username,))
if cursor.fetchone():
return "❌ Username already exists", "", "", ""
cursor.execute(
"INSERT INTO users VALUES (?,?,?,?,?,?)",
(username, hash_password(password), display_name or username, "✨ New to Vibes ✨", None, datetime.now().isoformat())
)
conn.commit()
return "✅ Account created! Please login.", "", "", ""
def login(username, password):
if not username or not password:
return "❌ Username and password required", None
cursor.execute("SELECT password, display_name FROM users WHERE username=?", (username,))
result = cursor.fetchone()
if not result:
return "❌ User not found", None
if result[0] != hash_password(password):
return "❌ Wrong password", None
return f"✅ Welcome back, {result[1]}!", username
# ===============================
# POST LAYER (SUPPORTS IMAGES & VIDEOS)
# ===============================
def upload_media(user, media, caption, media_type):
if user is None or user == "guest":
return "❌ Login to post", None
if media is None:
return f"❌ Select a {media_type}", None
post_id = str(uuid.uuid4())
# Determine file extension
if media_type == "image":
ext = "jpg"
# Process image
img = Image.fromarray(media)
img = img.convert("RGB")
# Resize if too large
max_size = 1200
if max(img.size) > max_size:
ratio = max_size / max(img.size)
new_size = (int(img.size[0] * ratio), int(img.size[1] * ratio))
img = img.resize(new_size, Image.Resampling.LANCZOS)
path = f"uploads/{post_id}.{ext}"
img.save(path, quality=85)
else:
ext = "mp4"
path = f"uploads/{post_id}.{ext}"
# Handle video file
if isinstance(media, str):
shutil.copy(media, path)
else:
return "❌ Invalid video format", None
cursor.execute(
"INSERT INTO posts VALUES (?,?,?,?,?,?,?)",
(post_id, user, caption, path, media_type, datetime.now().isoformat(), 0)
)
conn.commit()
# Notify followers
cursor.execute("SELECT follower FROM followers WHERE user=?", (user,))
followers = cursor.fetchall()
for f in followers:
notif_id = str(uuid.uuid4())
cursor.execute(
"INSERT INTO notifications VALUES (?,?,?,?,?)",
(notif_id, f[0], f"@{user} posted new content!", datetime.now().isoformat(), 0)
)
conn.commit()
return f"✅ {media_type.capitalize()} posted!", None
def like_post(post_id, user):
if user is None or user == "guest":
return "❤️ Login to like", None
cursor.execute("SELECT username FROM posts WHERE id=?", (post_id,))
post_owner = cursor.fetchone()
cursor.execute("SELECT * FROM likes WHERE post_id=? AND username=?", (post_id, user))
if cursor.fetchone():
cursor.execute("DELETE FROM likes WHERE post_id=? AND username=?", (post_id, user))
conn.commit()
# Update notification (simplified - just remove like)
return "🤍", "unliked"
else:
cursor.execute("INSERT INTO likes VALUES (?,?)", (post_id, user))
conn.commit()
# Notify post owner
if post_owner and post_owner[0] != user:
notif_id = str(uuid.uuid4())
cursor.execute(
"INSERT INTO notifications VALUES (?,?,?,?,?)",
(notif_id, post_owner[0], f"@{user} liked your post!", datetime.now().isoformat(), 0)
)
conn.commit()
return "❤️", "liked"
def add_comment(user, post_id, comment):
if user is None or user == "guest":
return "❌ Login to comment"
if not comment or not comment.strip():
return "❌ Enter a comment"
cursor.execute("SELECT username FROM posts WHERE id=?", (post_id,))
post_owner = cursor.fetchone()
cursor.execute(
"INSERT INTO comments VALUES (?,?,?,?,?)",
(str(uuid.uuid4()), post_id, user, comment.strip(), datetime.now().isoformat())
)
conn.commit()
# Notify post owner
if post_owner and post_owner[0] != user:
notif_id = str(uuid.uuid4())
cursor.execute(
"INSERT INTO notifications VALUES (?,?,?,?,?)",
(notif_id, post_owner[0], f"@{user} commented: {comment[:30]}", datetime.now().isoformat(), 0)
)
conn.commit()
return "✅ Comment added!"
# ===============================
# FEED RENDERER WITH STICKY HEADER
# ===============================
def render_feed():
cursor.execute("""
SELECT posts.id, posts.media_path, posts.caption, posts.media_type,
users.display_name, users.username, users.avatar,
COUNT(DISTINCT likes.username) as like_count,
COUNT(DISTINCT comments.id) as comment_count
FROM posts
JOIN users ON posts.username = users.username
LEFT JOIN likes ON posts.id = likes.post_id
LEFT JOIN comments ON posts.id = comments.video_id
GROUP BY posts.id
ORDER BY posts.timestamp DESC
LIMIT 50
""")
rows = cursor.fetchall()
if not rows:
return """
🎵
No content yet
Be the first to share something amazing!
"""
html = '
'
for row in rows:
post_id, media_path, caption, media_type, display_name, username, avatar, like_count, comment_count = row
if media_type == "image":
media_html = f''
else:
media_html = f'''
'''
avatar_html = f'' if avatar else '
👤
'
html += f"""
{media_html}
{avatar_html}
{display_name}
@{username}
{caption if caption else "✨"}
"""
html += "
"
return html
# ===============================
# PROFILE LAYER
# ===============================
def load_profile(user):
if user is None or user == "guest":
return "### 👤 Guest Mode\n\nLogin to see your profile", []
cursor.execute("SELECT display_name, bio, avatar, created_at FROM users WHERE username=?", (user,))
u = cursor.fetchone()
if not u:
return "### User not found", []
cursor.execute("SELECT media_path, media_type FROM posts WHERE username=? ORDER BY timestamp DESC", (user,))
posts = [(p[0], p[1]) for p in cursor.fetchall()]
cursor.execute("SELECT COUNT(*) FROM followers WHERE user=?", (user,))
followers = cursor.fetchone()[0]
cursor.execute("SELECT COUNT(*) FROM followers WHERE follower=?", (user,))
following = cursor.fetchone()[0]
cursor.execute("SELECT COUNT(*) FROM posts WHERE username=?", (user,))
posts_count = cursor.fetchone()[0]
cursor.execute("SELECT COUNT(*) FROM likes WHERE post_id IN (SELECT id FROM posts WHERE username=?)", (user,))
total_likes = cursor.fetchone()[0]
avatar_html = f'' if u[2] else '
👤
'
profile = f"""
{avatar_html}
{u[0]}
@{user}
{u[1]}
Joined {u[3][:10]}
{posts_count}
Posts
{followers}
Followers
{following}
Following
{total_likes}
Likes
"""
# Return posts as gallery items
gallery_items = [p[0] for p in posts[:9]]
return profile, gallery_items
def update_bio(user, new_bio):
if user is None or user == "guest":
return "❌ Login to update bio"
if not new_bio or not new_bio.strip():
return "❌ Please enter a bio"
cursor.execute("UPDATE users SET bio=? WHERE username=?", (new_bio.strip(), user))
conn.commit()
return "✅ Bio updated!"
def follow_user(current_user, target_user):
if current_user is None or current_user == "guest":
return "❌ Login to follow"
if current_user == target_user:
return "❌ Cannot follow yourself"
cursor.execute("SELECT * FROM users WHERE username=?", (target_user,))
if not cursor.fetchone():
return "❌ User not found"
cursor.execute("SELECT * FROM followers WHERE user=? AND follower=?", (target_user, current_user))
if cursor.fetchone():
cursor.execute("DELETE FROM followers WHERE user=? AND follower=?", (target_user, current_user))
conn.commit()
# Create unfollow notification
notif_id = str(uuid.uuid4())
cursor.execute(
"INSERT INTO notifications VALUES (?,?,?,?,?)",
(notif_id, target_user, f"@{current_user} unfollowed you", datetime.now().isoformat(), 0)
)
conn.commit()
return f"✅ Unfollowed {target_user}"
else:
cursor.execute("INSERT INTO followers VALUES (?,?)", (target_user, current_user))
conn.commit()
# Create follow notification
notif_id = str(uuid.uuid4())
cursor.execute(
"INSERT INTO notifications VALUES (?,?,?,?,?)",
(notif_id, target_user, f"@{current_user} started following you!", datetime.now().isoformat(), 0)
)
conn.commit()
return f"✅ Following {target_user}"
# ===============================
# NOTIFICATIONS
# ===============================
def get_notifications(user):
if user is None or user == "guest":
return []
cursor.execute(
"SELECT message, timestamp, read FROM notifications WHERE username=? ORDER BY timestamp DESC LIMIT 20",
(user,)
)
notifs = cursor.fetchall()
# Mark as read
cursor.execute("UPDATE notifications SET read=1 WHERE username=?", (user,))
conn.commit()
return notifs
def render_notifications(user):
notifs = get_notifications(user)
if not notifs:
return '
🔔 No notifications yet
'
html = '
'
for n in notifs:
unread_class = "" if n[2] else "unread"
html += f'''
"""
with gr.Blocks(title="Vibes", css=custom_css, theme=gr.themes.Soft()) as app:
user_state = gr.State("guest")
# Login Screen
with gr.Column(visible=True, elem_id="login_screen") as login_screen:
gr.HTML("""