Spaces:
Running
Running
File size: 8,705 Bytes
bdb271a 46519b8 bdb271a 46519b8 bdb271a |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 |
import os
import csv
import io
from flask import Flask, render_template, jsonify, request, Response, redirect, url_for
from flask_cors import CORS
from flask_login import LoginManager, UserMixin, login_user, login_required, logout_user, current_user
from werkzeug.security import generate_password_hash, check_password_hash
from sqlalchemy import func
from models import db, DisasterPost
# 1. CONFIG: FLAT STRUCTURE (Look in current directory '.')
# initializes the Flask application
app = Flask(__name__, static_url_path='', static_folder='.', template_folder='.')
# sets a secret key for session management and security
app.secret_key = "alisto_secret_key_secure"
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
DB_PATH = os.path.join(BASE_DIR, 'alisto.db')
# configures the application to use the SQLite database
app.config['SQLALCHEMY_DATABASE_URI'] = f'sqlite:///{DB_PATH}'
# disables modification tracking to save resources
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
# enables Cross-Origin Resource Sharing for API requests
CORS(app)
# initializes the SQLAlchemy database object with the Flask app
db.init_app(app)
# 2. AUTH CONFIG
# initializes Flask-Login manager for user session handling
login_manager = LoginManager()
login_manager.init_app(app)
# sets the default view for unauthenticated users
login_manager.login_view = 'login_page'
# defines the User model for database and login functionality
class User(UserMixin, db.Model):
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(100), unique=True)
password = db.Column(db.String(100))
# flask-Login callback to reload the user object from the user ID stored in the session
@login_manager.user_loader
def load_user(user_id):
return User.query.get(int(user_id))
# creates the default 'admin' user if it does not already exist in the database
def create_admin():
with app.app_context():
db.create_all()
if not User.query.filter_by(username='admin').first():
hashed = generate_password_hash('admin123', method='pbkdf2:sha256')
db.session.add(User(username='admin', password=hashed))
db.session.commit()
print("✅ Admin Created: admin / admin123")
# 3. ROUTES
# route to display the login page template
@app.route('/login')
def login_page():
if current_user.is_authenticated:
return redirect(url_for('index'))
return render_template('login.html')
# route to display the main dashboard (requires login)
@app.route('/')
@login_required
def index():
return render_template('index.html')
# route to display the live map view (requires login)
@app.route('/map')
@login_required
def map_view():
return render_template('map.html')
# --- API ROUTES ---
# api endpoint for user authentication via POST request
@app.route('/api/login', methods=['POST'])
def login_api():
data = request.get_json()
user = User.query.filter_by(username=data.get('username')).first()
# checks hashed password and logs the user in on success
if user and check_password_hash(user.password, data.get('password')):
login_user(user)
return jsonify({"message": "Success"})
return jsonify({"message": "Invalid credentials"}), 401
# api endpoint for logging out the current user
@app.route('/api/logout')
@login_required
def logout():
logout_user()
return redirect(url_for('login_page'))
# api endpoint to fetch filtered, sorted, and paginated disaster posts
@app.route('/api/posts')
@login_required
def get_posts():
search_query = request.args.get('query')
sort_order = request.args.get('sort', 'newest')
view_mode = request.args.get('view', 'active')
urgency_filter = request.args.get('urgency', 'all')
type_filter = request.args.get('type', 'all')
assist_filter = request.args.get('assist', 'all')
posts_query = DisasterPost.query
posts_query = DisasterPost.query
# applies full-text search filter across title, content, and location
if search_query:
pattern = f"%{search_query}%"
posts_query = posts_query.filter(DisasterPost.title.ilike(pattern) | DisasterPost.content.ilike(pattern) | DisasterPost.location.ilike(pattern))
# --- MODIFIED VIEW MODE FILTER LOGIC ---
# filters to show only resolved/archived posts
if view_mode == 'archived':
posts_query = posts_query.filter(DisasterPost.status == 'Resolved')
# filters to show only active (New or Verified) posts
elif view_mode == 'active':
posts_query = posts_query.filter(DisasterPost.status.in_(['New', 'Verified']))
# applies filter based on the 'urgency_level' parameter
if urgency_filter != 'all': posts_query = posts_query.filter(DisasterPost.urgency_level == urgency_filter)
# applies sorting by timestamp (newest or oldest)
if sort_order == 'oldest': posts_query = posts_query.order_by(DisasterPost.timestamp.asc())
else: posts_query = posts_query.order_by(DisasterPost.timestamp.desc())
# applies filter based on the 'urgency_level' parameter
if urgency_filter != 'all': posts_query = posts_query.filter(DisasterPost.urgency_level == urgency_filter)
# applies filter based on the 'disaster_type' parameter
if type_filter != 'all': posts_query = posts_query.filter(DisasterPost.disaster_type == type_filter)
# applies filter based on the 'assistance_type' parameter
if assist_filter != 'all': posts_query = posts_query.filter(DisasterPost.assistance_type == assist_filter)
# returns the first 100 posts matching the filters as a JSON array
return jsonify([p.to_dict() for p in posts_query.limit(100).all()])
# api endpoint to update the status (New, Verified, Resolved) of a specific post ID
@app.route('/api/posts/<int:post_id>/status', methods=['POST'])
@login_required
def update_status(post_id):
data = request.get_json()
post = DisasterPost.query.get(post_id)
# finds the post and updates its 'status' field
if post:
post.status = data.get('status')
db.session.commit()
return jsonify({"success": True})
return jsonify({"error": "Not found"}), 404
# api endpoint to fetch statistics (counts by disaster type and urgency level) for charts
@app.route('/api/stats')
@login_required
def get_stats():
# queries the count of active posts grouped by disaster type
disaster_counts = db.session.query(DisasterPost.disaster_type, func.count(DisasterPost.id)).filter(DisasterPost.status.in_(['New', 'Verified'])).group_by(DisasterPost.disaster_type).all()
# queries the count of active posts grouped by urgency level
urgency_counts = db.session.query(DisasterPost.urgency_level, func.count(DisasterPost.id)).filter(DisasterPost.status.in_(['New', 'Verified'])).group_by(DisasterPost.urgency_level).all()
# returns both sets of counts as a single JSON object
return jsonify({"disaster_types": dict(disaster_counts), "urgency_levels": dict(urgency_counts)})
# api endpoint to export the full post database as a CSV file
@app.route('/api/export')
@login_required
def export_csv():
# queries ALL posts in the database, regardless of status
posts = DisasterPost.query.order_by(DisasterPost.timestamp.desc()).all()
output = io.StringIO()
writer = csv.writer(output)
# writes the header row with all necessary triage columns
writer.writerow([
'ID', 'Time', 'Location', 'Contact Number',
'Disaster Type', 'Assistance Type', 'Urgency', 'Status', 'Content'
])
# iterates through posts and writes data rows to the CSV output stream
for p in posts:
writer.writerow([
p.id,
p.timestamp,
p.location,
p.contact_number,
p.disaster_type,
p.assistance_type,
p.urgency_level,
p.status,
p.content
])
output.seek(0)
# returns the CSV data as an attachment for download
return Response(output, mimetype="text/csv", headers={"Content-Disposition": "attachment;filename=alisto_full_report.csv"})
# api endpoint to check the current user's authentication status and return their username
@app.route('/api/user_status')
def user_status():
if current_user.is_authenticated:
return jsonify({"is_logged_in": True, "username": current_user.username})
return jsonify({"is_logged_in": False})
# runs the application on host 0.0.0.0 and creates the admin user on startup
# NEW CODE (Runs on Cloud too)
# Create the database and admin user immediately when loaded
with app.app_context():
create_admin()
if __name__ == '__main__':
app.run(debug=True, port=5000) |