from flask import Flask, request, jsonify, send_file from flask_cors import CORS from datetime import datetime, timedelta from pymongo import MongoClient from bson import ObjectId import jwt, bcrypt, secrets, io, csv, re,os app = Flask(__name__) CORS(app) # ----------------------------- # Config # ----------------------------- app.config["SECRET_KEY"] = "supersecretjwtkey" app.config["JWT_EXP_DELTA_SECONDS"] = 3600 # ✅ MongoDB Atlas connection client = MongoClient( "mongodb+srv://drroshini16_db_user:RPmF63fvR3eiZvjD@cluster0.pdu4tsr.mongodb.net/?retryWrites=true&w=majority&appName=Cluster0" ) db = client["student_admin"] # ----------------------------- # Helper Functions # ----------------------------- def encode_token(user_id, role): payload = { "user_id": str(user_id), "role": role, "exp": datetime.utcnow() + timedelta(seconds=app.config["JWT_EXP_DELTA_SECONDS"]) } return jwt.encode(payload, app.config["SECRET_KEY"], algorithm="HS256") def decode_token(token): try: return jwt.decode(token, app.config["SECRET_KEY"], algorithms=["HS256"]) except: return None def token_required(role=None): def decorator(f): def wrapper(*args, **kwargs): token = request.headers.get("Authorization", "").replace("Bearer ", "") if not token: return jsonify({"error": "Authentication required"}), 401 data = decode_token(token) if not data: return jsonify({"error": "Invalid or expired token"}), 401 if role and data.get("role") != role: return jsonify({"error": "Forbidden"}), 403 return f(*args, **kwargs, user_id=data["user_id"], role=data["role"]) wrapper.__name__ = f.__name__ return wrapper return decorator def hash_password(password): return bcrypt.hashpw(password.encode(), bcrypt.gensalt()) def verify_password(password, hashed): return bcrypt.checkpw(password.encode(), hashed) def safe_objectid(oid): try: return ObjectId(oid) except: return None def validate_password(password: str): """Simple password policy: at least 8 chars, 1 digit, 1 letter""" if len(password) < 8: return False if not re.search(r"[A-Za-z]", password): return False if not re.search(r"[0-9]", password): return False return True # ----------------------------- # AUTH ROUTES # ----------------------------- @app.route("/auth/signup", methods=["POST"]) def signup(): data = request.json email = data["email"].lower() if db.users.find_one({"email": email}): return jsonify({"error": "Email already exists"}), 400 if not validate_password(data["password"]): return jsonify({"error": "Weak password (min 8 chars, include letters & numbers)"}), 400 hashed = hash_password(data["password"]) user = { "name": data["name"], "email": email, "password": hashed, "role": "student", "phone": "", "dob": "", "address": "", "profile_picture": "", "blocked": False } res = db.users.insert_one(user) return jsonify({"id": str(res.inserted_id), "message": "User created"}), 201 @app.route("/auth/login", methods=["POST"]) def login(): data = request.json email = data["email"].lower() user = db.users.find_one({"email": email}) if not user or not verify_password(data["password"], user["password"]): return jsonify({"error": "Invalid credentials"}), 401 if user.get("blocked", False): return jsonify({"error": "User blocked"}), 403 token = encode_token(user["_id"], user["role"]) return jsonify({"token": token}) @app.route("/auth/forgot-password", methods=["POST"]) def forgot_password(): email = request.json.get("email", "").lower() user = db.users.find_one({"email": email}) if not user: return jsonify({"error": "Email not found"}), 404 reset_token = secrets.token_urlsafe(16) db.users.update_one( {"_id": user["_id"]}, {"$set": {"reset_token": reset_token, "reset_expires": datetime.utcnow() + timedelta(hours=1)}} ) # ⚠️ In production, send email with reset link instead of returning token return jsonify({"reset_token": reset_token}) @app.route("/auth/reset-password", methods=["POST"]) def reset_password(): data = request.json email = data["email"].lower() user = db.users.find_one({"email": email, "reset_token": data["token"]}) if not user or datetime.utcnow() > user.get("reset_expires", datetime.utcnow()): return jsonify({"error": "Invalid or expired token"}), 400 if not validate_password(data["new_password"]): return jsonify({"error": "Weak password (min 8 chars, include letters & numbers)"}), 400 hashed = hash_password(data["new_password"]) db.users.update_one( {"_id": user["_id"]}, {"$set": {"password": hashed}, "$unset": {"reset_token": "", "reset_expires": ""}} ) return jsonify({"message": "Password reset successful"}) # ----------------------------- # PROFILE ROUTES # ----------------------------- @app.route("/profile", methods=["GET"]) @token_required() def get_profile(user_id, role): obj_id = safe_objectid(user_id) if not obj_id: return jsonify({"error": "Invalid user ID"}), 400 user = db.users.find_one({"_id": obj_id}, {"password": 0}) if not user: return jsonify({"error": "User not found"}), 404 user["_id"] = str(user["_id"]) return jsonify(user) @app.route("/profile", methods=["PUT"]) @token_required() def update_profile(user_id, role): obj_id = safe_objectid(user_id) if not obj_id: return jsonify({"error": "Invalid user ID"}), 400 data = request.json allowed_fields = ["name", "phone", "dob", "address", "profile_picture"] update_fields = {k: v for k, v in data.items() if k in allowed_fields} db.users.update_one({"_id": obj_id}, {"$set": update_fields}) return jsonify({"message": "Profile updated"}) @app.route("/profile/change-password", methods=["POST"]) @token_required() def change_password(user_id, role): obj_id = safe_objectid(user_id) if not obj_id: return jsonify({"error": "Invalid user ID"}), 400 data = request.json user = db.users.find_one({"_id": obj_id}) if not user or not verify_password(data["current_password"], user["password"]): return jsonify({"error": "Current password incorrect"}), 400 if not validate_password(data["new_password"]): return jsonify({"error": "Weak password"}), 400 db.users.update_one({"_id": obj_id}, {"$set": {"password": hash_password(data["new_password"])}}) return jsonify({"message": "Password changed"}) # ----------------------------- # COURSES ROUTES (Admin) # ----------------------------- @app.route("/courses", methods=["POST"]) @token_required(role="admin") def add_course(user_id, role): data = request.json if db.courses.find_one({"code": data["code"]}): return jsonify({"error": "Course already exists"}), 400 course = { "code": data["code"], "title": data["title"], "description": data.get("description", ""), "created_at": datetime.utcnow() } db.courses.insert_one(course) return jsonify({"message": "Course added"}) @app.route("/courses/", methods=["DELETE"]) @token_required(role="admin") def delete_course(user_id, role, code): db.courses.delete_one({"code": code}) return jsonify({"message": "Course deleted"}) @app.route("/courses", methods=["GET"]) @token_required() def list_courses(user_id, role): courses = list(db.courses.find({}, {"_id": 0})) return jsonify(courses) # ----------------------------- # FEEDBACK ROUTES # ----------------------------- @app.route("/feedback", methods=["POST"]) @token_required(role="student") def submit_feedback(user_id, role): data = request.json if not all(k in data for k in ["course_code", "rating", "message"]): return jsonify({"error": "Missing fields"}), 400 user_doc = db.users.find_one({"_id": safe_objectid(user_id)}) course_doc = db.courses.find_one({"code": data["course_code"]}) if not course_doc: return jsonify({"error": "Course not found"}), 404 db.feedback.insert_one({ "course_code": data["course_code"], "rating": data["rating"], "message": data["message"], "student_id": user_id, "student_name": user_doc["name"], "created_at": datetime.utcnow() }) return jsonify({"message": "Feedback submitted"}), 201 @app.route("/feedback", methods=["GET"]) @token_required(role="student") def list_my_feedback(user_id, role): page = int(request.args.get("page", 1)) per_page = int(request.args.get("per_page", 5)) feedbacks = list(db.feedback.find({"student_id": user_id}).skip((page-1)*per_page).limit(per_page)) for f in feedbacks: f["_id"] = str(f["_id"]) return jsonify(feedbacks) @app.route("/feedback/", methods=["PUT"]) @token_required(role="student") def edit_feedback(user_id, role, feedback_id): fid = safe_objectid(feedback_id) if not fid: return jsonify({"error": "Invalid feedback ID"}), 400 data = request.json db.feedback.update_one({"_id": fid, "student_id": user_id}, {"$set": data}) return jsonify({"message": "Feedback updated"}) @app.route("/feedback/", methods=["DELETE"]) @token_required(role="student") def delete_feedback(user_id, role, feedback_id): fid = safe_objectid(feedback_id) if not fid: return jsonify({"error": "Invalid feedback ID"}), 400 db.feedback.delete_one({"_id": fid, "student_id": user_id}) return jsonify({"message": "Feedback deleted"}) # ----------------------------- # ADMIN FEEDBACK MANAGEMENT # ----------------------------- @app.route("/admin/stats", methods=["GET"]) @token_required(role="admin") def admin_stats(user_id, role): total_feedback = db.feedback.count_documents({}) total_students = db.users.count_documents({"role": "student"}) pipeline = [{"$group": {"_id": "$course_code", "average_rating": {"$avg": "$rating"}, "count": {"$sum": 1}}}] trends = list(db.feedback.aggregate(pipeline)) return jsonify({"total_feedback": total_feedback, "total_students": total_students, "feedback_trends": trends}) @app.route("/admin/students", methods=["GET"]) @token_required(role="admin") def list_students(user_id, role): students = list(db.users.find({"role": "student"}, {"password": 0})) for s in students: s["_id"] = str(s["_id"]) return jsonify(students) @app.route("/admin/students//block", methods=["POST"]) @token_required(role="admin") def block_unblock_student(user_id, role, student_id): obj_id = safe_objectid(student_id) if not obj_id: return jsonify({"error": "Invalid student ID"}), 400 action = request.json.get("action") if action not in ["block", "unblock"]: return jsonify({"error": "Invalid action, must be 'block' or 'unblock'"}), 400 blocked = True if action == "block" else False db.users.update_one({"_id": obj_id}, {"$set": {"blocked": blocked}}) return jsonify({"message": f"Student {'blocked' if blocked else 'unblocked'}"}) @app.route("/admin/students/", methods=["DELETE"]) @token_required(role="admin") def delete_student(user_id, role, student_id): obj_id = safe_objectid(student_id) if not obj_id: return jsonify({"error": "Invalid student ID"}), 400 db.users.delete_one({"_id": obj_id}) return jsonify({"message": "Student deleted"}) @app.route("/admin/feedback", methods=["GET"]) @token_required(role="admin") def view_all_feedback(user_id, role): query = {} course_filter = request.args.get("course") rating_filter = request.args.get("rating") student_filter = request.args.get("student") if course_filter: query["course_code"] = course_filter if rating_filter: try: query["rating"] = int(rating_filter) except: return jsonify({"error": "Invalid rating filter"}), 400 if student_filter: query["student_name"] = student_filter feedbacks = list(db.feedback.find(query)) for f in feedbacks: f["_id"] = str(f["_id"]) return jsonify(feedbacks) @app.route("/feedback/export", methods=["GET"]) @token_required(role="admin") def export_feedback(user_id, role): feedbacks = list(db.feedback.find({})) output = io.StringIO() writer = csv.writer(output) writer.writerow(["course_code","student_name","rating","message","created_at"]) for f in feedbacks: writer.writerow([f["course_code"], f["student_name"], f["rating"], f["message"], f["created_at"]]) output.seek(0) return send_file(io.BytesIO(output.getvalue().encode()), mimetype="text/csv", as_attachment=True, download_name="feedback_export.csv") # ----------------------------- # RUN APP # ----------------------------- if __name__ == "__main__": # Create default admin if not exists if not db.users.find_one({"email": "admin123@example.com"}): db.users.insert_one({ "name": "Admin User", "email": "admin123@example.com", "password": hash_password("Admin@1234"), "role": "admin" }) port = int(os.environ.get("PORT", 7860)) # use HF-provided port or default 7860 app.run(host="0.0.0.0", port=port, debug=True)