LapStore commited on
Commit
7c4b2b5
·
1 Parent(s): 06994c9

uploaded backend files

Browse files
.env ADDED
@@ -0,0 +1,11 @@
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # Database configurations
2
+ TRAFFIC_DB_HOST=mysql-19285eb2-tahaelshrif1-7999.h.aivencloud.com
3
+ TRAFFIC_DB_PORT=11520
4
+ MOBILE_DB_HOST=mysql-19285eb2-tahaelshrif1-7999.h.aivencloud.com
5
+ MOBILE_DB_PORT=11520
6
+ RL_DB_HOST=mysql-19285eb2-tahaelshrif1-7999.h.aivencloud.com
7
+ RL_DB_PORT=11520
8
+
9
+ # Database credentials
10
+ DB_USER=avnadmin
11
+ DB_PASSWORD=AVNS_aT0RGFafs6_34WFegSF
Dockerfile.unknown ADDED
@@ -0,0 +1,55 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # syntax=docker/dockerfile:1
2
+ FROM python:3.10-slim AS builder
3
+
4
+ # Set environment variables
5
+ ENV PYTHONDONTWRITEBYTECODE=1 \
6
+ PYTHONUNBUFFERED=1 \
7
+ PIP_NO_CACHE_DIR=1
8
+
9
+ WORKDIR /app
10
+
11
+ # Install system dependencies
12
+ RUN apt-get update && apt-get install -y --no-install-recommends \
13
+ build-essential \
14
+ && rm -rf /var/lib/apt/lists/*
15
+
16
+ # Create and activate virtual environment
17
+ RUN python -m venv /opt/venv
18
+ ENV PATH="/opt/venv/bin:$PATH"
19
+
20
+ # Install Python dependencies
21
+ COPY requirements.txt .
22
+ RUN pip install --upgrade pip \
23
+ && pip install --no-cache-dir -r requirements.txt
24
+
25
+ # --- Runtime stage ---
26
+ FROM python:3.10-slim
27
+
28
+ # Platform-agnostic environment setup
29
+ ENV PYTHONUNBUFFERED=1 \
30
+ PATH="/opt/venv/bin:$PATH" \
31
+ PORT=8000 \
32
+ HOST=0.0.0.0
33
+
34
+ WORKDIR /app
35
+
36
+ # Copy virtual environment
37
+ COPY --from=builder /opt/venv /opt/venv
38
+
39
+ # Copy application code (with .dockerignore in place)
40
+ COPY . .
41
+
42
+ # Create non-root user and permissions
43
+ RUN useradd -m appuser \
44
+ && chown -R appuser:appuser /app \
45
+ && mkdir -p /app/tmp \
46
+ && chown appuser:appuser /app/tmp
47
+
48
+ USER appuser
49
+
50
+ # Health check (for platforms that support it)
51
+ HEALTHCHECK --interval=30s --timeout=30s --start-period=5s --retries=3 \
52
+ CMD curl -f http://localhost:$PORT/health || exit 1
53
+
54
+ # Universal run command
55
+ CMD ["sh", "-c", "uvicorn main:app --host $HOST --port $PORT"]
__init__.py ADDED
@@ -0,0 +1 @@
 
 
1
+ # This file makes the 'apis' directory a Python package.
__pycache__/database.cpython-312.pyc ADDED
Binary file (1.17 kB). View file
 
__pycache__/dependencies.cpython-312.pyc ADDED
Binary file (1.15 kB). View file
 
__pycache__/main.cpython-312.pyc ADDED
Binary file (1.17 kB). View file
 
__pycache__/models.cpython-312.pyc ADDED
Binary file (2.55 kB). View file
 
database.py ADDED
@@ -0,0 +1,23 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import pymysql
2
+ from pymysql.cursors import DictCursor
3
+ from functools import lru_cache
4
+ import os
5
+ from dotenv import load_dotenv
6
+
7
+ load_dotenv()
8
+
9
+ class DatabaseConfig:
10
+ TRAFFIC_MANAGER = {
11
+ "host": os.getenv("TRAFFIC_DB_HOST"),
12
+ "port": int(os.getenv("TRAFFIC_DB_PORT")),
13
+ "user": os.getenv("DB_USER"),
14
+ "password": os.getenv("DB_PASSWORD"),
15
+ "db": "trafficManagerFull",
16
+ "charset": "utf8mb4",
17
+ "cursorclass": DictCursor
18
+ }
19
+
20
+
21
+ @lru_cache()
22
+ def get_db_connection(config: dict):
23
+ return pymysql.connect(**config)
dependencies.py ADDED
@@ -0,0 +1,23 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from fastapi import Depends, HTTPException, status
2
+ from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
3
+ import jwt
4
+ from models import User
5
+
6
+ SECRET_KEY = "your_secret_key_here"
7
+ ALGORITHM = "HS256"
8
+ http_bearer = HTTPBearer()
9
+
10
+ def verify_token(credentials: HTTPAuthorizationCredentials = Depends(http_bearer)) -> dict:
11
+ token = credentials.credentials
12
+ try:
13
+ payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
14
+ # Optionally, check for a custom claim, e.g., 'iss' or 'backend':
15
+ # if payload.get('backend') != 'my_backend':
16
+ # raise HTTPException(status_code=401, detail="Invalid token origin")
17
+ return payload
18
+ except jwt.PyJWTError:
19
+ raise HTTPException(
20
+ status_code=status.HTTP_401_UNAUTHORIZED,
21
+ detail="Could not validate credentials",
22
+ headers={"WWW-Authenticate": "Bearer"},
23
+ )
main.py ADDED
@@ -0,0 +1,29 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from fastapi import FastAPI
2
+ from fastapi.middleware.cors import CORSMiddleware
3
+ from routers.traffic_lights import router as traffic_lights_router
4
+ from routers.mobile import router as mobile_router
5
+ from routers.rl_online import router as rl_online_router
6
+
7
+ app = FastAPI(
8
+ title="Traffic Management System API",
9
+ description="API for managing traffic lights, mobile users, and RL online data",
10
+ version="1.0.0"
11
+ )
12
+
13
+ # CORS configuration
14
+ app.add_middleware(
15
+ CORSMiddleware,
16
+ allow_origins=["*"],
17
+ allow_credentials=True,
18
+ allow_methods=["*"],
19
+ allow_headers=["*"],
20
+ )
21
+
22
+ # Include routers
23
+ app.include_router(traffic_lights_router)
24
+ app.include_router(mobile_router)
25
+ app.include_router(rl_online_router)
26
+
27
+ @app.get("/")
28
+ def read_root():
29
+ return {"message": "Welcome to Traffic Management System API"}
models.py ADDED
@@ -0,0 +1,46 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from pydantic import BaseModel
2
+
3
+ # Traffic Lights Models
4
+ class TrafficSignalBase(BaseModel):
5
+ lat: float
6
+ lon: float
7
+ tl_id_sumo: str
8
+ tl_id_osm: str
9
+
10
+ class TrafficSignalCreate(TrafficSignalBase):
11
+ pass
12
+
13
+ class TrafficSignal(TrafficSignalBase):
14
+ class Config:
15
+ orm_mode = True
16
+
17
+ # Mobile (Users/Vehicles) Models
18
+ class UserBase(BaseModel):
19
+ national_id: str
20
+ name: str
21
+ phone_number: str
22
+ email: str
23
+ type: str
24
+
25
+ class UserCreate(UserBase):
26
+ password: str
27
+
28
+ class User(UserBase):
29
+ class Config:
30
+ orm_mode = True
31
+
32
+ class VehicleBase(BaseModel):
33
+ national_id: str
34
+ vehicle: str
35
+ vehicle_type: str
36
+
37
+ class VehicleCreate(VehicleBase):
38
+ password: str
39
+
40
+ class Vehicle(VehicleBase):
41
+ class Config:
42
+ orm_mode = True
43
+
44
+ class LoginRequest(BaseModel):
45
+ national_id: str
46
+ password: str
requirements.txt ADDED
@@ -0,0 +1,6 @@
 
 
 
 
 
 
 
1
+ fastapi>=0.68.0
2
+ uvicorn>=0.15.0
3
+ pymysql>=1.0.2
4
+ python-dotenv>=0.19.0
5
+ bcrypt>=3.2.0
6
+ pyjwt>=2.0.0
routers/__init__.py ADDED
File without changes
routers/__pycache__/__init__.cpython-312.pyc ADDED
Binary file (136 Bytes). View file
 
routers/__pycache__/mobile.cpython-312.pyc ADDED
Binary file (12.9 kB). View file
 
routers/__pycache__/rl_online.cpython-312.pyc ADDED
Binary file (1.46 kB). View file
 
routers/__pycache__/traffic_lights.cpython-312.pyc ADDED
Binary file (7.38 kB). View file
 
routers/mobile.py ADDED
@@ -0,0 +1,219 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from fastapi import APIRouter, HTTPException, Depends
2
+ import pymysql
3
+ import bcrypt
4
+ from utils.database import get_db_connection, DatabaseConfig
5
+ from models import User, UserCreate, Vehicle, VehicleCreate, LoginRequest
6
+ import jwt
7
+ from datetime import datetime, timedelta
8
+ from dependencies import verify_token
9
+
10
+ router = APIRouter(
11
+ prefix="/mobile",
12
+ tags=["mobile"]
13
+ )
14
+
15
+ SECRET_KEY = "your_secret_key_here"
16
+ ALGORITHM = "HS256"
17
+ ACCESS_TOKEN_EXPIRE_MINUTES = 60
18
+
19
+ def create_access_token(data: dict, expires_delta: timedelta = None):
20
+ to_encode = data.copy()
21
+ if expires_delta:
22
+ expire = datetime.utcnow() + expires_delta
23
+ else:
24
+ expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
25
+ to_encode.update({"exp": expire})
26
+ encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
27
+ return encoded_jwt
28
+
29
+ def hash_password(password: str) -> str:
30
+ salt = bcrypt.gensalt()
31
+ hashed_password = bcrypt.hashpw(password.encode('utf-8'), salt)
32
+ return hashed_password.decode('utf-8')
33
+
34
+ @router.post("/signup/", response_model=User)
35
+ def create_user(user: UserCreate):
36
+ connection = get_db_connection(DatabaseConfig.TRAFFIC_MANAGER)
37
+ try:
38
+ with connection.cursor() as cursor:
39
+ cursor.execute("SELECT * FROM users WHERE national_id = %s", (user.national_id,))
40
+ if cursor.fetchone():
41
+ raise HTTPException(status_code=400, detail="User already exists")
42
+
43
+ hashed_password = hash_password(user.password)
44
+
45
+ sql = """
46
+ INSERT INTO users (national_id, name, phone_number, email, password, type)
47
+ VALUES (%s, %s, %s, %s, %s, %s)
48
+ """
49
+ cursor.execute(sql, (
50
+ user.national_id,
51
+ user.name,
52
+ user.phone_number,
53
+ user.email,
54
+ hashed_password,
55
+ user.type
56
+ ))
57
+ connection.commit()
58
+
59
+ return {
60
+ "national_id": user.national_id,
61
+ "name": user.name,
62
+ "phone_number": user.phone_number,
63
+ "email": user.email,
64
+ "type": user.type
65
+ }
66
+ except pymysql.Error as e:
67
+ raise HTTPException(status_code=400, detail=str(e))
68
+ finally:
69
+ connection.close()
70
+
71
+ @router.post("/login/")
72
+ def login(login_request: LoginRequest):
73
+ connection = get_db_connection(DatabaseConfig.TRAFFIC_MANAGER)
74
+ try:
75
+ with connection.cursor(pymysql.cursors.DictCursor) as cursor:
76
+ cursor.execute("""
77
+ SELECT national_id, name, password, type
78
+ FROM users
79
+ WHERE national_id = %s
80
+ """, (login_request.national_id,))
81
+ user = cursor.fetchone()
82
+
83
+ if not user:
84
+ raise HTTPException(status_code=401, detail="Invalid credentials")
85
+
86
+ if not bcrypt.checkpw(login_request.password.encode('utf-8'), user['password'].encode('utf-8')):
87
+ raise HTTPException(status_code=401, detail="Invalid credentials")
88
+
89
+ access_token = create_access_token(
90
+ data={"sub": user['national_id'], "type": user['type']}
91
+ )
92
+
93
+ return {
94
+ "access_token": access_token,
95
+ "token_type": "bearer",
96
+ "national_id": user['national_id'],
97
+ "name": user['name'],
98
+ "type": user['type'],
99
+ "message": "Login successful"
100
+ }
101
+ except pymysql.Error as e:
102
+ raise HTTPException(status_code=400, detail=str(e))
103
+ finally:
104
+ connection.close()
105
+
106
+ @router.get("/users/", response_model=list[User], dependencies=[Depends(verify_token)])
107
+ def read_users():
108
+ connection = get_db_connection(DatabaseConfig.TRAFFIC_MANAGER)
109
+ try:
110
+ with connection.cursor(pymysql.cursors.DictCursor) as cursor:
111
+ cursor.execute("SELECT national_id, name, phone_number, email, type FROM users")
112
+ users = cursor.fetchall()
113
+ return list(users)
114
+ except pymysql.Error as e:
115
+ raise HTTPException(status_code=400, detail=str(e))
116
+ finally:
117
+ connection.close()
118
+
119
+ @router.get("/users/{national_id}", response_model=User, dependencies=[Depends(verify_token)])
120
+ def read_user(national_id: str):
121
+ connection = get_db_connection(DatabaseConfig.TRAFFIC_MANAGER)
122
+ try:
123
+ with connection.cursor(pymysql.cursors.DictCursor) as cursor:
124
+ cursor.execute("""
125
+ SELECT national_id, name, phone_number, email, type
126
+ FROM users
127
+ WHERE national_id = %s
128
+ """, (national_id,))
129
+ user = cursor.fetchone()
130
+ if not user:
131
+ raise HTTPException(status_code=404, detail="User not found")
132
+ return user
133
+ except pymysql.Error as e:
134
+ raise HTTPException(status_code=400, detail=str(e))
135
+ finally:
136
+ connection.close()
137
+
138
+ @router.post("/vehicles/", response_model=Vehicle, dependencies=[Depends(verify_token)])
139
+ def create_vehicle(vehicle: VehicleCreate):
140
+ connection = get_db_connection(DatabaseConfig.TRAFFIC_MANAGER)
141
+ try:
142
+ with connection.cursor(pymysql.cursors.DictCursor) as cursor:
143
+ cursor.execute("""
144
+ SELECT password FROM users
145
+ WHERE national_id = %s
146
+ """, (vehicle.national_id,))
147
+ user = cursor.fetchone()
148
+ if not user or not bcrypt.checkpw(vehicle.password.encode('utf-8'), user['password'].encode('utf-8')):
149
+ raise HTTPException(status_code=401, detail="Invalid credentials")
150
+ sql = """
151
+ INSERT INTO vehicles (national_id, vehicle, password, vehicle_type)
152
+ VALUES (%s, %s, %s, %s)
153
+ """
154
+ cursor.execute(sql, (
155
+ vehicle.national_id,
156
+ vehicle.vehicle,
157
+ user['password'], # Store the hashed password
158
+ vehicle.vehicle_type
159
+ ))
160
+ connection.commit()
161
+ return {
162
+ "national_id": vehicle.national_id,
163
+ "vehicle": vehicle.vehicle,
164
+ "vehicle_type": vehicle.vehicle_type
165
+ }
166
+ except pymysql.Error as e:
167
+ raise HTTPException(status_code=400, detail=str(e))
168
+ finally:
169
+ connection.close()
170
+
171
+ @router.get("/vehicles/", response_model=list[Vehicle], dependencies=[Depends(verify_token)])
172
+ def read_vehicles():
173
+ connection = get_db_connection(DatabaseConfig.TRAFFIC_MANAGER)
174
+ try:
175
+ with connection.cursor(pymysql.cursors.DictCursor) as cursor:
176
+ cursor.execute("SELECT national_id, vehicle, vehicle_type FROM vehicles")
177
+ vehicles = cursor.fetchall()
178
+ return [
179
+ {
180
+ "national_id": vehicle["national_id"],
181
+ "vehicle": vehicle["vehicle"],
182
+ "vehicle_type": vehicle["vehicle_type"]
183
+ }
184
+ for vehicle in vehicles
185
+ ]
186
+ except pymysql.Error as e:
187
+ raise HTTPException(status_code=400, detail=str(e))
188
+ finally:
189
+ connection.close()
190
+
191
+ @router.get("/vehicles/{national_id}", response_model=list[Vehicle], dependencies=[Depends(verify_token)])
192
+ def read_user_vehicles(national_id: str):
193
+ connection = get_db_connection(DatabaseConfig.TRAFFIC_MANAGER)
194
+ try:
195
+ with connection.cursor(pymysql.cursors.DictCursor) as cursor:
196
+ cursor.execute("""
197
+ SELECT national_id, vehicle, vehicle_type
198
+ FROM vehicles
199
+ WHERE national_id = %s
200
+ """, (national_id,))
201
+ vehicles = cursor.fetchall()
202
+ if not vehicles:
203
+ raise HTTPException(status_code=404, detail="No vehicles found for this user")
204
+ return [
205
+ {
206
+ "national_id": vehicle["national_id"],
207
+ "vehicle": vehicle["vehicle"],
208
+ "vehicle_type": vehicle["vehicle_type"]
209
+ }
210
+ for vehicle in vehicles
211
+ ]
212
+ except pymysql.Error as e:
213
+ raise HTTPException(status_code=400, detail=str(e))
214
+ finally:
215
+ connection.close()
216
+
217
+ @router.get("/users/me-protected", dependencies=[Depends(verify_token)])
218
+ def read_current_user_protected():
219
+ return {"message": "You are authenticated with a valid backend token."}
routers/rl_online.py ADDED
@@ -0,0 +1,22 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from fastapi import APIRouter, HTTPException
2
+ import pymysql
3
+ from utils.database import get_db_connection, DatabaseConfig
4
+
5
+ router = APIRouter(
6
+ prefix="/rl-online",
7
+ tags=["rl_online"]
8
+ )
9
+
10
+ @router.get("/")
11
+ def read_rl_data():
12
+ # Replace 'RL_ONLINE' with the correct attribute from DatabaseConfig if needed
13
+ connection = get_db_connection(DatabaseConfig.TRAFFIC_MANAGER) # <-- Change to your RL DB config
14
+ try:
15
+ with connection.cursor(pymysql.cursors.DictCursor) as cursor:
16
+ cursor.execute("SHOW TABLES")
17
+ tables = cursor.fetchall()
18
+ return {"tables": tables}
19
+ except pymysql.Error as e:
20
+ raise HTTPException(status_code=400, detail=str(e))
21
+ finally:
22
+ connection.close()
routers/traffic_lights.py ADDED
@@ -0,0 +1,104 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from fastapi import APIRouter, HTTPException, Depends
2
+ import pymysql
3
+ from utils.database import get_db_connection, DatabaseConfig
4
+ from models import TrafficSignal, TrafficSignalCreate
5
+ from dependencies import verify_token
6
+
7
+ router = APIRouter(
8
+ prefix="/traffic-lights",
9
+ tags=["traffic_lights"]
10
+ )
11
+
12
+ @router.post("/", response_model=TrafficSignal, dependencies=[Depends(verify_token)])
13
+ def create_traffic_signal(signal: TrafficSignalCreate):
14
+ connection = get_db_connection(DatabaseConfig.TRAFFIC_MANAGER)
15
+ try:
16
+ with connection.cursor() as cursor:
17
+ sql = """
18
+ INSERT INTO traffic_signals (lat, lon, tl_id_sumo, tl_id_osm)
19
+ VALUES (%s, %s, %s, %s)
20
+ """
21
+ cursor.execute(sql, (signal.lat, signal.lon, signal.tl_id_sumo, signal.tl_id_osm))
22
+ connection.commit()
23
+ return signal
24
+ except pymysql.Error as e:
25
+ raise HTTPException(status_code=400, detail=str(e))
26
+ finally:
27
+ connection.close()
28
+
29
+
30
+ @router.get("/", response_model=list[TrafficSignal], dependencies=[Depends(verify_token)])
31
+ def read_traffic_signals():
32
+ connection = get_db_connection(DatabaseConfig.TRAFFIC_MANAGER)
33
+ try:
34
+ with connection.cursor(pymysql.cursors.DictCursor) as cursor:
35
+ cursor.execute("SELECT lat, lon, tl_id_sumo, tl_id_osm FROM traffic_signals")
36
+ results = cursor.fetchall()
37
+ return [
38
+ {"lat": row["lat"], "lon": row["lon"],
39
+ "tl_id_sumo": row["tl_id_sumo"], "tl_id_osm": row["tl_id_osm"]}
40
+ for row in results
41
+ ]
42
+ except pymysql.Error as e:
43
+ raise HTTPException(status_code=400, detail=str(e))
44
+ finally:
45
+ connection.close()
46
+
47
+ @router.get("/{lat}/{lon}", response_model=TrafficSignal, dependencies=[Depends(verify_token)])
48
+ def read_traffic_signal(lat: float, lon: float):
49
+ connection = get_db_connection(DatabaseConfig.TRAFFIC_MANAGER)
50
+ try:
51
+ with connection.cursor(pymysql.cursors.DictCursor) as cursor:
52
+ cursor.execute("SELECT lat, lon, tl_id_sumo, tl_id_osm FROM traffic_signals WHERE lat = %s AND lon = %s", (lat, lon))
53
+ result = cursor.fetchone()
54
+ if not result:
55
+ raise HTTPException(status_code=404, detail="Traffic signal not found")
56
+ return {
57
+ "lat": result["lat"],
58
+ "lon": result["lon"],
59
+ "tl_id_sumo": result["tl_id_sumo"],
60
+ "tl_id_osm": result["tl_id_osm"]
61
+ }
62
+ except pymysql.Error as e:
63
+ raise HTTPException(status_code=400, detail=str(e))
64
+ finally:
65
+ connection.close()
66
+
67
+ @router.put("/{lat}/{lon}", response_model=TrafficSignal, dependencies=[Depends(verify_token)])
68
+ def update_traffic_signal(lat: float, lon: float, signal: TrafficSignalCreate):
69
+ connection = get_db_connection(DatabaseConfig.TRAFFIC_MANAGER)
70
+ try:
71
+ with connection.cursor() as cursor:
72
+ # First check if the record exists
73
+ cursor.execute("SELECT * FROM traffic_signals WHERE lat = %s AND lon = %s", (lat, lon))
74
+ if cursor.rowcount == 0:
75
+ raise HTTPException(status_code=404, detail="Traffic signal not found")
76
+
77
+ # Update all fields
78
+ sql = """
79
+ UPDATE traffic_signals
80
+ SET lat = %s, lon = %s, tl_id_sumo = %s, tl_id_osm = %s
81
+ WHERE lat = %s AND lon = %s
82
+ """
83
+ cursor.execute(sql, (signal.lat, signal.lon, signal.tl_id_sumo, signal.tl_id_osm, lat, lon))
84
+ connection.commit()
85
+ return signal.dict()
86
+ except pymysql.Error as e:
87
+ raise HTTPException(status_code=400, detail=str(e))
88
+ finally:
89
+ connection.close()
90
+
91
+ @router.delete("/{lat}/{lon}", dependencies=[Depends(verify_token)])
92
+ def delete_traffic_signal(lat: float, lon: float):
93
+ connection = get_db_connection(DatabaseConfig.TRAFFIC_MANAGER)
94
+ try:
95
+ with connection.cursor() as cursor:
96
+ cursor.execute("DELETE FROM traffic_signals WHERE lat = %s AND lon = %s", (lat, lon))
97
+ if cursor.rowcount == 0:
98
+ raise HTTPException(status_code=404, detail="Traffic signal not found")
99
+ connection.commit()
100
+ return {"message": "Traffic signal deleted successfully"}
101
+ except pymysql.Error as e:
102
+ raise HTTPException(status_code=400, detail=str(e))
103
+ finally:
104
+ connection.close()
utils/__pycache__/database.cpython-312.pyc ADDED
Binary file (1.73 kB). View file
 
utils/database.py ADDED
@@ -0,0 +1,32 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import os
2
+ import pymysql
3
+ import socket
4
+ from dotenv import load_dotenv
5
+
6
+ # Load environment variables
7
+ load_dotenv()
8
+
9
+ class DatabaseConfig:
10
+ TRAFFIC_MANAGER = {
11
+ "host": os.getenv("TRAFFIC_DB_HOST"),
12
+ "port": int(os.getenv("TRAFFIC_DB_PORT")),
13
+ "user": os.getenv("DB_USER"),
14
+ "password": os.getenv("DB_PASSWORD"),
15
+ "database": "trafficManagerFull"
16
+ }
17
+
18
+ def get_db_connection(config):
19
+ if not isinstance(config, dict):
20
+ raise TypeError("Expected a dictionary for database configuration")
21
+ try:
22
+ return pymysql.connect(
23
+ host=config["host"],
24
+ port=config["port"],
25
+ user=config["user"],
26
+ password=config["password"],
27
+ database=config["database"]
28
+ )
29
+ except socket.gaierror as e:
30
+ raise ConnectionError(f"Hostname resolution failed for {config['host']}: {e}")
31
+ except pymysql.err.OperationalError as e:
32
+ raise ConnectionError(f"Operational error while connecting to the database: {e}")