Quivara's picture
Update alisto_project/backend/app.py
46519b8 verified
import os
import csv
import io
from flask import Flask, render_template, jsonify, request, Response, redirect, url_for
from flask_cors import CORS
from flask_login import LoginManager, UserMixin, login_user, login_required, logout_user, current_user
from werkzeug.security import generate_password_hash, check_password_hash
from sqlalchemy import func
from models import db, DisasterPost
# 1. CONFIG: FLAT STRUCTURE (Look in current directory '.')
# initializes the Flask application
app = Flask(__name__, static_url_path='', static_folder='.', template_folder='.')
# sets a secret key for session management and security
app.secret_key = "alisto_secret_key_secure"
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
DB_PATH = os.path.join(BASE_DIR, 'alisto.db')
# configures the application to use the SQLite database
app.config['SQLALCHEMY_DATABASE_URI'] = f'sqlite:///{DB_PATH}'
# disables modification tracking to save resources
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
# enables Cross-Origin Resource Sharing for API requests
CORS(app)
# initializes the SQLAlchemy database object with the Flask app
db.init_app(app)
# 2. AUTH CONFIG
# initializes Flask-Login manager for user session handling
login_manager = LoginManager()
login_manager.init_app(app)
# sets the default view for unauthenticated users
login_manager.login_view = 'login_page'
# defines the User model for database and login functionality
class User(UserMixin, db.Model):
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(100), unique=True)
password = db.Column(db.String(100))
# flask-Login callback to reload the user object from the user ID stored in the session
@login_manager.user_loader
def load_user(user_id):
return User.query.get(int(user_id))
# creates the default 'admin' user if it does not already exist in the database
def create_admin():
with app.app_context():
db.create_all()
if not User.query.filter_by(username='admin').first():
hashed = generate_password_hash('admin123', method='pbkdf2:sha256')
db.session.add(User(username='admin', password=hashed))
db.session.commit()
print("✅ Admin Created: admin / admin123")
# 3. ROUTES
# route to display the login page template
@app.route('/login')
def login_page():
if current_user.is_authenticated:
return redirect(url_for('index'))
return render_template('login.html')
# route to display the main dashboard (requires login)
@app.route('/')
@login_required
def index():
return render_template('index.html')
# route to display the live map view (requires login)
@app.route('/map')
@login_required
def map_view():
return render_template('map.html')
# --- API ROUTES ---
# api endpoint for user authentication via POST request
@app.route('/api/login', methods=['POST'])
def login_api():
data = request.get_json()
user = User.query.filter_by(username=data.get('username')).first()
# checks hashed password and logs the user in on success
if user and check_password_hash(user.password, data.get('password')):
login_user(user)
return jsonify({"message": "Success"})
return jsonify({"message": "Invalid credentials"}), 401
# api endpoint for logging out the current user
@app.route('/api/logout')
@login_required
def logout():
logout_user()
return redirect(url_for('login_page'))
# api endpoint to fetch filtered, sorted, and paginated disaster posts
@app.route('/api/posts')
@login_required
def get_posts():
search_query = request.args.get('query')
sort_order = request.args.get('sort', 'newest')
view_mode = request.args.get('view', 'active')
urgency_filter = request.args.get('urgency', 'all')
type_filter = request.args.get('type', 'all')
assist_filter = request.args.get('assist', 'all')
posts_query = DisasterPost.query
posts_query = DisasterPost.query
# applies full-text search filter across title, content, and location
if search_query:
pattern = f"%{search_query}%"
posts_query = posts_query.filter(DisasterPost.title.ilike(pattern) | DisasterPost.content.ilike(pattern) | DisasterPost.location.ilike(pattern))
# --- MODIFIED VIEW MODE FILTER LOGIC ---
# filters to show only resolved/archived posts
if view_mode == 'archived':
posts_query = posts_query.filter(DisasterPost.status == 'Resolved')
# filters to show only active (New or Verified) posts
elif view_mode == 'active':
posts_query = posts_query.filter(DisasterPost.status.in_(['New', 'Verified']))
# applies filter based on the 'urgency_level' parameter
if urgency_filter != 'all': posts_query = posts_query.filter(DisasterPost.urgency_level == urgency_filter)
# applies sorting by timestamp (newest or oldest)
if sort_order == 'oldest': posts_query = posts_query.order_by(DisasterPost.timestamp.asc())
else: posts_query = posts_query.order_by(DisasterPost.timestamp.desc())
# applies filter based on the 'urgency_level' parameter
if urgency_filter != 'all': posts_query = posts_query.filter(DisasterPost.urgency_level == urgency_filter)
# applies filter based on the 'disaster_type' parameter
if type_filter != 'all': posts_query = posts_query.filter(DisasterPost.disaster_type == type_filter)
# applies filter based on the 'assistance_type' parameter
if assist_filter != 'all': posts_query = posts_query.filter(DisasterPost.assistance_type == assist_filter)
# returns the first 100 posts matching the filters as a JSON array
return jsonify([p.to_dict() for p in posts_query.limit(100).all()])
# api endpoint to update the status (New, Verified, Resolved) of a specific post ID
@app.route('/api/posts/<int:post_id>/status', methods=['POST'])
@login_required
def update_status(post_id):
data = request.get_json()
post = DisasterPost.query.get(post_id)
# finds the post and updates its 'status' field
if post:
post.status = data.get('status')
db.session.commit()
return jsonify({"success": True})
return jsonify({"error": "Not found"}), 404
# api endpoint to fetch statistics (counts by disaster type and urgency level) for charts
@app.route('/api/stats')
@login_required
def get_stats():
# queries the count of active posts grouped by disaster type
disaster_counts = db.session.query(DisasterPost.disaster_type, func.count(DisasterPost.id)).filter(DisasterPost.status.in_(['New', 'Verified'])).group_by(DisasterPost.disaster_type).all()
# queries the count of active posts grouped by urgency level
urgency_counts = db.session.query(DisasterPost.urgency_level, func.count(DisasterPost.id)).filter(DisasterPost.status.in_(['New', 'Verified'])).group_by(DisasterPost.urgency_level).all()
# returns both sets of counts as a single JSON object
return jsonify({"disaster_types": dict(disaster_counts), "urgency_levels": dict(urgency_counts)})
# api endpoint to export the full post database as a CSV file
@app.route('/api/export')
@login_required
def export_csv():
# queries ALL posts in the database, regardless of status
posts = DisasterPost.query.order_by(DisasterPost.timestamp.desc()).all()
output = io.StringIO()
writer = csv.writer(output)
# writes the header row with all necessary triage columns
writer.writerow([
'ID', 'Time', 'Location', 'Contact Number',
'Disaster Type', 'Assistance Type', 'Urgency', 'Status', 'Content'
])
# iterates through posts and writes data rows to the CSV output stream
for p in posts:
writer.writerow([
p.id,
p.timestamp,
p.location,
p.contact_number,
p.disaster_type,
p.assistance_type,
p.urgency_level,
p.status,
p.content
])
output.seek(0)
# returns the CSV data as an attachment for download
return Response(output, mimetype="text/csv", headers={"Content-Disposition": "attachment;filename=alisto_full_report.csv"})
# api endpoint to check the current user's authentication status and return their username
@app.route('/api/user_status')
def user_status():
if current_user.is_authenticated:
return jsonify({"is_logged_in": True, "username": current_user.username})
return jsonify({"is_logged_in": False})
# runs the application on host 0.0.0.0 and creates the admin user on startup
# NEW CODE (Runs on Cloud too)
# Create the database and admin user immediately when loaded
with app.app_context():
create_admin()
if __name__ == '__main__':
app.run(debug=True, port=5000)