rockerritesh's picture
fix: handle None values in category/project fields
408e51f unverified
import gradio as gr
import pandas as pd
import plotly.express as px
import plotly.graph_objects as go
import json
import os
import secrets
import tempfile
import platform
from datetime import datetime, timezone
from qdrant_client import QdrantClient
from qdrant_client.models import (
Filter, FieldCondition, MatchValue, PointStruct,
)
from collections import Counter
from itertools import combinations
import uuid
# --- Config & Client ---
QDRANT_URL = os.environ.get("QDRANT_URL")
QDRANT_API_KEY = os.environ.get("QDRANT_API_KEY")
PASS_KEY = os.environ.get("PASS_KEY", "")
MEMORIES_COLLECTION = "memories"
TRAILS_COLLECTION = "trails"
USERS_COLLECTION = "users"
_client = None
_auth_sessions = {}
def get_client():
global _client
if _client is None:
_client = QdrantClient(url=QDRANT_URL, api_key=QDRANT_API_KEY)
return _client
# --- Terminal CSS ---
TERMINAL_CSS = """
/* === GLOBAL === */
.gradio-container {
max-width: 1600px !important;
background-color: #0a0a0a !important;
font-family: 'JetBrains Mono', 'Fira Code', 'Cascadia Code', 'Consolas', monospace !important;
color: #cccccc !important;
}
body, .dark {
background-color: #0a0a0a !important;
}
/* === STATUS BAR === */
.status-bar {
background: #111111;
border: 1px solid #333333;
padding: 6px 16px;
font-size: 12px;
font-family: 'JetBrains Mono', monospace;
color: #cccccc;
margin-bottom: 8px;
letter-spacing: 0.5px;
}
.status-bar .online { color: #00ff41; }
.status-bar .offline { color: #ff3333; }
.status-bar .warn { color: #ffff00; }
.status-bar .amber { color: #ff8c00; }
.status-bar .cyan { color: #00d4ff; }
/* === TABS === */
.tab-nav button {
background: #111111 !important;
color: #888888 !important;
border: 1px solid #333333 !important;
border-radius: 0 !important;
font-family: 'JetBrains Mono', monospace !important;
font-size: 12px !important;
text-transform: uppercase !important;
letter-spacing: 1px !important;
padding: 8px 16px !important;
}
.tab-nav button.selected {
background: #1a1a1a !important;
color: #ff8c00 !important;
border-bottom: 2px solid #ff8c00 !important;
}
.tab-nav {
border-bottom: 1px solid #333333 !important;
background: #0a0a0a !important;
}
/* === INPUTS === */
.gradio-container input,
.gradio-container textarea,
.gradio-container select {
background: #1a1a1a !important;
color: #cccccc !important;
border: 1px solid #333333 !important;
border-radius: 0 !important;
font-family: 'JetBrains Mono', monospace !important;
font-size: 13px !important;
}
.gradio-container input:focus,
.gradio-container textarea:focus {
border-color: #ff8c00 !important;
box-shadow: none !important;
}
.gradio-container label, .gradio-container .label-wrap {
color: #ff8c00 !important;
font-family: 'JetBrains Mono', monospace !important;
font-size: 12px !important;
text-transform: uppercase !important;
}
/* === BUTTONS === */
.gradio-container button.primary {
background: #111111 !important;
color: #00ff41 !important;
border: 1px solid #00ff41 !important;
border-radius: 0 !important;
font-family: 'JetBrains Mono', monospace !important;
text-transform: uppercase !important;
letter-spacing: 1px !important;
}
.gradio-container button.primary:hover {
background: #1a2a1a !important;
}
.gradio-container button.secondary {
background: #111111 !important;
color: #ff8c00 !important;
border: 1px solid #ff8c00 !important;
border-radius: 0 !important;
font-family: 'JetBrains Mono', monospace !important;
text-transform: uppercase !important;
}
.gradio-container button.stop {
background: #111111 !important;
color: #ff3333 !important;
border: 1px solid #ff3333 !important;
border-radius: 0 !important;
font-family: 'JetBrains Mono', monospace !important;
text-transform: uppercase !important;
}
/* === DATAFRAMES / TABLES === */
.gradio-container table {
font-family: 'JetBrains Mono', monospace !important;
font-size: 12px !important;
border-collapse: collapse !important;
}
.gradio-container table th {
background: #111111 !important;
color: #ff8c00 !important;
border: 1px solid #333333 !important;
text-transform: uppercase !important;
font-size: 11px !important;
letter-spacing: 0.5px !important;
padding: 6px 8px !important;
}
.gradio-container table td {
background: #0a0a0a !important;
color: #cccccc !important;
border: 1px solid #222222 !important;
padding: 4px 8px !important;
}
.gradio-container table tr:hover td {
background: #1a1a1a !important;
}
/* === MARKDOWN === */
.gradio-container .markdown-text,
.gradio-container .prose {
color: #cccccc !important;
font-family: 'JetBrains Mono', monospace !important;
}
.gradio-container .prose h1,
.gradio-container .prose h2,
.gradio-container .prose h3 {
color: #ff8c00 !important;
font-family: 'JetBrains Mono', monospace !important;
border-bottom: 1px solid #333333 !important;
}
/* === PLOTS === */
.gradio-container .plot-container {
background: #0a0a0a !important;
}
/* === DROPDOWNS === */
.gradio-container .wrap ul {
background: #1a1a1a !important;
border: 1px solid #333333 !important;
}
.gradio-container .wrap ul li {
color: #cccccc !important;
}
.gradio-container .wrap ul li:hover {
background: #222222 !important;
}
/* === RADIO === */
.gradio-container .radio-group label {
background: #111111 !important;
color: #888888 !important;
border: 1px solid #333333 !important;
border-radius: 0 !important;
}
.gradio-container .radio-group label.selected {
color: #ff8c00 !important;
border-color: #ff8c00 !important;
}
/* === BLOCKS / PANELS === */
.gradio-container .block {
background: #0a0a0a !important;
border: 1px solid #333333 !important;
border-radius: 0 !important;
box-shadow: none !important;
}
.gradio-container .panel {
background: #0a0a0a !important;
border: none !important;
}
.gradio-container .form {
background: #0a0a0a !important;
border: none !important;
}
/* === ACCORDION === */
.gradio-container .accordion {
background: #111111 !important;
border: 1px solid #333333 !important;
border-radius: 0 !important;
}
/* === FILE DOWNLOAD === */
.gradio-container .file-preview {
background: #111111 !important;
border: 1px solid #333333 !important;
}
/* === LOG / FEED PANEL === */
.log-panel {
background: #0a0a0a;
border: 1px solid #333333;
padding: 12px;
font-family: 'JetBrains Mono', monospace;
font-size: 12px;
color: #cccccc;
max-height: 500px;
overflow-y: auto;
line-height: 1.6;
}
.log-panel .log-created { color: #00ff41; }
.log-panel .log-accessed { color: #ff8c00; }
.log-panel .log-updated { color: #cccccc; }
.log-panel .log-archived { color: #ff3333; }
.log-panel .log-ts { color: #888888; }
"""
# --- Plotly Terminal Theme ---
TERMINAL_COLORS = [
"#00ff41", "#ff8c00", "#00d4ff", "#ff3333",
"#ffff00", "#cc44ff", "#00cccc", "#ff6699",
]
def apply_terminal_theme(fig):
"""Apply terminal dark theme to any Plotly figure."""
fig.update_layout(
paper_bgcolor="#0a0a0a",
plot_bgcolor="#0a0a0a",
font=dict(
family="JetBrains Mono, Fira Code, Consolas, monospace",
color="#cccccc",
size=12,
),
title_font=dict(color="#ff8c00", size=14),
xaxis=dict(gridcolor="#222222", zerolinecolor="#333333",
tickfont=dict(color="#888888")),
yaxis=dict(gridcolor="#222222", zerolinecolor="#333333",
tickfont=dict(color="#888888")),
legend=dict(font=dict(color="#cccccc"), bgcolor="rgba(0,0,0,0)"),
margin=dict(l=40, r=20, t=40, b=40),
)
return fig
# --- Auth ---
def resolve_user(password):
"""Resolve auth credentials to (role, user_id) or None."""
if password == PASS_KEY and PASS_KEY:
return ("admin", "admin")
try:
client = get_client()
points, _ = client.scroll(
collection_name=USERS_COLLECTION,
limit=1000,
with_payload=True,
with_vectors=False,
)
for p in points:
if p.payload.get("token") == password:
return ("user", p.payload["user_id"])
except Exception:
pass
return None
def auth_fn(username, password):
"""Authenticate. Password is either PASS_KEY (admin) or user token."""
result = resolve_user(password)
if result:
session_key = password[:16]
_auth_sessions[session_key] = result
return True
return False
# --- Data Loading ---
def load_data():
"""Load all current-version memories from Qdrant."""
client = get_client()
points, _ = client.scroll(
collection_name=MEMORIES_COLLECTION,
scroll_filter=Filter(must=[
FieldCondition(key="is_current", match=MatchValue(value=True)),
]),
limit=10000,
with_payload=True,
with_vectors=False,
)
return [p.payload for p in points]
def load_users():
"""Load all users from Qdrant."""
client = get_client()
points, _ = client.scroll(
collection_name=USERS_COLLECTION,
limit=1000,
with_payload=True,
with_vectors=False,
)
return {"users": [p.payload for p in points]}
def load_trail(memory_id):
"""Load trail events for a specific memory."""
try:
client = get_client()
points, _ = client.scroll(
collection_name=TRAILS_COLLECTION,
scroll_filter=Filter(must=[
FieldCondition(key="memory_id", match=MatchValue(value=memory_id)),
]),
limit=10000,
with_payload=True,
with_vectors=False,
)
events = [p.payload for p in points]
events.sort(key=lambda e: e.get("timestamp", ""))
return events
except Exception:
return []
def load_all_trail_stats(records):
"""Load access stats for all memories."""
stats = {}
for r in records:
mid = r.get("memory_id")
if not mid:
continue
trail = load_trail(mid)
access_count = sum(1 for e in trail if e.get("event") == "accessed")
last_accessed = None
for e in reversed(trail):
if e.get("event") == "accessed":
last_accessed = e.get("timestamp")
break
stats[mid] = {"access_count": access_count, "last_accessed": last_accessed}
return stats
def save_users(users_data):
"""Save users to Qdrant."""
client = get_client()
_ns = uuid.UUID("a1b2c3d4-e5f6-7890-abcd-ef1234567890")
points = []
for u in users_data.get("users", []):
point_id = str(uuid.uuid5(_ns, f"user_{u['user_id']}"))
points.append(PointStruct(
id=point_id,
vector=[0.0],
payload=u,
))
if points:
client.upsert(collection_name=USERS_COLLECTION, points=points)
# --- Status Bar ---
def build_admin_status_bar():
"""Build HTML status bar for admin view."""
try:
client = get_client()
mem_info = client.get_collection(MEMORIES_COLLECTION)
trail_info = client.get_collection(TRAILS_COLLECTION)
user_info = client.get_collection(USERS_COLLECTION)
mem_count = mem_info.points_count
trail_count = trail_info.points_count
user_count = user_info.points_count
qdrant_status = '<span class="online">ONLINE</span>'
except Exception:
mem_count = trail_count = user_count = "?"
qdrant_status = '<span class="offline">OFFLINE</span>'
now = datetime.now().strftime("%H:%M:%S")
return f"""<div class="status-bar">
<span class="amber">TATVA ADMIN</span> &nbsp;|&nbsp;
Qdrant: {qdrant_status} &nbsp;|&nbsp;
Memories: <span class="cyan">{mem_count}</span> &nbsp;|&nbsp;
Trails: <span class="cyan">{trail_count}</span> &nbsp;|&nbsp;
Users: <span class="cyan">{user_count}</span> &nbsp;|&nbsp;
Last sync: <span class="online">{now}</span>
</div>"""
def build_user_status_bar(user_id):
"""Build HTML status bar for user view."""
try:
client = get_client()
points, _ = client.scroll(
collection_name=MEMORIES_COLLECTION,
scroll_filter=Filter(must=[
FieldCondition(key="is_current", match=MatchValue(value=True)),
FieldCondition(key="user_id", match=MatchValue(value=user_id)),
]),
limit=10000,
with_payload=True,
with_vectors=False,
)
records = [p.payload for p in points]
mem_count = len(records)
categories = len(set(r.get("category", "") for r in records))
projects = len(set(r.get("project", "") for r in records))
dates = [r.get("time_date", "") for r in records if r.get("time_date")]
last_activity = max(dates)[:10] if dates else "N/A"
except Exception:
mem_count = categories = projects = "?"
last_activity = "N/A"
return f"""<div class="status-bar">
<span class="amber">TATVA</span> [<span class="cyan">{user_id}</span>] &nbsp;|&nbsp;
Memories: <span class="cyan">{mem_count}</span> &nbsp;|&nbsp;
Categories: <span class="cyan">{categories}</span> &nbsp;|&nbsp;
Projects: <span class="cyan">{projects}</span> &nbsp;|&nbsp;
Last activity: <span class="online">{last_activity}</span>
</div>"""
def refresh_status_bar(role, user_id):
"""Return the appropriate status bar HTML based on role."""
if role == "admin":
return build_admin_status_bar()
return build_user_status_bar(user_id)
# --- Chart Helpers (terminal themed) ---
def build_dataframe(records, user_filter="All Users"):
df = pd.DataFrame(records)
if df.empty:
return df
if "user_id" not in df.columns:
df["user_id"] = "sumit"
else:
df["user_id"] = df["user_id"].fillna("sumit")
if user_filter != "All Users":
df = df[df["user_id"] == user_filter]
df["date"] = pd.to_datetime(df["time_date"], errors="coerce")
df["month"] = df["date"].dt.to_period("M").astype(str)
df["week"] = df["date"].dt.to_period("W").astype(str)
df["day_of_week"] = df["date"].dt.day_name()
df["category"] = df["category"].fillna("uncategorized")
df["project"] = df["project"].fillna("unspecified")
return df
def get_all_tags(df):
return [t for tags in df["tags"].dropna()
for t in (tags if isinstance(tags, list) else [])]
def make_timeline(df, granularity="Month"):
if df.empty:
return apply_terminal_theme(go.Figure())
if granularity == "Month":
grouped = df.groupby("month").size().reset_index(name="count")
elif granularity == "Week":
grouped = df.groupby("week").size().reset_index(name="count")
else:
grouped = df.groupby(df["date"].dt.date).size().reset_index(name="count")
grouped.columns = ["period", "count"]
grouped = grouped.sort_values("period")
fig = px.bar(grouped, x="period", y="count",
color_discrete_sequence=["#00ff41"], title="ACTIVITY")
fig.update_layout(xaxis_title="", yaxis_title="TASKS", height=350)
return apply_terminal_theme(fig)
def make_category_pie(df):
if df.empty:
return apply_terminal_theme(go.Figure())
cat_counts = df["category"].value_counts().reset_index()
cat_counts.columns = ["category", "count"]
fig = px.pie(cat_counts, names="category", values="count", hole=0.5,
color_discrete_sequence=TERMINAL_COLORS, title="CATEGORIES")
fig.update_layout(height=350)
return apply_terminal_theme(fig)
def make_project_bar(df):
if df.empty:
return apply_terminal_theme(go.Figure())
proj_counts = (df[df["project"] != "unspecified"]["project"]
.value_counts().head(15).reset_index())
proj_counts.columns = ["project", "count"]
fig = px.bar(proj_counts, x="count", y="project", orientation="h",
color_discrete_sequence=["#ff8c00"], title="TOP PROJECTS")
fig.update_layout(yaxis=dict(autorange="reversed"), height=400,
xaxis_title="TASKS", yaxis_title="")
return apply_terminal_theme(fig)
def make_day_of_week(df):
if df.empty:
return apply_terminal_theme(go.Figure())
day_order = ["Monday", "Tuesday", "Wednesday", "Thursday",
"Friday", "Saturday", "Sunday"]
day_counts = (df["day_of_week"].value_counts()
.reindex(day_order, fill_value=0).reset_index())
day_counts.columns = ["day", "count"]
fig = px.bar(day_counts, x="day", y="count",
color_discrete_sequence=["#00d4ff"], title="DAY OF WEEK")
fig.update_layout(height=400, xaxis_title="", yaxis_title="TASKS")
return apply_terminal_theme(fig)
def make_tags_bar(all_tags):
if not all_tags:
return apply_terminal_theme(go.Figure())
tag_counts = Counter(all_tags).most_common(20)
tag_df = pd.DataFrame(tag_counts, columns=["tag", "count"])
fig = px.bar(tag_df, x="count", y="tag", orientation="h",
color_discrete_sequence=["#cc44ff"], title="TOP 20 TAGS")
fig.update_layout(yaxis=dict(autorange="reversed"), height=450,
xaxis_title="COUNT", yaxis_title="",
coloraxis_showscale=False)
return apply_terminal_theme(fig)
def make_heatmap(df):
if df.empty:
return apply_terminal_theme(go.Figure())
df_valid = df.dropna(subset=["date"]).copy()
if df_valid.empty:
return apply_terminal_theme(go.Figure())
df_valid["year"] = df_valid["date"].dt.year
df_valid["month_num"] = df_valid["date"].dt.month
heatmap_data = (df_valid.groupby(["year", "month_num"])
.size().reset_index(name="count"))
heatmap_pivot = heatmap_data.pivot(
index="year", columns="month_num", values="count").fillna(0)
month_labels = ["Jan", "Feb", "Mar", "Apr", "May", "Jun",
"Jul", "Aug", "Sep", "Oct", "Nov", "Dec"]
fig = go.Figure(data=go.Heatmap(
z=heatmap_pivot.values,
x=[month_labels[i - 1] for i in heatmap_pivot.columns],
y=heatmap_pivot.index.astype(str),
colorscale=[[0, "#0a0a0a"], [0.5, "#ff8c00"], [1, "#ff3333"]],
hoverongaps=False,
))
fig.update_layout(height=450, xaxis_title="MONTH", yaxis_title="YEAR",
title="ACTIVITY HEATMAP")
return apply_terminal_theme(fig)
def make_recent_table(df):
if df.empty:
return pd.DataFrame(columns=["Date", "Category", "Project", "User", "Task"])
recent = df.sort_values("date", ascending=False).head(25)[
["time_date", "category", "project", "user_id", "task"]
].copy()
recent.columns = ["Date", "Category", "Project", "User", "Task"]
return recent
# --- Dashboard ---
def get_user_choices():
users_data = load_users()
user_ids = [u["user_id"] for u in users_data.get("users", [])]
return ["All Users", "shared"] + user_ids
def refresh_dashboard(granularity, user_filter, role, user_id):
"""Refresh dashboard. Admin sees all users + filter. User sees only own data."""
records = load_data()
if role != "admin":
user_filter = user_id
choices = get_user_choices() if role == "admin" else [user_id]
df = build_dataframe(records, user_filter)
all_tags = get_all_tags(df) if not df.empty else []
total = len(df)
n_categories = df["category"].nunique() if not df.empty else 0
n_projects = df["project"].nunique() if not df.empty else 0
n_tags = len(set(all_tags))
stats = (f"`{total}` RECORDS | `{n_categories}` CATEGORIES | "
f"`{n_projects}` PROJECTS | `{n_tags}` TAGS | "
f"VIEW: `{user_filter}` | {datetime.now().strftime('%H:%M:%S')}")
bar = refresh_status_bar(role, user_id)
return (
gr.update(choices=choices, value=user_filter),
stats,
make_timeline(df, granularity),
make_category_pie(df),
make_project_bar(df),
make_day_of_week(df),
make_tags_bar(all_tags),
make_heatmap(df),
make_recent_table(df),
bar,
)
# --- Memory Browser ---
def load_memories_page(category_filter, project_filter, tag_filter,
status_filter, page, role, user_id):
"""Load a page of memories with filters."""
if status_filter == "archived":
try:
client = get_client()
points, _ = client.scroll(
collection_name=MEMORIES_COLLECTION,
scroll_filter=Filter(must=[
FieldCondition(key="status",
match=MatchValue(value="archived")),
]),
limit=10000,
with_payload=True,
with_vectors=False,
)
records = [p.payload for p in points]
except Exception:
records = []
else:
records = load_data()
if role != "admin":
records = [r for r in records
if r.get("user_id", "sumit") == user_id]
if category_filter and category_filter != "All":
records = [r for r in records
if r.get("category") == category_filter]
if project_filter and project_filter != "All":
records = [r for r in records
if r.get("project") == project_filter]
if tag_filter and tag_filter.strip():
tag = tag_filter.strip().lower()
records = [r for r in records
if tag in [t.lower() for t in (r.get("tags") or [])]]
records.sort(key=lambda r: r.get("time_date", ""), reverse=True)
page_size = 50
total = len(records)
total_pages = max(1, (total + page_size - 1) // page_size)
page = max(1, min(int(page), total_pages))
start = (page - 1) * page_size
page_records = records[start:start + page_size]
rows = []
for r in page_records:
tags_str = ", ".join(r.get("tags") or [])
rows.append({
"ID": (r.get("memory_id", ""))[:16] + "...",
"Date": (r.get("time_date", ""))[:10],
"Category": r.get("category", ""),
"Project": r.get("project", ""),
"User": r.get("user_id", "sumit"),
"Task": (r.get("task", ""))[:80],
"Tags": tags_str[:40],
"Status": r.get("status", "active"),
})
df = pd.DataFrame(rows) if rows else pd.DataFrame(
columns=["ID", "Date", "Category", "Project",
"User", "Task", "Tags", "Status"])
info = f"`{total}` MEMORIES | PAGE `{page}` / `{total_pages}`"
return df, info, page
def get_memory_detail(memory_id_partial, role, user_id):
"""Get full detail of a memory by partial ID match."""
if not memory_id_partial or len(memory_id_partial) < 4:
return "Enter at least 4 characters of the memory ID."
records = load_data()
if role != "admin":
records = [r for r in records
if r.get("user_id", "sumit") == user_id]
clean_id = memory_id_partial.replace("...", "").strip()
match = None
for r in records:
if r.get("memory_id", "").startswith(clean_id):
match = r
break
if not match:
return "Memory not found."
tags_str = ", ".join(match.get("tags") or [])
return f"""```
MEMORY ID : {match.get('memory_id', 'N/A')}
DATE : {match.get('time_date', 'N/A')}
CATEGORY : {match.get('category', 'N/A')}
PROJECT : {match.get('project', 'N/A')}
USER : {match.get('user_id', 'sumit')}
STATUS : {match.get('status', 'active')}
VERSION : {match.get('version', 1)}
TAGS : {tags_str}
TASK:
{match.get('task', 'N/A')}
```"""
# --- Search ---
def search_memories_keyword(query, role, user_id):
"""Search memories by keyword in task field."""
if not query or len(query.strip()) < 2:
return (pd.DataFrame(columns=["ID", "Date", "Category", "Project",
"User", "Task", "Score"]),
"Enter at least 2 characters.")
query = query.strip().lower()
records = load_data()
if role != "admin":
records = [r for r in records
if r.get("user_id", "sumit") == user_id]
results = []
for r in records:
task = (r.get("task") or "").lower()
tags = " ".join(r.get("tags") or []).lower()
cat = (r.get("category") or "").lower()
proj = (r.get("project") or "").lower()
score = (task.count(query) * 3 + tags.count(query) * 2
+ cat.count(query) + proj.count(query))
if score > 0:
results.append((r, score))
results.sort(key=lambda x: x[1], reverse=True)
results = results[:50]
rows = []
for r, score in results:
rows.append({
"ID": (r.get("memory_id", ""))[:16] + "...",
"Date": (r.get("time_date", ""))[:10],
"Category": r.get("category", ""),
"Project": r.get("project", ""),
"User": r.get("user_id", "sumit"),
"Task": (r.get("task", ""))[:100],
"Score": score,
})
df = pd.DataFrame(rows) if rows else pd.DataFrame(
columns=["ID", "Date", "Category", "Project",
"User", "Task", "Score"])
info = f"`{len(rows)}` RESULTS for `{query}`"
return df, info
# --- Memory Lifecycle ---
def load_trail_view(memory_id, role, user_id):
"""Load trail for a memory. Returns formatted table."""
if not memory_id:
return pd.DataFrame(columns=["TIME", "EVENT", "ACTOR", "DETAILS"])
memory_id = memory_id.strip()
if role != "admin":
records = load_data()
own_ids = {r.get("memory_id") for r in records
if r.get("user_id", "sumit") == user_id}
if memory_id not in own_ids:
return pd.DataFrame([{
"TIME": "", "EVENT": "ACCESS DENIED",
"ACTOR": "", "DETAILS": "Not your memory",
}])
events = load_trail(memory_id)
if not events:
return pd.DataFrame(columns=["TIME", "EVENT", "ACTOR", "DETAILS"])
rows = []
for e in events:
details = json.dumps(e.get("context", {}), ensure_ascii=False)
rows.append({
"TIME": e.get("timestamp", ""),
"EVENT": (e.get("event", "")).upper(),
"ACTOR": e.get("actor", ""),
"DETAILS": details[:120],
})
return pd.DataFrame(rows)
def refresh_access_stats(role, user_id):
"""Load access stats. Admin sees all, user sees own."""
current = load_data()
if role != "admin":
current = [r for r in current
if r.get("user_id", "sumit") == user_id]
stats = load_all_trail_stats(current)
rows = []
for r in current:
mid = r.get("memory_id", "")
s = stats.get(mid, {})
rows.append({
"Memory ID": mid[:16] + "...",
"Task": (r.get("task", ""))[:60],
"Status": r.get("status", "active"),
"Accesses": s.get("access_count", 0),
"Last Accessed": s.get("last_accessed", "never"),
"User": r.get("user_id", "sumit"),
})
df = pd.DataFrame(rows)
if df.empty:
empty = pd.DataFrame(columns=["Memory ID", "Task", "Status",
"Accesses", "Last Accessed", "User"])
return empty, empty
top = df.sort_values("Accesses", ascending=False).head(20)
stale = df[(df["Accesses"] == 0) & (df["Status"] == "active")].head(20)
return top, stale
# --- Trends ---
def make_growth_chart(df):
"""Weekly memory creation with cumulative trend line."""
if df.empty:
return apply_terminal_theme(go.Figure())
weekly = df.groupby("week").size().reset_index(name="count")
weekly = weekly.sort_values("week")
weekly["cumulative"] = weekly["count"].cumsum()
fig = go.Figure()
fig.add_trace(go.Bar(x=weekly["week"], y=weekly["count"],
name="WEEKLY", marker_color="#00ff41"))
fig.add_trace(go.Scatter(x=weekly["week"], y=weekly["cumulative"],
name="CUMULATIVE",
line=dict(color="#ff8c00", width=2),
yaxis="y2"))
fig.update_layout(
title="GROWTH",
yaxis=dict(title="WEEKLY COUNT"),
yaxis2=dict(title="CUMULATIVE", overlaying="y", side="right",
gridcolor="#222222"),
height=400,
showlegend=True,
)
return apply_terminal_theme(fig)
def make_category_trend(df):
"""Category breakdown stacked area chart over time."""
if df.empty:
return apply_terminal_theme(go.Figure())
cat_month = (df.groupby(["month", "category"])
.size().reset_index(name="count"))
if cat_month.empty:
return apply_terminal_theme(go.Figure())
fig = px.area(cat_month, x="month", y="count", color="category",
color_discrete_sequence=TERMINAL_COLORS,
title="CATEGORY TREND")
fig.update_layout(height=400, xaxis_title="", yaxis_title="COUNT")
return apply_terminal_theme(fig)
def make_project_timeline(df):
"""Which projects are active per month."""
if df.empty:
return apply_terminal_theme(go.Figure())
proj_month = (df[df["project"] != "unspecified"]
.groupby(["month", "project"])
.size().reset_index(name="count"))
top_projs = df["project"].value_counts().head(10).index.tolist()
proj_month = proj_month[proj_month["project"].isin(top_projs)]
if proj_month.empty:
return apply_terminal_theme(go.Figure())
fig = px.bar(proj_month, x="month", y="count", color="project",
color_discrete_sequence=TERMINAL_COLORS,
title="PROJECT TIMELINE")
fig.update_layout(height=400, xaxis_title="", yaxis_title="COUNT",
barmode="stack")
return apply_terminal_theme(fig)
def make_busiest_periods(df):
"""Top 10 most active days."""
if df.empty:
return pd.DataFrame(columns=["DATE", "COUNT"])
daily = df.groupby(df["date"].dt.date).size().reset_index(name="count")
daily.columns = ["DATE", "COUNT"]
return daily.sort_values("COUNT", ascending=False).head(10)
def make_retention_stats(df):
"""Percentage of memories created in last 30/60/90 days."""
if df.empty:
return "No data."
total = len(df)
now = pd.Timestamp.now()
recent_30 = len(df[df["date"] >= now - pd.Timedelta(days=30)])
recent_60 = len(df[df["date"] >= now - pd.Timedelta(days=60)])
recent_90 = len(df[df["date"] >= now - pd.Timedelta(days=90)])
pct_30 = (recent_30 / total * 100) if total else 0
pct_60 = (recent_60 / total * 100) if total else 0
pct_90 = (recent_90 / total * 100) if total else 0
return f"""```
RETENTION REPORT
================
Total memories : {total}
Last 30 days : {recent_30} ({pct_30:.1f}%)
Last 60 days : {recent_60} ({pct_60:.1f}%)
Last 90 days : {recent_90} ({pct_90:.1f}%)
Older than 90 days: {total - recent_90} ({100 - pct_90:.1f}%)
```"""
def refresh_trends(role, user_id):
records = load_data()
user_filter = "All Users" if role == "admin" else user_id
df = build_dataframe(records, user_filter)
return (
make_growth_chart(df),
make_category_trend(df),
make_project_timeline(df),
make_busiest_periods(df),
make_retention_stats(df),
)
# --- Knowledge Graph ---
def make_tag_cooccurrence(df):
"""Heatmap of tag co-occurrence."""
if df.empty:
return apply_terminal_theme(go.Figure())
all_tag_lists = [tags for tags in df["tags"].dropna()
if isinstance(tags, list) and len(tags) > 1]
if not all_tag_lists:
return apply_terminal_theme(go.Figure())
cooc = Counter()
all_tags_flat = Counter()
for tags in all_tag_lists:
for t in tags:
all_tags_flat[t] += 1
for pair in combinations(sorted(set(tags)), 2):
cooc[pair] += 1
top_tags = [t for t, _ in all_tags_flat.most_common(15)]
matrix = []
for t1 in top_tags:
row = []
for t2 in top_tags:
if t1 == t2:
row.append(all_tags_flat[t1])
else:
pair = tuple(sorted([t1, t2]))
row.append(cooc.get(pair, 0))
matrix.append(row)
fig = go.Figure(data=go.Heatmap(
z=matrix, x=top_tags, y=top_tags,
colorscale=[[0, "#0a0a0a"], [0.5, "#00d4ff"], [1, "#00ff41"]],
hoverongaps=False,
))
fig.update_layout(title="TAG CO-OCCURRENCE", height=500)
return apply_terminal_theme(fig)
def make_project_category_sankey(df):
"""Sankey diagram: categories -> projects."""
if df.empty:
return apply_terminal_theme(go.Figure())
grouped = (df[df["project"] != "unspecified"]
.groupby(["category", "project"])
.size().reset_index(name="count"))
grouped = grouped.sort_values("count", ascending=False).head(30)
if grouped.empty:
return apply_terminal_theme(go.Figure())
cats = grouped["category"].unique().tolist()
projs = grouped["project"].unique().tolist()
all_labels = cats + projs
source = [cats.index(c) for c in grouped["category"]]
target = [len(cats) + projs.index(p) for p in grouped["project"]]
values = grouped["count"].tolist()
fig = go.Figure(data=go.Sankey(
node=dict(
pad=15, thickness=20,
line=dict(color="#333333", width=0.5),
label=all_labels,
color=["#ff8c00"] * len(cats) + ["#00d4ff"] * len(projs),
),
link=dict(source=source, target=target, value=values,
color="rgba(0, 255, 65, 0.2)"),
))
fig.update_layout(title="CATEGORY > PROJECT FLOW", height=500)
return apply_terminal_theme(fig)
def make_tag_cloud_table(df):
"""Tag frequency table."""
if df.empty:
return pd.DataFrame(columns=["TAG", "COUNT", "CATEGORIES"])
tag_data = {}
for _, row in df.iterrows():
tags = row.get("tags")
if not isinstance(tags, list):
continue
cat = row.get("category", "")
for t in tags:
if t not in tag_data:
tag_data[t] = {"count": 0, "categories": set()}
tag_data[t]["count"] += 1
tag_data[t]["categories"].add(cat)
rows = [
{"TAG": t, "COUNT": d["count"],
"CATEGORIES": ", ".join(sorted(d["categories"]))}
for t, d in sorted(tag_data.items(),
key=lambda x: x[1]["count"], reverse=True)[:30]
]
return (pd.DataFrame(rows) if rows
else pd.DataFrame(columns=["TAG", "COUNT", "CATEGORIES"]))
def refresh_knowledge(role, user_id):
records = load_data()
user_filter = "All Users" if role == "admin" else user_id
df = build_dataframe(records, user_filter)
return (make_tag_cooccurrence(df),
make_project_category_sankey(df),
make_tag_cloud_table(df))
# --- Activity Feed ---
def load_activity_feed(role, user_id):
"""Load recent trail events as a terminal log display."""
try:
client = get_client()
points, _ = client.scroll(
collection_name=TRAILS_COLLECTION,
limit=10000,
with_payload=True,
with_vectors=False,
)
events = [p.payload for p in points]
except Exception:
return ("<div class='log-panel'>"
"ERROR: Failed to load trail events.</div>")
if role != "admin":
records = load_data()
own_ids = {r.get("memory_id") for r in records
if r.get("user_id", "sumit") == user_id}
events = [e for e in events if e.get("memory_id") in own_ids]
events.sort(key=lambda e: e.get("timestamp", ""), reverse=True)
events = events[:100]
if not events:
return "<div class='log-panel'>No activity events found.</div>"
lines = []
for e in events:
ts = e.get("timestamp", "")[:19]
event_type = (e.get("event", "unknown")).upper()
mid = (e.get("memory_id", ""))[:12]
actor = e.get("actor", "system")
context = json.dumps(
e.get("context", {}), ensure_ascii=False)[:80]
css_class = {
"CREATED": "log-created",
"ACCESSED": "log-accessed",
"UPDATED": "log-updated",
"ARCHIVED": "log-archived",
}.get(event_type, "log-updated")
lines.append(
f'<span class="log-ts">[{ts}]</span> '
f'<span class="{css_class}">{event_type:10s}</span> '
f'{mid} {actor}{context}'
)
content = "<br>".join(lines)
return f"<div class='log-panel'>{content}</div>"
# --- User Management ---
def refresh_users_table():
users_data = load_users()
rows = []
for u in users_data.get("users", []):
token = u.get("token", "")
rows.append({
"User ID": u["user_id"],
"Name": u["name"],
"Role": u["role"],
"Created": u.get("created_at", ""),
"Token": token[:10] + "..." if len(token) > 10 else token,
})
return (pd.DataFrame(rows) if rows
else pd.DataFrame(columns=["User ID", "Name", "Role",
"Created", "Token"]))
def add_user(user_id, name):
if not user_id or not name:
return "Error: user_id and name are required", refresh_users_table()
user_id = user_id.strip().lower()
name = name.strip()
users_data = load_users()
if any(u["user_id"] == user_id for u in users_data["users"]):
return (f"Error: user_id '{user_id}' already exists",
refresh_users_table())
token = f"tatva_{user_id}_{secrets.token_hex(8)}"
users_data["users"].append({
"user_id": user_id,
"name": name,
"token": token,
"role": "user",
"created_at": datetime.now(timezone.utc).isoformat(),
})
save_users(users_data)
return (f"User added! Token: `{token}` — share this with {name}",
refresh_users_table())
def revoke_user(user_id):
if not user_id:
return "Error: user_id is required", refresh_users_table()
user_id = user_id.strip().lower()
users_data = load_users()
original_count = len(users_data["users"])
users_data["users"] = [u for u in users_data["users"]
if u["user_id"] != user_id]
if len(users_data["users"]) == original_count:
return (f"Error: user_id '{user_id}' not found",
refresh_users_table())
_ns = uuid.UUID("a1b2c3d4-e5f6-7890-abcd-ef1234567890")
point_id = str(uuid.uuid5(_ns, f"user_{user_id}"))
client = get_client()
client.delete(collection_name=USERS_COLLECTION,
points_selector=[point_id])
return (f"User '{user_id}' revoked.",
refresh_users_table())
def regenerate_token(user_id):
if not user_id:
return "Error: user_id is required", refresh_users_table()
user_id = user_id.strip().lower()
users_data = load_users()
for u in users_data["users"]:
if u["user_id"] == user_id:
new_token = f"tatva_{user_id}_{secrets.token_hex(8)}"
u["token"] = new_token
save_users(users_data)
return (f"New token for {user_id}: `{new_token}`",
refresh_users_table())
return (f"Error: user_id '{user_id}' not found",
refresh_users_table())
def get_user_profile(user_id):
"""Get a user's own profile info."""
try:
client = get_client()
points, _ = client.scroll(
collection_name=USERS_COLLECTION,
limit=1000,
with_payload=True,
with_vectors=False,
)
for p in points:
if p.payload.get("user_id") == user_id:
u = p.payload
return f"""```
USER PROFILE
============
USER ID : {u.get('user_id', '')}
NAME : {u.get('name', '')}
ROLE : {u.get('role', 'user')}
CREATED : {u.get('created_at', 'N/A')}
TOKEN : {u.get('token', '')[:10]}...
```"""
except Exception:
pass
return "Profile not found."
def regenerate_own_token(user_id):
"""Regenerate token for current user."""
if not user_id:
return "Error: no user_id", ""
users_data = load_users()
for u in users_data["users"]:
if u["user_id"] == user_id:
new_token = f"tatva_{user_id}_{secrets.token_hex(8)}"
u["token"] = new_token
save_users(users_data)
return (f"New token: `{new_token}` — save this, "
"it won't be shown again."), get_user_profile(user_id)
return "Error: user not found.", ""
# --- System Status ---
def load_system_status():
"""Load Qdrant and system status. Admin only."""
lines = []
lines.append("SYSTEM STATUS")
lines.append("=" * 40)
lines.append(f"PYTHON : {platform.python_version()}")
lines.append(f"PLATFORM : {platform.system()} {platform.release()}")
try:
import gradio as _gr
lines.append(f"GRADIO : {_gr.__version__}")
except Exception:
lines.append("GRADIO : unknown")
try:
import qdrant_client as _qc
lines.append(f"QDRANT CLI: {_qc.__version__}")
except Exception:
lines.append("QDRANT CLI: unknown")
lines.append("")
lines.append("QDRANT COLLECTIONS")
lines.append("=" * 40)
try:
client = get_client()
for coll_name in [MEMORIES_COLLECTION, TRAILS_COLLECTION,
USERS_COLLECTION]:
try:
info = client.get_collection(coll_name)
lines.append(
f"{coll_name.upper():12s}: {info.points_count:>8} points"
f" | status: {info.status}")
except Exception as e:
lines.append(f"{coll_name.upper():12s}: ERROR - {e}")
lines.append("")
lines.append("QDRANT SERVER")
lines.append("=" * 40)
try:
collections = client.get_collections()
lines.append(f"COLLECTIONS : {len(collections.collections)}")
lines.append("STATUS : ONLINE")
except Exception:
lines.append("STATUS : ERROR")
except Exception as e:
lines.append(f"CONNECTION : FAILED ({e})")
return f"```\n{chr(10).join(lines)}\n```"
# --- Export ---
def export_memories(fmt, category_filter, project_filter, role, user_id):
"""Export memories as JSON or CSV file."""
records = load_data()
if role != "admin":
records = [r for r in records
if r.get("user_id", "sumit") == user_id]
if category_filter and category_filter != "All":
records = [r for r in records
if r.get("category") == category_filter]
if project_filter and project_filter != "All":
records = [r for r in records
if r.get("project") == project_filter]
if not records:
return None, "`0` records matched. Nothing to export."
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
if fmt == "JSON":
clean = []
for r in records:
clean.append({
"memory_id": r.get("memory_id", ""),
"task": r.get("task", ""),
"time_date": r.get("time_date", ""),
"category": r.get("category", ""),
"project": r.get("project", ""),
"tags": r.get("tags", []),
"user_id": r.get("user_id", ""),
"status": r.get("status", "active"),
"version": r.get("version", 1),
})
filepath = os.path.join(
tempfile.gettempdir(), f"tatva_export_{timestamp}.json")
with open(filepath, "w") as f:
json.dump(clean, f, indent=2, ensure_ascii=False)
else:
df = pd.DataFrame(records)
cols = ["memory_id", "task", "time_date", "category", "project",
"tags", "user_id", "status", "version"]
cols = [c for c in cols if c in df.columns]
df = df[cols]
if "tags" in df.columns:
df["tags"] = df["tags"].apply(
lambda t: ", ".join(t) if isinstance(t, list) else "")
filepath = os.path.join(
tempfile.gettempdir(), f"tatva_export_{timestamp}.csv")
df.to_csv(filepath, index=False)
info = f"`{len(records)}` records exported as `{fmt}`."
return filepath, info
# --- Gradio UI Assembly ---
with gr.Blocks(title="Tatva Terminal", css=TERMINAL_CSS) as demo:
role_state = gr.State(value="admin")
user_id_state = gr.State(value="admin")
status_bar = gr.HTML(
value=("<div class='status-bar'>"
"<span class='amber'>TATVA</span> | Loading...</div>"))
with gr.Tabs():
# === Tab 1: Dashboard ===
with gr.Tab("Dashboard"):
with gr.Row():
granularity = gr.Radio(
["Month", "Week", "Day"], value="Month",
label="GRANULARITY")
user_filter = gr.Dropdown(
choices=["All Users"], value="All Users",
label="FILTER BY USER")
refresh_btn = gr.Button(
"REFRESH", variant="primary", scale=0)
stats_md = gr.Markdown()
with gr.Row():
timeline_plot = gr.Plot(label="ACTIVITY")
category_plot = gr.Plot(label="CATEGORIES")
with gr.Row():
project_plot = gr.Plot(label="PROJECTS")
dow_plot = gr.Plot(label="DAY OF WEEK")
with gr.Row():
tags_plot = gr.Plot(label="TAGS")
heatmap_plot = gr.Plot(label="HEATMAP")
gr.Markdown("### RECENT ACTIVITY")
recent_table = gr.Dataframe(
label="RECENT", interactive=False)
dash_outputs = [
user_filter, stats_md,
timeline_plot, category_plot, project_plot,
dow_plot, tags_plot, heatmap_plot,
recent_table, status_bar,
]
dash_inputs = [
granularity, user_filter,
role_state, user_id_state,
]
refresh_btn.click(
fn=refresh_dashboard,
inputs=dash_inputs, outputs=dash_outputs)
granularity.change(
fn=refresh_dashboard,
inputs=dash_inputs, outputs=dash_outputs)
user_filter.change(
fn=refresh_dashboard,
inputs=dash_inputs, outputs=dash_outputs)
# === Tab 2: Memory Browser ===
with gr.Tab("Browser"):
with gr.Row():
browser_cat = gr.Dropdown(
choices=["All"], value="All", label="CATEGORY")
browser_proj = gr.Dropdown(
choices=["All"], value="All", label="PROJECT")
browser_tag = gr.Textbox(
label="TAG FILTER", placeholder="tag name")
browser_status = gr.Radio(
["active", "archived"], value="active",
label="STATUS")
with gr.Row():
browser_page = gr.Number(
value=1, label="PAGE", precision=0)
browser_refresh = gr.Button(
"LOAD", variant="primary", scale=0)
browser_info = gr.Markdown()
browser_table = gr.Dataframe(
label="MEMORIES", interactive=False)
with gr.Row():
browser_prev = gr.Button(
"< PREV", variant="secondary", scale=0)
browser_next = gr.Button(
"NEXT >", variant="secondary", scale=0)
gr.Markdown("### MEMORY DETAIL")
browser_detail_id = gr.Textbox(
label="MEMORY ID",
placeholder="paste full or partial ID")
browser_detail_btn = gr.Button(
"VIEW DETAIL", variant="primary", scale=0)
browser_detail_out = gr.Markdown()
def do_browser_load(cat, proj, tag, status, page,
role, uid):
return load_memories_page(
cat, proj, tag, status, int(page), role, uid)
def do_browser_prev(cat, proj, tag, status, page,
role, uid):
return load_memories_page(
cat, proj, tag, status,
max(1, int(page) - 1), role, uid)
def do_browser_next(cat, proj, tag, status, page,
role, uid):
return load_memories_page(
cat, proj, tag, status,
int(page) + 1, role, uid)
browser_outputs = [
browser_table, browser_info, browser_page]
browser_inputs = [
browser_cat, browser_proj, browser_tag,
browser_status, browser_page,
role_state, user_id_state,
]
browser_refresh.click(
fn=do_browser_load,
inputs=browser_inputs, outputs=browser_outputs)
browser_prev.click(
fn=do_browser_prev,
inputs=browser_inputs, outputs=browser_outputs)
browser_next.click(
fn=do_browser_next,
inputs=browser_inputs, outputs=browser_outputs)
browser_detail_btn.click(
fn=get_memory_detail,
inputs=[browser_detail_id, role_state, user_id_state],
outputs=[browser_detail_out])
# === Tab 3: Search ===
with gr.Tab("Search"):
gr.Markdown("### KEYWORD SEARCH")
with gr.Row():
search_input = gr.Textbox(
label="QUERY", placeholder="search term...")
search_btn = gr.Button(
"SEARCH", variant="primary", scale=0)
search_info = gr.Markdown()
search_results = gr.Dataframe(
label="RESULTS", interactive=False)
gr.Markdown("### RESULT DETAIL")
search_detail_id = gr.Textbox(
label="MEMORY ID",
placeholder="paste ID from results")
search_detail_btn = gr.Button(
"VIEW", variant="primary", scale=0)
search_detail_out = gr.Markdown()
search_btn.click(
fn=search_memories_keyword,
inputs=[search_input, role_state, user_id_state],
outputs=[search_results, search_info])
search_input.submit(
fn=search_memories_keyword,
inputs=[search_input, role_state, user_id_state],
outputs=[search_results, search_info])
search_detail_btn.click(
fn=get_memory_detail,
inputs=[search_detail_id, role_state, user_id_state],
outputs=[search_detail_out])
# === Tab 4: Memory Lifecycle ===
with gr.Tab("Lifecycle"):
gr.Markdown("### MEMORY TRAIL")
with gr.Row():
trail_memory_id = gr.Textbox(
label="MEMORY ID",
placeholder="mem_a3f8b2c1-...")
trail_btn = gr.Button(
"LOAD TRAIL", variant="primary", scale=0)
trail_table = gr.Dataframe(
label="TRAIL EVENTS", interactive=False)
trail_btn.click(
fn=load_trail_view,
inputs=[trail_memory_id, role_state, user_id_state],
outputs=[trail_table])
gr.Markdown("---")
gr.Markdown("### ACCESS STATS")
access_table = gr.Dataframe(
label="MOST ACCESSED", interactive=False)
stale_table = gr.Dataframe(
label="STALE (0 ACCESSES)", interactive=False)
refresh_access_btn = gr.Button(
"LOAD STATS", variant="secondary")
refresh_access_btn.click(
fn=refresh_access_stats,
inputs=[role_state, user_id_state],
outputs=[access_table, stale_table])
# === Tab 5: Trends ===
with gr.Tab("Trends"):
trends_refresh = gr.Button(
"LOAD TRENDS", variant="primary")
with gr.Row():
growth_plot = gr.Plot(label="GROWTH")
cat_trend_plot = gr.Plot(label="CATEGORY TREND")
with gr.Row():
proj_timeline_plot = gr.Plot(
label="PROJECT TIMELINE")
with gr.Row():
with gr.Column():
gr.Markdown("### BUSIEST DAYS")
busiest_table = gr.Dataframe(
label="TOP 10 DAYS", interactive=False)
with gr.Column():
retention_md = gr.Markdown()
trends_refresh.click(
fn=refresh_trends,
inputs=[role_state, user_id_state],
outputs=[growth_plot, cat_trend_plot,
proj_timeline_plot, busiest_table,
retention_md])
# === Tab 6: Knowledge Graph ===
with gr.Tab("Graph"):
graph_refresh = gr.Button(
"LOAD GRAPH", variant="primary")
with gr.Row():
cooc_plot = gr.Plot(label="TAG CO-OCCURRENCE")
sankey_plot = gr.Plot(
label="CATEGORY > PROJECT")
gr.Markdown("### TAG FREQUENCY")
tag_cloud_table = gr.Dataframe(
label="TAGS", interactive=False)
graph_refresh.click(
fn=refresh_knowledge,
inputs=[role_state, user_id_state],
outputs=[cooc_plot, sankey_plot, tag_cloud_table])
# === Tab 7: Activity Feed ===
with gr.Tab("Feed"):
feed_refresh = gr.Button(
"REFRESH FEED", variant="primary")
feed_html = gr.HTML(
value=("<div class='log-panel'>"
"Click REFRESH to load activity feed.</div>"))
feed_refresh.click(
fn=load_activity_feed,
inputs=[role_state, user_id_state],
outputs=[feed_html])
# === Tab 8: User Management ===
with gr.Tab("Users"):
# Admin section
with gr.Column(visible=True) as admin_users_section:
gr.Markdown("### REGISTERED USERS")
users_table = gr.Dataframe(
label="USERS", interactive=False)
refresh_users_btn = gr.Button(
"REFRESH", variant="secondary")
refresh_users_btn.click(
fn=refresh_users_table,
outputs=[users_table])
gr.Markdown("---")
gr.Markdown("### ADD USER")
with gr.Row():
new_user_id = gr.Textbox(
label="USER ID",
placeholder="lowercase, no spaces")
new_user_name = gr.Textbox(
label="NAME",
placeholder="Display Name")
add_btn = gr.Button(
"ADD", variant="primary")
add_result = gr.Markdown()
add_btn.click(
fn=add_user,
inputs=[new_user_id, new_user_name],
outputs=[add_result, users_table])
gr.Markdown("---")
gr.Markdown("### REVOKE / REGENERATE")
with gr.Row():
target_user_id = gr.Textbox(
label="USER ID",
placeholder="target user")
revoke_btn = gr.Button(
"REVOKE", variant="stop")
regen_btn = gr.Button(
"REGEN TOKEN", variant="secondary")
action_result = gr.Markdown()
revoke_btn.click(
fn=revoke_user,
inputs=[target_user_id],
outputs=[action_result, users_table])
regen_btn.click(
fn=regenerate_token,
inputs=[target_user_id],
outputs=[action_result, users_table])
# User section (own profile only)
with gr.Column(visible=True) as user_profile_section:
gr.Markdown("### MY PROFILE")
profile_md = gr.Markdown()
regen_own_btn = gr.Button(
"REGENERATE MY TOKEN", variant="secondary")
regen_own_result = gr.Markdown()
regen_own_btn.click(
fn=regenerate_own_token,
inputs=[user_id_state],
outputs=[regen_own_result, profile_md])
# === Tab 9: System Status (admin only) ===
with gr.Tab("System", visible=True) as system_tab:
system_refresh = gr.Button(
"REFRESH STATUS", variant="primary")
system_md = gr.Markdown()
system_refresh.click(
fn=load_system_status,
outputs=[system_md])
# === Tab 10: Export ===
with gr.Tab("Export"):
gr.Markdown("### EXPORT MEMORIES")
with gr.Row():
export_fmt = gr.Radio(
["JSON", "CSV"], value="JSON", label="FORMAT")
export_cat = gr.Dropdown(
choices=["All"], value="All", label="CATEGORY")
export_proj = gr.Dropdown(
choices=["All"], value="All", label="PROJECT")
export_btn = gr.Button("EXPORT", variant="primary")
export_info = gr.Markdown()
export_file = gr.File(label="DOWNLOAD")
export_btn.click(
fn=export_memories,
inputs=[export_fmt, export_cat, export_proj,
role_state, user_id_state],
outputs=[export_file, export_info])
# --- Session Init ---
def init_full(request: gr.Request):
"""Initialize session: role, status bar, dashboard, tab visibility."""
if not _auth_sessions:
role, uid = "admin", "admin"
else:
last_key = list(_auth_sessions.keys())[-1]
role, uid = _auth_sessions[last_key]
bar = refresh_status_bar(role, uid)
# Dashboard initial load
records = load_data()
user_filter_val = "All Users" if role == "admin" else uid
choices = get_user_choices() if role == "admin" else [uid]
df = build_dataframe(records, user_filter_val)
all_tags = get_all_tags(df) if not df.empty else []
total = len(df)
n_categories = df["category"].nunique() if not df.empty else 0
n_projects = df["project"].nunique() if not df.empty else 0
n_tags = len(set(all_tags))
stats = (f"`{total}` RECORDS | `{n_categories}` CATEGORIES | "
f"`{n_projects}` PROJECTS | `{n_tags}` TAGS")
# Users tab setup
if role == "admin":
admin_vis = gr.update(visible=True)
user_vis = gr.update(visible=False)
users_df = refresh_users_table()
profile = ""
sys_vis = gr.update(visible=True)
else:
admin_vis = gr.update(visible=False)
user_vis = gr.update(visible=True)
users_df = pd.DataFrame()
profile = get_user_profile(uid)
sys_vis = gr.update(visible=False)
# Browser + Export filter choices
if role != "admin":
scoped = [r for r in records
if r.get("user_id", "sumit") == uid]
else:
scoped = records
cats = sorted(set((r.get("category") or "uncategorized")
for r in scoped))
projs = sorted(set((r.get("project") or "unspecified")
for r in scoped))
return [
role, uid, bar,
# Dashboard
gr.update(choices=choices, value=user_filter_val), stats,
make_timeline(df), make_category_pie(df),
make_project_bar(df), make_day_of_week(df),
make_tags_bar(all_tags), make_heatmap(df),
make_recent_table(df),
# Users tab
admin_vis, user_vis, users_df, profile,
# System tab
sys_vis,
# Browser filters
gr.update(choices=["All"] + cats),
gr.update(choices=["All"] + projs),
# Export filters
gr.update(choices=["All"] + cats),
gr.update(choices=["All"] + projs),
]
demo.load(
fn=init_full,
outputs=[
role_state, user_id_state, status_bar,
# Dashboard
user_filter, stats_md,
timeline_plot, category_plot,
project_plot, dow_plot,
tags_plot, heatmap_plot,
recent_table,
# Users tab
admin_users_section, user_profile_section,
users_table, profile_md,
# System tab
system_tab,
# Browser filters
browser_cat, browser_proj,
# Export filters
export_cat, export_proj,
],
)
demo.launch(
server_name="0.0.0.0",
server_port=7860,
ssr_mode=False,
auth=auth_fn,
auth_message="TATVA TERMINAL — Enter admin passkey or user token",
)