Spaces:
Sleeping
Sleeping
| """ | |
| FastAPI Backend for Smart Parking System - AI & Payment Module | |
| Serves AI model predictions and handles payment integration | |
| Note: WhatsApp/External integrations handled by other teams | |
| """ | |
| from fastapi import FastAPI, HTTPException, Request | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from pydantic import BaseModel | |
| from typing import List, Optional, Dict, Tuple | |
| import sys | |
| import os | |
| from datetime import datetime | |
| import json | |
| from dotenv import load_dotenv | |
| load_dotenv() | |
| # Add parent directory to path | |
| sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) | |
| from ai_model.parking_slot_classifier import ParkingSlotClassifier | |
| from backend.payment_service import PaymentService, PaymentSimulator | |
| from backend.database_api import DatabaseAPI, transform_parking_lots_for_ai | |
| app = FastAPI(title="Smart Parking API", version="1.0.0") | |
| # CORS middleware | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| # Initialize services | |
| classifier = ParkingSlotClassifier() | |
| payment_service = None | |
| payment_simulator = PaymentSimulator() | |
| db_api = DatabaseAPI() | |
| # Razorpay credentials (from environment or config) | |
| RAZORPAY_KEY_ID = os.getenv("RAZORPAY_KEY_ID") | |
| RAZORPAY_KEY_SECRET = os.getenv("RAZORPAY_KEY_SECRET") | |
| # In-memory feedback cache (feedback is stored in DB, this is just for quick access) | |
| feedback_cache = [] | |
| # Pydantic models | |
| class ParkingSlot(BaseModel): | |
| slot_id: str | |
| latitude: float | |
| longitude: float | |
| avg_feedback: float = 3.0 | |
| popularity_score: float = 0.5 | |
| is_available: bool = True | |
| price_factor: float = 0.5 | |
| price_per_hour: float = 50.0 | |
| class UserLocation(BaseModel): | |
| latitude: float | |
| longitude: float | |
| class SlotRecommendationRequest(BaseModel): | |
| user_location: UserLocation | |
| slots: List[ParkingSlot] | |
| top_k: int = 3 | |
| class FeedbackRequest(BaseModel): | |
| booking_id: str | |
| slot_id: str | |
| rating: float # 1-5 | |
| comment: Optional[str] = None | |
| user_satisfaction: bool | |
| class PaymentOrderRequest(BaseModel): | |
| booking_id: str | |
| slot_id: str | |
| amount: float | |
| duration_hours: int | |
| vehicle_number: str | |
| customer_name: str | |
| customer_contact: str | |
| customer_email: str | |
| class PaymentVerificationRequest(BaseModel): | |
| order_id: str | |
| payment_id: str | |
| signature: str | |
| # API Endpoints | |
| async def startup_event(): | |
| """Initialize services on startup""" | |
| global payment_service | |
| # Load or train the AI model | |
| try: | |
| classifier.load_model('parking_model.pkl') | |
| print("Model loaded successfully") | |
| except: | |
| print("Training new model...") | |
| from ai_model.parking_slot_classifier import generate_training_data | |
| training_data, labels = generate_training_data(500) | |
| classifier.train(training_data, labels) | |
| classifier.save_model('parking_model.pkl') | |
| # Initialize payment service | |
| try: | |
| payment_service = PaymentService(RAZORPAY_KEY_ID, RAZORPAY_KEY_SECRET) | |
| print("Payment service initialized") | |
| except Exception as e: | |
| print(f"Payment service initialization failed: {e}") | |
| async def root(): | |
| """Health check endpoint""" | |
| return { | |
| "status": "ok", | |
| "message": "Smart Parking API - AI + Payment Module", | |
| "version": "1.0.0", | |
| "database": "Connected to https://aadarshsenapati.in/api/api.php" | |
| } | |
| async def test_database(): | |
| """Test database API connection""" | |
| try: | |
| result = db_api.get_parking_lots() | |
| return { | |
| "success": result['status'] == 'success', | |
| "message": "Database connection test", | |
| "parking_lots_count": len(result.get('data', [])) if result['status'] == 'success' else 0, | |
| "sample_data": result.get('data', [])[:2] if result['status'] == 'success' else None | |
| } | |
| except Exception as e: | |
| return { | |
| "success": False, | |
| "error": str(e) | |
| } | |
| async def recommend_slots(request: SlotRecommendationRequest): | |
| """ | |
| Get recommended parking slots based on AI model | |
| Fetches real parking data from database | |
| """ | |
| try: | |
| # Option 1: Use slots provided in request (for testing/custom data) | |
| if request.slots: | |
| slots_data = [slot.dict() for slot in request.slots] | |
| else: | |
| # Option 2: Fetch available slots from database | |
| db_result = db_api.get_all_available_slots() | |
| if db_result['status'] != 'success': | |
| raise HTTPException(status_code=500, detail="Failed to fetch parking slots from database") | |
| # Transform database format to AI model format | |
| slots_data = transform_parking_lots_for_ai(db_result.get('data', [])) | |
| if not slots_data: | |
| return { | |
| "success": False, | |
| "message": "No parking slots available", | |
| "recommendations": [] | |
| } | |
| user_coords = (request.user_location.latitude, request.user_location.longitude) | |
| # Get recommendations from AI model | |
| recommended_slots = classifier.predict_best_slots( | |
| slots_data, | |
| user_coords, | |
| top_k=request.top_k | |
| ) | |
| return { | |
| "success": True, | |
| "recommendations": recommended_slots, | |
| "count": len(recommended_slots), | |
| "total_available": len(slots_data) | |
| } | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def submit_feedback(feedback: FeedbackRequest): | |
| """ | |
| Submit user feedback on slot assignment | |
| This data is used to retrain and improve the model | |
| Stores in database via API | |
| """ | |
| try: | |
| # First get the booking details to get booking ID (not UID) | |
| booking_result = db_api.get_booking_status(feedback.booking_id) | |
| if booking_result['status'] != 'success' or not booking_result.get('data'): | |
| # Cache feedback locally if booking not found | |
| feedback_entry = { | |
| "booking_uid": feedback.booking_id, | |
| "slot_id": feedback.slot_id, | |
| "rating": feedback.rating, | |
| "comment": feedback.comment, | |
| "user_satisfaction": feedback.user_satisfaction, | |
| "timestamp": datetime.now().isoformat() | |
| } | |
| feedback_cache.append(feedback_entry) | |
| return { | |
| "success": True, | |
| "message": "Feedback cached (booking not found in DB)", | |
| "cached": True | |
| } | |
| booking_data = booking_result['data'] | |
| # Add feedback to database | |
| db_result = db_api.add_feedback( | |
| user_id=booking_data.get('user_id', 1), | |
| booking_id=booking_data.get('id'), | |
| rating=feedback.rating, | |
| comments=feedback.comment or "" | |
| ) | |
| # Also cache locally for model retraining | |
| feedback_entry = { | |
| "booking_id": feedback.booking_id, | |
| "slot_id": feedback.slot_id, | |
| "rating": feedback.rating, | |
| "comment": feedback.comment, | |
| "user_satisfaction": feedback.user_satisfaction, | |
| "timestamp": datetime.now().isoformat() | |
| } | |
| feedback_cache.append(feedback_entry) | |
| return { | |
| "success": db_result['status'] == 'success', | |
| "message": db_result.get('message', 'Feedback submitted successfully'), | |
| "database": db_result['status'] == 'success' | |
| } | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def get_feedback_stats(): | |
| """ | |
| Get feedback statistics from cache | |
| """ | |
| if not feedback_cache: | |
| return { | |
| "total_feedback": 0, | |
| "average_rating": 0, | |
| "satisfaction_rate": 0 | |
| } | |
| total = len(feedback_cache) | |
| avg_rating = sum(f['rating'] for f in feedback_cache) / total | |
| satisfaction_rate = sum(1 for f in feedback_cache if f['user_satisfaction']) / total * 100 | |
| return { | |
| "total_feedback": total, | |
| "average_rating": round(avg_rating, 2), | |
| "satisfaction_rate": round(satisfaction_rate, 2) | |
| } | |
| async def create_payment_order(request: PaymentOrderRequest): | |
| """ | |
| Create a payment order for parking booking | |
| Integrates with database API for booking creation | |
| """ | |
| try: | |
| # Calculate start and end times | |
| from datetime import timedelta | |
| start_time = datetime.now() | |
| end_time = start_time + timedelta(hours=request.duration_hours) | |
| start_time_str = start_time.strftime("%Y-%m-%d %H:%M:%S") | |
| end_time_str = end_time.strftime("%Y-%m-%d %H:%M:%S") | |
| # First, ensure user is registered (or register them) | |
| user_result = db_api.register_user( | |
| name=request.customer_name, | |
| phone=request.customer_contact, | |
| email=request.customer_email | |
| ) | |
| # Get slot ID from database (assuming slot_id in request is the slot name/identifier) | |
| # For now, we'll use a placeholder user_id=1 if registration fails | |
| user_id = 1 # This should be retrieved from user_result in production | |
| # Parse slot_id to get database ID if needed | |
| # If slot_id is like "A1", we need to find its DB ID | |
| # For now, assume slot_id is already the DB ID or can be used directly | |
| slot_db_id = request.slot_id | |
| # Try to parse if it's a number string | |
| try: | |
| slot_db_id = int(request.slot_id) | |
| except: | |
| # If slot_id is name like "A1", we'd need to fetch from DB | |
| # For now, use a default | |
| slot_db_id = 1 | |
| # Create booking in database | |
| booking_result = db_api.book_slot( | |
| user_id=user_id, | |
| slot_id=slot_db_id, | |
| start_time=start_time_str, | |
| end_time=end_time_str, | |
| total_amount=request.amount | |
| ) | |
| if booking_result['status'] != 'success': | |
| raise HTTPException(status_code=500, detail=f"Booking failed: {booking_result.get('message')}") | |
| booking_uid = booking_result.get('booking_uid') | |
| # Use simulator if payment service is not available | |
| use_simulator = payment_service is None | |
| if use_simulator: | |
| # Simulate payment | |
| order = { | |
| 'success': True, | |
| 'order_id': f'order_sim_{booking_uid}', | |
| 'amount': request.amount, | |
| 'currency': 'INR', | |
| 'status': 'created', | |
| 'mode': 'simulation' | |
| } | |
| else: | |
| # Real payment | |
| order = payment_service.create_payment_order( | |
| amount=request.amount, | |
| booking_id=booking_uid, | |
| notes={ | |
| 'booking_uid': booking_uid, | |
| 'slot_id': request.slot_id, | |
| 'duration': f'{request.duration_hours} hours', | |
| 'vehicle_number': request.vehicle_number | |
| } | |
| ) | |
| return { | |
| **order, | |
| 'booking_uid': booking_uid, | |
| 'start_time': start_time_str, | |
| 'end_time': end_time_str | |
| } | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def create_payment_link(request: PaymentOrderRequest): | |
| """ | |
| Create a payment link for WhatsApp sharing | |
| """ | |
| try: | |
| if payment_service is None: | |
| # Return simulated link | |
| return { | |
| 'success': True, | |
| 'short_url': f'https://rzp.io/l/sim_{request.booking_id}', | |
| 'mode': 'simulation', | |
| 'message': 'This is a simulated payment link' | |
| } | |
| link = payment_service.create_payment_link( | |
| amount=request.amount, | |
| description=f"Parking Slot {request.slot_id} - {request.duration_hours} hours", | |
| customer_name=request.customer_name, | |
| customer_contact=request.customer_contact, | |
| customer_email=request.customer_email, | |
| booking_id=request.booking_id | |
| ) | |
| return link | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def verify_payment(request: PaymentVerificationRequest): | |
| """ | |
| Verify payment signature after payment completion | |
| Updates payment status in database | |
| """ | |
| try: | |
| if payment_service is None: | |
| # Simulate verification | |
| # Update database payment status | |
| update_result = db_api.update_payment( | |
| booking_uid=request.order_id.replace('order_sim_', ''), | |
| payment_status='Paid', | |
| transaction_id=request.payment_id, | |
| amount=0 | |
| ) | |
| return { | |
| 'success': True, | |
| 'verified': True, | |
| 'mode': 'simulation', | |
| 'database_updated': update_result['status'] == 'success' | |
| } | |
| is_valid = payment_service.verify_payment_signature( | |
| request.order_id, | |
| request.payment_id, | |
| request.signature, | |
| RAZORPAY_KEY_SECRET | |
| ) | |
| if is_valid: | |
| # Extract booking_uid from order_id if needed | |
| # Update database | |
| # Note: We'd need to store order_id -> booking_uid mapping | |
| pass | |
| return { | |
| 'success': True, | |
| 'verified': is_valid | |
| } | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def simulate_payment(booking_uid: str, amount: float): | |
| """ | |
| Simulate a payment (for testing purposes) | |
| Updates database with simulated payment | |
| """ | |
| try: | |
| # Check if booking exists in database | |
| booking_result = db_api.get_booking_status(booking_uid) | |
| if booking_result['status'] != 'success': | |
| raise HTTPException(status_code=404, detail="Booking not found") | |
| # Simulate payment | |
| order_id = f"order_{booking_uid}" | |
| simulated = payment_simulator.simulate_payment(order_id, amount) | |
| # Update booking payment status in database | |
| update_result = db_api.update_payment( | |
| booking_uid=booking_uid, | |
| payment_status='Paid', | |
| transaction_id=simulated['payment_id'], | |
| amount=amount | |
| ) | |
| return { | |
| **simulated, | |
| 'database_updated': update_result['status'] == 'success' | |
| } | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def get_booking(booking_uid: str): | |
| """ | |
| Get booking details from database | |
| """ | |
| try: | |
| result = db_api.get_booking_status(booking_uid) | |
| if result['status'] != 'success' or not result.get('data'): | |
| raise HTTPException(status_code=404, detail="Booking not found") | |
| return { | |
| "success": True, | |
| "booking": result['data'] | |
| } | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def retrain_model(): | |
| """ | |
| Retrain the AI model with new feedback data from cache | |
| Should be called periodically or when sufficient new feedback is collected | |
| """ | |
| try: | |
| if len(feedback_cache) < 10: | |
| return { | |
| "success": False, | |
| "message": f"Insufficient feedback data for retraining (need 10+, have {len(feedback_cache)})" | |
| } | |
| # Convert feedback to training data | |
| # This is simplified - in production, you'd have more sophisticated feature engineering | |
| training_samples = [] | |
| labels = [] | |
| for feedback in feedback_cache: | |
| # Create a simplified training sample | |
| sample = { | |
| 'slot_id': feedback['slot_id'], | |
| 'avg_feedback': feedback['rating'], | |
| 'proximity_score': 0.5, # Would come from actual data | |
| 'popularity_score': 0.5, | |
| 'is_available': True, | |
| 'price_factor': 0.5 | |
| } | |
| training_samples.append(sample) | |
| labels.append(1 if feedback['user_satisfaction'] else 0) | |
| # Retrain model | |
| classifier.train(training_samples, labels) | |
| classifier.save_model('parking_model.pkl') | |
| return { | |
| "success": True, | |
| "message": f"Model retrained with {len(training_samples)} samples", | |
| "samples_used": len(training_samples) | |
| } | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| if __name__ == "__main__": | |
| import uvicorn | |
| uvicorn.run(app, host="0.0.0.0", port=8000) | |