| import os |
| import psycopg2 |
| from psycopg2 import pool |
| from psycopg2.extras import DictCursor |
| from contextlib import contextmanager |
| from .config import DB_USER, DB_PASSWORD, DB_HOST, DB_PORT, DB_NAME, DB_ENDPOINT_ID |
|
|
| conn_pool = None |
| try: |
| if DB_ENDPOINT_ID: |
| DATABASE_URL = ( |
| f"postgresql://{DB_USER}:{DB_PASSWORD}" |
| f"@{DB_HOST}:{DB_PORT}/{DB_NAME}" |
| f"?sslmode=require" |
| ) |
| else: |
| DATABASE_URL = f"postgresql://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}/{DB_NAME}" |
|
|
| conn_pool = pool.SimpleConnectionPool( |
| minconn=1, |
| maxconn=10, |
| dsn=DATABASE_URL |
| ) |
|
|
| except (psycopg2.OperationalError, Exception) as e: |
| conn_pool = None |
|
|
| @contextmanager |
| def get_pooled_connection(): |
| if conn_pool is None: |
| raise ConnectionError("Database connection pool is not initialized.") |
|
|
| conn = None |
| try: |
| conn = conn_pool.getconn() |
| yield conn |
| conn.commit() |
| except (Exception, psycopg2.Error) as e: |
| if conn: |
| try: |
| conn.rollback() |
| except psycopg2.Error as rb_err: |
| pass |
| raise |
| finally: |
| if conn: |
| try: |
| conn_pool.putconn(conn) |
| except Exception as pc_err: |
| pass |
|
|
| def execute_query(query: str, params: tuple = None, fetch_one: bool = False): |
| if conn_pool is None: |
| return None |
|
|
| results = None |
| try: |
| with get_pooled_connection() as conn: |
| with conn.cursor(cursor_factory=DictCursor) as cur: |
| cur.execute(query, params) |
|
|
| if fetch_one: |
| result_dict = cur.fetchone() |
| results = dict(result_dict) if result_dict else None |
| else: |
| results_list = cur.fetchall() |
| results = [dict(row) for row in results_list] |
| return results |
|
|
| except ConnectionError as e: |
| return None |
| except psycopg2.Error as e: |
| return None |
| except Exception as e: |
| return None |
|
|
| def get_available_locations(): |
| query = """ |
| SELECT DISTINCT unnest(destination) AS destination |
| FROM Tour |
| WHERE availability = true |
| ORDER BY destination; |
| """ |
| results = execute_query(query) |
| if results: |
| return [row['destination'] for row in results] |
| elif results == []: |
| return [] |
| else: |
| return None |
|
|
| def get_tour_by_id(tour_id): |
| query = """ |
| SELECT |
| t.tour_id, |
| t.title, |
| t.duration, |
| t.departure_location, |
| t.destination, |
| t.region, |
| t.itinerary, |
| t.max_participants, |
| d.departure_id, |
| d.start_date, |
| d.price_adult, |
| d.price_child_120_140, |
| d.price_child_100_120, |
| p.promotion_id, |
| p.name AS promotion_name, |
| p.type AS promotion_type, |
| p.discount AS promotion_discount, |
| p.start_date AS promotion_start_date, |
| p.end_date AS promotion_end_date |
| FROM Tour t |
| LEFT JOIN Departure d ON t.tour_id = d.tour_id AND d.availability = true |
| LEFT JOIN Tour_Promotion tp ON t.tour_id = tp.tour_id |
| LEFT JOIN Promotion p ON tp.promotion_id = p.promotion_id |
| AND CURRENT_DATE BETWEEN p.start_date AND p.end_date |
| AND p.status = 'active' |
| WHERE t.tour_id = %s AND t.availability = true |
| ORDER BY d.start_date |
| LIMIT 1; |
| """ |
| result = execute_query(query, (tour_id,), fetch_one=True) |
| return result |
|
|
| def search_tours_db(entities: dict): |
| base_query = """ |
| SELECT |
| t.tour_id, |
| t.title, |
| t.duration, |
| t.departure_location, |
| t.destination, |
| t.region, |
| t.itinerary, |
| t.max_participants, |
| d.departure_id, |
| d.start_date, |
| d.price_adult, |
| d.price_child_120_140, |
| d.price_child_100_120, |
| p.promotion_id, |
| p.name AS promotion_name, |
| p.type AS promotion_type, |
| p.discount AS promotion_discount, |
| p.start_date AS promotion_start_date, |
| p.end_date AS promotion_end_date |
| FROM Departure d |
| JOIN Tour t ON d.tour_id = t.tour_id |
| LEFT JOIN Tour_Promotion tp ON t.tour_id = tp.tour_id |
| LEFT JOIN Promotion p ON tp.promotion_id = p.promotion_id |
| AND d.start_date BETWEEN p.start_date AND p.end_date |
| AND p.status = 'active' |
| WHERE t.availability = true AND d.availability = true |
| """ |
| filters = [] |
| params = [] |
|
|
| if entities.get('region'): |
| filters.append("t.region = %s") |
| params.append(entities['region']) |
|
|
| if entities.get('destination'): |
| dest_list = entities['destination'] if isinstance(entities['destination'], list) else [entities['destination']] |
| filters.append("t.destination && %s::text[]") |
| params.append(dest_list) |
|
|
| if entities.get('duration'): |
| filters.append("t.duration ILIKE %s") |
| params.append(f"%{entities['duration']}%") |
|
|
| if entities.get('time'): |
| time_filter_parts = [] |
| time_info = entities['time'] |
| if not isinstance(time_info, list): time_info = [time_info] |
| for time_obj in time_info: |
| if 'departure_date' in time_obj: |
| time_filter_parts.append("d.start_date = %s") |
| params.append(time_obj['departure_date']) |
| elif 'start_date' in time_obj and 'end_date' in time_obj: |
| time_filter_parts.append("d.start_date BETWEEN %s AND %s") |
| params.extend([time_obj['start_date'], time_obj['end_date']]) |
| if time_filter_parts: filters.append(f"({' OR '.join(time_filter_parts)})") |
|
|
| if entities.get('budget'): |
| budget = str(entities['budget']) |
| try: |
| if '-' in budget: |
| min_price, max_price = map(float, budget.split('-')) |
| filters.append("d.price_adult BETWEEN %s AND %s") |
| params.extend([min_price, max_price]) |
| else: |
| max_price = float(budget) |
| filters.append("d.price_adult <= %s") |
| params.append(max_price) |
| except ValueError: |
| pass |
|
|
| if entities.get('number_of_people'): |
| num_people = str(entities['number_of_people']) |
| min_required = 1 |
| try: |
| if num_people.startswith('>'): min_required = int(num_people[1:]) + 1 |
| elif '-' in num_people: min_req, _ = map(int, num_people.split('-')); min_required = max(min_required, min_req) |
| else: min_required = max(min_required, int(num_people)) |
| except ValueError: |
| pass |
| if min_required > 1: |
| filters.append("t.max_participants >= %s") |
| params.append(min_required) |
|
|
| if filters: |
| base_query += " AND " + " AND ".join(filters) |
|
|
| base_query += " ORDER BY d.start_date, t.title;" |
|
|
| results = execute_query(base_query, tuple(params)) |
|
|
| if results is None: |
| return [] |
| return results |