telegram-analytics / dashboard.py
rottg's picture
Update code
999a034 verified
#!/usr/bin/env python3
"""
Telegram Analytics Dashboard - Web Server
A Flask-based web dashboard for visualizing Telegram chat analytics.
Inspired by Combot and other Telegram statistics bots.
Usage:
python dashboard.py --db telegram.db --port 5000
Then open http://localhost:5000 in your browser
Requirements:
pip install flask
"""
import sqlite3
import json
import csv
import io
import os
from datetime import datetime, timedelta
from flask import Flask, render_template, jsonify, request, Response
from typing import Optional
from collections import defaultdict
# ==========================================
# DATABASE DOWNLOAD FROM HF DATASET
# ==========================================
HF_DATASET_REPO = "rottg/telegram-db"
APP_DIR = os.path.dirname(os.path.abspath(__file__))
DB_PATH_FULL = os.path.join(APP_DIR, "telegram.db")
EMBEDDINGS_PATH_FULL = os.path.join(APP_DIR, "embeddings.db")
CHUNK_EMBEDDINGS_PATH = os.path.join(APP_DIR, "chunk_embeddings.db")
BM25_INDEX_PATH = os.path.join(APP_DIR, "bm25_index.pkl")
def download_from_hf(filename, local_path):
"""Download a file from HF Dataset repo."""
from huggingface_hub import hf_hub_download
import shutil
token = os.environ.get("HF_TOKEN")
if not token:
token_file = os.path.join(APP_DIR, ".hf_token")
if os.path.exists(token_file):
with open(token_file) as f:
token = f.read().strip()
cached_path = hf_hub_download(
repo_id=HF_DATASET_REPO,
filename=filename,
repo_type="dataset",
token=token,
)
shutil.copy2(cached_path, local_path)
return True
def ensure_db_exists():
"""Download DBs from HF Dataset repo if they don't exist locally."""
print(f"[DB] Current working directory: {os.getcwd()}")
# Download telegram.db
if os.path.exists(DB_PATH_FULL):
size_mb = os.path.getsize(DB_PATH_FULL) / (1024 * 1024)
print(f"βœ“ telegram.db found ({size_mb:.0f} MB)")
else:
print(f"[DB] Downloading telegram.db from HF...")
try:
download_from_hf("telegram.db", DB_PATH_FULL)
size_mb = os.path.getsize(DB_PATH_FULL) / (1024 * 1024)
print(f"βœ“ telegram.db downloaded ({size_mb:.0f} MB)")
except Exception as e:
print(f"βœ— Failed to download telegram.db: {e}")
return False
# Download embeddings.db (optional - for semantic search)
if os.path.exists(EMBEDDINGS_PATH_FULL):
size_mb = os.path.getsize(EMBEDDINGS_PATH_FULL) / (1024 * 1024)
print(f"βœ“ embeddings.db found ({size_mb:.0f} MB)")
else:
print(f"[DB] Downloading embeddings.db from HF...")
try:
download_from_hf("embeddings.db", EMBEDDINGS_PATH_FULL)
size_mb = os.path.getsize(EMBEDDINGS_PATH_FULL) / (1024 * 1024)
print(f"βœ“ embeddings.db downloaded ({size_mb:.0f} MB)")
except Exception as e:
print(f"⚠ embeddings.db not available: {e}")
# Download chunk_embeddings.db (for hybrid search)
if os.path.exists(CHUNK_EMBEDDINGS_PATH):
size_mb = os.path.getsize(CHUNK_EMBEDDINGS_PATH) / (1024 * 1024)
print(f"βœ“ chunk_embeddings.db found ({size_mb:.0f} MB)")
else:
print(f"[DB] Downloading chunk_embeddings.db from HF...")
try:
download_from_hf("chunk_embeddings.db", CHUNK_EMBEDDINGS_PATH)
size_mb = os.path.getsize(CHUNK_EMBEDDINGS_PATH) / (1024 * 1024)
print(f"βœ“ chunk_embeddings.db downloaded ({size_mb:.0f} MB)")
except Exception as e:
print(f"⚠ chunk_embeddings.db not available: {e}")
# Download bm25_index.pkl (for hybrid search)
if os.path.exists(BM25_INDEX_PATH):
size_mb = os.path.getsize(BM25_INDEX_PATH) / (1024 * 1024)
print(f"βœ“ bm25_index.pkl found ({size_mb:.0f} MB)")
else:
print(f"[DB] Downloading bm25_index.pkl from HF...")
try:
download_from_hf("bm25_index.pkl", BM25_INDEX_PATH)
size_mb = os.path.getsize(BM25_INDEX_PATH) / (1024 * 1024)
print(f"βœ“ bm25_index.pkl downloaded ({size_mb:.0f} MB)")
except Exception as e:
print(f"⚠ bm25_index.pkl not available: {e}")
return True
# Download DBs on module import (for gunicorn)
ensure_db_exists()
# ==========================================
# AI CONFIGURATION
# Set via environment variables (e.g. in .env or hosting platform settings)
# ==========================================
if not os.environ.get('AI_PROVIDER'):
os.environ['AI_PROVIDER'] = 'gemini'
# GEMINI_API_KEY should be set as an environment variable, not hardcoded
# Import our algorithms
from algorithms import (
TopK, find_median, find_percentile, top_k_frequent,
RankTree, lcs_similarity, find_similar_messages,
bucket_sort_by_time, time_histogram, RankedTimeIndex
)
# Import semantic search (uses pre-computed embeddings)
try:
from semantic_search import get_semantic_search
HAS_SEMANTIC_SEARCH = True
except ImportError:
HAS_SEMANTIC_SEARCH = False
get_semantic_search = None
app = Flask(__name__)
DB_PATH = 'telegram.db'
def get_db():
"""Get database connection."""
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
return conn
def parse_timeframe(timeframe: str) -> tuple[int, int]:
"""Parse timeframe string to Unix timestamps."""
now = datetime.now()
today_start = datetime(now.year, now.month, now.day)
if timeframe == 'today':
start = today_start
end = now
elif timeframe == 'yesterday':
start = today_start - timedelta(days=1)
end = today_start
elif timeframe == 'week':
start = today_start - timedelta(days=7)
end = now
elif timeframe == 'month':
start = today_start - timedelta(days=30)
end = now
elif timeframe == 'year':
start = today_start - timedelta(days=365)
end = now
elif timeframe == '2years':
start = today_start - timedelta(days=730)
end = now
elif timeframe == 'all':
return 0, int(now.timestamp())
else:
# Custom range: "start,end" as Unix timestamps
try:
parts = timeframe.split(',')
return int(parts[0]), int(parts[1])
except:
return 0, int(now.timestamp())
return int(start.timestamp()), int(end.timestamp())
# ==========================================
# CACHE INVALIDATION SYSTEM
# ==========================================
_cache_version = 0 # Incremented on DB updates to invalidate all caches
def invalidate_caches():
"""Invalidate all cached data. Call after DB updates (sync, import, etc.)."""
global _cache_version, _user_rank_tree, _user_rank_tree_timeframe
_cache_version += 1
_user_rank_tree = None
_user_rank_tree_timeframe = None
# ==========================================
# GLOBAL ALGORITHM CACHES
# ==========================================
# RankTree for O(log n) user ranking - rebuilt on demand
_user_rank_tree = None
_user_rank_tree_timeframe = None
_user_rank_tree_version = -1
def get_user_rank_tree(timeframe: str):
"""
Get or rebuild the user rank tree for efficient O(log n) rank queries.
Tree is cached and rebuilt only when timeframe or DB version changes.
"""
global _user_rank_tree, _user_rank_tree_timeframe, _user_rank_tree_version
if (_user_rank_tree is not None
and _user_rank_tree_timeframe == timeframe
and _user_rank_tree_version == _cache_version):
return _user_rank_tree
start_ts, end_ts = parse_timeframe(timeframe)
conn = get_db()
cursor = conn.execute('''
SELECT from_id, from_name, COUNT(*) as message_count
FROM messages
WHERE date_unixtime BETWEEN ? AND ?
AND from_id IS NOT NULL AND from_id != ''
GROUP BY from_id
ORDER BY message_count DESC
''', (start_ts, end_ts))
_user_rank_tree = RankTree()
for row in cursor.fetchall():
_user_rank_tree.insert(
-row['message_count'],
{'user_id': row['from_id'], 'name': row['from_name'], 'messages': row['message_count']}
)
conn.close()
_user_rank_tree_timeframe = timeframe
_user_rank_tree_version = _cache_version
return _user_rank_tree
# ==========================================
# PAGE ROUTES
# ==========================================
@app.route('/')
def index():
"""Main dashboard page."""
return render_template('index.html')
@app.route('/users')
def users_page():
"""User leaderboard page."""
return render_template('users.html')
@app.route('/moderation')
def moderation_page():
"""Moderation analytics page."""
return render_template('moderation.html')
@app.route('/search')
def search_page():
"""Search page."""
return render_template('search.html')
@app.route('/chat')
def chat_page():
"""Chat view page - Telegram-like interface."""
return render_template('chat.html')
@app.route('/user/<user_id>')
def user_profile_page(user_id):
"""User profile page with comprehensive statistics."""
return render_template('user_profile.html', user_id=user_id)
@app.route('/settings')
def settings_page():
"""Settings and data update page."""
return render_template('settings.html')
@app.route('/ai-search')
def ai_search_page():
"""AI-powered search page with Gemini."""
return render_template('ai_search.html')
@app.route('/maintenance')
def maintenance_page():
"""Maintenance page - password protected."""
return render_template('maintenance.html')
# ==========================================
# API ENDPOINTS - OVERVIEW STATS
# ==========================================
@app.route('/api/overview')
def api_overview():
"""Get overview statistics."""
timeframe = request.args.get('timeframe', 'all')
start_ts, end_ts = parse_timeframe(timeframe)
conn = get_db()
# Total messages
cursor = conn.execute('''
SELECT COUNT(*) FROM messages
WHERE date_unixtime BETWEEN ? AND ?
''', (start_ts, end_ts))
total_messages = cursor.fetchone()[0]
# Active users
cursor = conn.execute('''
SELECT COUNT(DISTINCT from_id) FROM messages
WHERE date_unixtime BETWEEN ? AND ?
''', (start_ts, end_ts))
active_users = cursor.fetchone()[0]
# Total users (all time)
cursor = conn.execute('SELECT COUNT(*) FROM users')
total_users = cursor.fetchone()[0]
# Date range
cursor = conn.execute('''
SELECT MIN(date_unixtime), MAX(date_unixtime) FROM messages
WHERE date_unixtime BETWEEN ? AND ?
''', (start_ts, end_ts))
row = cursor.fetchone()
first_msg = row[0] or start_ts
last_msg = row[1] or end_ts
# Calculate days
days = max(1, (last_msg - first_msg) // 86400)
# Messages per day
messages_per_day = total_messages / days
# Users per day (average unique users)
cursor = conn.execute('''
SELECT COUNT(DISTINCT from_id) as users,
date(datetime(date_unixtime, 'unixepoch')) as day
FROM messages
WHERE date_unixtime BETWEEN ? AND ?
GROUP BY day
''', (start_ts, end_ts))
daily_users = [r[0] for r in cursor.fetchall()]
users_per_day = sum(daily_users) / len(daily_users) if daily_users else 0
# Messages with media/links
cursor = conn.execute('''
SELECT
SUM(has_media) as media,
SUM(has_links) as links,
SUM(has_mentions) as mentions
FROM messages
WHERE date_unixtime BETWEEN ? AND ?
''', (start_ts, end_ts))
row = cursor.fetchone()
media_count = row[0] or 0
links_count = row[1] or 0
mentions_count = row[2] or 0
# Replies
cursor = conn.execute('''
SELECT COUNT(*) FROM messages
WHERE date_unixtime BETWEEN ? AND ?
AND reply_to_message_id IS NOT NULL
''', (start_ts, end_ts))
replies_count = cursor.fetchone()[0]
# Forwards
cursor = conn.execute('''
SELECT COUNT(*) FROM messages
WHERE date_unixtime BETWEEN ? AND ?
AND forwarded_from IS NOT NULL
''', (start_ts, end_ts))
forwards_count = cursor.fetchone()[0]
conn.close()
return jsonify({
'total_messages': total_messages,
'active_users': active_users,
'total_users': total_users,
'messages_per_day': round(messages_per_day, 1),
'users_per_day': round(users_per_day, 1),
'messages_per_user': round(total_messages / active_users, 1) if active_users else 0,
'media_count': media_count,
'links_count': links_count,
'mentions_count': mentions_count,
'replies_count': replies_count,
'forwards_count': forwards_count,
'days_span': days,
'first_message': first_msg,
'last_message': last_msg
})
# ==========================================
# API ENDPOINTS - CHARTS
# ==========================================
@app.route('/api/chart/messages')
def api_chart_messages():
"""Get message volume over time."""
timeframe = request.args.get('timeframe', 'month')
granularity = request.args.get('granularity', 'day') # hour, day, week
start_ts, end_ts = parse_timeframe(timeframe)
conn = get_db()
if granularity == 'hour':
format_str = '%Y-%m-%d %H:00'
elif granularity == 'week':
format_str = '%Y-W%W'
else: # day
format_str = '%Y-%m-%d'
cursor = conn.execute(f'''
SELECT
strftime('{format_str}', datetime(date_unixtime, 'unixepoch')) as period,
COUNT(*) as count
FROM messages
WHERE date_unixtime BETWEEN ? AND ?
GROUP BY period
ORDER BY period
''', (start_ts, end_ts))
data = [{'label': row[0], 'value': row[1]} for row in cursor.fetchall()]
conn.close()
return jsonify(data)
@app.route('/api/chart/users')
def api_chart_users():
"""Get active users over time."""
timeframe = request.args.get('timeframe', 'month')
granularity = request.args.get('granularity', 'day')
start_ts, end_ts = parse_timeframe(timeframe)
conn = get_db()
if granularity == 'hour':
format_str = '%Y-%m-%d %H:00'
elif granularity == 'week':
format_str = '%Y-W%W'
else:
format_str = '%Y-%m-%d'
cursor = conn.execute(f'''
SELECT
strftime('{format_str}', datetime(date_unixtime, 'unixepoch')) as period,
COUNT(DISTINCT from_id) as count
FROM messages
WHERE date_unixtime BETWEEN ? AND ?
GROUP BY period
ORDER BY period
''', (start_ts, end_ts))
data = [{'label': row[0], 'value': row[1]} for row in cursor.fetchall()]
conn.close()
return jsonify(data)
@app.route('/api/chart/heatmap')
def api_chart_heatmap():
"""Get activity heatmap (hour of day vs day of week)."""
timeframe = request.args.get('timeframe', 'all')
start_ts, end_ts = parse_timeframe(timeframe)
conn = get_db()
cursor = conn.execute('''
SELECT
CAST(strftime('%w', datetime(date_unixtime, 'unixepoch')) AS INTEGER) as dow,
CAST(strftime('%H', datetime(date_unixtime, 'unixepoch')) AS INTEGER) as hour,
COUNT(*) as count
FROM messages
WHERE date_unixtime BETWEEN ? AND ?
GROUP BY dow, hour
''', (start_ts, end_ts))
# Initialize grid
heatmap = [[0 for _ in range(24)] for _ in range(7)]
for row in cursor.fetchall():
dow, hour, count = row
heatmap[dow][hour] = count
conn.close()
return jsonify({
'data': heatmap,
'days': ['Sunday', 'Monday', 'Tuesday', 'Wednesday', 'Thursday', 'Friday', 'Saturday'],
'hours': list(range(24))
})
@app.route('/api/chart/daily')
def api_chart_daily():
"""Get activity by day of week."""
timeframe = request.args.get('timeframe', 'all')
start_ts, end_ts = parse_timeframe(timeframe)
conn = get_db()
days = ['Sunday', 'Monday', 'Tuesday', 'Wednesday', 'Thursday', 'Friday', 'Saturday']
cursor = conn.execute('''
SELECT
CAST(strftime('%w', datetime(date_unixtime, 'unixepoch')) AS INTEGER) as dow,
COUNT(*) as count
FROM messages
WHERE date_unixtime BETWEEN ? AND ?
GROUP BY dow
ORDER BY dow
''', (start_ts, end_ts))
data = {days[row[0]]: row[1] for row in cursor.fetchall()}
conn.close()
return jsonify([{'label': day, 'value': data.get(day, 0)} for day in days])
@app.route('/api/chart/hourly')
def api_chart_hourly():
"""Get activity by hour of day."""
timeframe = request.args.get('timeframe', 'all')
start_ts, end_ts = parse_timeframe(timeframe)
conn = get_db()
cursor = conn.execute('''
SELECT
CAST(strftime('%H', datetime(date_unixtime, 'unixepoch')) AS INTEGER) as hour,
COUNT(*) as count
FROM messages
WHERE date_unixtime BETWEEN ? AND ?
GROUP BY hour
ORDER BY hour
''', (start_ts, end_ts))
data = {row[0]: row[1] for row in cursor.fetchall()}
conn.close()
return jsonify([{'label': f'{h:02d}:00', 'value': data.get(h, 0)} for h in range(24)])
# ==========================================
# API ENDPOINTS - USERS
# ==========================================
@app.route('/api/users')
def api_users():
"""Get user leaderboard including participants who never sent messages."""
timeframe = request.args.get('timeframe', 'all')
start_ts, end_ts = parse_timeframe(timeframe)
limit = int(request.args.get('limit', 50))
offset = int(request.args.get('offset', 0))
include_inactive = request.args.get('include_inactive', '1') == '1'
conn = get_db()
# Get total messages for percentage calculation
cursor = conn.execute('''
SELECT COUNT(*) FROM messages
WHERE date_unixtime BETWEEN ? AND ?
''', (start_ts, end_ts))
total_messages = cursor.fetchone()[0]
# Get user stats from messages
cursor = conn.execute('''
SELECT
from_id,
from_name,
COUNT(*) as message_count,
SUM(LENGTH(text_plain)) as char_count,
SUM(has_links) as links,
SUM(has_media) as media,
MIN(date_unixtime) as first_seen,
MAX(date_unixtime) as last_seen,
COUNT(DISTINCT date(datetime(date_unixtime, 'unixepoch'))) as active_days
FROM messages
WHERE date_unixtime BETWEEN ? AND ?
AND from_id IS NOT NULL AND from_id != ''
GROUP BY from_id
ORDER BY message_count DESC
''', (start_ts, end_ts))
active_users = []
active_user_ids = set()
for row in cursor.fetchall():
active_user_ids.add(row['from_id'])
active_users.append({
'user_id': row['from_id'],
'name': row['from_name'] or 'Unknown',
'messages': row['message_count'],
'characters': row['char_count'] or 0,
'percentage': round(100 * row['message_count'] / total_messages, 2) if total_messages else 0,
'links': row['links'] or 0,
'media': row['media'] or 0,
'first_seen': row['first_seen'],
'last_seen': row['last_seen'],
'active_days': row['active_days'],
'daily_average': round(row['message_count'] / max(1, row['active_days']), 1),
'is_participant': False,
'role': None,
})
# Try to enrich with participant data and add inactive participants
has_participants = False
try:
cursor = conn.execute('SELECT COUNT(*) FROM participants')
has_participants = cursor.fetchone()[0] > 0
except Exception:
pass
if has_participants:
# Enrich active users with participant data
participant_map = {}
cursor = conn.execute('SELECT * FROM participants')
for row in cursor.fetchall():
participant_map[row['user_id']] = dict(row)
for user in active_users:
p = participant_map.get(user['user_id'])
if p:
user['is_participant'] = True
user['username'] = p.get('username', '')
if p.get('is_creator'):
user['role'] = 'creator'
elif p.get('is_admin'):
user['role'] = 'admin'
elif p.get('is_bot'):
user['role'] = 'bot'
# Add inactive participants (those who never sent messages)
if include_inactive:
for uid, p in participant_map.items():
if uid not in active_user_ids:
name = f"{p.get('first_name', '')} {p.get('last_name', '')}".strip()
role = None
if p.get('is_creator'):
role = 'creator'
elif p.get('is_admin'):
role = 'admin'
elif p.get('is_bot'):
role = 'bot'
active_users.append({
'user_id': uid,
'name': name or 'Unknown',
'messages': 0,
'characters': 0,
'percentage': 0,
'links': 0,
'media': 0,
'first_seen': None,
'last_seen': None,
'active_days': 0,
'daily_average': 0,
'is_participant': True,
'username': p.get('username', ''),
'role': role,
})
# Assign ranks (active users first, then inactive)
users_with_rank = []
for i, user in enumerate(active_users):
user['rank'] = i + 1 if user['messages'] > 0 else None
users_with_rank.append(user)
total_users = len(users_with_rank)
total_active = len(active_user_ids)
# Apply pagination
page_users = users_with_rank[offset:offset + limit]
conn.close()
return jsonify({
'users': page_users,
'total': total_users,
'total_active': total_active,
'total_participants': total_users - total_active if has_participants else 0,
'limit': limit,
'offset': offset
})
@app.route('/api/user/<user_id>')
def api_user_detail(user_id):
"""Get detailed stats for a specific user."""
timeframe = request.args.get('timeframe', 'all')
start_ts, end_ts = parse_timeframe(timeframe)
conn = get_db()
# Basic stats
cursor = conn.execute('''
SELECT
from_name,
COUNT(*) as messages,
SUM(LENGTH(text_plain)) as characters,
SUM(has_links) as links,
SUM(has_media) as media,
SUM(has_mentions) as mentions,
MIN(date_unixtime) as first_seen,
MAX(date_unixtime) as last_seen,
COUNT(DISTINCT date(datetime(date_unixtime, 'unixepoch'))) as active_days
FROM messages
WHERE from_id = ?
AND date_unixtime BETWEEN ? AND ?
''', (user_id, start_ts, end_ts))
row = cursor.fetchone()
if not row or not row['messages']:
conn.close()
return jsonify({'error': 'User not found'}), 404
# Replies sent
cursor = conn.execute('''
SELECT COUNT(*) FROM messages
WHERE from_id = ? AND reply_to_message_id IS NOT NULL
AND date_unixtime BETWEEN ? AND ?
''', (user_id, start_ts, end_ts))
replies_sent = cursor.fetchone()[0]
# Replies received
cursor = conn.execute('''
SELECT COUNT(*) FROM messages m1
JOIN messages m2 ON m1.reply_to_message_id = m2.id
WHERE m2.from_id = ?
AND m1.date_unixtime BETWEEN ? AND ?
''', (user_id, start_ts, end_ts))
replies_received = cursor.fetchone()[0]
# Activity by hour
cursor = conn.execute('''
SELECT
CAST(strftime('%H', datetime(date_unixtime, 'unixepoch')) AS INTEGER) as hour,
COUNT(*) as count
FROM messages
WHERE from_id = ?
AND date_unixtime BETWEEN ? AND ?
GROUP BY hour
''', (user_id, start_ts, end_ts))
hourly = {row[0]: row[1] for row in cursor.fetchall()}
# Activity over time
cursor = conn.execute('''
SELECT
date(datetime(date_unixtime, 'unixepoch')) as day,
COUNT(*) as count
FROM messages
WHERE from_id = ?
AND date_unixtime BETWEEN ? AND ?
GROUP BY day
ORDER BY day DESC
LIMIT 30
''', (user_id, start_ts, end_ts))
daily = [{'date': r[0], 'count': r[1]} for r in cursor.fetchall()]
# Rank
cursor = conn.execute('''
SELECT COUNT(*) + 1 FROM (
SELECT from_id, COUNT(*) as cnt FROM messages
WHERE date_unixtime BETWEEN ? AND ?
GROUP BY from_id
) WHERE cnt > ?
''', (start_ts, end_ts, row['messages']))
rank = cursor.fetchone()[0]
conn.close()
return jsonify({
'user_id': user_id,
'name': row['from_name'] or 'Unknown',
'messages': row['messages'],
'characters': row['characters'] or 0,
'links': row['links'] or 0,
'media': row['media'] or 0,
'mentions': row['mentions'] or 0,
'first_seen': row['first_seen'],
'last_seen': row['last_seen'],
'active_days': row['active_days'],
'daily_average': round(row['messages'] / max(1, row['active_days']), 1),
'replies_sent': replies_sent,
'replies_received': replies_received,
'rank': rank,
'hourly_activity': [hourly.get(h, 0) for h in range(24)],
'daily_activity': daily
})
@app.route('/api/user/<user_id>/profile')
def api_user_profile(user_id):
"""Get comprehensive user profile with all available statistics."""
conn = get_db()
# ---- Participant info (from Telethon sync) ----
participant = None
try:
cursor = conn.execute('SELECT * FROM participants WHERE user_id = ?', (user_id,))
row = cursor.fetchone()
if row:
participant = dict(row)
except Exception:
pass # Table might not exist yet
# ---- Basic message stats ----
cursor = conn.execute('''
SELECT
from_name,
COUNT(*) as total_messages,
SUM(text_length) as total_chars,
AVG(text_length) as avg_length,
MAX(text_length) as max_length,
SUM(has_links) as links_shared,
SUM(has_media) as media_sent,
SUM(has_photo) as photos_sent,
SUM(has_mentions) as mentions_made,
SUM(is_edited) as edits,
MIN(date_unixtime) as first_message,
MAX(date_unixtime) as last_message,
COUNT(DISTINCT date(datetime(date_unixtime, 'unixepoch'))) as active_days
FROM messages WHERE from_id = ?
''', (user_id,))
stats = cursor.fetchone()
if not stats or not stats['total_messages']:
# User might be a participant who never sent a message
if participant:
conn.close()
return jsonify({
'user_id': user_id,
'participant': participant,
'has_messages': False,
'name': f"{participant.get('first_name', '')} {participant.get('last_name', '')}".strip()
})
conn.close()
return jsonify({'error': 'User not found'}), 404
stats = dict(stats)
# ---- Replies sent (who does this user reply to most) ----
cursor = conn.execute('''
SELECT r.from_name, r.from_id, COUNT(*) as cnt
FROM messages m
JOIN messages r ON m.reply_to_message_id = r.id
WHERE m.from_id = ? AND r.from_id != ?
GROUP BY r.from_id
ORDER BY cnt DESC
LIMIT 10
''', (user_id, user_id))
replies_to = [{'name': r[0], 'user_id': r[1], 'count': r[2]} for r in cursor.fetchall()]
# ---- Replies received (who replies to this user most) ----
cursor = conn.execute('''
SELECT m.from_name, m.from_id, COUNT(*) as cnt
FROM messages m
JOIN messages r ON m.reply_to_message_id = r.id
WHERE r.from_id = ? AND m.from_id != ?
GROUP BY m.from_id
ORDER BY cnt DESC
LIMIT 10
''', (user_id, user_id))
replies_from = [{'name': r[0], 'user_id': r[1], 'count': r[2]} for r in cursor.fetchall()]
# ---- Total replies sent/received ----
cursor = conn.execute('''
SELECT COUNT(*) FROM messages
WHERE from_id = ? AND reply_to_message_id IS NOT NULL
''', (user_id,))
total_replies_sent = cursor.fetchone()[0]
cursor = conn.execute('''
SELECT COUNT(*) FROM messages m
JOIN messages r ON m.reply_to_message_id = r.id
WHERE r.from_id = ? AND m.from_id != ?
''', (user_id, user_id))
total_replies_received = cursor.fetchone()[0]
# ---- Forwarded messages ----
cursor = conn.execute('''
SELECT COUNT(*) FROM messages
WHERE from_id = ? AND forwarded_from IS NOT NULL
''', (user_id,))
forwards_sent = cursor.fetchone()[0]
# ---- Top forwarded sources ----
cursor = conn.execute('''
SELECT forwarded_from, COUNT(*) as cnt
FROM messages
WHERE from_id = ? AND forwarded_from IS NOT NULL
GROUP BY forwarded_from
ORDER BY cnt DESC
LIMIT 5
''', (user_id,))
top_forward_sources = [{'name': r[0], 'count': r[1]} for r in cursor.fetchall()]
# ---- Activity by hour ----
cursor = conn.execute('''
SELECT
CAST(strftime('%H', datetime(date_unixtime, 'unixepoch')) AS INTEGER) as hour,
COUNT(*) as count
FROM messages WHERE from_id = ?
GROUP BY hour
''', (user_id,))
hourly = {r[0]: r[1] for r in cursor.fetchall()}
# ---- Activity by weekday ----
cursor = conn.execute('''
SELECT
CAST(strftime('%w', datetime(date_unixtime, 'unixepoch')) AS INTEGER) as weekday,
COUNT(*) as count
FROM messages WHERE from_id = ?
GROUP BY weekday
''', (user_id,))
weekday_names = ['Sunday', 'Monday', 'Tuesday', 'Wednesday', 'Thursday', 'Friday', 'Saturday']
weekday_data = {r[0]: r[1] for r in cursor.fetchall()}
weekday_activity = [{'day': weekday_names[d], 'count': weekday_data.get(d, 0)} for d in range(7)]
# ---- Activity trend (last 90 days) ----
cursor = conn.execute('''
SELECT
date(datetime(date_unixtime, 'unixepoch')) as day,
COUNT(*) as count
FROM messages WHERE from_id = ?
GROUP BY day
ORDER BY day DESC
LIMIT 90
''', (user_id,))
daily_activity = [{'date': r[0], 'count': r[1]} for r in cursor.fetchall()]
# ---- Monthly trend ----
cursor = conn.execute('''
SELECT
strftime('%Y-%m', datetime(date_unixtime, 'unixepoch')) as month,
COUNT(*) as count
FROM messages WHERE from_id = ?
GROUP BY month
ORDER BY month
''', (user_id,))
monthly_activity = [{'month': r[0], 'count': r[1]} for r in cursor.fetchall()]
# ---- Top links shared ----
cursor = conn.execute('''
SELECT e.value, COUNT(*) as cnt
FROM entities e
JOIN messages m ON e.message_id = m.id
WHERE m.from_id = ? AND e.type = 'link'
GROUP BY e.value
ORDER BY cnt DESC
LIMIT 10
''', (user_id,))
top_links = [{'url': r[0], 'count': r[1]} for r in cursor.fetchall()]
# ---- Rank among all users ----
cursor = conn.execute('''
SELECT COUNT(*) + 1 FROM (
SELECT from_id, COUNT(*) as cnt FROM messages GROUP BY from_id
) WHERE cnt > ?
''', (stats['total_messages'],))
rank = cursor.fetchone()[0]
cursor = conn.execute('SELECT COUNT(DISTINCT from_id) FROM messages')
total_users = cursor.fetchone()[0]
# ---- Average reply time (when replying to someone) ----
cursor = conn.execute('''
SELECT AVG(m.date_unixtime - r.date_unixtime)
FROM messages m
JOIN messages r ON m.reply_to_message_id = r.id
WHERE m.from_id = ?
AND m.date_unixtime - r.date_unixtime > 0
AND m.date_unixtime - r.date_unixtime < 86400
''', (user_id,))
avg_reply_time = cursor.fetchone()[0]
conn.close()
# ---- Build response ----
total_msgs = stats['total_messages']
active_days = stats['active_days'] or 1
first_msg = stats['first_message']
last_msg = stats['last_message']
span_days = max(1, (last_msg - first_msg) / 86400) if first_msg and last_msg else 1
return jsonify({
'user_id': user_id,
'name': stats['from_name'] or 'Unknown',
'has_messages': True,
'participant': participant,
# Core stats
'total_messages': total_msgs,
'total_characters': stats['total_chars'] or 0,
'avg_message_length': round(stats['avg_length'] or 0, 1),
'max_message_length': stats['max_length'] or 0,
'links_shared': stats['links_shared'] or 0,
'media_sent': stats['media_sent'] or 0,
'photos_sent': stats['photos_sent'] or 0,
'mentions_made': stats['mentions_made'] or 0,
'edits': stats['edits'] or 0,
'forwards_sent': forwards_sent,
# Time stats
'first_message': first_msg,
'last_message': last_msg,
'active_days': active_days,
'daily_average': round(total_msgs / active_days, 1),
'messages_per_calendar_day': round(total_msgs / span_days, 1),
# Reply stats
'total_replies_sent': total_replies_sent,
'total_replies_received': total_replies_received,
'reply_ratio': round(total_replies_sent / max(1, total_msgs) * 100, 1),
'avg_reply_time_seconds': round(avg_reply_time) if avg_reply_time else None,
'replies_to': replies_to,
'replies_from': replies_from,
# Forward stats
'top_forward_sources': top_forward_sources,
# Ranking
'rank': rank,
'total_active_users': total_users,
# Activity patterns
'hourly_activity': [hourly.get(h, 0) for h in range(24)],
'weekday_activity': weekday_activity,
'daily_activity': daily_activity,
'monthly_activity': monthly_activity,
# Content
'top_links': top_links,
})
# ==========================================
# API ENDPOINTS - CONTENT ANALYTICS
# ==========================================
@app.route('/api/top/words')
def api_top_words():
"""Get top words."""
timeframe = request.args.get('timeframe', 'all')
start_ts, end_ts = parse_timeframe(timeframe)
limit = int(request.args.get('limit', 30))
conn = get_db()
cursor = conn.execute('''
SELECT text_plain FROM messages
WHERE date_unixtime BETWEEN ? AND ?
AND text_plain IS NOT NULL
''', (start_ts, end_ts))
import re
word_pattern = re.compile(r'[\u0590-\u05FFa-zA-Z]{3,}')
words = []
for row in cursor.fetchall():
words.extend(word_pattern.findall(row[0].lower()))
conn.close()
top_words = top_k_frequent(words, limit)
return jsonify([{'word': w, 'count': c} for w, c in top_words])
@app.route('/api/top/domains')
def api_top_domains():
"""Get top shared domains."""
timeframe = request.args.get('timeframe', 'all')
start_ts, end_ts = parse_timeframe(timeframe)
limit = int(request.args.get('limit', 20))
conn = get_db()
cursor = conn.execute('''
SELECT e.value FROM entities e
JOIN messages m ON e.message_id = m.id
WHERE e.type = 'link'
AND m.date_unixtime BETWEEN ? AND ?
''', (start_ts, end_ts))
import re
domain_pattern = re.compile(r'https?://(?:www\.)?([^/]+)')
domains = []
for row in cursor.fetchall():
match = domain_pattern.match(row[0])
if match:
domains.append(match.group(1))
conn.close()
top_domains = top_k_frequent(domains, limit)
return jsonify([{'domain': d, 'count': c} for d, c in top_domains])
@app.route('/api/top/mentions')
def api_top_mentions():
"""Get top mentioned users."""
timeframe = request.args.get('timeframe', 'all')
start_ts, end_ts = parse_timeframe(timeframe)
limit = int(request.args.get('limit', 20))
conn = get_db()
cursor = conn.execute('''
SELECT e.value, COUNT(*) as count FROM entities e
JOIN messages m ON e.message_id = m.id
WHERE e.type = 'mention'
AND m.date_unixtime BETWEEN ? AND ?
GROUP BY e.value
ORDER BY count DESC
LIMIT ?
''', (start_ts, end_ts, limit))
data = [{'mention': row[0], 'count': row[1]} for row in cursor.fetchall()]
conn.close()
return jsonify(data)
# ==========================================
# API ENDPOINTS - ADVANCED ANALYTICS (Course Algorithms)
# ==========================================
@app.route('/api/similar/<int:message_id>')
def api_similar_messages(message_id):
"""
Find messages similar to a given message using LCS algorithm.
Algorithm: LCS (Longest Common Subsequence)
Time: O(n * m) where n = sample size, m = avg message length
Use case: Detect reposts, spam, similar content
"""
threshold = float(request.args.get('threshold', 0.7))
limit = int(request.args.get('limit', 10))
sample_size = int(request.args.get('sample', 1000))
conn = get_db()
# Get the target message
cursor = conn.execute('''
SELECT text_plain, from_name, date FROM messages WHERE id = ?
''', (message_id,))
target = cursor.fetchone()
if not target or not target['text_plain']:
conn.close()
return jsonify({'error': 'Message not found or empty'}), 404
target_text = target['text_plain']
# Get sample of messages to compare (excluding the target)
cursor = conn.execute('''
SELECT id, text_plain, from_name, date FROM messages
WHERE id != ? AND text_plain IS NOT NULL AND LENGTH(text_plain) > 20
ORDER BY RANDOM()
LIMIT ?
''', (message_id, sample_size))
messages = [(row['id'], row['text_plain']) for row in cursor.fetchall()]
conn.close()
# Find similar messages using LCS
similar = []
for msg_id, text in messages:
sim = lcs_similarity(target_text, text)
if sim >= threshold:
similar.append({
'id': msg_id,
'similarity': round(sim * 100, 1),
'text': text[:200] + '...' if len(text) > 200 else text
})
# Sort by similarity descending and limit
similar.sort(key=lambda x: x['similarity'], reverse=True)
similar = similar[:limit]
return jsonify({
'target': {
'id': message_id,
'text': target_text[:200] + '...' if len(target_text) > 200 else target_text,
'from': target['from_name'],
'date': target['date']
},
'similar': similar,
'algorithm': 'LCS (Longest Common Subsequence)',
'threshold': threshold
})
@app.route('/api/analytics/similar')
def api_find_all_similar():
"""
Find all similar message pairs in the database.
Algorithm: LCS with early termination
Time: O(nΒ² * m) where n = sample size, m = avg message length
Use case: Detect spam campaigns, repeated content
"""
timeframe = request.args.get('timeframe', 'all')
threshold = float(request.args.get('threshold', 0.8))
sample_size = int(request.args.get('sample', 500))
start_ts, end_ts = parse_timeframe(timeframe)
conn = get_db()
cursor = conn.execute('''
SELECT id, text_plain, from_name, from_id FROM messages
WHERE date_unixtime BETWEEN ? AND ?
AND text_plain IS NOT NULL AND LENGTH(text_plain) > 30
ORDER BY RANDOM()
LIMIT ?
''', (start_ts, end_ts, sample_size))
messages = [(row['id'], row['text_plain'], row['from_name'], row['from_id'])
for row in cursor.fetchall()]
conn.close()
# Use our LCS algorithm to find similar pairs
message_pairs = [(id_, text) for id_, text, _, _ in messages]
similar_pairs = find_similar_messages(message_pairs, threshold=threshold, min_length=30)
# Build result with user info
id_to_info = {id_: (name, uid) for id_, _, name, uid in messages}
id_to_text = {id_: text for id_, text, _, _ in messages}
results = []
for id1, id2, sim in similar_pairs[:50]: # Limit to top 50
results.append({
'message1': {
'id': id1,
'text': id_to_text[id1][:150],
'from': id_to_info[id1][0]
},
'message2': {
'id': id2,
'text': id_to_text[id2][:150],
'from': id_to_info[id2][0]
},
'similarity': round(sim * 100, 1)
})
return jsonify({
'pairs': results,
'total_found': len(similar_pairs),
'algorithm': 'LCS (Longest Common Subsequence)',
'threshold': threshold,
'sample_size': sample_size
})
@app.route('/api/user/rank/<user_id>')
def api_user_rank_efficient(user_id):
"""
Get user rank using RankTree for O(log n) lookup.
Algorithm: Order Statistics Tree (AVL-based Rank Tree)
Time: O(log n) instead of O(n) SQL scan
Use case: Real-time user ranking queries
"""
timeframe = request.args.get('timeframe', 'all')
tree = get_user_rank_tree(timeframe)
# Find user in tree by iterating (still O(n) for lookup, but rank is O(log n))
# For true O(log n), we'd need to store user_id as key
start_ts, end_ts = parse_timeframe(timeframe)
conn = get_db()
cursor = conn.execute('''
SELECT COUNT(*) as count FROM messages
WHERE from_id = ? AND date_unixtime BETWEEN ? AND ?
''', (user_id, start_ts, end_ts))
user_count = cursor.fetchone()['count']
if user_count == 0:
conn.close()
return jsonify({'error': 'User not found'}), 404
# Use rank tree to find rank (O(log n))
rank = tree.rank(-user_count) # Negative because tree uses negative counts
# Get total users
total = len(tree)
conn.close()
return jsonify({
'user_id': user_id,
'messages': user_count,
'rank': rank,
'total_users': total,
'percentile': round(100 * (total - rank + 1) / total, 1) if total > 0 else 0,
'algorithm': 'RankTree (Order Statistics Tree)',
'complexity': 'O(log n)'
})
@app.route('/api/user/by-rank/<int:rank>')
def api_user_by_rank(rank):
"""
Get user at specific rank using RankTree.
Algorithm: Order Statistics Tree select(k)
Time: O(log n)
Use case: "Who is the 10th most active user?"
"""
timeframe = request.args.get('timeframe', 'all')
tree = get_user_rank_tree(timeframe)
if rank < 1 or rank > len(tree):
return jsonify({'error': f'Rank must be between 1 and {len(tree)}'}), 400
user = tree.select(rank)
if not user:
return jsonify({'error': 'User not found'}), 404
return jsonify({
'rank': rank,
'user': user,
'total_users': len(tree),
'algorithm': 'RankTree select(k)',
'complexity': 'O(log n)'
})
@app.route('/api/analytics/histogram')
def api_activity_histogram():
"""
Get activity histogram using Bucket Sort.
Algorithm: Bucket Sort
Time: O(n + k) where k = number of buckets
Use case: Efficient time-based grouping without SQL GROUP BY
"""
timeframe = request.args.get('timeframe', 'month')
bucket_seconds = int(request.args.get('bucket', 86400)) # Default: 1 day
start_ts, end_ts = parse_timeframe(timeframe)
conn = get_db()
cursor = conn.execute('''
SELECT date_unixtime FROM messages
WHERE date_unixtime BETWEEN ? AND ?
''', (start_ts, end_ts))
records = [{'date_unixtime': row[0]} for row in cursor.fetchall()]
conn.close()
# Use bucket sort algorithm
histogram = time_histogram(records, 'date_unixtime', bucket_size=bucket_seconds)
# Format for frontend
from datetime import datetime
result = []
for bucket_time, count in histogram:
result.append({
'timestamp': bucket_time,
'date': datetime.fromtimestamp(bucket_time).strftime('%Y-%m-%d %H:%M'),
'count': count
})
return jsonify({
'histogram': result,
'bucket_size_seconds': bucket_seconds,
'total_records': len(records),
'algorithm': 'Bucket Sort',
'complexity': 'O(n + k)'
})
@app.route('/api/analytics/percentiles')
def api_message_percentiles():
"""
Get message length percentiles using Selection Algorithm.
Algorithm: Quickselect with Median of Medians
Time: O(n) guaranteed
Use case: Analyze message length distribution without sorting
"""
timeframe = request.args.get('timeframe', 'all')
start_ts, end_ts = parse_timeframe(timeframe)
conn = get_db()
cursor = conn.execute('''
SELECT LENGTH(text_plain) as length FROM messages
WHERE date_unixtime BETWEEN ? AND ?
AND text_plain IS NOT NULL
''', (start_ts, end_ts))
lengths = [row[0] for row in cursor.fetchall() if row[0]]
conn.close()
if not lengths:
return jsonify({'error': 'No messages found'}), 404
# Use our O(n) selection algorithm
result = {
'count': len(lengths),
'min': min(lengths),
'max': max(lengths),
'median': find_median(lengths),
'p25': find_percentile(lengths, 25),
'p75': find_percentile(lengths, 75),
'p90': find_percentile(lengths, 90),
'p95': find_percentile(lengths, 95),
'p99': find_percentile(lengths, 99),
'algorithm': 'Quickselect with Median of Medians',
'complexity': 'O(n) guaranteed'
}
return jsonify(result)
# ==========================================
# API ENDPOINTS - SEARCH
# ==========================================
@app.route('/api/search')
def api_search():
"""Search messages."""
query = request.args.get('q', '')
timeframe = request.args.get('timeframe', 'all')
start_ts, end_ts = parse_timeframe(timeframe)
limit = int(request.args.get('limit', 50))
offset = int(request.args.get('offset', 0))
if not query:
return jsonify({'results': [], 'total': 0})
conn = get_db()
cursor = conn.execute('''
SELECT
m.id,
m.date,
m.from_name,
m.from_id,
m.text_plain,
m.has_links,
m.has_media
FROM messages_fts
JOIN messages m ON messages_fts.rowid = m.id
WHERE messages_fts MATCH ?
AND m.date_unixtime BETWEEN ? AND ?
ORDER BY m.date_unixtime DESC
LIMIT ? OFFSET ?
''', (query, start_ts, end_ts, limit, offset))
results = [{
'id': row['id'],
'date': row['date'],
'from_name': row['from_name'],
'from_id': row['from_id'],
'text': row['text_plain'][:300] if row['text_plain'] else '',
'has_links': bool(row['has_links']),
'has_media': bool(row['has_media'])
} for row in cursor.fetchall()]
conn.close()
return jsonify({
'results': results,
'query': query,
'limit': limit,
'offset': offset
})
# ==========================================
# API ENDPOINTS - CHAT VIEW
# ==========================================
@app.route('/api/chat/messages')
def api_chat_messages():
"""Get messages for chat view with filters."""
offset = int(request.args.get('offset', 0))
limit = int(request.args.get('limit', 50))
user_id = request.args.get('user_id')
search = request.args.get('search')
date_from = request.args.get('date_from')
date_to = request.args.get('date_to')
has_media = request.args.get('has_media')
has_link = request.args.get('has_link')
conn = get_db()
# Build query
conditions = ["1=1"]
params = []
if user_id:
conditions.append("m.from_id = ?")
params.append(user_id)
if date_from:
conditions.append("m.date >= ?")
params.append(date_from)
if date_to:
conditions.append("m.date <= ?")
params.append(date_to)
if has_media == '1':
conditions.append("m.has_media = 1")
elif has_media == '0':
conditions.append("m.has_media = 0")
if has_link == '1':
conditions.append("m.has_links = 1")
# Handle FTS search
if search:
conditions.append("""m.id IN (
SELECT rowid FROM messages_fts WHERE messages_fts MATCH ?
)""")
params.append(search)
where_clause = " AND ".join(conditions)
# Get total count
cursor = conn.execute(f"SELECT COUNT(*) FROM messages m WHERE {where_clause}", params)
total = cursor.fetchone()[0]
# Get messages with reply info
query = f"""
SELECT
m.id,
m.id as message_id,
m.date,
m.from_id,
m.from_name,
m.text_plain as text,
m.reply_to_message_id,
m.forwarded_from,
m.forwarded_from_id,
m.has_media,
m.has_photo,
m.has_links as has_link,
m.has_mentions,
m.is_edited,
r.from_name as reply_to_name,
substr(r.text_plain, 1, 100) as reply_to_text
FROM messages m
LEFT JOIN messages r ON m.reply_to_message_id = r.id
WHERE {where_clause}
ORDER BY m.date ASC
LIMIT ? OFFSET ?
"""
params.extend([limit, offset])
cursor = conn.execute(query, params)
messages = [dict(row) for row in cursor.fetchall()]
# Fetch entities (links, mentions) for these messages
if messages:
msg_ids = [m['id'] for m in messages]
placeholders = ','.join('?' * len(msg_ids))
ent_cursor = conn.execute(f"""
SELECT message_id, type, value
FROM entities
WHERE message_id IN ({placeholders})
""", msg_ids)
# Group entities by message_id
entities_map = {}
for row in ent_cursor.fetchall():
mid = row[0]
if mid not in entities_map:
entities_map[mid] = []
entities_map[mid].append({'type': row[1], 'value': row[2]})
# Attach entities to messages
for msg in messages:
msg['entities'] = entities_map.get(msg['id'], [])
conn.close()
return jsonify({
'messages': messages,
'total': total,
'offset': offset,
'limit': limit,
'has_more': offset + limit < total
})
@app.route('/api/chat/thread/<int:message_id>')
def api_chat_thread(message_id):
"""Get conversation thread for a message."""
conn = get_db()
thread = []
visited = set()
def get_parent(msg_id):
"""Recursively get parent messages."""
if msg_id in visited:
return
visited.add(msg_id)
cursor = conn.execute("""
SELECT id as message_id, date, from_name, text_plain as text, reply_to_message_id
FROM messages WHERE id = ?
""", (msg_id,))
row = cursor.fetchone()
if row:
if row['reply_to_message_id']:
get_parent(row['reply_to_message_id'])
thread.append(dict(row))
def get_children(msg_id):
"""Get all replies to a message."""
cursor = conn.execute("""
SELECT id as message_id, date, from_name, text_plain as text, reply_to_message_id
FROM messages WHERE reply_to_message_id = ?
ORDER BY date
""", (msg_id,))
for row in cursor.fetchall():
if row['message_id'] not in visited:
visited.add(row['message_id'])
thread.append(dict(row))
get_children(row['message_id'])
# Get the original message and its parents
get_parent(message_id)
# Get all replies
get_children(message_id)
conn.close()
# Sort by date
thread.sort(key=lambda x: x['date'])
return jsonify(thread)
@app.route('/api/chat/context/<int:message_id>')
def api_chat_context(message_id):
"""Get messages around a specific message."""
before = int(request.args.get('before', 20))
after = int(request.args.get('after', 20))
conn = get_db()
# Get target message date
cursor = conn.execute("SELECT date FROM messages WHERE id = ?", (message_id,))
row = cursor.fetchone()
if not row:
conn.close()
return jsonify({'messages': [], 'target_id': message_id})
target_date = row['date']
# Get messages before
cursor = conn.execute("""
SELECT id as message_id, date, from_id, from_name, text_plain as text,
reply_to_message_id, has_media, has_links as has_link
FROM messages
WHERE date < ?
ORDER BY date DESC
LIMIT ?
""", (target_date, before))
before_msgs = list(reversed([dict(row) for row in cursor.fetchall()]))
# Get target message
cursor = conn.execute("""
SELECT id as message_id, date, from_id, from_name, text_plain as text,
reply_to_message_id, has_media, has_links as has_link
FROM messages
WHERE id = ?
""", (message_id,))
target_msg = dict(cursor.fetchone())
# Get messages after
cursor = conn.execute("""
SELECT id as message_id, date, from_id, from_name, text_plain as text,
reply_to_message_id, has_media, has_links as has_link
FROM messages
WHERE date > ?
ORDER BY date ASC
LIMIT ?
""", (target_date, after))
after_msgs = [dict(row) for row in cursor.fetchall()]
conn.close()
return jsonify({
'messages': before_msgs + [target_msg] + after_msgs,
'target_id': message_id
})
# ==========================================
# API ENDPOINTS - AI SEARCH
# ==========================================
# Global AI engine (lazy loaded)
_ai_engine = None
_ai_engine_init_attempted = False
def get_ai_engine():
"""Get or create AI search engine."""
global _ai_engine, _ai_engine_init_attempted
if _ai_engine is not None:
return _ai_engine
if _ai_engine_init_attempted:
return None # Already tried and failed
_ai_engine_init_attempted = True
try:
from ai_search import AISearchEngine
import os
provider = os.getenv('AI_PROVIDER', 'ollama')
# Get API key - check both generic and provider-specific env vars
api_key = os.getenv('AI_API_KEY') or os.getenv(f'{provider.upper()}_API_KEY')
print(f"Initializing AI engine with provider: {provider}")
_ai_engine = AISearchEngine(DB_PATH, provider, api_key)
print(f"AI engine initialized successfully")
return _ai_engine
except Exception as e:
print(f"AI Search not available: {e}")
import traceback
traceback.print_exc()
return None
@app.route('/api/ai/status')
def api_ai_status():
"""Get AI engine status for debugging."""
provider = os.getenv('AI_PROVIDER', 'ollama')
api_key = os.getenv('AI_API_KEY') or os.getenv(f'{provider.upper()}_API_KEY')
status = {
'provider': provider,
'api_key_set': bool(api_key),
'api_key_preview': f"{api_key[:8]}..." if api_key and len(api_key) > 8 else None,
'ai_engine_initialized': _ai_engine is not None,
'init_attempted': _ai_engine_init_attempted,
'semantic_search_available': HAS_SEMANTIC_SEARCH,
}
# Check if we can initialize now
if _ai_engine is None and not _ai_engine_init_attempted:
engine = get_ai_engine()
status['ai_engine_initialized'] = engine is not None
# Check for embeddings
if HAS_SEMANTIC_SEARCH:
try:
ss = get_semantic_search()
status['embeddings_available'] = ss.is_available()
status['embeddings_stats'] = ss.stats()
except Exception as e:
status['embeddings_error'] = str(e)
return jsonify(status)
@app.route('/api/ai/reset')
def api_ai_reset():
"""Reset AI engine to allow re-initialization."""
global _ai_engine, _ai_engine_init_attempted
_ai_engine = None
_ai_engine_init_attempted = False
return jsonify({'status': 'reset', 'message': 'AI engine will be reinitialized on next request'})
@app.route('/api/cache/invalidate')
def api_cache_invalidate():
"""Invalidate all caches. Call after DB updates (daily sync, import, etc.)."""
invalidate_caches()
return jsonify({'status': 'invalidated', 'new_version': _cache_version})
@app.route('/api/embeddings/reload')
def api_embeddings_reload():
"""Reload embeddings from DB (call after daily sync adds new embeddings)."""
if not HAS_SEMANTIC_SEARCH:
return jsonify({'error': 'Semantic search not available'})
try:
ss = get_semantic_search()
old_count = len(ss.message_ids) if ss.embeddings_loaded else 0
ss.reload_embeddings()
new_count = len(ss.message_ids)
return jsonify({
'status': 'reloaded',
'previous_count': old_count,
'new_count': new_count,
'added': new_count - old_count
})
except Exception as e:
return jsonify({'error': str(e)})
@app.route('/api/ai/search', methods=['POST'])
def api_ai_search():
"""AI-powered natural language search."""
data = request.get_json()
query = data.get('query', '')
mode = data.get('mode', 'auto') # 'auto', 'sql', 'context', or 'semantic'
if not query:
return jsonify({'error': 'Query required'})
# Semantic mode: Use pre-computed embeddings + AI reasoning
if mode == 'semantic':
if not HAS_SEMANTIC_SEARCH:
return jsonify({'error': 'Semantic search not available. Install sentence-transformers.'})
try:
ss = get_semantic_search()
if not ss.is_available():
return jsonify({'error': 'embeddings.db not found. Run the Colab notebook first.'})
# Get AI engine for reasoning
ai_engine = get_ai_engine()
if ai_engine:
# Semantic search + AI reasoning
result = ss.search_with_ai_answer(query, ai_engine, limit=30)
return jsonify(result)
else:
# Just semantic search without AI reasoning
results = ss.search_with_full_text(query, limit=30)
provider = os.getenv('AI_PROVIDER', 'ollama')
api_key_set = bool(os.getenv('AI_API_KEY') or os.getenv(f'{provider.upper()}_API_KEY'))
return jsonify({
'query': query,
'mode': 'semantic',
'results': results,
'count': len(results),
'answer': f"Χ ΧžΧ¦ΧΧ• {len(results)} Χ”Χ•Χ“Χ’Χ•Χͺ Χ“Χ•ΧžΧ•Χͺ Χ‘ΧžΧ Χ˜Χ™Χͺ ΧœΧ©ΧΧ™ΧœΧͺΧ”.\n\n⚠️ AI לא Χ–ΧžΧ™ΧŸ - Χ‘Χ“Χ•Χ§ Χ©Χ”-API key ΧžΧ•Χ’Χ“Χ¨ (provider: {provider}, key set: {api_key_set})"
})
except Exception as e:
return jsonify({'error': f'Semantic search error: {str(e)}'})
engine = get_ai_engine()
if engine is None:
# Fallback: Use basic SQL search
return fallback_ai_search(query)
try:
# Context mode: AI reads messages and reasons over them
if mode == 'context':
result = engine.context_search(query)
# SQL mode: Generate SQL and execute
elif mode == 'sql':
result = engine.search(query, generate_answer=True)
# Auto mode: Try SQL first, fall back to context if no results
else:
result = engine.search(query, generate_answer=True)
# If no results or error, try context search
if result.get('count', 0) == 0 or 'error' in result:
result = engine.context_search(query)
return jsonify(result)
except Exception as e:
return jsonify({'error': str(e), 'query': query})
@app.route('/api/hybrid/search', methods=['POST'])
def api_hybrid_search():
"""
Hybrid search combining:
- Chunk-based vector search (conversation context)
- BM25 keyword search (exact matches)
- Query expansion (synonyms, variations)
"""
data = request.get_json()
query = data.get('query', '')
limit = data.get('limit', 20)
include_context = data.get('include_context', True)
if not query:
return jsonify({'error': 'Query required'})
try:
from hybrid_search import get_hybrid_search
hs = get_hybrid_search()
# Get stats
stats = hs.stats()
if not stats.get('chunks_available') and not stats.get('single_embeddings_available'):
return jsonify({
'error': 'No search indexes available. Run the Colab notebook first.',
'stats': stats
})
# Search with or without context
if include_context:
results = hs.search_with_context(query, limit=limit)
else:
results = hs.hybrid_search(query, limit=limit)
# Get expanded queries for display
expanded = hs.expand_query(query)
return jsonify({
'query': query,
'expanded_queries': expanded,
'results': results,
'count': len(results),
'stats': stats,
'mode': 'hybrid'
})
except ImportError as e:
return jsonify({'error': f'Hybrid search not available: {str(e)}'})
except Exception as e:
import traceback
return jsonify({
'error': str(e),
'traceback': traceback.format_exc()
})
@app.route('/api/gemini/search', methods=['POST'])
def api_gemini_search():
"""
AI-powered search using Gemini 1.5 Flash.
Combines hybrid search with Gemini for natural language answers.
"""
data = request.get_json()
query = data.get('query', '')
limit = data.get('limit', 5)
if not query:
return jsonify({'error': 'Query required'})
try:
from gemini_client import ai_search, get_gemini_client
# Check if Gemini is available
client = get_gemini_client()
if not client.is_available():
# Fall back to hybrid search without AI
from hybrid_search import get_hybrid_search
hs = get_hybrid_search()
results = hs.search_with_context(query, limit=limit)
return jsonify({
'query': query,
'success': False,
'error': 'Gemini API not available. Set GEMINI_API_KEY environment variable.',
'search_results': results,
'count': len(results),
'mode': 'hybrid_only'
})
# Perform AI search
result = ai_search(query, limit=limit)
return jsonify(result)
except ImportError as e:
return jsonify({'error': f'AI search not available: {str(e)}'})
except Exception as e:
import traceback
return jsonify({
'error': str(e),
'traceback': traceback.format_exc()
})
@app.route('/api/gemini/status')
def api_gemini_status():
"""Check Gemini API status."""
try:
from gemini_client import get_gemini_client
client = get_gemini_client()
api_key = os.environ.get('GEMINI_API_KEY', '')
return jsonify({
'available': client.is_available(),
'api_key_set': bool(api_key),
'api_key_preview': f"{api_key[:8]}..." if len(api_key) > 8 else None
})
except Exception as e:
return jsonify({
'available': False,
'error': str(e)
})
@app.route('/api/hybrid/status')
def api_hybrid_status():
"""Check hybrid search cache status."""
try:
from hybrid_search import get_hybrid_search
hs = get_hybrid_search()
# Check what's loaded
chunk_loaded = hs.chunk_embeddings is not None
bm25_loaded = hs.bm25 is not None
model_loaded = hs.model is not None
# Count chunks
chunk_count = len(hs.chunk_embeddings) if chunk_loaded else 0
return jsonify({
'chunk_embeddings_loaded': chunk_loaded,
'bm25_loaded': bm25_loaded,
'model_loaded': model_loaded,
'chunk_count': chunk_count,
'ready': chunk_loaded or bm25_loaded
})
except Exception as e:
return jsonify({
'ready': False,
'error': str(e)
})
# ==========================================
# API ENDPOINTS - STYLOMETRY (Duplicate Detection)
# ==========================================
# Global stylometry state
_stylometry_status = {'status': 'idle', 'progress': 0, 'message': '', 'results': None}
@app.route('/api/stylometry/analyze', methods=['POST'])
def api_stylometry_analyze():
"""Start stylometry analysis to detect duplicate accounts."""
import threading
data = request.get_json() or {}
min_messages = data.get('min_messages', 300)
days = data.get('days', 365)
threshold = data.get('threshold', 0.85)
global _stylometry_status
_stylometry_status = {'status': 'running', 'progress': 0, 'message': 'מΧͺΧ—Χ™Χœ Χ Χ™ΧͺΧ•Χ—...', 'results': None}
def run_analysis():
global _stylometry_status
try:
from stylometry import get_stylometry_analyzer
analyzer = get_stylometry_analyzer()
analyzer.similarity_threshold = threshold
def progress_callback(event, *args):
global _stylometry_status
if event == 'users_found':
_stylometry_status['message'] = f'Χ ΧžΧ¦ΧΧ• {args[0]} משΧͺΧžΧ©Χ™Χ ΧœΧ Χ™ΧͺΧ•Χ—'
_stylometry_status['progress'] = 5
elif event == 'user_processed':
current, total, name = args
pct = 5 + int(70 * current / total)
_stylometry_status['progress'] = pct
_stylometry_status['message'] = f'ΧžΧ’Χ‘Χ“ {current}/{total}: {name}'
elif event == 'comparing':
current = args[0]
total = args[1] if len(args) > 1 else 1
pct = 75 + int(25 * current / max(1, total))
_stylometry_status['progress'] = min(99, pct)
_stylometry_status['message'] = 'ΧžΧ©Χ•Χ•Χ” Χ“Χ€Χ•Χ‘Χ™ Χ›ΧͺΧ™Χ‘Χ”...'
results = analyzer.analyze_all_users(
min_messages=min_messages,
days=days,
progress_callback=progress_callback
)
_stylometry_status = {
'status': 'completed',
'progress': 100,
'message': 'Χ”Χ Χ™ΧͺΧ•Χ— Χ”Χ•Χ©ΧœΧ',
'results': results
}
except Exception as e:
import traceback
_stylometry_status = {
'status': 'error',
'progress': 0,
'message': str(e),
'error': traceback.format_exc(),
'results': None
}
# Run in background thread
thread = threading.Thread(target=run_analysis)
thread.start()
return jsonify({'status': 'started'})
@app.route('/api/stylometry/status')
def api_stylometry_status():
"""Get stylometry analysis status."""
return jsonify(_stylometry_status)
def fallback_ai_search(query: str):
"""Fallback search when AI is not available."""
conn = get_db()
# Simple keyword extraction and search
keywords = [w for w in query.split() if len(w) > 2]
if not keywords:
return jsonify({'error': 'No valid keywords', 'query': query})
# Build FTS query
fts_query = ' OR '.join(keywords)
try:
cursor = conn.execute('''
SELECT
m.id as message_id, m.date, m.from_name, m.text_plain as text
FROM messages_fts
JOIN messages m ON messages_fts.rowid = m.id
WHERE messages_fts MATCH ?
ORDER BY m.date DESC
LIMIT 20
''', (fts_query,))
results = [dict(row) for row in cursor.fetchall()]
conn.close()
# Generate simple answer
if results:
answer = f"Χ ΧžΧ¦ΧΧ• {len(results)} Χ”Χ•Χ“Χ’Χ•Χͺ גם Χ”ΧžΧ™ΧœΧ™Χ: {', '.join(keywords)}"
else:
answer = f"לא Χ ΧžΧ¦ΧΧ• Χ”Χ•Χ“Χ’Χ•Χͺ גם Χ”ΧžΧ™ΧœΧ™Χ: {', '.join(keywords)}"
return jsonify({
'query': query,
'sql': f"FTS MATCH: {fts_query}",
'results': results,
'count': len(results),
'answer': answer,
'fallback': True
})
except Exception as e:
conn.close()
return jsonify({'error': str(e), 'query': query})
@app.route('/api/ai/thread/<int:message_id>')
def api_ai_thread(message_id):
"""Get full thread using AI-powered analysis."""
engine = get_ai_engine()
if engine is None:
# Use basic thread retrieval
return api_chat_thread(message_id)
try:
thread = engine.get_thread(message_id)
return jsonify(thread)
except Exception as e:
return jsonify({'error': str(e)})
@app.route('/api/ai/similar/<int:message_id>')
def api_ai_similar(message_id):
"""Find similar messages."""
limit = int(request.args.get('limit', 10))
engine = get_ai_engine()
if engine is None:
return jsonify({'error': 'AI not available'})
try:
similar = engine.find_similar_messages(message_id, limit)
return jsonify(similar)
except Exception as e:
return jsonify({'error': str(e)})
# ==========================================
# API ENDPOINTS - DATABASE UPDATE
# ==========================================
@app.route('/api/update', methods=['POST'])
def api_update_database():
"""
Update database with new JSON data.
Disabled in production - updates are done locally via daily_sync.py.
"""
return jsonify({'error': 'Database updates are disabled on this server. Run daily_sync.py locally.'}), 403
try:
# Check if file was uploaded
if 'file' in request.files:
file = request.files['file']
if file.filename == '':
return jsonify({'error': 'No file selected'}), 400
# Read and parse JSON
try:
json_data = json.loads(file.read().decode('utf-8'))
except json.JSONDecodeError as e:
return jsonify({'error': f'Invalid JSON: {str(e)}'}), 400
else:
# Try to get JSON from request body
json_data = request.get_json()
if not json_data:
return jsonify({'error': 'No JSON data provided'}), 400
# Import and use IncrementalIndexer
from indexer import IncrementalIndexer
indexer = IncrementalIndexer(DB_PATH)
try:
stats = indexer.update_from_json_data(json_data, show_progress=False)
finally:
indexer.close()
return jsonify({
'success': True,
'stats': {
'total_in_file': stats['total_in_file'],
'new_messages': stats['new_messages'],
'duplicates': stats['duplicates'],
'entities': stats['entities'],
'elapsed_seconds': round(stats['elapsed_seconds'], 2)
}
})
except FileNotFoundError as e:
return jsonify({'error': str(e)}), 404
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/api/db/stats')
def api_db_stats():
"""Get database statistics."""
conn = get_db()
stats = {}
# Total messages
cursor = conn.execute('SELECT COUNT(*) FROM messages')
stats['total_messages'] = cursor.fetchone()[0]
# Total users
cursor = conn.execute('SELECT COUNT(DISTINCT from_id) FROM messages WHERE from_id IS NOT NULL')
stats['total_users'] = cursor.fetchone()[0]
# Date range
cursor = conn.execute('SELECT MIN(date), MAX(date) FROM messages')
row = cursor.fetchone()
stats['first_message'] = row[0]
stats['last_message'] = row[1]
# Database file size
import os
if os.path.exists(DB_PATH):
stats['db_size_mb'] = round(os.path.getsize(DB_PATH) / (1024 * 1024), 2)
conn.close()
return jsonify(stats)
# ==========================================
# API ENDPOINTS - EXPORT
# ==========================================
@app.route('/api/export/users')
def api_export_users():
"""Export user data as CSV."""
timeframe = request.args.get('timeframe', 'all')
start_ts, end_ts = parse_timeframe(timeframe)
conn = get_db()
cursor = conn.execute('''
SELECT
from_id,
from_name,
COUNT(*) as message_count,
SUM(LENGTH(text_plain)) as char_count,
SUM(has_links) as links,
SUM(has_media) as media,
MIN(date_unixtime) as first_seen,
MAX(date_unixtime) as last_seen
FROM messages
WHERE date_unixtime BETWEEN ? AND ?
AND from_id IS NOT NULL
GROUP BY from_id
ORDER BY message_count DESC
''', (start_ts, end_ts))
output = io.StringIO()
writer = csv.writer(output)
writer.writerow(['User ID', 'Name', 'Messages', 'Characters', 'Links', 'Media', 'First Seen', 'Last Seen'])
for row in cursor.fetchall():
writer.writerow([
row['from_id'],
row['from_name'],
row['message_count'],
row['char_count'] or 0,
row['links'] or 0,
row['media'] or 0,
datetime.fromtimestamp(row['first_seen']).isoformat() if row['first_seen'] else '',
datetime.fromtimestamp(row['last_seen']).isoformat() if row['last_seen'] else ''
])
conn.close()
output.seek(0)
return Response(
output.getvalue(),
mimetype='text/csv',
headers={'Content-Disposition': 'attachment; filename=users_export.csv'}
)
@app.route('/api/export/messages')
def api_export_messages():
"""Export messages as CSV."""
timeframe = request.args.get('timeframe', 'all')
start_ts, end_ts = parse_timeframe(timeframe)
limit = int(request.args.get('limit', 10000))
conn = get_db()
cursor = conn.execute('''
SELECT
id, date, from_id, from_name, text_plain,
has_links, has_media, has_mentions,
reply_to_message_id
FROM messages
WHERE date_unixtime BETWEEN ? AND ?
ORDER BY date_unixtime DESC
LIMIT ?
''', (start_ts, end_ts, limit))
output = io.StringIO()
writer = csv.writer(output)
writer.writerow(['ID', 'Date', 'User ID', 'User Name', 'Text', 'Has Links', 'Has Media', 'Has Mentions', 'Reply To'])
for row in cursor.fetchall():
writer.writerow([
row['id'],
row['date'],
row['from_id'],
row['from_name'],
row['text_plain'][:500] if row['text_plain'] else '',
row['has_links'],
row['has_media'],
row['has_mentions'],
row['reply_to_message_id']
])
conn.close()
output.seek(0)
return Response(
output.getvalue(),
mimetype='text/csv',
headers={'Content-Disposition': 'attachment; filename=messages_export.csv'}
)
# ==========================================
# MAIN
# ==========================================
def main():
import argparse
parser = argparse.ArgumentParser(description='Telegram Analytics Dashboard')
parser.add_argument('--db', default=os.environ.get('DB_PATH', 'telegram.db'), help='Database path')
parser.add_argument('--port', type=int, default=int(os.environ.get('PORT', 5000)), help='Server port')
parser.add_argument('--host', default=os.environ.get('HOST', '127.0.0.1'), help='Server host')
parser.add_argument('--debug', action='store_true', help='Debug mode')
args = parser.parse_args()
# Download DB from HF Dataset if not present
ensure_db_exists()
global DB_PATH
DB_PATH = args.db
print(f"""
╔══════════════════════════════════════════════════════════════╗
β•‘ TELEGRAM ANALYTICS DASHBOARD β•‘
╠══════════════════════════════════════════════════════════════╣
β•‘ Database: {args.db:47} β•‘
β•‘ Server: http://{args.host}:{args.port:<37} β•‘
β•šβ•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•
""")
app.run(host=args.host, port=args.port, debug=args.debug)
if __name__ == '__main__':
main()