code-strom / backend /main.py
suryateja008's picture
Upload 3 files
38a0f55 verified
"""
FastAPI Backend for Smart Parking System - AI & Payment Module
Serves AI model predictions and handles payment integration
Note: WhatsApp/External integrations handled by other teams
"""
from fastapi import FastAPI, HTTPException, Request
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from typing import List, Optional, Dict, Tuple
import sys
import os
from datetime import datetime
import json
from dotenv import load_dotenv
load_dotenv()
# Add parent directory to path
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from ai_model.parking_slot_classifier import ParkingSlotClassifier
from backend.payment_service import PaymentService, PaymentSimulator
from backend.database_api import DatabaseAPI, transform_parking_lots_for_ai
app = FastAPI(title="Smart Parking API", version="1.0.0")
# CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Initialize services
classifier = ParkingSlotClassifier()
payment_service = None
payment_simulator = PaymentSimulator()
db_api = DatabaseAPI()
# Razorpay credentials (from environment or config)
RAZORPAY_KEY_ID = os.getenv("RAZORPAY_KEY_ID")
RAZORPAY_KEY_SECRET = os.getenv("RAZORPAY_KEY_SECRET")
# In-memory feedback cache (feedback is stored in DB, this is just for quick access)
feedback_cache = []
# Pydantic models
class ParkingSlot(BaseModel):
slot_id: str
latitude: float
longitude: float
avg_feedback: float = 3.0
popularity_score: float = 0.5
is_available: bool = True
price_factor: float = 0.5
price_per_hour: float = 50.0
class UserLocation(BaseModel):
latitude: float
longitude: float
class SlotRecommendationRequest(BaseModel):
user_location: UserLocation
slots: List[ParkingSlot]
top_k: int = 3
class FeedbackRequest(BaseModel):
booking_id: str
slot_id: str
rating: float # 1-5
comment: Optional[str] = None
user_satisfaction: bool
class PaymentOrderRequest(BaseModel):
booking_id: str
slot_id: str
amount: float
duration_hours: int
vehicle_number: str
customer_name: str
customer_contact: str
customer_email: str
class PaymentVerificationRequest(BaseModel):
order_id: str
payment_id: str
signature: str
# API Endpoints
@app.on_event("startup")
async def startup_event():
"""Initialize services on startup"""
global payment_service
# Load or train the AI model
try:
classifier.load_model('parking_model.pkl')
print("Model loaded successfully")
except:
print("Training new model...")
from ai_model.parking_slot_classifier import generate_training_data
training_data, labels = generate_training_data(500)
classifier.train(training_data, labels)
classifier.save_model('parking_model.pkl')
# Initialize payment service
try:
payment_service = PaymentService(RAZORPAY_KEY_ID, RAZORPAY_KEY_SECRET)
print("Payment service initialized")
except Exception as e:
print(f"Payment service initialization failed: {e}")
@app.get("/")
async def root():
"""Health check endpoint"""
return {
"status": "ok",
"message": "Smart Parking API - AI + Payment Module",
"version": "1.0.0",
"database": "Connected to https://aadarshsenapati.in/api/api.php"
}
@app.get("/api/test-database")
async def test_database():
"""Test database API connection"""
try:
result = db_api.get_parking_lots()
return {
"success": result['status'] == 'success',
"message": "Database connection test",
"parking_lots_count": len(result.get('data', [])) if result['status'] == 'success' else 0,
"sample_data": result.get('data', [])[:2] if result['status'] == 'success' else None
}
except Exception as e:
return {
"success": False,
"error": str(e)
}
@app.post("/api/recommend-slots")
async def recommend_slots(request: SlotRecommendationRequest):
"""
Get recommended parking slots based on AI model
Fetches real parking data from database
"""
try:
# Option 1: Use slots provided in request (for testing/custom data)
if request.slots:
slots_data = [slot.dict() for slot in request.slots]
else:
# Option 2: Fetch available slots from database
db_result = db_api.get_all_available_slots()
if db_result['status'] != 'success':
raise HTTPException(status_code=500, detail="Failed to fetch parking slots from database")
# Transform database format to AI model format
slots_data = transform_parking_lots_for_ai(db_result.get('data', []))
if not slots_data:
return {
"success": False,
"message": "No parking slots available",
"recommendations": []
}
user_coords = (request.user_location.latitude, request.user_location.longitude)
# Get recommendations from AI model
recommended_slots = classifier.predict_best_slots(
slots_data,
user_coords,
top_k=request.top_k
)
return {
"success": True,
"recommendations": recommended_slots,
"count": len(recommended_slots),
"total_available": len(slots_data)
}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.post("/api/feedback")
async def submit_feedback(feedback: FeedbackRequest):
"""
Submit user feedback on slot assignment
This data is used to retrain and improve the model
Stores in database via API
"""
try:
# First get the booking details to get booking ID (not UID)
booking_result = db_api.get_booking_status(feedback.booking_id)
if booking_result['status'] != 'success' or not booking_result.get('data'):
# Cache feedback locally if booking not found
feedback_entry = {
"booking_uid": feedback.booking_id,
"slot_id": feedback.slot_id,
"rating": feedback.rating,
"comment": feedback.comment,
"user_satisfaction": feedback.user_satisfaction,
"timestamp": datetime.now().isoformat()
}
feedback_cache.append(feedback_entry)
return {
"success": True,
"message": "Feedback cached (booking not found in DB)",
"cached": True
}
booking_data = booking_result['data']
# Add feedback to database
db_result = db_api.add_feedback(
user_id=booking_data.get('user_id', 1),
booking_id=booking_data.get('id'),
rating=feedback.rating,
comments=feedback.comment or ""
)
# Also cache locally for model retraining
feedback_entry = {
"booking_id": feedback.booking_id,
"slot_id": feedback.slot_id,
"rating": feedback.rating,
"comment": feedback.comment,
"user_satisfaction": feedback.user_satisfaction,
"timestamp": datetime.now().isoformat()
}
feedback_cache.append(feedback_entry)
return {
"success": db_result['status'] == 'success',
"message": db_result.get('message', 'Feedback submitted successfully'),
"database": db_result['status'] == 'success'
}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/api/feedback/stats")
async def get_feedback_stats():
"""
Get feedback statistics from cache
"""
if not feedback_cache:
return {
"total_feedback": 0,
"average_rating": 0,
"satisfaction_rate": 0
}
total = len(feedback_cache)
avg_rating = sum(f['rating'] for f in feedback_cache) / total
satisfaction_rate = sum(1 for f in feedback_cache if f['user_satisfaction']) / total * 100
return {
"total_feedback": total,
"average_rating": round(avg_rating, 2),
"satisfaction_rate": round(satisfaction_rate, 2)
}
@app.post("/api/payment/create-order")
async def create_payment_order(request: PaymentOrderRequest):
"""
Create a payment order for parking booking
Integrates with database API for booking creation
"""
try:
# Calculate start and end times
from datetime import timedelta
start_time = datetime.now()
end_time = start_time + timedelta(hours=request.duration_hours)
start_time_str = start_time.strftime("%Y-%m-%d %H:%M:%S")
end_time_str = end_time.strftime("%Y-%m-%d %H:%M:%S")
# First, ensure user is registered (or register them)
user_result = db_api.register_user(
name=request.customer_name,
phone=request.customer_contact,
email=request.customer_email
)
# Get slot ID from database (assuming slot_id in request is the slot name/identifier)
# For now, we'll use a placeholder user_id=1 if registration fails
user_id = 1 # This should be retrieved from user_result in production
# Parse slot_id to get database ID if needed
# If slot_id is like "A1", we need to find its DB ID
# For now, assume slot_id is already the DB ID or can be used directly
slot_db_id = request.slot_id
# Try to parse if it's a number string
try:
slot_db_id = int(request.slot_id)
except:
# If slot_id is name like "A1", we'd need to fetch from DB
# For now, use a default
slot_db_id = 1
# Create booking in database
booking_result = db_api.book_slot(
user_id=user_id,
slot_id=slot_db_id,
start_time=start_time_str,
end_time=end_time_str,
total_amount=request.amount
)
if booking_result['status'] != 'success':
raise HTTPException(status_code=500, detail=f"Booking failed: {booking_result.get('message')}")
booking_uid = booking_result.get('booking_uid')
# Use simulator if payment service is not available
use_simulator = payment_service is None
if use_simulator:
# Simulate payment
order = {
'success': True,
'order_id': f'order_sim_{booking_uid}',
'amount': request.amount,
'currency': 'INR',
'status': 'created',
'mode': 'simulation'
}
else:
# Real payment
order = payment_service.create_payment_order(
amount=request.amount,
booking_id=booking_uid,
notes={
'booking_uid': booking_uid,
'slot_id': request.slot_id,
'duration': f'{request.duration_hours} hours',
'vehicle_number': request.vehicle_number
}
)
return {
**order,
'booking_uid': booking_uid,
'start_time': start_time_str,
'end_time': end_time_str
}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.post("/api/payment/create-link")
async def create_payment_link(request: PaymentOrderRequest):
"""
Create a payment link for WhatsApp sharing
"""
try:
if payment_service is None:
# Return simulated link
return {
'success': True,
'short_url': f'https://rzp.io/l/sim_{request.booking_id}',
'mode': 'simulation',
'message': 'This is a simulated payment link'
}
link = payment_service.create_payment_link(
amount=request.amount,
description=f"Parking Slot {request.slot_id} - {request.duration_hours} hours",
customer_name=request.customer_name,
customer_contact=request.customer_contact,
customer_email=request.customer_email,
booking_id=request.booking_id
)
return link
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.post("/api/payment/verify")
async def verify_payment(request: PaymentVerificationRequest):
"""
Verify payment signature after payment completion
Updates payment status in database
"""
try:
if payment_service is None:
# Simulate verification
# Update database payment status
update_result = db_api.update_payment(
booking_uid=request.order_id.replace('order_sim_', ''),
payment_status='Paid',
transaction_id=request.payment_id,
amount=0
)
return {
'success': True,
'verified': True,
'mode': 'simulation',
'database_updated': update_result['status'] == 'success'
}
is_valid = payment_service.verify_payment_signature(
request.order_id,
request.payment_id,
request.signature,
RAZORPAY_KEY_SECRET
)
if is_valid:
# Extract booking_uid from order_id if needed
# Update database
# Note: We'd need to store order_id -> booking_uid mapping
pass
return {
'success': True,
'verified': is_valid
}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.post("/api/payment/simulate")
async def simulate_payment(booking_uid: str, amount: float):
"""
Simulate a payment (for testing purposes)
Updates database with simulated payment
"""
try:
# Check if booking exists in database
booking_result = db_api.get_booking_status(booking_uid)
if booking_result['status'] != 'success':
raise HTTPException(status_code=404, detail="Booking not found")
# Simulate payment
order_id = f"order_{booking_uid}"
simulated = payment_simulator.simulate_payment(order_id, amount)
# Update booking payment status in database
update_result = db_api.update_payment(
booking_uid=booking_uid,
payment_status='Paid',
transaction_id=simulated['payment_id'],
amount=amount
)
return {
**simulated,
'database_updated': update_result['status'] == 'success'
}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/api/booking/{booking_uid}")
async def get_booking(booking_uid: str):
"""
Get booking details from database
"""
try:
result = db_api.get_booking_status(booking_uid)
if result['status'] != 'success' or not result.get('data'):
raise HTTPException(status_code=404, detail="Booking not found")
return {
"success": True,
"booking": result['data']
}
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.post("/api/retrain-model")
async def retrain_model():
"""
Retrain the AI model with new feedback data from cache
Should be called periodically or when sufficient new feedback is collected
"""
try:
if len(feedback_cache) < 10:
return {
"success": False,
"message": f"Insufficient feedback data for retraining (need 10+, have {len(feedback_cache)})"
}
# Convert feedback to training data
# This is simplified - in production, you'd have more sophisticated feature engineering
training_samples = []
labels = []
for feedback in feedback_cache:
# Create a simplified training sample
sample = {
'slot_id': feedback['slot_id'],
'avg_feedback': feedback['rating'],
'proximity_score': 0.5, # Would come from actual data
'popularity_score': 0.5,
'is_available': True,
'price_factor': 0.5
}
training_samples.append(sample)
labels.append(1 if feedback['user_satisfaction'] else 0)
# Retrain model
classifier.train(training_samples, labels)
classifier.save_model('parking_model.pkl')
return {
"success": True,
"message": f"Model retrained with {len(training_samples)} samples",
"samples_used": len(training_samples)
}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)