master-brain-api / server.py
Dilip8756's picture
Upload 101 files
cc442b8 verified
from flask import (
Flask,
request,
jsonify,
render_template,
send_from_directory,
session,
redirect,
url_for,
)
from flask_sqlalchemy import SQLAlchemy
from flask_login import (
LoginManager,
login_user,
logout_user,
login_required,
current_user,
)
from flask_bcrypt import Bcrypt
from flask_cors import CORS
from datetime import datetime, timedelta
import os
import json
import uuid
import urllib.request
import http.cookiejar
import ssl
import random
from database import (
db,
User,
Transaction,
WalletLedger,
AadhaarHistory,
AadhaarDownloadHistory,
SupportTicket,
TicketMessage,
)
app = Flask(__name__)
app.config["SECRET_KEY"] = "aadhaar-secret-key-12345"
# Ensure instance folder exists for professional SQLite handling
if not os.path.exists(app.instance_path):
os.makedirs(app.instance_path)
db_path = os.path.join(app.instance_path, "aadhaar_portal.db")
app.config["SQLALCHEMY_DATABASE_URI"] = f"sqlite:///{db_path}"
app.config["SQLALCHEMY_TRACK_MODIFICATIONS"] = False
app.config["UPLOAD_FOLDER"] = "uploads"
# Global Feature Flags
MAINTENANCE_MODE = False # Set to True to enable maintenance page
SKIP_LOGIN = True # Set to True to bypass authentication for debugging
# Ensure uploads folder exists
if not os.path.exists(app.config["UPLOAD_FOLDER"]):
os.makedirs(app.config["UPLOAD_FOLDER"])
CORS(app)
db.init_app(app)
bcrypt = Bcrypt(app)
login_manager = LoginManager(app)
login_manager.login_view = "login"
# UIDAI Proxy initialization
ssl_context = ssl._create_unverified_context()
cj = http.cookiejar.CookieJar()
opener = urllib.request.build_opener(
urllib.request.HTTPCookieProcessor(cj),
urllib.request.HTTPSHandler(context=ssl_context),
)
@login_manager.user_loader
def load_user(user_id):
return User.query.get(user_id)
@app.before_request
def check_maintenance():
# If SKIP_LOGIN is enabled and no user is logged in, use a mock admin for easy access
if SKIP_LOGIN and not current_user.is_authenticated:
mock_user = User.query.filter_by(role="admin").first()
if mock_user:
login_user(mock_user)
print(f"DEBUG: Automatically logged in as {mock_user.name}")
if MAINTENANCE_MODE:
# Allow access to static files and maintenance page
if request.endpoint in ["static", "maintenance_page", "login", "logout"]:
return
# Allow admins to bypass maintenance
if current_user.is_authenticated and current_user.role == "admin":
return
return redirect(url_for("maintenance_page"))
@app.route("/maintenance")
def maintenance_page():
return render_template("maintenance.html")
# ==========================================
# AUTH ROUTES
# ==========================================
@app.route("/register", methods=["POST"])
def register():
data = request.json
phone = data.get("phone")
if not phone:
return jsonify({"error": "Phone number is required"}), 400
if User.query.filter_by(phone=phone).first():
print(f"AUTH: Registration failed - Phone {phone} already exists")
return jsonify({"error": "Phone number already exists"}), 400
hashed_pw = bcrypt.generate_password_hash(data["password"]).decode("utf-8")
# First user is admin (optional logic)
role = "admin" if User.query.count() == 0 else "user"
new_user = User(
phone=phone,
name=data.get("name", "User"),
password=hashed_pw,
role=role,
)
db.session.add(new_user)
db.session.commit()
print(f"AUTH: New user registered: {phone} as {role}")
return jsonify({"success": "Account created"}), 201
@app.route("/login", methods=["POST"])
def login():
data = request.json
phone = data.get("phone")
password = data.get("password")
print(f"AUTH: Login attempt for phone: {phone}")
user = User.query.filter_by(phone=phone).first()
if not user:
print("AUTH: User not found in database")
return jsonify({"error": "Invalid phone or password"}), 401
is_valid = bcrypt.check_password_hash(user.password, password)
print(f"AUTH: Password verification result: {is_valid}")
if is_valid:
login_user(user, remember=True)
print(f"AUTH: Login successful for {user.name} ({user.role})")
return jsonify({"success": "Logged in", "role": user.role}), 200
print("AUTH: Password mismatch")
return jsonify({"error": "Invalid phone or password"}), 401
@app.route("/logout")
@login_required
def logout():
logout_user()
return redirect(url_for("login_page"))
# ==========================================
# WALLET & PAYMENT ROUTES
# ==========================================
@app.route("/api/wallet/topup", methods=["POST"])
@login_required
def wallet_topup():
# In a real app, we'd handle the multipart form for the screenshot
amount = float(request.form.get("amount", 0))
utr = request.form.get("utr")
if Transaction.query.filter_by(utr_number=utr).first():
return jsonify({"error": "UTR already submitted"}), 400
screenshot = request.files.get("screenshot")
screenshot_path = ""
if screenshot and screenshot.filename != "":
filename = f"{uuid.uuid4()}_{screenshot.filename}"
screenshot.save(os.path.join(app.config["UPLOAD_FOLDER"], filename))
screenshot_path = f"/uploads/{filename}"
tx = Transaction(
user_id=current_user.id,
amount=amount,
utr_number=utr,
screenshot_url=screenshot_path,
status="pending",
)
db.session.add(tx)
db.session.commit()
return jsonify({"success": "Payment submitted for verification"}), 201
@app.route("/api/wallet/history")
@login_required
def wallet_history():
history = (
Transaction.query.filter_by(user_id=current_user.id)
.order_by(Transaction.created_at.desc())
.all()
)
return jsonify(
[
{
"amount": t.amount,
"utr": t.utr_number,
"status": t.status,
"date": t.created_at.strftime("%Y-%m-%d %H:%M:%S"),
}
for t in history
]
)
@app.route("/api/user/info")
@login_required
def user_info():
return jsonify(
{
"name": current_user.name,
"phone": current_user.phone,
"balance": current_user.wallet_balance,
"role": current_user.role,
}
)
@app.route("/api/user/stats")
@login_required
def user_stats():
# Simple count logic for prints
total_prints = AadhaarHistory.query.filter_by(user_id=current_user.id).count()
# Today's prints
today_start = datetime.utcnow().replace(hour=0, minute=0, second=0, microsecond=0)
today_prints = AadhaarHistory.query.filter(
AadhaarHistory.user_id == current_user.id,
AadhaarHistory.timestamp >= today_start.isoformat(),
).count()
return jsonify(
{
"balance": current_user.wallet_balance,
"prints_today": today_prints,
"total_prints": total_prints,
"name": current_user.name,
}
)
# ==========================================
# ADMIN ROUTES
# ==========================================
@app.route("/api/admin/transactions")
@login_required
def admin_transactions():
if current_user.role != "admin":
return jsonify({"error": "Unauthorized"}), 403
txs = (
db.session.query(Transaction, User)
.join(User)
.order_by(Transaction.created_at.desc())
.all()
)
return jsonify(
[
{
"tx_id": t[0].id,
"user": t[1].name,
"phone": t[1].phone,
"amount": t[0].amount,
"utr": t[0].utr_number,
"screenshot": t[0].screenshot_url,
"status": t[0].status,
"date": t[0].created_at.strftime("%Y-%m-%d %H:%M:%S"),
}
for t in txs
]
)
@app.route("/api/admin/verify", methods=["POST"])
@login_required
def verify_payment():
if current_user.role != "admin":
return jsonify({"error": "Unauthorized"}), 403
data = request.json
tx = Transaction.query.get(data["tx_id"])
if not tx:
return jsonify({"error": "Transaction not found"}), 404
if data["action"] == "approve":
tx.status = "approved"
user = User.query.get(tx.user_id)
user.wallet_balance += tx.amount
# Add ledger entry
ledger = WalletLedger(
user_id=user.id,
type="credit",
amount=tx.amount,
description=f"Deposit via UTR {tx.utr_number}",
balance_after=user.wallet_balance,
)
db.session.add(ledger)
else:
tx.status = "rejected"
tx.verified_at = datetime.utcnow()
db.session.commit()
return jsonify({"success": "Transaction updated"}), 200
# ==========================================
# AADHAAR HISTORY (Legacy logic)
# ==========================================
@app.route("/api/history", methods=["GET", "POST", "DELETE"])
@login_required
def aadhaar_history():
if request.method == "GET":
rows = (
AadhaarHistory.query.filter_by(user_id=current_user.id)
.order_by(AadhaarHistory.timestamp.desc())
.limit(50)
.all()
)
history = []
for row in rows:
try:
item_data = json.loads(row.data)
# Ensure the ID used in frontend matches the database ID
item_data["id"] = row.id
history.append(item_data)
except:
continue
return jsonify(history)
elif request.method == "POST":
# Skip balance check logic (already handled in previous version)
if not (SKIP_LOGIN or current_user.role == "admin"):
if current_user.wallet_balance < 10:
return jsonify(
{"success": False, "message": "Insufficient balance"}
), 400
item = request.json
# New Duplicate Check based on Aadhaar Number, Name, and DOB
new_aadhaar = item.get("in-aadhaar")
new_name = item.get("in-name-en")
new_dob = item.get("in_dob") or item.get("in-dob")
existing = None
# Search for a match in user's history if we have core identification fields
if new_aadhaar and new_name and new_dob:
all_history = AadhaarHistory.query.filter_by(user_id=current_user.id).all()
for record in all_history:
try:
rec_data = json.loads(record.data)
if (
rec_data.get("in-aadhaar") == new_aadhaar
and rec_data.get("in-name-en") == new_name
and (
rec_data.get("in-dob") == new_dob
or rec_data.get("in_dob") == new_dob
)
):
existing = record
break
except:
continue
# Fallback to ID-based check if no composite match found
if not existing:
existing = AadhaarHistory.query.get(item["id"])
if existing:
# Update existing record and move it to top by updating timestamp
existing.data = json.dumps(item)
existing.timestamp = item["timestamp"]
status = "updated"
else:
# Create new record
new_hist = AadhaarHistory(
id=item["id"],
user_id=current_user.id,
timestamp=item["timestamp"],
data=json.dumps(item),
)
db.session.add(new_hist)
status = "created"
db.session.commit()
# Return the actual database ID so the frontend can use it for deletion
return jsonify(
{
"status": "success",
"action": status,
"id": existing.id if existing else item["id"],
}
), 201
elif request.method == "DELETE":
item_id = request.args.get("id")
print(f"DEBUG: DELETE ID: {item_id}, User: {current_user.id}")
hist = AadhaarHistory.query.filter_by(
id=item_id, user_id=current_user.id
).first()
if hist:
print(f"DEBUG: Found record, deleting...")
db.session.delete(hist)
db.session.commit()
return jsonify({"status": "deleted"}), 200
print(f"DEBUG: Record not found in DB")
return jsonify({"error": "Not found"}), 404
# ==========================================
# UIDAI PROXY (Ported from api_server.py)
# ==========================================
@app.route("/api/proxy/<path:target_path>", methods=["POST"])
def proxy(target_path):
proxy_map = {
"captcha": "https://tathya.uidai.gov.in/audioCaptchaService/api/captcha/v3/generation",
"otp": "https://tathya.uidai.gov.in/unifiedAppAuthService/api/v2/generate/aadhaar/otp",
"download": "https://tathya.uidai.gov.in/downloadAadhaarService/api/aadhaar/download",
}
if target_path not in proxy_map:
return jsonify({"error": "Proxy not found"}), 404
url = proxy_map[target_path]
headers = {
"Content-Type": request.headers.get("Content-Type", "application/json"),
"appid": request.headers.get("appid", "MYAADHAAR"),
"x-request-id": request.headers.get("x-request-id", ""),
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/146.0.0.0 Safari/537.36",
}
req = urllib.request.Request(
url, data=request.get_data(), headers=headers, method="POST"
)
try:
with opener.open(req) as response:
return (
response.read(),
response.status,
{
"Content-Type": response.headers.get(
"Content-Type", "application/json"
)
},
)
except urllib.error.HTTPError as e:
# Log the full error for debugging
error_body = e.read().decode("utf-8") if e.read() else str(e)
print(f"Proxy HTTP Error for {target_path}: {e.code} - {error_body}")
return jsonify(
{"error": f"UIDAI API Error: {e.code}", "details": error_body}
), 502
except urllib.error.URLError as e:
print(f"Proxy URL Error for {target_path}: {str(e.reason)}")
return jsonify({"error": f"Connection Error: {str(e.reason)}"}), 502
except Exception as e:
import traceback
print(f"Proxy Exception for {target_path}: {str(e)}")
print(traceback.format_exc())
return jsonify({"error": str(e)}), 500
@app.route("/api/wallet/ledger")
@login_required
def wallet_ledger():
ledger = (
WalletLedger.query.filter_by(user_id=current_user.id)
.order_by(WalletLedger.created_at.desc())
.all()
)
return jsonify(
[
{
"type": l.type,
"amount": l.amount,
"description": l.description,
"balance_after": l.balance_after,
"date": l.created_at.strftime("%Y-%m-%d %H:%M:%S"),
}
for l in ledger
]
)
@app.route("/api/wallet/deduct", methods=["POST"])
@login_required
def wallet_deduct():
data = request.json
amount = data.get("amount", 0)
service = data.get("service", "Service")
if current_user.wallet_balance < amount:
return jsonify({"error": "Insufficient balance"}), 400
current_user.wallet_balance -= amount
ledger = WalletLedger(
user_id=current_user.id,
type="debit",
amount=amount,
description=f"Print: {service}",
balance_after=current_user.wallet_balance,
)
db.session.add(ledger)
db.session.commit()
return jsonify(
{"success": "Balance deducted", "new_balance": current_user.wallet_balance}
), 200
@app.route("/api/profile/update", methods=["POST"])
@login_required
def update_profile():
data = request.json
current_user.name = data.get("name", current_user.name)
current_user.email = data.get("email", current_user.email)
current_user.address = data.get("address", current_user.address)
db.session.commit()
return jsonify({"success": "Profile updated"}), 200
@app.route("/favicon.ico")
def favicon():
return send_from_directory(
".", "static/img/favicon.ico", mimetype="image/vnd.microsoft.icon"
)
@app.route("/api/admin/transactions")
@login_required
def admin_list_transactions():
if current_user.role != "admin":
return jsonify({"error": "Unauthorized"}), 403
# Order by status (pending first) then date
results = (
db.session.query(Transaction, User)
.join(User, Transaction.user_id == User.id)
.order_by(
db.case(
(Transaction.status == "pending", 0),
(Transaction.status == "approved", 1),
else_=2,
)
)
.order_by(Transaction.created_at.desc())
.all()
)
print(f"DEBUG: Found {len(results)} transactions for admin")
return jsonify(
[
{
"tx_id": t.id,
"user": u.name,
"phone": u.phone,
"amount": t.amount,
"utr": t.utr_number,
"screenshot": t.screenshot_url,
"status": t.status,
"date": t.created_at.strftime("%d %b %Y, %H:%M"),
}
for t, u in results
]
)
@app.route("/api/admin/stats")
@login_required
def admin_dashboard_stats():
if current_user.role != "admin":
return jsonify({"error": "Unauthorized"}), 403
pending_payments = Transaction.query.filter_by(status="pending").count()
total_wallet_balance = (
db.session.query(db.func.sum(User.wallet_balance)).scalar() or 0
)
open_tickets = SupportTicket.query.filter_by(status="open").count()
active_users = User.query.count()
# Calculate trends (simplified - in production, this would compare with previous period)
payments_trend = random.randint(-10, 20)
wallet_trend = random.randint(5, 25)
tickets_trend = random.randint(-15, 15)
users_trend = random.randint(2, 18)
return jsonify(
{
"pending_payments": pending_payments,
"total_wallet_balance": round(total_wallet_balance, 2),
"open_tickets": open_tickets,
"active_users": active_users,
"payments_trend": payments_trend,
"wallet_trend": wallet_trend,
"tickets_trend": tickets_trend,
"users_trend": users_trend,
}
)
@app.route("/api/admin/revenue")
@login_required
def admin_revenue_analytics():
if current_user.role != "admin":
return jsonify({"error": "Unauthorized"}), 403
period = request.args.get("period", "30")
days = int(period)
# Generate sample revenue data (in production, this would query actual transactions)
dates = []
amounts = []
for i in range(days):
date = (datetime.utcnow() - timedelta(days=days - i - 1)).strftime("%Y-%m-%d")
dates.append(date)
# Simulate daily revenue with some randomness
amount = random.randint(500, 5000) + random.random() * 1000
amounts.append(round(amount, 2))
return jsonify({"dates": dates, "amounts": amounts})
@app.route("/api/admin/transaction-metrics")
@login_required
def admin_transaction_metrics():
if current_user.role != "admin":
return jsonify({"error": "Unauthorized"}), 403
approved = Transaction.query.filter_by(status="approved").count()
pending = Transaction.query.filter_by(status="pending").count()
rejected = Transaction.query.filter_by(status="rejected").count()
return jsonify({"approved": approved, "pending": pending, "rejected": rejected})
@app.route("/api/admin/user-activity")
@login_required
def admin_user_activity():
if current_user.role != "admin":
return jsonify({"error": "Unauthorized"}), 403
# Generate sample user activity data (last 7 days)
dates = []
counts = []
for i in range(7):
date = (datetime.utcnow() - timedelta(days=6 - i)).strftime("%Y-%m-%d")
dates.append(date)
# Simulate daily active users
count = random.randint(50, 200)
counts.append(count)
return jsonify({"dates": dates, "counts": counts})
@app.route("/api/admin/wallets")
@login_required
def admin_list_wallets():
if current_user.role != "admin":
return jsonify({"error": "Unauthorized"}), 403
users = User.query.order_by(User.wallet_balance.desc()).all()
wallet_data = []
for u in users:
# Get last transaction for this user
last_tx = (
WalletLedger.query.filter_by(user_id=u.id)
.order_by(WalletLedger.created_at.desc())
.first()
)
wallet_data.append(
{
"id": u.id,
"name": u.name,
"phone": u.phone,
"balance": round(u.wallet_balance, 2),
"last_tx_date": last_tx.created_at.strftime("%Y-%m-%d %H:%M:%S")
if last_tx
else "No transactions",
}
)
print(f"DEBUG: Found {len(wallet_data)} wallets for admin")
return jsonify(wallet_data)
@app.route("/api/admin/user/<user_id>/details")
@login_required
def admin_user_details(user_id):
if current_user.role != "admin":
return jsonify({"error": "Unauthorized"}), 403
user = User.query.get(user_id)
if not user:
return jsonify({"error": "User not found"}), 404
topups = (
Transaction.query.filter_by(user_id=user.id)
.order_by(Transaction.created_at.desc())
.limit(40)
.all()
)
topup_data = []
for tx in topups:
reason = "Pending verification"
if tx.status == "approved":
reason = "Approved and credited to wallet"
elif tx.status == "rejected":
reason = "Rejected by admin"
topup_data.append(
{
"id": tx.id,
"amount": round(tx.amount, 2),
"utr": tx.utr_number,
"status": tx.status,
"reason": reason,
"date": tx.created_at.strftime("%Y-%m-%d %H:%M:%S"),
"verified_at": tx.verified_at.strftime("%Y-%m-%d %H:%M:%S")
if tx.verified_at
else None,
}
)
print_rows = (
AadhaarHistory.query.filter_by(user_id=user.id)
.order_by(AadhaarHistory.timestamp.desc())
.limit(40)
.all()
)
print_data = []
for row in print_rows:
try:
payload = json.loads(row.data or "{}")
except Exception:
payload = {}
reason = (
payload.get("reason")
or payload.get("statusReason")
or payload.get("remark")
or payload.get("message")
or "Aadhaar generation request"
)
print_data.append(
{
"id": row.id,
"aadhaar": payload.get("in-aadhaar") or payload.get("aadhaar") or "",
"name": payload.get("in-name-en") or payload.get("name") or "",
"dob": payload.get("in-dob")
or payload.get("in_dob")
or payload.get("dob")
or "",
"status": payload.get("status") or "completed",
"reason": reason,
"timestamp": payload.get("timestamp") or row.timestamp or "",
}
)
ledger_rows = (
WalletLedger.query.filter_by(user_id=user.id)
.order_by(WalletLedger.created_at.desc())
.limit(30)
.all()
)
# Fetch download history
download_rows = (
AadhaarDownloadHistory.query.filter_by(user_id=user.id)
.order_by(AadhaarDownloadHistory.downloaded_at.desc())
.limit(40)
.all()
)
download_data = []
for d in download_rows:
download_data.append(
{
"id": d.id,
"aadhaar_number": d.aadhaar_number,
"eid": d.eid or "",
"is_masked": d.is_masked,
"ip_address": d.ip_address or "",
"downloaded_at": d.downloaded_at.strftime("%Y-%m-%d %H:%M:%S")
if d.downloaded_at
else "",
}
)
return jsonify(
{
"user": {
"id": user.id,
"name": user.name,
"phone": user.phone,
"email": user.email or "",
"address": user.address or "",
"role": user.role,
"balance": round(user.wallet_balance, 2),
"created_at": user.created_at.strftime("%Y-%m-%d %H:%M:%S")
if user.created_at
else "",
},
"topup_history": topup_data,
"print_history": print_data,
"download_history": download_data,
"wallet_ledger": [
{
"type": l.type,
"amount": round(l.amount, 2),
"description": l.description or "",
"balance_after": round(l.balance_after, 2),
"date": l.created_at.strftime("%Y-%m-%d %H:%M:%S"),
}
for l in ledger_rows
],
}
)
@app.route("/api/admin/wallet/adjust", methods=["POST"])
@login_required
def admin_adjust_wallet():
if current_user.role != "admin":
return jsonify({"error": "Unauthorized"}), 403
data = request.json
target_user_id = data.get("user_id")
amount = float(data.get("amount", 0))
action = data.get("action") # 'credit' or 'debit'
reason = data.get("reason", "Admin adjustment")
user = User.query.get(target_user_id)
if not user:
return jsonify({"error": "User not found"}), 404
if action == "credit":
user.wallet_balance += amount
elif action == "debit":
if user.wallet_balance < amount:
return jsonify({"error": "Insufficient balance"}), 400
user.wallet_balance -= amount
else:
return jsonify({"error": "Invalid action"}), 400
# Log the adjustment in ledger
ledger = WalletLedger(
user_id=user.id,
amount=amount,
type=action,
balance_after=user.wallet_balance,
description=f"Admin Adjustment: {reason}",
)
db.session.add(ledger)
db.session.commit()
return jsonify({"success": True, "new_balance": round(user.wallet_balance, 2)})
# ─────────────────────────────────────────────────────
# USER MANAGEMENT APIs
# ─────────────────────────────────────────────────────
@app.route("/api/admin/users", methods=["GET"])
@login_required
def admin_list_users():
if current_user.role != "admin":
return jsonify({"error": "Unauthorized"}), 403
users = User.query.order_by(User.created_at.desc()).all()
user_data = []
for u in users:
user_data.append(
{
"id": u.id,
"name": u.name,
"phone": u.phone,
"email": u.email or "",
"role": u.role or "user",
"balance": round(u.wallet_balance, 2),
"is_active": u.is_active if u.is_active is not None else True,
"joined_at": u.created_at.strftime("%Y-%m-%d %H:%M:%S")
if u.created_at
else "",
}
)
return jsonify(user_data)
@app.route("/api/admin/users/<user_id>/toggle-status", methods=["POST"])
@login_required
def admin_toggle_user_status(user_id):
if current_user.role != "admin":
return jsonify({"error": "Unauthorized"}), 403
user = User.query.get(user_id)
if not user:
return jsonify({"error": "User not found"}), 404
# Prevent disabling admin users
if user.role == "admin":
return jsonify({"error": "Admin users cannot be disabled."}), 403
user.is_active = not (user.is_active if user.is_active is not None else True)
db.session.commit()
return jsonify({"success": True, "new_status": user.is_active})
@app.route("/api/admin/users/<user_id>/change-password", methods=["POST"])
@login_required
def admin_change_user_password(user_id):
if current_user.role != "admin":
return jsonify({"error": "Unauthorized"}), 403
data = request.json
new_password = data.get("new_password")
if not new_password:
return jsonify({"error": "New password is required"}), 400
user = User.query.get(user_id)
if not user:
return jsonify({"error": "User not found"}), 404
# Hash the new password before saving
user.password = bcrypt.generate_password_hash(new_password).decode("utf-8")
db.session.commit()
return jsonify({"success": True})
@app.route("/api/admin/users/<user_id>/delete", methods=["POST"])
@login_required
def admin_delete_user(user_id):
if current_user.role != "admin":
return jsonify({"error": "Unauthorized"}), 403
user = User.query.get(user_id)
if not user:
return jsonify({"error": "User to delete not found"}), 404
# Check admin protection first (before password verification)
if user.role == "admin":
return jsonify(
{"error": "Admin users are protected and cannot be deleted."}
), 403
if user.role == "admin-user":
return jsonify(
{"error": "Admin-User records are protected and cannot be deleted."}
), 403
# Prevent deleting self
if user.id == current_user.id:
return jsonify(
{"error": "You cannot delete your own account while logged in."}
), 400
# Now verify password for non-protected users
data = request.json
admin_password = data.get(
"admin_password"
) # Current session admin's password for verification
if not admin_password:
return jsonify({"error": "Admin password is required."}), 400
# Use bcrypt to verify the admin password
if not bcrypt.check_password_hash(current_user.password, admin_password):
return jsonify({"error": "Incorrect admin password. Deletion denied."}), 401
db.session.delete(user)
db.session.commit()
return jsonify({"success": True})
# ── support ticket helper ──────────────────────────────────────────
def _generate_ticket_number():
"""Returns the next TKT-XXXX number based on existing count."""
count = SupportTicket.query.count() + 1
return f"TKT-{count:04d}"
# ==========================================
# SUPPORT TICKET ROUTES (Improved)
# ==========================================
@app.route("/api/support/create", methods=["POST"])
@login_required
def create_support_ticket():
data = request.json
subject = data.get("subject", "").strip()
message = data.get("message", "").strip()
category = data.get("category", "other")
if not subject or not message:
return jsonify({"error": "Subject and message are required"}), 400
ticket = SupportTicket(
ticket_number=_generate_ticket_number(),
user_id=current_user.id,
subject=subject,
message=message,
category=category,
priority="medium",
status="open",
)
db.session.add(ticket)
db.session.flush()
first_msg = TicketMessage(ticket_id=ticket.id, sender="user", message=message)
db.session.add(first_msg)
db.session.commit()
return jsonify({"success": True, "ticket_number": ticket.ticket_number}), 201
@app.route("/api/support/list")
@login_required
def list_support_tickets():
tickets = (
SupportTicket.query.filter_by(user_id=current_user.id)
.order_by(SupportTicket.created_at.desc())
.all()
)
return jsonify(
[
{
"id": t.id,
"ticket_number": t.ticket_number,
"subject": t.subject,
"category": t.category,
"priority": t.priority,
"status": t.status,
"date": t.created_at.strftime("%d %b %Y, %H:%M"),
}
for t in tickets
]
)
@app.route("/api/support/ticket/<ticket_id>")
@login_required
def get_ticket_detail(ticket_id):
ticket = SupportTicket.query.filter_by(
id=ticket_id, user_id=current_user.id
).first()
if not ticket:
return jsonify({"error": "Not found"}), 404
return jsonify(
{
"id": ticket.id,
"ticket_number": ticket.ticket_number,
"subject": ticket.subject,
"category": ticket.category,
"priority": ticket.priority,
"status": ticket.status,
"date": ticket.created_at.strftime("%d %b %Y, %H:%M"),
"messages": [
{
"sender": m.sender,
"message": m.message,
"time": m.created_at.strftime("%d %b, %H:%M"),
}
for m in ticket.messages_rel
],
}
)
@app.route("/api/admin/verify-payment", methods=["POST"])
@login_required
def admin_verify_payment():
if current_user.role != "admin":
return jsonify({"error": "Unauthorized"}), 403
data = request.json
tx_id = data.get("tx_id")
action = data.get("action") # 'approve' or 'reject'
tx = Transaction.query.get(tx_id)
if not tx:
return jsonify({"error": "Transaction not found"}), 404
if tx.status != "pending":
return jsonify({"error": "Transaction already processed"}), 400
user = User.query.get(tx.user_id)
if not user:
return jsonify({"error": "User not found"}), 404
if action == "approve":
tx.status = "approved"
user.wallet_balance += tx.amount
# Log in ledger
ledger = WalletLedger(
user_id=user.id,
amount=tx.amount,
type="credit",
balance_after=user.wallet_balance,
description=f"Wallet Top-up (UTR: {tx.utr_number})",
)
db.session.add(ledger)
else:
tx.status = "rejected"
tx.verified_at = datetime.utcnow()
db.session.commit()
return jsonify({"success": True, "status": tx.status})
@app.route("/api/admin/tickets")
@login_required
def admin_list_tickets():
if current_user.role != "admin":
return jsonify({"error": "Unauthorized"}), 403
results = (
db.session.query(SupportTicket, User)
.join(User, SupportTicket.user_id == User.id)
.order_by(SupportTicket.created_at.desc())
.all()
)
print(f"DEBUG: Found {len(results)} tickets for admin")
return jsonify(
[
{
"id": t.id,
"ticket_number": t.ticket_number,
"subject": t.subject,
"category": t.category,
"priority": t.priority,
"status": t.status,
"user_name": u.name,
"user_phone": u.phone,
"date": t.created_at.strftime("%d %b %Y, %H:%M"),
"reply_count": len(t.messages_rel),
}
for t, u in results
]
)
@app.route("/api/admin/tickets/<ticket_id>")
@login_required
def admin_ticket_detail(ticket_id):
if current_user.role != "admin":
return jsonify({"error": "Unauthorized"}), 403
ticket = SupportTicket.query.get(ticket_id)
if not ticket:
return jsonify({"error": "Not found"}), 404
user = User.query.get(ticket.user_id)
return jsonify(
{
"id": ticket.id,
"ticket_number": ticket.ticket_number,
"subject": ticket.subject,
"category": ticket.category,
"priority": ticket.priority,
"status": ticket.status,
"user_name": user.name,
"user_phone": user.phone,
"date": ticket.created_at.strftime("%d %b %Y, %H:%M"),
"messages": [
{
"sender": m.sender,
"message": m.message,
"time": m.created_at.strftime("%d %b, %H:%M"),
}
for m in ticket.messages_rel
],
}
)
@app.route("/api/admin/tickets/<ticket_id>/reply", methods=["POST"])
@login_required
def admin_reply_ticket(ticket_id):
if current_user.role != "admin":
return jsonify({"error": "Unauthorized"}), 403
ticket = SupportTicket.query.get(ticket_id)
if not ticket:
return jsonify({"error": "Not found"}), 404
data = request.json
msg_text = data.get("message", "").strip()
if not msg_text:
return jsonify({"error": "Message cannot be empty"}), 400
msg = TicketMessage(ticket_id=ticket.id, sender="admin", message=msg_text)
db.session.add(msg)
if ticket.status == "open":
ticket.status = "pending"
ticket.updated_at = datetime.utcnow()
db.session.commit()
return jsonify({"success": True}), 200
@app.route("/api/admin/tickets/<ticket_id>/status", methods=["POST"])
@login_required
def admin_update_ticket(ticket_id):
if current_user.role != "admin":
return jsonify({"error": "Unauthorized"}), 403
ticket = SupportTicket.query.get(ticket_id)
if not ticket:
return jsonify({"error": "Not found"}), 404
data = request.json
if "status" in data:
ticket.status = data["status"]
if "priority" in data:
ticket.priority = data["priority"]
ticket.updated_at = datetime.utcnow()
db.session.commit()
return jsonify({"success": True}), 200
# ==========================================
# STATIC FILES & PAGES
# ==========================================
@app.route("/")
def dashboard():
if not current_user.is_authenticated:
return redirect(url_for("login_page"))
return render_template("index.html", active_page="dashboard")
@app.route("/generate")
@login_required
def generate_page():
return render_template("generate.html", active_page="generate")
@app.route("/download")
@login_required
def download_page():
return render_template("download.html", active_page="download")
@app.route("/wallet")
@login_required
def wallet_page():
return render_template("wallet.html", active_page="wallet")
@app.route("/reports")
@login_required
def reports_page():
return render_template("reports.html", active_page="reports")
@app.route("/profile")
@login_required
def profile_page():
return render_template("profile.html", active_page="profile")
@app.route("/support")
@login_required
def support_page():
return render_template("support.html", active_page="support")
@app.route("/login")
def login_page():
return render_template("login.html")
@app.route("/signup")
def signup_page():
return render_template("signup.html")
@app.route("/admin")
@login_required
def admin_page():
if current_user.role != "admin":
return redirect(url_for("dashboard"))
return render_template("admin.html")
@app.route("/uploads/<filename>")
def uploaded_file(filename):
return send_from_directory(app.config["UPLOAD_FOLDER"], filename)
@app.route("/<path:path>")
def serve_static(path):
return send_from_directory(".", path)
# ==========================================
# AADHAAR DOWNLOAD HISTORY API
# ==========================================
@app.route("/api/aadhaar/download-history", methods=["POST"])
@login_required
def save_download_history():
"""Save Aadhaar download history for current user with PDF content"""
data = request.json
print(f"DEBUG: Saving download history - User: {current_user.id}, Data: {data}")
try:
history = AadhaarDownloadHistory(
user_id=current_user.id,
aadhaar_number=data.get("aadhaar_number", ""), # Should be masked format
eid=data.get("eid", ""),
is_masked=data.get("is_masked", False),
pdf_content=data.get("pdf_content", ""), # Store the actual PDF content
ip_address=request.remote_addr,
user_agent=request.headers.get("User-Agent", "")[:255],
)
db.session.add(history)
db.session.commit()
print(f"DEBUG: History saved successfully - ID: {history.id}")
return jsonify({"success": True, "id": history.id})
except Exception as e:
db.session.rollback()
print(f"DEBUG: History save failed - Error: {str(e)}")
import traceback
print(traceback.format_exc())
return jsonify({"success": False, "error": str(e)}), 500
@app.route("/api/aadhaar/download-history", methods=["GET"])
@login_required
def get_download_history():
"""Get Aadhaar download history for current user - optimized (no pdf_content)"""
try:
history = (
AadhaarDownloadHistory.query.filter_by(user_id=current_user.id)
.order_by(AadhaarDownloadHistory.downloaded_at.desc())
.limit(50)
.all()
)
return jsonify(
{
"success": True,
"history": [
{
"id": h.id,
"aadhaar_number": h.aadhaar_number,
"eid": h.eid,
"is_masked": h.is_masked,
"has_pdf": bool(h.pdf_content), # Just indicate if PDF exists
"downloaded_at": h.downloaded_at.isoformat()
if h.downloaded_at
else None,
}
for h in history
],
}
)
except Exception as e:
import traceback
print(f"ERROR in get_download_history: {str(e)}")
print(traceback.format_exc())
return jsonify({"success": False, "error": str(e)}), 500
@app.route("/api/aadhaar/download-history/<history_id>/pdf", methods=["GET"])
@login_required
def get_history_pdf(history_id):
"""Get PDF content for a specific history item - fast endpoint"""
try:
history_item = AadhaarDownloadHistory.query.filter_by(
id=history_id, user_id=current_user.id
).first()
if not history_item:
return jsonify({"success": False, "error": "History item not found"}), 404
if not history_item.pdf_content:
return jsonify({"success": False, "error": "PDF not available"}), 404
return jsonify(
{
"success": True,
"pdf_content": history_item.pdf_content,
"aadhaar_number": history_item.aadhaar_number,
"is_masked": history_item.is_masked,
}
)
except Exception as e:
import traceback
print(f"ERROR in get_history_pdf: {str(e)}")
print(traceback.format_exc())
return jsonify({"success": False, "error": str(e)}), 500
@app.route("/api/aadhaar/download-history/<history_id>", methods=["DELETE"])
@login_required
def delete_download_history(history_id):
"""Delete a specific download history item"""
try:
history_item = AadhaarDownloadHistory.query.filter_by(
id=history_id, user_id=current_user.id
).first()
if not history_item:
return jsonify({"success": False, "error": "History item not found"}), 404
db.session.delete(history_item)
db.session.commit()
return jsonify({"success": True})
except Exception as e:
db.session.rollback()
return jsonify({"success": False, "error": str(e)}), 500
@app.route("/api/admin/aadhaar/download-history", methods=["GET"])
@login_required
def admin_get_all_download_history():
"""Get all Aadhaar download history (admin only)"""
if current_user.role != "admin":
return jsonify({"error": "Unauthorized"}), 403
try:
# Optional: filter by user_id
user_id = request.args.get("user_id")
query = AadhaarDownloadHistory.query
if user_id:
query = query.filter_by(user_id=user_id)
history = (
query.order_by(AadhaarDownloadHistory.downloaded_at.desc()).limit(100).all()
)
# Get user names for display
result = []
for h in history:
user = User.query.get(h.user_id)
result.append(
{
"id": h.id,
"user_id": h.user_id,
"user_name": user.name if user else "Unknown",
"user_phone": user.phone if user else "Unknown",
"aadhaar_number": h.aadhaar_number,
"eid": h.eid,
"is_masked": h.is_masked,
"downloaded_at": h.downloaded_at.isoformat()
if h.downloaded_at
else None,
"ip_address": h.ip_address,
}
)
return jsonify({"success": True, "history": result})
except Exception as e:
return jsonify({"success": False, "error": str(e)}), 500
if __name__ == "__main__":
with app.app_context():
db.create_all()
port = int(os.environ.get("PORT", 8080))
app.run(host="0.0.0.0", port=port, debug=False)