import os import csv import io from flask import Flask, render_template, jsonify, request, Response, redirect, url_for from flask_cors import CORS from flask_login import LoginManager, UserMixin, login_user, login_required, logout_user, current_user from werkzeug.security import generate_password_hash, check_password_hash from sqlalchemy import func from models import db, DisasterPost # 1. CONFIG: FLAT STRUCTURE (Look in current directory '.') # initializes the Flask application app = Flask(__name__, static_url_path='', static_folder='.', template_folder='.') # sets a secret key for session management and security app.secret_key = "alisto_secret_key_secure" BASE_DIR = os.path.dirname(os.path.abspath(__file__)) DB_PATH = os.path.join(BASE_DIR, 'alisto.db') # configures the application to use the SQLite database app.config['SQLALCHEMY_DATABASE_URI'] = f'sqlite:///{DB_PATH}' # disables modification tracking to save resources app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False # enables Cross-Origin Resource Sharing for API requests CORS(app) # initializes the SQLAlchemy database object with the Flask app db.init_app(app) # 2. AUTH CONFIG # initializes Flask-Login manager for user session handling login_manager = LoginManager() login_manager.init_app(app) # sets the default view for unauthenticated users login_manager.login_view = 'login_page' # defines the User model for database and login functionality class User(UserMixin, db.Model): id = db.Column(db.Integer, primary_key=True) username = db.Column(db.String(100), unique=True) password = db.Column(db.String(100)) # flask-Login callback to reload the user object from the user ID stored in the session @login_manager.user_loader def load_user(user_id): return User.query.get(int(user_id)) # creates the default 'admin' user if it does not already exist in the database def create_admin(): with app.app_context(): db.create_all() if not User.query.filter_by(username='admin').first(): hashed = generate_password_hash('admin123', method='pbkdf2:sha256') db.session.add(User(username='admin', password=hashed)) db.session.commit() print("✅ Admin Created: admin / admin123") # 3. ROUTES # route to display the login page template @app.route('/login') def login_page(): if current_user.is_authenticated: return redirect(url_for('index')) return render_template('login.html') # route to display the main dashboard (requires login) @app.route('/') @login_required def index(): return render_template('index.html') # route to display the live map view (requires login) @app.route('/map') @login_required def map_view(): return render_template('map.html') # --- API ROUTES --- # api endpoint for user authentication via POST request @app.route('/api/login', methods=['POST']) def login_api(): data = request.get_json() user = User.query.filter_by(username=data.get('username')).first() # checks hashed password and logs the user in on success if user and check_password_hash(user.password, data.get('password')): login_user(user) return jsonify({"message": "Success"}) return jsonify({"message": "Invalid credentials"}), 401 # api endpoint for logging out the current user @app.route('/api/logout') @login_required def logout(): logout_user() return redirect(url_for('login_page')) # api endpoint to fetch filtered, sorted, and paginated disaster posts @app.route('/api/posts') @login_required def get_posts(): search_query = request.args.get('query') sort_order = request.args.get('sort', 'newest') view_mode = request.args.get('view', 'active') urgency_filter = request.args.get('urgency', 'all') type_filter = request.args.get('type', 'all') assist_filter = request.args.get('assist', 'all') posts_query = DisasterPost.query posts_query = DisasterPost.query # applies full-text search filter across title, content, and location if search_query: pattern = f"%{search_query}%" posts_query = posts_query.filter(DisasterPost.title.ilike(pattern) | DisasterPost.content.ilike(pattern) | DisasterPost.location.ilike(pattern)) # --- MODIFIED VIEW MODE FILTER LOGIC --- # filters to show only resolved/archived posts if view_mode == 'archived': posts_query = posts_query.filter(DisasterPost.status == 'Resolved') # filters to show only active (New or Verified) posts elif view_mode == 'active': posts_query = posts_query.filter(DisasterPost.status.in_(['New', 'Verified'])) # applies filter based on the 'urgency_level' parameter if urgency_filter != 'all': posts_query = posts_query.filter(DisasterPost.urgency_level == urgency_filter) # applies sorting by timestamp (newest or oldest) if sort_order == 'oldest': posts_query = posts_query.order_by(DisasterPost.timestamp.asc()) else: posts_query = posts_query.order_by(DisasterPost.timestamp.desc()) # applies filter based on the 'urgency_level' parameter if urgency_filter != 'all': posts_query = posts_query.filter(DisasterPost.urgency_level == urgency_filter) # applies filter based on the 'disaster_type' parameter if type_filter != 'all': posts_query = posts_query.filter(DisasterPost.disaster_type == type_filter) # applies filter based on the 'assistance_type' parameter if assist_filter != 'all': posts_query = posts_query.filter(DisasterPost.assistance_type == assist_filter) # returns the first 100 posts matching the filters as a JSON array return jsonify([p.to_dict() for p in posts_query.limit(100).all()]) # api endpoint to update the status (New, Verified, Resolved) of a specific post ID @app.route('/api/posts//status', methods=['POST']) @login_required def update_status(post_id): data = request.get_json() post = DisasterPost.query.get(post_id) # finds the post and updates its 'status' field if post: post.status = data.get('status') db.session.commit() return jsonify({"success": True}) return jsonify({"error": "Not found"}), 404 # api endpoint to fetch statistics (counts by disaster type and urgency level) for charts @app.route('/api/stats') @login_required def get_stats(): # queries the count of active posts grouped by disaster type disaster_counts = db.session.query(DisasterPost.disaster_type, func.count(DisasterPost.id)).filter(DisasterPost.status.in_(['New', 'Verified'])).group_by(DisasterPost.disaster_type).all() # queries the count of active posts grouped by urgency level urgency_counts = db.session.query(DisasterPost.urgency_level, func.count(DisasterPost.id)).filter(DisasterPost.status.in_(['New', 'Verified'])).group_by(DisasterPost.urgency_level).all() # returns both sets of counts as a single JSON object return jsonify({"disaster_types": dict(disaster_counts), "urgency_levels": dict(urgency_counts)}) # api endpoint to export the full post database as a CSV file @app.route('/api/export') @login_required def export_csv(): # queries ALL posts in the database, regardless of status posts = DisasterPost.query.order_by(DisasterPost.timestamp.desc()).all() output = io.StringIO() writer = csv.writer(output) # writes the header row with all necessary triage columns writer.writerow([ 'ID', 'Time', 'Location', 'Contact Number', 'Disaster Type', 'Assistance Type', 'Urgency', 'Status', 'Content' ]) # iterates through posts and writes data rows to the CSV output stream for p in posts: writer.writerow([ p.id, p.timestamp, p.location, p.contact_number, p.disaster_type, p.assistance_type, p.urgency_level, p.status, p.content ]) output.seek(0) # returns the CSV data as an attachment for download return Response(output, mimetype="text/csv", headers={"Content-Disposition": "attachment;filename=alisto_full_report.csv"}) # api endpoint to check the current user's authentication status and return their username @app.route('/api/user_status') def user_status(): if current_user.is_authenticated: return jsonify({"is_logged_in": True, "username": current_user.username}) return jsonify({"is_logged_in": False}) # runs the application on host 0.0.0.0 and creates the admin user on startup # NEW CODE (Runs on Cloud too) # Create the database and admin user immediately when loaded with app.app_context(): create_admin() if __name__ == '__main__': app.run(debug=True, port=5000)