import psycopg2 from psycopg2.extras import RealDictCursor import re import json from typing import List, Dict, Any, Optional, Union from sklearn.feature_extraction.text import TfidfVectorizer from sklearn.metrics.pairwise import cosine_similarity import numpy as np from pydantic import BaseModel, Field from bs4 import BeautifulSoup import math class TourRecommendationRequest(BaseModel): user_id: Optional[int] = Field(None) tour_id: Optional[int] = Field(None) limit: int = Field(3, ge=1, le=10) class TourSummary(BaseModel): tour_id: int title: str duration: Optional[str] = None departure_location: Optional[str] = None destination: Optional[List[str]] = None region: Optional[int] = None description: Optional[str] = None similarity_score: Optional[float] = None class TourRecommendationResponse(BaseModel): recommendations: List[TourSummary] class ContentBasedRecommender: def __init__(self, conn): self.conn = conn vietnamese_stop_words = [ "và", "là", "của", "trong", "được", "có", "không", "cho", "với", "tại", "bằng", "để", "này", "khi", "một", "những", "các", "đã", "rồi", "lại", "nếu", "vì", "thì", "từ", "ra", "đến", "trên", "dưới", "quý", "khách", "tham", "quan", "du", "lịch", "tour", "ngày", "đêm", "ăn", "sáng", "trưa", "tối", "nghỉ", "khách", "sạn", "tự", "túc" ] self.vectorizer = TfidfVectorizer( max_features=8000, stop_words=vietnamese_stop_words, ngram_range=(1, 3), min_df=1, max_df=0.8, token_pattern=r'[a-zA-ZÀ-ỹ]+', lowercase=True ) self.field_weights = { 'title': 0.20, 'destination': 0.30, 'description': 0.15, 'departure_location': 0.10, 'region': 0.15, 'itinerary': 0.10, 'duration': 0.05, 'attractions': 0.15 } self.region_proximity = { 1: {1: 1.0, 2: 0.6, 3: 0.3}, 2: {1: 0.6, 2: 1.0, 3: 0.7}, 3: {1: 0.3, 2: 0.7, 3: 1.0} } def clean_html(self, text): if not text: return "" try: soup = BeautifulSoup(text, 'html.parser') clean_text = soup.get_text() clean_text = re.sub(r'\s+', ' ', clean_text).strip() return clean_text except: return str(text) def preprocess_text(self, text): if not text: return "" text = self.clean_html(text) text = str(text).lower() text = re.sub(r'[^\w\sÀ-ỹ]', ' ', text) text = re.sub(r'\s+', ' ', text).strip() words = text.split() words = [word for word in words if len(word) >= 2] return " ".join(words) def preprocess_list(self, items): if not items: return "" processed_items = [] for item in items: cleaned = self.preprocess_text(item) if cleaned: processed_items.append(cleaned) return " ".join(processed_items) def extract_attractions_from_itinerary(self, itinerary): if not itinerary: return "" try: if isinstance(itinerary, str): data = json.loads(itinerary) else: data = itinerary attractions = [] if isinstance(data, list): for day in data: if isinstance(day, dict): description = day.get('description', '') if description: clean_desc = self.clean_html(description) soup = BeautifulSoup(description, 'html.parser') strong_tags = soup.find_all('strong') for tag in strong_tags: attractions.append(tag.get_text()) colored_spans = soup.find_all('span', style=lambda x: x and 'color' in x) for span in colored_spans: attractions.append(span.get_text()) clean_attractions = [] for attraction in attractions: cleaned = self.preprocess_text(attraction) if cleaned and len(cleaned) > 3: clean_attractions.append(cleaned) return " ".join(clean_attractions) except Exception as e: print(f"Error extracting attractions: {e}") return "" def preprocess_json(self, json_data): if not json_data: return "" try: if isinstance(json_data, str): data = json.loads(json_data) else: data = json_data text_values = [] def extract_values(obj): if isinstance(obj, dict): for key, val in obj.items(): if key.lower() in ['title', 'description', 'name', 'location']: if val: clean_val = self.clean_html(str(val)) if clean_val: text_values.append(clean_val) else: extract_values(val) elif isinstance(obj, list): for item in obj: extract_values(item) elif obj and len(str(obj)) > 3: clean_val = self.clean_html(str(obj)) if clean_val: text_values.append(clean_val) extract_values(data) return " ".join(text_values) except Exception as e: print(f"Error preprocessing JSON: {e}") return "" def get_all_tours(self): with self.conn.cursor(cursor_factory=RealDictCursor) as cursor: cursor.execute(""" SELECT t.tour_id, t.title, t.duration, t.departure_location, t.description, t.destination, t.region, t.itinerary, t.max_participants, MIN(d.price_adult) as min_price, MAX(d.price_adult) as max_price, AVG(d.price_adult) as avg_price FROM Tour t LEFT JOIN Departure d ON t.tour_id = d.tour_id AND d.availability = true WHERE t.availability = true GROUP BY t.tour_id, t.title, t.duration, t.departure_location, t.description, t.destination, t.region, t.itinerary, t.max_participants """) return cursor.fetchall() def get_user_history(self, user_id): with self.conn.cursor(cursor_factory=RealDictCursor) as cursor: cursor.execute(""" SELECT h.tour_id, COUNT(*) as interaction_count, MAX(h.timestamp) as last_interaction FROM History h WHERE h.user_id = %s GROUP BY h.tour_id ORDER BY interaction_count DESC, last_interaction DESC """, (user_id,)) return cursor.fetchall() def get_tour_by_id(self, tour_id): with self.conn.cursor(cursor_factory=RealDictCursor) as cursor: cursor.execute(""" SELECT t.tour_id, t.title, t.duration, t.departure_location, t.description, t.destination, t.region, t.itinerary, t.max_participants, MIN(d.price_adult) as min_price, MAX(d.price_adult) as max_price, AVG(d.price_adult) as avg_price FROM Tour t LEFT JOIN Departure d ON t.tour_id = d.tour_id AND d.availability = true WHERE t.tour_id = %s GROUP BY t.tour_id, t.title, t.duration, t.departure_location, t.description, t.destination, t.region, t.itinerary, t.max_participants """, (tour_id,)) return cursor.fetchone() def extract_duration_days(self, duration): if not duration: return 0 numbers = re.findall(r'\d+', duration) if numbers: return int(numbers[0]) return 0 def calculate_price_similarity(self, price1, price2): if not price1 or not price2: return 0.5 price1 = float(price1) price2 = float(price2) max_price = max(price1, price2) min_price = min(price1, price2) if max_price == 0: return 1.0 ratio = min_price / max_price return ratio def create_tour_features(self, tours): tour_features = {} for tour in tours: title = self.preprocess_text(tour.get('title', '')) description = self.preprocess_text(tour.get('description', '')) departure_location = self.preprocess_text(tour.get('departure_location', '')) destination = self.preprocess_list(tour.get('destination', [])) region = self.preprocess_text(str(tour.get('region', ''))) duration = self.preprocess_text(tour.get('duration', '')) itinerary = self.preprocess_json(tour.get('itinerary')) attractions = self.extract_attractions_from_itinerary(tour.get('itinerary')) combined_features = ( f"{title} " * int(self.field_weights['title'] * 20) + f"{destination} " * int(self.field_weights['destination'] * 20) + f"{description} " * int(self.field_weights['description'] * 20) + f"{departure_location} " * int(self.field_weights['departure_location'] * 20) + f"{region} " * int(self.field_weights['region'] * 20) + f"{itinerary} " * int(self.field_weights['itinerary'] * 20) + f"{duration} " * int(self.field_weights['duration'] * 20) + f"{attractions} " * int(self.field_weights['attractions'] * 20) ) tour_features[tour['tour_id']] = combined_features.strip() return tour_features def calculate_enhanced_similarity(self, tours): tour_features = self.create_tour_features(tours) tour_ids = list(tour_features.keys()) feature_texts = [tour_features[tour_id] for tour_id in tour_ids] if not feature_texts or all(not text.strip() for text in feature_texts): return {} try: tfidf_matrix = self.vectorizer.fit_transform(feature_texts) text_similarity = cosine_similarity(tfidf_matrix, tfidf_matrix) except Exception as e: print(f"Error in TF-IDF calculation: {e}") return {} tour_lookup = {tour['tour_id']: tour for tour in tours} similarity_dict = {} for i, tour_id in enumerate(tour_ids): similarity_dict[tour_id] = {} tour_i = tour_lookup[tour_id] for j, other_tour_id in enumerate(tour_ids): if i == j: continue tour_j = tour_lookup[other_tour_id] text_sim = text_similarity[i][j] region_i = tour_i.get('region', 1) region_j = tour_j.get('region', 1) region_sim = self.region_proximity.get(region_i, {}).get(region_j, 0.3) duration_i = self.extract_duration_days(tour_i.get('duration')) duration_j = self.extract_duration_days(tour_j.get('duration')) duration_sim = 1.0 if duration_i == duration_j else 0.7 if abs(duration_i - duration_j) <= 1 else 0.3 price_i = tour_i.get('avg_price') price_j = tour_j.get('avg_price') price_sim = self.calculate_price_similarity(price_i, price_j) final_similarity = ( text_sim * 0.6 + region_sim * 0.2 + duration_sim * 0.1 + price_sim * 0.1 ) similarity_dict[tour_id][other_tour_id] = final_similarity return similarity_dict def recommend_similar_tours(self, tour_id, limit=3): all_tours = self.get_all_tours() target_tour = None for tour in all_tours: if tour.get('tour_id') == tour_id: target_tour = tour break if not target_tour: return [] similarity_dict = self.calculate_enhanced_similarity(all_tours) if tour_id in similarity_dict: similar_tours = sorted( similarity_dict[tour_id].items(), key=lambda x: x[1], reverse=True )[:limit] recommended_tours = [] for similar_tour_id, similarity_score in similar_tours: for tour in all_tours: if tour.get('tour_id') == similar_tour_id: tour_copy = dict(tour) tour_copy['similarity_score'] = float(similarity_score) recommended_tours.append(tour_copy) break return recommended_tours return [] def recommend_for_user(self, user_id, limit=3): user_history = self.get_user_history(user_id) if not user_history: return self.recommend_popular_tours(limit) all_tours = self.get_all_tours() similarity_dict = self.calculate_enhanced_similarity(all_tours) tour_scores = {} total_interactions = sum(h['interaction_count'] for h in user_history) for tour in all_tours: tour_id = tour.get('tour_id') if tour_id is None or any(h['tour_id'] == tour_id for h in user_history): continue total_similarity = 0 total_weight = 0 for history_item in user_history: history_tour_id = history_item['tour_id'] interaction_weight = history_item['interaction_count'] / total_interactions if (history_tour_id in similarity_dict and tour_id in similarity_dict[history_tour_id]): similarity = similarity_dict[history_tour_id][tour_id] total_similarity += similarity * interaction_weight total_weight += interaction_weight if total_weight > 0: tour_scores[tour_id] = total_similarity / total_weight user_regions = set() for history_item in user_history: for tour in all_tours: if tour['tour_id'] == history_item['tour_id']: user_regions.add(tour.get('region')) break for tour_id, score in tour_scores.items(): for tour in all_tours: if tour['tour_id'] == tour_id: if tour.get('region') not in user_regions: tour_scores[tour_id] = score * 1.1 break top_tours = sorted( tour_scores.items(), key=lambda x: x[1], reverse=True )[:limit] recommended_tours = [] for tour_id, similarity_score in top_tours: for tour in all_tours: if tour['tour_id'] == tour_id: tour_copy = dict(tour) tour_copy['similarity_score'] = float(similarity_score) recommended_tours.append(tour_copy) break return recommended_tours def recommend_popular_tours(self, limit=3): with self.conn.cursor(cursor_factory=RealDictCursor) as cursor: cursor.execute(""" SELECT t.tour_id, t.title, t.duration, t.departure_location, t.description, t.destination, t.region, COUNT(DISTINCT b.booking_id) as booking_count, AVG(r.average_rating) as avg_rating, COUNT(DISTINCT r.review_id) as review_count FROM Tour t LEFT JOIN Departure d ON t.tour_id = d.tour_id LEFT JOIN Booking b ON d.departure_id = b.departure_id LEFT JOIN Review r ON t.tour_id = r.tour_id WHERE t.availability = true GROUP BY t.tour_id, t.title, t.duration, t.departure_location, t.description, t.destination, t.region ORDER BY (COUNT(DISTINCT b.booking_id) * 0.6 + COALESCE(AVG(r.average_rating), 3.0) * COUNT(DISTINCT r.review_id) * 0.4) DESC LIMIT %s """, (limit,)) popular_tours = cursor.fetchall() for tour in popular_tours: tour['similarity_score'] = None return popular_tours def get_recommendations(self, user_id=None, tour_id=None, limit=3): if tour_id: return self.recommend_similar_tours(tour_id, limit) elif user_id: return self.recommend_for_user(user_id, limit) else: return self.recommend_popular_tours(limit) def get_db_connection(): try: from src.database import conn_pool return conn_pool.getconn() except Exception as e: print(f"Error getting connection from pool: {e}") try: try: from src.config import DB_USER, DB_PASSWORD, DB_HOST, DB_PORT, DB_NAME conn = psycopg2.connect( user=DB_USER, password=DB_PASSWORD, host=DB_HOST, port=DB_PORT, dbname=DB_NAME ) except ImportError: import os conn = psycopg2.connect( user=os.getenv("DB_USER"), password=os.getenv("DB_PASSWORD"), host=os.getenv("DB_HOST"), port=os.getenv("DB_PORT"), dbname=os.getenv("DB_NAME") ) return conn except Exception as e2: print(f"Error creating direct connection: {e2}") raise def return_db_connection(conn): try: from src.database import conn_pool conn_pool.putconn(conn) except Exception as e: print(f"Error returning connection to pool: {e}") try: conn.close() except: pass def convert_to_tour_summary(tour): return TourSummary( tour_id=tour.get('tour_id'), title=tour.get('title', ''), duration=tour.get('duration'), departure_location=tour.get('departure_location'), destination=tour.get('destination'), region=tour.get('region'), description=tour.get('description'), similarity_score=tour.get('similarity_score') ) def get_tour_recommendations(user_id=None, tour_id=None, limit=3): conn = None try: conn = get_db_connection() recommender = ContentBasedRecommender(conn) recommended_tours = recommender.get_recommendations(user_id, tour_id, limit) tour_summaries = [convert_to_tour_summary(tour) for tour in recommended_tours] response = TourRecommendationResponse( recommendations=tour_summaries, recommendation_type="content-based" ) return response except Exception as e: print(f"Error getting recommendations: {e}") raise finally: if conn: return_db_connection(conn)