Spaces:
Sleeping
Sleeping
| from flask import Flask, request, jsonify, render_template_string | |
| import psycopg2 | |
| import os | |
| app = Flask(__name__) | |
| DB_PARAMS = { | |
| 'dbname': os.getenv('POSTGRES_DB', 'mydatabase'), | |
| 'user': os.getenv('POSTGRES_USER', 'myuser'), | |
| 'password': os.getenv('POSTGRES_PASSWORD', 'mypassword'), | |
| 'host': 'localhost', | |
| 'port': 5432 | |
| } | |
| def get_conn(): | |
| return psycopg2.connect(**DB_PARAMS) | |
| def index(): | |
| html = """ | |
| <html> | |
| <head> | |
| <title>Database Status</title> | |
| <style> | |
| body { background-color: #121212; color: #eeeeee; font-family: sans-serif; padding: 2em; } | |
| textarea, input, button { background-color: #222; color: #eee; border: 1px solid #555; padding: 5px; } | |
| button { cursor: pointer; } | |
| </style> | |
| </head> | |
| <body> | |
| <h1>Database Status</h1> | |
| <div id="status">Loading...</div> | |
| <h2>Run Query</h2> | |
| <textarea id="query" rows="4" cols="60">SELECT * FROM pg_tables LIMIT 5;</textarea><br> | |
| <button onclick="runQuery()">Execute</button> | |
| <h3>Results</h3> | |
| <pre id="results"></pre> | |
| <script> | |
| async function loadStatus() { | |
| const res = await fetch('/api/health'); | |
| const data = await res.json(); | |
| document.getElementById('status').textContent = | |
| 'Health: ' + data.status + ' | ' + | |
| 'Connected: ' + data.connected; | |
| } | |
| async function runQuery() { | |
| const sql = document.getElementById('query').value; | |
| const res = await fetch('/api/query', { | |
| method: 'POST', | |
| headers: { 'Content-Type': 'application/json' }, | |
| body: JSON.stringify({ query: sql }) | |
| }); | |
| const data = await res.json(); | |
| document.getElementById('results').textContent = | |
| JSON.stringify(data, null, 2); | |
| } | |
| loadStatus(); | |
| </script> | |
| </body> | |
| </html> | |
| """ | |
| return render_template_string(html) | |
| def health(): | |
| try: | |
| conn = get_conn() | |
| conn.close() | |
| return jsonify({'status': 'ok', 'connected': True}) | |
| except Exception as e: | |
| return jsonify({'status': 'error', 'connected': False, 'detail': str(e)}), 500 | |
| def query(): | |
| data = request.get_json() | |
| sql = data.get("query", "") | |
| try: | |
| conn = get_conn() | |
| cur = conn.cursor() | |
| cur.execute(sql) | |
| if cur.description: | |
| rows = cur.fetchall() | |
| columns = [desc[0] for desc in cur.description] | |
| result = [dict(zip(columns, row)) for row in rows] | |
| else: | |
| conn.commit() | |
| result = {"rows_affected": cur.rowcount} | |
| cur.close() | |
| conn.close() | |
| return jsonify(result) | |
| except Exception as e: | |
| return jsonify({"error": str(e)}), 400 | |
| if __name__ == "__main__": | |
| app.run(host="0.0.0.0", port=7860) |