from flask import Flask, jsonify, request, redirect, session, Response import json import requests import os import hashlib import time import psycopg2 from psycopg2.extras import RealDictCursor from datetime import datetime, timedelta import threading import atexit import uuid import json import re from collections import defaultdict app = Flask(__name__) app.secret_key = 'your-secret-key-here' DISCORD_CLIENT_ID = '1426015278499627058' DISCORD_CLIENT_SECRET = 'YfTZg5JZA-y3GU1wDyoO3IPilwz9jXmL' DATABASE_URL = 'postgres://avnadmin:AVNS_o0LDhRmORJQ7lh7ULQS@goapi-hacktv.b.aivencloud.com:11426/defaultdb?sslmode=require' ADMIN_PASSWORD = "GoodGuys!Funn1" # Global variables for batching credit deductions pending_deductions = defaultdict(int) # username -> total_credits_to_deduct credit_cache = {} # username -> last_known_credits_balance deduction_lock = threading.Lock() batch_timer = None MODEL_MULTIPLIERS = { 'gpt-5-nano': 1.0, 'gpt-5-chat': 1.5, 'gpt-4.1': 1.0, 'claude-4-sonnet': 3.0, 'claude-4-opus': 5.0, 'claude-3.5-sonnet': 3.0, 'claude-3.7-sonnet': 3.0, 'claude-3.5-haiku': 2.0 } def init_db(): conn = psycopg2.connect(DATABASE_URL) c = conn.cursor() c.execute('''CREATE TABLE IF NOT EXISTS users (username TEXT PRIMARY KEY, plan TEXT DEFAULT 'free', credits INTEGER DEFAULT 500000, last_reset TEXT)''') conn.commit() conn.close() def get_user(username): conn = psycopg2.connect(DATABASE_URL) c = conn.cursor(cursor_factory=RealDictCursor) c.execute('SELECT plan, credits, last_reset FROM users WHERE username = %s', (username,)) result = c.fetchone() conn.close() if result: return dict(result) return None def create_or_update_user(username, plan='free'): conn = psycopg2.connect(DATABASE_URL) c = conn.cursor() now = datetime.now().isoformat() c.execute('''INSERT INTO users (username, plan, credits, last_reset) VALUES (%s, %s, 500000, %s) ON CONFLICT (username) DO UPDATE SET plan = EXCLUDED.plan, credits = EXCLUDED.credits, last_reset = EXCLUDED.last_reset''', (username, plan, now)) conn.commit() conn.close() def reset_credits_if_needed(username): user = get_user(username) if not user: return last_reset = user['last_reset'] if last_reset: last_reset_date = datetime.fromisoformat(last_reset).date() today = datetime.now().date() if last_reset_date < today: conn = psycopg2.connect(DATABASE_URL) c = conn.cursor() now = datetime.now().isoformat() c.execute('UPDATE users SET credits = 500000, last_reset = %s WHERE username = %s', (now, username)) conn.commit() conn.close() def deduct_credits(username, amount): conn = psycopg2.connect(DATABASE_URL) c = conn.cursor() c.execute('UPDATE users SET credits = credits - %s WHERE username = %s AND credits >= %s', (amount, username, amount)) success = c.rowcount > 0 conn.commit() conn.close() return success def get_user_credits(username): user = get_user(username) if user: reset_credits_if_needed(username) user = get_user(username) return user['credits'] if user else 0 return 0 def get_cached_credits(username): """Get cached credits balance for a user (estimated: cached_balance - pending_deductions)""" with deduction_lock: cached_balance = credit_cache.get(username, 0) pending_for_user = pending_deductions.get(username, 0) return cached_balance - pending_for_user def update_credit_cache(username, new_balance): """Update the cached credit balance for a user""" with deduction_lock: credit_cache[username] = new_balance print(f"Updated credit cache for {username}: {new_balance} credits") def clear_cache_entry(username): """Clear cache entry for a user (called when they run out of credits)""" with deduction_lock: credit_cache.pop(username, None) print(f"Cleared credit cache for {username}") def add_pending_deduction(username, amount): """Add a credit deduction to the pending queue""" with deduction_lock: pending_deductions[username] += amount def process_pending_deductions(): """Process all pending credit deductions in a single batch (runs in background thread)""" try: with deduction_lock: if not pending_deductions: return # Create a copy of pending deductions to process deductions_to_process = dict(pending_deductions) pending_deductions.clear() # Process deductions in database if deductions_to_process: conn = psycopg2.connect(DATABASE_URL) c = conn.cursor() for username, amount in deductions_to_process.items(): c.execute('UPDATE users SET credits = credits - %s WHERE username = %s AND credits >= %s', (amount, username, amount)) # Get updated balances for all affected users usernames = list(deductions_to_process.keys()) if usernames: placeholders = ','.join(['%s'] * len(usernames)) c.execute(f'SELECT username, credits FROM users WHERE username IN ({placeholders})', usernames) for row in c.fetchall(): username, new_balance = row update_credit_cache(username, new_balance) conn.commit() conn.close() print(f"Processed {len(deductions_to_process)} credit deductions") except Exception as e: print(f"Error processing credit deductions: {e}") # Put deductions back in queue if there was an error with deduction_lock: for username, amount in deductions_to_process.items(): pending_deductions[username] += amount def start_batch_timer(): """Start or restart the 3-minute batch timer""" global batch_timer if batch_timer: batch_timer.cancel() batch_timer = threading.Timer(180.0, lambda: threading.Thread(target=process_pending_deductions, daemon=True).start()) # 3 minutes = 180 seconds batch_timer.daemon = True batch_timer.start() def verify_admin_password(password): """Verify admin password""" return password == ADMIN_PASSWORD def export_database(): """Export current database to JSON""" try: conn = psycopg2.connect(DATABASE_URL) c = conn.cursor(cursor_factory=RealDictCursor) # Get all users c.execute('SELECT * FROM users') users = c.fetchall() # Convert to list of dicts for JSON serialization users_data = [dict(user) for user in users] conn.close() return {"users": users_data, "exported_at": datetime.now().isoformat()} except Exception as e: return {"error": f"Failed to export database: {str(e)}"} def import_database(data): """Import database from JSON data""" try: if "users" not in data: return {"error": "Invalid data format: missing 'users' key"} conn = psycopg2.connect(DATABASE_URL) c = conn.cursor() # Clear existing data c.execute('DELETE FROM users') # Insert new data for user_data in data["users"]: c.execute('''INSERT INTO users (username, plan, credits, last_reset) VALUES (%s, %s, %s, %s)''', (user_data['username'], user_data.get('plan', 'free'), user_data.get('credits', 500000), user_data.get('last_reset'))) conn.commit() conn.close() # Update credit cache for all imported users for user_data in data["users"]: update_credit_cache(user_data['username'], user_data.get('credits', 500000)) return {"success": f"Imported {len(data['users'])} users"} except Exception as e: return {"error": f"Failed to import database: {str(e)}"} init_db() def get_all_users(): """Get all users from database""" try: conn = psycopg2.connect(DATABASE_URL) c = conn.cursor(cursor_factory=RealDictCursor) c.execute('SELECT * FROM users ORDER BY username') users = c.fetchall() conn.close() return [dict(user) for user in users] except Exception as e: return [] @app.route('/db/admin', methods=['GET', 'POST']) def db_admin(): """Database admin interface""" if request.method == 'GET': users = get_all_users() # Generate user management table users_html = "" for user in users: users_html += f''' {user['username']}
{user['credits']:,} ''' return f''' Database Admin

Database Administration

User Management

{users_html}
Username Plan Credits Set Tokens/Day Actions

Export Database

Download the current database as a JSON file.

Import Database

Upload a JSON file to replace the current database.

''' return redirect('/db/admin') @app.route('/db/admin/export', methods=['POST']) def db_admin_export(): """Export database as JSON file""" password = request.form.get('password') if not verify_admin_password(password): return jsonify({"error": "Invalid admin password"}), 401 data = export_database() if "error" in data: return jsonify(data), 500 # Create response with JSON data response = Response( json.dumps(data, indent=2), mimetype='application/json', headers={'Content-disposition': 'attachment; filename=database_export.json'} ) return response @app.route('/db/admin/import', methods=['POST']) def db_admin_import(): """Import database from JSON file""" password = request.form.get('password') if not verify_admin_password(password): return jsonify({"error": "Invalid admin password"}), 401 if 'db_file' not in request.files: return jsonify({"error": "No file provided"}), 400 file = request.files['db_file'] if file.filename == '': return jsonify({"error": "No file selected"}), 400 if not file.filename.endswith('.json'): return jsonify({"error": "File must be a JSON file"}), 400 try: file_content = file.read().decode('utf-8') data = json.loads(file_content) result = import_database(data) if "error" in result: return jsonify(result), 500 return jsonify(result) except json.JSONDecodeError: return jsonify({"error": "Invalid JSON file"}), 400 except Exception as e: return jsonify({"error": f"Import failed: {str(e)}"}), 500 @app.route('/db/admin/update-user', methods=['POST']) def db_admin_update_user(): """Update user plan and credits""" try: data = request.get_json() if not data: return jsonify({"error": "No data provided"}), 400 password = data.get('password') if not verify_admin_password(password): return jsonify({"error": "Invalid admin password"}), 401 username = data.get('username') plan = data.get('plan') credits = data.get('credits') if not username or not plan or credits is None: return jsonify({"error": "Missing required fields"}), 400 # Update user in database conn = psycopg2.connect(DATABASE_URL) c = conn.cursor() # Update plan and credits, reset last_reset to today now = datetime.now().isoformat() c.execute('''UPDATE users SET plan = %s, credits = %s, last_reset = %s WHERE username = %s''', (plan, credits, now, username)) if c.rowcount == 0: conn.close() return jsonify({"error": "User not found"}), 404 conn.commit() conn.close() # Update credit cache update_credit_cache(username, credits) return jsonify({"success": f"Updated {username}: {plan} plan, {credits:,} credits"}) except Exception as e: return jsonify({"error": f"Update failed: {str(e)}"}), 500 @app.route('/test_stream') def test_stream(): """Test endpoint to check upstream API streaming behavior with longer prompt""" import time # Longer test message for better streaming analysis test_data = { "model": "gpt-5", "messages": [{"role": "user", "content": "Write a short novel about a detective solving a mystery in Victorian London. Include dialogue, suspense, and a surprising twist ending."}], "stream": True, "max_tokens": 500 } headers = { 'Content-Type': 'application/json', 'Authorization': 'Bearer test-key' # This might not work, but for testing } try: start_time = time.time() print(f"Starting test stream request at {start_time}") response = requests.post( 'https://ch.at/v1/chat/completions', json=test_data, headers=headers, stream=True, timeout=60 # Longer timeout for novel generation ) if response.status_code != 200: return jsonify({ "error": f"HTTP {response.status_code}", "message": response.text, "total_time": time.time() - start_time }), response.status_code chunk_count = 0 first_chunk_time = None last_chunk_time = None total_response = "" for chunk in response.iter_content(chunk_size=8192): if chunk: chunk_count += 1 current_time = time.time() if first_chunk_time is None: first_chunk_time = current_time print(f"First chunk received at {current_time} ({current_time - start_time:.2f}s after start)") last_chunk_time = current_time chunk_str = chunk.decode('utf-8') total_response += chunk_str print(f"Chunk {chunk_count} at {current_time} ({current_time - start_time:.2f}s): {len(chunk_str)} chars") end_time = time.time() total_time = end_time - start_time streaming_duration = last_chunk_time - first_chunk_time if first_chunk_time and last_chunk_time else 0 result = { "total_time": round(total_time, 2), "streaming_duration": round(streaming_duration, 2), "chunk_count": chunk_count, "response_length": len(total_response), "average_chunk_interval": round(streaming_duration / max(chunk_count - 1, 1), 2) if chunk_count > 1 else 0, "chunks_per_second": round(chunk_count / max(streaming_duration, 0.01), 2), "streaming_efficiency": "Good" if streaming_duration > 1.0 else "Poor (likely buffered)", "status": "success" } print(f"Stream test completed: {result}") return jsonify(result) except Exception as e: end_time = time.time() return jsonify({ "error": "Test failed", "message": str(e), "total_time": round(end_time - start_time, 2), "status": "error" }), 500 @app.route('/v1/models') def models(): models_data = { "object": "list", "data": [ { "id": "gpt-5-nano", "object": "model", "created": 1677610602, "owned_by": "openai" }, { "id": "gpt-5-chat", "object": "model", "created": 1677610602, "owned_by": "openai" }, { "id": "gpt-4.1", "object": "model", "created": 1677610602, "owned_by": "openai" }, { "id": "claude-4-sonnet", "object": "model", "created": 1677610602, "owned_by": "anthropic" }, { "id": "claude-4-opus", "object": "model", "created": 1677610602, "owned_by": "anthropic" }, { "id": "claude-3.5-sonnet", "object": "model", "created": 1677610602, "owned_by": "anthropic" }, { "id": "claude-3.7-sonnet", "object": "model", "created": 1677610602, "owned_by": "anthropic" }, { "id": "claude-3.5-haiku", "object": "model", "created": 1677610602, "owned_by": "anthropic" } ] } return jsonify(models_data) @app.route('/v1/chat/completions', methods=['POST']) def chat_completions(): try: auth_header = request.headers.get('Authorization') if not auth_header or not auth_header.startswith('Bearer '): return jsonify({"error": "Missing or invalid Bearer token"}), 401 token = auth_header.split(' ')[1] conn = psycopg2.connect(DATABASE_URL) c = conn.cursor() c.execute('SELECT username FROM users') all_users = c.fetchall() conn.close() username = None for (user,) in all_users: expected_key = f"sk-{hashlib.sha256(user.encode()).hexdigest()[:32]}" if token == expected_key: username = user break if not username: return jsonify({"error": "Invalid API key"}), 401 # Get cached credits (no database call) cached_credits = get_cached_credits(username) # If user has no cached credits, try to load from database once if username not in credit_cache: try: real_credits = get_user_credits(username) update_credit_cache(username, real_credits) cached_credits = real_credits except: # If database fails, reject request return jsonify({"error": "Unable to verify credits"}), 402 # Reject if no credits available (cached or real) if cached_credits <= 0: clear_cache_entry(username) # Clear cache since they're out of credits return jsonify({"error": "Insufficient credits"}), 402 data = request.get_json() if not data: return jsonify({"error": "No JSON data provided"}), 400 model = data.get('model') if not model: return jsonify({"error": "No model specified"}), 400 original_model = model if model == 'gpt-5-nano': target_model = 'gpt-5-nano' elif model == 'gpt-5-chat': target_model = 'gpt-5' elif model == 'gpt-4.1': target_model = 'gpt-41' elif model == 'claude-4-sonnet': target_model = 'claude-4-sonnet' elif model == 'claude-4-opus': target_model = 'claude-4-opus' elif model == 'claude-3.5-sonnet': target_model = 'claude-3.5-sonnet' elif model == 'claude-3.7-sonnet': target_model = 'claude-3.7-sonnet' elif model == 'claude-3.5-haiku': target_model = 'claude-3.5-haiku' else: return jsonify({"error": f"Unsupported model: {model}"}), 400 data['model'] = target_model messages = data.get('messages', []) # Count total words in all messages (input and assistant) input_words = sum(len(str(msg.get('content', '')).split()) for msg in messages) # Estimate output words (roughly half the input length, min 100 words) estimated_output_words = max(100, input_words // 2) total_words = input_words + estimated_output_words multiplier = MODEL_MULTIPLIERS.get(model, 1.0) estimated_cost = int(total_words * multiplier) # Check against cached credits (no database call) if estimated_cost > cached_credits: clear_cache_entry(username) # Clear cache since insufficient credits detected return jsonify({"error": "Insufficient credits for this request"}), 402 # Add to pending deductions instead of immediate database update add_pending_deduction(username, estimated_cost) start_batch_timer() # Start/restart the 3-minute timer if 'claude' in model.lower() and messages: messages = data['messages'] if messages and messages[0]['role'] == 'system': system_content = messages[0]['content'] data['messages'] = [ {'role': 'user', 'content': system_content}, {'role': 'assistant', 'content': system_content} ] + messages[1:] auth_header = request.headers.get('Authorization') if not auth_header or not auth_header.startswith('Bearer '): return jsonify({"error": "Missing or invalid Bearer token"}), 401 headers = { 'Content-Type': 'application/json', 'Authorization': auth_header } max_retries = 3 retry_delay = 1 for attempt in range(max_retries): response = requests.post( 'https://ch.at/v1/chat/completions', json=data, headers=headers, stream=True ) if response.status_code == 200: break if attempt < max_retries - 1: time.sleep(retry_delay) else: return # Ensure we have a valid streaming response if not response: return jsonify({"error": "Failed to get response from upstream API"}), 502 def generate(): # Generate a UUID for this chat completion completion_id = f"chatcmpl-{uuid.uuid4().hex[:16]}" try: for chunk in response.iter_content(chunk_size=8192): if chunk: chunk_str = chunk.decode('utf-8', errors='replace') # Replace the chat completion ID with our UUID chunk_str = re.sub(r'"id":"chatcmpl-[^"]*"', f'"id":"{completion_id}"', chunk_str) # Replace model IDs in the response to match the original request if original_model == 'gpt-5-chat': chunk_str = chunk_str.replace('"model":"gpt-5"', '"model":"gpt-5-chat"') elif original_model == 'gpt-4.1': chunk_str = chunk_str.replace('"model":"gpt-41"', '"model":"gpt-4.1"') # Check if this chunk contains [DONE] - handle edge case where [DONE] might be split across chunks if 'data: [DONE]' in chunk_str: print("Stream ended with [DONE], triggering retry...") # Return a special response that indicates retry is needed return Response('data: [RETRY_REQUEST]\n\n', content_type='text/plain') # Ensure proper SSE formatting and only yield non-empty chunks if chunk_str.strip(): yield f"{chunk_str}\n\n".encode('utf-8') except Exception as e: print(f"Error in streaming: {e}") yield b'data: {"error": "Streaming error"}\n\n' response_obj = Response( generate(), content_type='text/event-stream; charset=utf-8', status=response.status_code ) return response_obj except Exception as e: return jsonify({"error": f"Internal server error: {str(e)}"}), 500 @app.route('/auth') def auth(): redirect_uri = f"{request.scheme}://{request.host}/auth/callback" discord_auth_url = f"https://discord.com/api/oauth2/authorize?client_id={DISCORD_CLIENT_ID}&redirect_uri={redirect_uri}&response_type=code&scope=identify" return redirect(discord_auth_url) @app.route('/auth/callback') def auth_callback(): code = request.args.get('code') if not code: return "Error: No authorization code provided" redirect_uri = f"{request.scheme}://{request.host}/auth/callback" data = { 'client_id': DISCORD_CLIENT_ID, 'client_secret': DISCORD_CLIENT_SECRET, 'grant_type': 'authorization_code', 'code': code, 'redirect_uri': redirect_uri } headers = {'Content-Type': 'application/x-www-form-urlencoded'} token_response = requests.post('https://discord.com/api/oauth2/token', data=data, headers=headers) token_json = token_response.json() if 'access_token' not in token_json: return "Error: Failed to get access token" access_token = token_json['access_token'] user_response = requests.get('https://discord.com/api/users/@me', headers={'Authorization': f'Bearer {access_token}'}) user_json = user_response.json() username = user_json.get('username', 'Unknown') session['username'] = username create_or_update_user(username) # Initialize credit cache for this user try: initial_credits = get_user_credits(username) update_credit_cache(username, initial_credits) except: # If database fails during auth, user will need to refresh credits later pass return redirect('/dashboard') @app.route('/logout') def logout(): # Clear credit cache for this user if 'username' in session: username = session.get('username') with deduction_lock: credit_cache.pop(username, None) pending_deductions.pop(username, None) session.clear() return redirect('/') @app.route('/api/user/credits') def get_user_credits_api(): """API endpoint to get user credits and plan asynchronously""" if 'username' not in session: return jsonify({"error": "Unauthorized"}), 401 username = session.get('username', 'Unknown') # Get user data from database user = get_user(username) if not user: return jsonify({"error": "User not found"}), 404 # Return cached credits if available, otherwise use database value cached_credits = get_cached_credits(username) if username in credit_cache: credits = cached_credits else: credits = user['credits'] update_credit_cache(username, credits) return jsonify({ "credits": credits, "plan": user['plan'] }) @app.route('/dashboard') def dashboard(): if 'username' not in session: return redirect('/') username = session.get('username', 'Unknown') key = f"sk-{hashlib.sha256(username.encode()).hexdigest()[:32]}" # Get user data from database user = get_user(username) plan = user['plan'] if user else 'Free' try: response = app.test_client().get('/v1/models') models_data = json.loads(response.data.decode('utf-8')) models = models_data['data'] except Exception as e: models = [] return f''' Dashboard - GoAPI
{username}
Plan: {plan.title()}
Credits: Loading...
{key}

Available Models

{"
No models found
" if not models else ""}

Chat with AI

''' @app.route('/') def hello_world(): try: response = app.test_client().get('/v1/models') models_data = json.loads(response.data.decode('utf-8')) model_items = [] for model in models_data['data']: display_name = model['id'].replace('-', ' ').title().replace('Gpt', 'GPT') model_items.append(f'
  • {display_name}
  • ') models_html = '\n '.join(model_items) except Exception as e: models_html = '
  • Error loading models
  • ' return f''' GoAPI

    GoAPI

    Get Started

    Model List

    Free

    $0/month
    500,000 daily tokens
    Get Started

    Explorer

    $4.99/month
    5 million daily tokens

    Voyager

    $19.99/month
    50 million daily tokens
    ''' @app.route('/admin/process-pending-deductions') def process_deductions_now(): """Admin endpoint to manually process pending deductions""" if request.remote_addr != '127.0.0.1': # Only allow from localhost return jsonify({"error": "Unauthorized"}), 403 # Run in background thread to avoid blocking the response threading.Thread(target=process_pending_deductions, daemon=True).start() return jsonify({"message": "Pending deductions processing started"}) @atexit.register def cleanup_pending_deductions(): """Process any remaining pending deductions when the app shuts down""" if pending_deductions: print("Processing remaining pending deductions on shutdown...") # Run in a separate thread to avoid blocking shutdown cleanup_thread = threading.Thread(target=process_pending_deductions, daemon=True) cleanup_thread.start() cleanup_thread.join(timeout=5) # Wait up to 5 seconds for completion if __name__ == '__main__': # Start the background timer when the app starts start_batch_timer() app.run(debug=True, port=7860, host="0.0.0.0")