| import psycopg2 |
| from psycopg2.extras import RealDictCursor |
| import re |
| import json |
| from typing import List, Dict, Any, Optional, Union |
| from sklearn.feature_extraction.text import TfidfVectorizer |
| from sklearn.metrics.pairwise import cosine_similarity |
| import numpy as np |
| from pydantic import BaseModel, Field |
| from bs4 import BeautifulSoup |
| import math |
|
|
| class TourRecommendationRequest(BaseModel): |
| user_id: Optional[int] = Field(None) |
| tour_id: Optional[int] = Field(None) |
| limit: int = Field(3, ge=1, le=10) |
|
|
| class TourSummary(BaseModel): |
| tour_id: int |
| title: str |
| duration: Optional[str] = None |
| departure_location: Optional[str] = None |
| destination: Optional[List[str]] = None |
| region: Optional[int] = None |
| description: Optional[str] = None |
| similarity_score: Optional[float] = None |
|
|
| class TourRecommendationResponse(BaseModel): |
| recommendations: List[TourSummary] |
|
|
| class ContentBasedRecommender: |
| def __init__(self, conn): |
| self.conn = conn |
| vietnamese_stop_words = [ |
| "và", "là", "của", "trong", "được", "có", "không", "cho", "với", |
| "tại", "bằng", "để", "này", "khi", "một", "những", "các", "đã", |
| "rồi", "lại", "nếu", "vì", "thì", "từ", "ra", "đến", "trên", "dưới", |
| "quý", "khách", "tham", "quan", "du", "lịch", "tour", "ngày", "đêm", |
| "ăn", "sáng", "trưa", "tối", "nghỉ", "khách", "sạn", "tự", "túc" |
| ] |
| |
| self.vectorizer = TfidfVectorizer( |
| max_features=8000, |
| stop_words=vietnamese_stop_words, |
| ngram_range=(1, 3), |
| min_df=1, |
| max_df=0.8, |
| token_pattern=r'[a-zA-ZÀ-ỹ]+', |
| lowercase=True |
| ) |
| |
| self.field_weights = { |
| 'title': 0.20, |
| 'destination': 0.30, |
| 'description': 0.15, |
| 'departure_location': 0.10, |
| 'region': 0.15, |
| 'itinerary': 0.10, |
| 'duration': 0.05, |
| 'attractions': 0.15 |
| } |
| |
| self.region_proximity = { |
| 1: {1: 1.0, 2: 0.6, 3: 0.3}, |
| 2: {1: 0.6, 2: 1.0, 3: 0.7}, |
| 3: {1: 0.3, 2: 0.7, 3: 1.0} |
| } |
|
|
| def clean_html(self, text): |
| if not text: |
| return "" |
| try: |
| soup = BeautifulSoup(text, 'html.parser') |
| clean_text = soup.get_text() |
| clean_text = re.sub(r'\s+', ' ', clean_text).strip() |
| return clean_text |
| except: |
| return str(text) |
|
|
| def preprocess_text(self, text): |
| if not text: |
| return "" |
| |
| text = self.clean_html(text) |
| |
| text = str(text).lower() |
| |
| text = re.sub(r'[^\w\sÀ-ỹ]', ' ', text) |
| |
| text = re.sub(r'\s+', ' ', text).strip() |
| |
| words = text.split() |
| words = [word for word in words if len(word) >= 2] |
| |
| return " ".join(words) |
|
|
| def preprocess_list(self, items): |
| if not items: |
| return "" |
| processed_items = [] |
| for item in items: |
| cleaned = self.preprocess_text(item) |
| if cleaned: |
| processed_items.append(cleaned) |
| return " ".join(processed_items) |
|
|
| def extract_attractions_from_itinerary(self, itinerary): |
| if not itinerary: |
| return "" |
| |
| try: |
| if isinstance(itinerary, str): |
| data = json.loads(itinerary) |
| else: |
| data = itinerary |
| |
| attractions = [] |
| |
| if isinstance(data, list): |
| for day in data: |
| if isinstance(day, dict): |
| description = day.get('description', '') |
| if description: |
| clean_desc = self.clean_html(description) |
| soup = BeautifulSoup(description, 'html.parser') |
| strong_tags = soup.find_all('strong') |
| for tag in strong_tags: |
| attractions.append(tag.get_text()) |
| |
| colored_spans = soup.find_all('span', style=lambda x: x and 'color' in x) |
| for span in colored_spans: |
| attractions.append(span.get_text()) |
| |
| clean_attractions = [] |
| for attraction in attractions: |
| cleaned = self.preprocess_text(attraction) |
| if cleaned and len(cleaned) > 3: |
| clean_attractions.append(cleaned) |
| |
| return " ".join(clean_attractions) |
| |
| except Exception as e: |
| print(f"Error extracting attractions: {e}") |
| return "" |
|
|
| def preprocess_json(self, json_data): |
| if not json_data: |
| return "" |
| try: |
| if isinstance(json_data, str): |
| data = json.loads(json_data) |
| else: |
| data = json_data |
| |
| text_values = [] |
| |
| def extract_values(obj): |
| if isinstance(obj, dict): |
| for key, val in obj.items(): |
| if key.lower() in ['title', 'description', 'name', 'location']: |
| if val: |
| clean_val = self.clean_html(str(val)) |
| if clean_val: |
| text_values.append(clean_val) |
| else: |
| extract_values(val) |
| elif isinstance(obj, list): |
| for item in obj: |
| extract_values(item) |
| elif obj and len(str(obj)) > 3: |
| clean_val = self.clean_html(str(obj)) |
| if clean_val: |
| text_values.append(clean_val) |
| |
| extract_values(data) |
| return " ".join(text_values) |
| except Exception as e: |
| print(f"Error preprocessing JSON: {e}") |
| return "" |
|
|
| def get_all_tours(self): |
| with self.conn.cursor(cursor_factory=RealDictCursor) as cursor: |
| cursor.execute(""" |
| SELECT |
| t.tour_id, |
| t.title, |
| t.duration, |
| t.departure_location, |
| t.description, |
| t.destination, |
| t.region, |
| t.itinerary, |
| t.max_participants, |
| MIN(d.price_adult) as min_price, |
| MAX(d.price_adult) as max_price, |
| AVG(d.price_adult) as avg_price |
| FROM |
| Tour t |
| LEFT JOIN |
| Departure d ON t.tour_id = d.tour_id AND d.availability = true |
| WHERE |
| t.availability = true |
| GROUP BY |
| t.tour_id, t.title, t.duration, t.departure_location, |
| t.description, t.destination, t.region, t.itinerary, t.max_participants |
| """) |
| return cursor.fetchall() |
|
|
| def get_user_history(self, user_id): |
| with self.conn.cursor(cursor_factory=RealDictCursor) as cursor: |
| cursor.execute(""" |
| SELECT |
| h.tour_id, |
| COUNT(*) as interaction_count, |
| MAX(h.timestamp) as last_interaction |
| FROM |
| History h |
| WHERE |
| h.user_id = %s |
| GROUP BY |
| h.tour_id |
| ORDER BY |
| interaction_count DESC, last_interaction DESC |
| """, (user_id,)) |
| return cursor.fetchall() |
|
|
| def get_tour_by_id(self, tour_id): |
| with self.conn.cursor(cursor_factory=RealDictCursor) as cursor: |
| cursor.execute(""" |
| SELECT |
| t.tour_id, |
| t.title, |
| t.duration, |
| t.departure_location, |
| t.description, |
| t.destination, |
| t.region, |
| t.itinerary, |
| t.max_participants, |
| MIN(d.price_adult) as min_price, |
| MAX(d.price_adult) as max_price, |
| AVG(d.price_adult) as avg_price |
| FROM |
| Tour t |
| LEFT JOIN |
| Departure d ON t.tour_id = d.tour_id AND d.availability = true |
| WHERE |
| t.tour_id = %s |
| GROUP BY |
| t.tour_id, t.title, t.duration, t.departure_location, |
| t.description, t.destination, t.region, t.itinerary, t.max_participants |
| """, (tour_id,)) |
| return cursor.fetchone() |
|
|
| def extract_duration_days(self, duration): |
| if not duration: |
| return 0 |
| |
| numbers = re.findall(r'\d+', duration) |
| if numbers: |
| return int(numbers[0]) |
| return 0 |
|
|
| def calculate_price_similarity(self, price1, price2): |
| if not price1 or not price2: |
| return 0.5 |
| |
| price1 = float(price1) |
| price2 = float(price2) |
| |
| max_price = max(price1, price2) |
| min_price = min(price1, price2) |
| |
| if max_price == 0: |
| return 1.0 |
| |
| ratio = min_price / max_price |
| return ratio |
|
|
| def create_tour_features(self, tours): |
| tour_features = {} |
| |
| for tour in tours: |
| title = self.preprocess_text(tour.get('title', '')) |
| description = self.preprocess_text(tour.get('description', '')) |
| departure_location = self.preprocess_text(tour.get('departure_location', '')) |
| destination = self.preprocess_list(tour.get('destination', [])) |
| region = self.preprocess_text(str(tour.get('region', ''))) |
| duration = self.preprocess_text(tour.get('duration', '')) |
| |
| itinerary = self.preprocess_json(tour.get('itinerary')) |
| attractions = self.extract_attractions_from_itinerary(tour.get('itinerary')) |
| |
| combined_features = ( |
| f"{title} " * int(self.field_weights['title'] * 20) + |
| f"{destination} " * int(self.field_weights['destination'] * 20) + |
| f"{description} " * int(self.field_weights['description'] * 20) + |
| f"{departure_location} " * int(self.field_weights['departure_location'] * 20) + |
| f"{region} " * int(self.field_weights['region'] * 20) + |
| f"{itinerary} " * int(self.field_weights['itinerary'] * 20) + |
| f"{duration} " * int(self.field_weights['duration'] * 20) + |
| f"{attractions} " * int(self.field_weights['attractions'] * 20) |
| ) |
| |
| tour_features[tour['tour_id']] = combined_features.strip() |
| |
| return tour_features |
|
|
| def calculate_enhanced_similarity(self, tours): |
| tour_features = self.create_tour_features(tours) |
| |
| tour_ids = list(tour_features.keys()) |
| feature_texts = [tour_features[tour_id] for tour_id in tour_ids] |
| |
| if not feature_texts or all(not text.strip() for text in feature_texts): |
| return {} |
| |
| try: |
| tfidf_matrix = self.vectorizer.fit_transform(feature_texts) |
| text_similarity = cosine_similarity(tfidf_matrix, tfidf_matrix) |
| except Exception as e: |
| print(f"Error in TF-IDF calculation: {e}") |
| return {} |
| |
| tour_lookup = {tour['tour_id']: tour for tour in tours} |
| |
| similarity_dict = {} |
| |
| for i, tour_id in enumerate(tour_ids): |
| similarity_dict[tour_id] = {} |
| tour_i = tour_lookup[tour_id] |
| |
| for j, other_tour_id in enumerate(tour_ids): |
| if i == j: |
| continue |
| |
| tour_j = tour_lookup[other_tour_id] |
| |
| text_sim = text_similarity[i][j] |
| |
| region_i = tour_i.get('region', 1) |
| region_j = tour_j.get('region', 1) |
| region_sim = self.region_proximity.get(region_i, {}).get(region_j, 0.3) |
| |
| duration_i = self.extract_duration_days(tour_i.get('duration')) |
| duration_j = self.extract_duration_days(tour_j.get('duration')) |
| duration_sim = 1.0 if duration_i == duration_j else 0.7 if abs(duration_i - duration_j) <= 1 else 0.3 |
| |
| price_i = tour_i.get('avg_price') |
| price_j = tour_j.get('avg_price') |
| price_sim = self.calculate_price_similarity(price_i, price_j) |
| |
| final_similarity = ( |
| text_sim * 0.6 + |
| region_sim * 0.2 + |
| duration_sim * 0.1 + |
| price_sim * 0.1 |
| ) |
| |
| similarity_dict[tour_id][other_tour_id] = final_similarity |
| |
| return similarity_dict |
|
|
| def recommend_similar_tours(self, tour_id, limit=3): |
| all_tours = self.get_all_tours() |
| target_tour = None |
| |
| for tour in all_tours: |
| if tour.get('tour_id') == tour_id: |
| target_tour = tour |
| break |
| |
| if not target_tour: |
| return [] |
| |
| similarity_dict = self.calculate_enhanced_similarity(all_tours) |
| |
| if tour_id in similarity_dict: |
| similar_tours = sorted( |
| similarity_dict[tour_id].items(), |
| key=lambda x: x[1], |
| reverse=True |
| )[:limit] |
| |
| recommended_tours = [] |
| for similar_tour_id, similarity_score in similar_tours: |
| for tour in all_tours: |
| if tour.get('tour_id') == similar_tour_id: |
| tour_copy = dict(tour) |
| tour_copy['similarity_score'] = float(similarity_score) |
| recommended_tours.append(tour_copy) |
| break |
| |
| return recommended_tours |
| |
| return [] |
|
|
| def recommend_for_user(self, user_id, limit=3): |
| user_history = self.get_user_history(user_id) |
| |
| if not user_history: |
| return self.recommend_popular_tours(limit) |
| |
| all_tours = self.get_all_tours() |
| similarity_dict = self.calculate_enhanced_similarity(all_tours) |
| |
| tour_scores = {} |
| total_interactions = sum(h['interaction_count'] for h in user_history) |
| |
| for tour in all_tours: |
| tour_id = tour.get('tour_id') |
| if tour_id is None or any(h['tour_id'] == tour_id for h in user_history): |
| continue |
| |
| total_similarity = 0 |
| total_weight = 0 |
| |
| for history_item in user_history: |
| history_tour_id = history_item['tour_id'] |
| interaction_weight = history_item['interaction_count'] / total_interactions |
| |
| if (history_tour_id in similarity_dict and |
| tour_id in similarity_dict[history_tour_id]): |
| |
| similarity = similarity_dict[history_tour_id][tour_id] |
| total_similarity += similarity * interaction_weight |
| total_weight += interaction_weight |
| |
| if total_weight > 0: |
| tour_scores[tour_id] = total_similarity / total_weight |
| |
| user_regions = set() |
| for history_item in user_history: |
| for tour in all_tours: |
| if tour['tour_id'] == history_item['tour_id']: |
| user_regions.add(tour.get('region')) |
| break |
| |
| for tour_id, score in tour_scores.items(): |
| for tour in all_tours: |
| if tour['tour_id'] == tour_id: |
| if tour.get('region') not in user_regions: |
| tour_scores[tour_id] = score * 1.1 |
| break |
| |
| top_tours = sorted( |
| tour_scores.items(), |
| key=lambda x: x[1], |
| reverse=True |
| )[:limit] |
| |
| recommended_tours = [] |
| for tour_id, similarity_score in top_tours: |
| for tour in all_tours: |
| if tour['tour_id'] == tour_id: |
| tour_copy = dict(tour) |
| tour_copy['similarity_score'] = float(similarity_score) |
| recommended_tours.append(tour_copy) |
| break |
| |
| return recommended_tours |
|
|
| def recommend_popular_tours(self, limit=3): |
| with self.conn.cursor(cursor_factory=RealDictCursor) as cursor: |
| cursor.execute(""" |
| SELECT |
| t.tour_id, |
| t.title, |
| t.duration, |
| t.departure_location, |
| t.description, |
| t.destination, |
| t.region, |
| COUNT(DISTINCT b.booking_id) as booking_count, |
| AVG(r.average_rating) as avg_rating, |
| COUNT(DISTINCT r.review_id) as review_count |
| FROM |
| Tour t |
| LEFT JOIN |
| Departure d ON t.tour_id = d.tour_id |
| LEFT JOIN |
| Booking b ON d.departure_id = b.departure_id |
| LEFT JOIN |
| Review r ON t.tour_id = r.tour_id |
| WHERE |
| t.availability = true |
| GROUP BY |
| t.tour_id, t.title, t.duration, t.departure_location, |
| t.description, t.destination, t.region |
| ORDER BY |
| (COUNT(DISTINCT b.booking_id) * 0.6 + |
| COALESCE(AVG(r.average_rating), 3.0) * COUNT(DISTINCT r.review_id) * 0.4) DESC |
| LIMIT %s |
| """, (limit,)) |
| |
| popular_tours = cursor.fetchall() |
| for tour in popular_tours: |
| tour['similarity_score'] = None |
| return popular_tours |
|
|
| def get_recommendations(self, user_id=None, tour_id=None, limit=3): |
| if tour_id: |
| return self.recommend_similar_tours(tour_id, limit) |
| elif user_id: |
| return self.recommend_for_user(user_id, limit) |
| else: |
| return self.recommend_popular_tours(limit) |
|
|
| def get_db_connection(): |
| try: |
| from src.database import conn_pool |
| return conn_pool.getconn() |
| except Exception as e: |
| print(f"Error getting connection from pool: {e}") |
| try: |
| try: |
| from src.config import DB_USER, DB_PASSWORD, DB_HOST, DB_PORT, DB_NAME |
| conn = psycopg2.connect( |
| user=DB_USER, |
| password=DB_PASSWORD, |
| host=DB_HOST, |
| port=DB_PORT, |
| dbname=DB_NAME |
| ) |
| except ImportError: |
| import os |
| conn = psycopg2.connect( |
| user=os.getenv("DB_USER"), |
| password=os.getenv("DB_PASSWORD"), |
| host=os.getenv("DB_HOST"), |
| port=os.getenv("DB_PORT"), |
| dbname=os.getenv("DB_NAME") |
| ) |
| return conn |
| except Exception as e2: |
| print(f"Error creating direct connection: {e2}") |
| raise |
|
|
| def return_db_connection(conn): |
| try: |
| from src.database import conn_pool |
| conn_pool.putconn(conn) |
| except Exception as e: |
| print(f"Error returning connection to pool: {e}") |
| try: |
| conn.close() |
| except: |
| pass |
|
|
| def convert_to_tour_summary(tour): |
| return TourSummary( |
| tour_id=tour.get('tour_id'), |
| title=tour.get('title', ''), |
| duration=tour.get('duration'), |
| departure_location=tour.get('departure_location'), |
| destination=tour.get('destination'), |
| region=tour.get('region'), |
| description=tour.get('description'), |
| similarity_score=tour.get('similarity_score') |
| ) |
|
|
| def get_tour_recommendations(user_id=None, tour_id=None, limit=3): |
| conn = None |
| try: |
| conn = get_db_connection() |
| recommender = ContentBasedRecommender(conn) |
| recommended_tours = recommender.get_recommendations(user_id, tour_id, limit) |
| tour_summaries = [convert_to_tour_summary(tour) for tour in recommended_tours] |
| response = TourRecommendationResponse( |
| recommendations=tour_summaries, |
| recommendation_type="content-based" |
| ) |
| return response |
| except Exception as e: |
| print(f"Error getting recommendations: {e}") |
| raise |
| finally: |
| if conn: |
| return_db_connection(conn) |