# 由 Copilot 生成 import pandas as pd import numpy as np from typing import Dict, List, Tuple import json from transformers import pipeline, AutoTokenizer, AutoModel from datasets import Dataset import re class RentalDataAnalyzer: """租屋資料分析器""" def __init__(self, data_path: str = None): """ 初始化分析器 Args: data_path: 資料檔案路徑 """ self.data_path = data_path self.df = None self.analysis_results = {} # 初始化Hugging Face模型用於文字分析 self.sentiment_analyzer = None self.text_classifier = None def load_data(self, data_path: str = None) -> pd.DataFrame: """載入資料""" if data_path: self.data_path = data_path try: if self.data_path.endswith('.json'): with open(self.data_path, 'r', encoding='utf-8') as f: data = json.load(f) self.df = pd.DataFrame(data) elif self.data_path.endswith('.csv'): self.df = pd.read_csv(self.data_path, encoding='utf-8-sig') else: raise ValueError("不支援的檔案格式") print(f"成功載入 {len(self.df)} 筆資料") return self.df except Exception as e: print(f"載入資料時發生錯誤: {e}") return None def clean_data(self) -> pd.DataFrame: """清洗資料""" if self.df is None: print("請先載入資料") return None print("開始清洗資料...") # 移除重複資料 original_count = len(self.df) self.df = self.df.drop_duplicates(subset=['title', 'address', 'price']) print(f"移除 {original_count - len(self.df)} 筆重複資料") # 清理租金資料 self.df['price'] = pd.to_numeric(self.df['price'], errors='coerce') self.df = self.df[self.df['price'] > 0] # 移除無效租金 # 清理坪數資料 self.df['area'] = pd.to_numeric(self.df['area'], errors='coerce') # 計算每坪租金 self.df['price_per_ping'] = self.df.apply( lambda row: row['price'] / row['area'] if row['area'] > 0 else np.nan, axis=1 ) # 移除異常值(使用IQR方法) self.df = self.remove_outliers(self.df, 'price') print(f"清洗後剩餘 {len(self.df)} 筆有效資料") return self.df def remove_outliers(self, df: pd.DataFrame, column: str) -> pd.DataFrame: """移除異常值""" Q1 = df[column].quantile(0.25) Q3 = df[column].quantile(0.75) IQR = Q3 - Q1 lower_bound = Q1 - 1.5 * IQR upper_bound = Q3 + 1.5 * IQR outliers_count = len(df[(df[column] < lower_bound) | (df[column] > upper_bound)]) print(f"移除 {outliers_count} 筆 {column} 異常值") return df[(df[column] >= lower_bound) & (df[column] <= upper_bound)] def basic_statistics(self) -> Dict: """基本統計分析""" if self.df is None or len(self.df) == 0: return {} stats = { 'total_properties': len(self.df), 'price_stats': { 'mean': round(self.df['price'].mean(), 2), 'median': round(self.df['price'].median(), 2), 'std': round(self.df['price'].std(), 2), 'min': self.df['price'].min(), 'max': self.df['price'].max(), 'q25': round(self.df['price'].quantile(0.25), 2), 'q75': round(self.df['price'].quantile(0.75), 2) }, 'area_stats': { 'mean': round(self.df['area'].mean(), 2), 'median': round(self.df['area'].median(), 2), 'min': self.df['area'].min(), 'max': self.df['area'].max() } if not self.df['area'].isna().all() else {}, 'price_per_ping_stats': { 'mean': round(self.df['price_per_ping'].mean(), 2), 'median': round(self.df['price_per_ping'].median(), 2), 'min': round(self.df['price_per_ping'].min(), 2), 'max': round(self.df['price_per_ping'].max(), 2) } if not self.df['price_per_ping'].isna().all() else {} } self.analysis_results['basic_stats'] = stats return stats def price_distribution_analysis(self) -> Dict: """租金分布分析""" if self.df is None or len(self.df) == 0: return {} # 定義租金區間 price_bins = [0, 15000, 20000, 25000, 30000, 40000, float('inf')] price_labels = ['<15K', '15-20K', '20-25K', '25-30K', '30-40K', '>40K'] self.df['price_range'] = pd.cut(self.df['price'], bins=price_bins, labels=price_labels, right=False) distribution = self.df['price_range'].value_counts().sort_index() distribution_dict = { 'ranges': distribution.index.tolist(), 'counts': distribution.values.tolist(), 'percentages': (distribution / len(self.df) * 100).round(2).tolist() } self.analysis_results['price_distribution'] = distribution_dict return distribution_dict def area_analysis(self) -> Dict: """坪數分析""" if self.df is None or len(self.df) == 0 or self.df['area'].isna().all(): return {} # 定義坪數區間 area_bins = [0, 20, 30, 40, 50, float('inf')] area_labels = ['<20坪', '20-30坪', '30-40坪', '40-50坪', '>50坪'] self.df['area_range'] = pd.cut(self.df['area'], bins=area_bins, labels=area_labels, right=False) area_distribution = self.df['area_range'].value_counts().sort_index() area_dict = { 'ranges': area_distribution.index.tolist(), 'counts': area_distribution.values.tolist(), 'percentages': (area_distribution / len(self.df) * 100).round(2).tolist() } self.analysis_results['area_analysis'] = area_dict return area_dict def setup_huggingface_models(self): """設置Hugging Face模型""" try: print("載入Hugging Face模型...") # 載入中文情感分析模型 self.sentiment_analyzer = pipeline( "sentiment-analysis", model="ckiplab/bert-base-chinese-ws", return_all_scores=True ) print("Hugging Face模型載入完成") except Exception as e: print(f"載入Hugging Face模型時發生錯誤: {e}") def analyze_descriptions(self) -> Dict: """分析物件描述文字""" if self.df is None or 'raw_info' not in self.df.columns: return {} descriptions = self.df['raw_info'].dropna().tolist() if not descriptions: return {} # 關鍵字分析 keywords_analysis = self.analyze_keywords(descriptions) analysis_result = { 'keywords_frequency': keywords_analysis, 'total_descriptions': len(descriptions) } self.analysis_results['description_analysis'] = analysis_result return analysis_result def analyze_keywords(self, descriptions: List[str]) -> Dict: """分析關鍵字頻率""" # 定義房屋相關關鍵字 keywords = [ '近捷運', '近車站', '電梯', '陽台', '停車位', '管理費', '採光', '通風', '安靜', '便利', '生活機能', '學區', '全新', '裝潢', '家具', '家電', '冷氣', '洗衣機' ] keyword_counts = {keyword: 0 for keyword in keywords} for desc in descriptions: for keyword in keywords: if keyword in desc: keyword_counts[keyword] += 1 # 排序並取前10個 sorted_keywords = dict(sorted(keyword_counts.items(), key=lambda x: x[1], reverse=True)[:10]) return sorted_keywords def correlation_analysis(self) -> Dict: """相關性分析""" if self.df is None or len(self.df) == 0: return {} numeric_columns = ['price', 'area', 'price_per_ping'] available_columns = [col for col in numeric_columns if col in self.df.columns and not self.df[col].isna().all()] if len(available_columns) < 2: return {} correlation_matrix = self.df[available_columns].corr() correlation_dict = {} for i, col1 in enumerate(available_columns): for j, col2 in enumerate(available_columns): if i < j: # 避免重複 correlation_dict[f"{col1}_vs_{col2}"] = round(correlation_matrix.loc[col1, col2], 3) self.analysis_results['correlation'] = correlation_dict return correlation_dict def generate_insights(self) -> List[str]: """生成分析洞察""" insights = [] if 'basic_stats' in self.analysis_results: stats = self.analysis_results['basic_stats'] insights.append(f"共找到 {stats['total_properties']} 筆符合條件的租屋物件") insights.append(f"平均租金為 {stats['price_stats']['mean']:,} 元") insights.append(f"租金中位數為 {stats['price_stats']['median']:,} 元") if stats['price_stats']['mean'] > stats['price_stats']['median']: insights.append("租金分布向右偏斜,存在高租金物件拉高平均值") if 'price_distribution' in self.analysis_results: dist = self.analysis_results['price_distribution'] max_range_idx = dist['percentages'].index(max(dist['percentages'])) most_common_range = dist['ranges'][max_range_idx] percentage = dist['percentages'][max_range_idx] insights.append(f"最常見的租金區間是 {most_common_range},佔 {percentage}%") if 'area_analysis' in self.analysis_results: area = self.analysis_results['area_analysis'] if area: max_area_idx = area['percentages'].index(max(area['percentages'])) most_common_area = area['ranges'][max_area_idx] insights.append(f"最常見的坪數區間是 {most_common_area}") return insights def run_full_analysis(self) -> Dict: """執行完整分析""" print("開始執行完整分析...") # 基本統計 basic_stats = self.basic_statistics() print("? 基本統計分析完成") # 租金分布分析 price_dist = self.price_distribution_analysis() print("? 租金分布分析完成") # 坪數分析 area_analysis = self.area_analysis() print("? 坪數分析完成") # 描述文字分析 desc_analysis = self.analyze_descriptions() print("? 描述文字分析完成") # 相關性分析 correlation = self.correlation_analysis() print("? 相關性分析完成") # 生成洞察 insights = self.generate_insights() print("? 洞察生成完成") self.analysis_results['insights'] = insights return self.analysis_results def save_analysis_results(self, filename: str = "analysis_results.json"): """儲存分析結果""" try: with open(f"output/{filename}", 'w', encoding='utf-8') as f: json.dump(self.analysis_results, f, ensure_ascii=False, indent=2) print(f"分析結果已儲存到 output/{filename}") except Exception as e: print(f"儲存分析結果時發生錯誤: {e}") def print_summary(self): """印出分析摘要""" if not self.analysis_results: print("沒有分析結果可顯示") return print("\n" + "="*50) print("高雄市鼓山區租屋市場分析報告") print("="*50) if 'insights' in self.analysis_results: print("\n? 重要洞察:") for i, insight in enumerate(self.analysis_results['insights'], 1): print(f"{i}. {insight}") if 'basic_stats' in self.analysis_results: stats = self.analysis_results['basic_stats'] print(f"\n? 租金統計:") print(f" 平均租金: {stats['price_stats']['mean']:,} 元") print(f" 中位數: {stats['price_stats']['median']:,} 元") print(f" 最低租金: {stats['price_stats']['min']:,} 元") print(f" 最高租金: {stats['price_stats']['max']:,} 元") print(f" 標準差: {stats['price_stats']['std']:,} 元") if 'price_distribution' in self.analysis_results: print(f"\n? 租金分布:") dist = self.analysis_results['price_distribution'] for range_name, count, percentage in zip(dist['ranges'], dist['counts'], dist['percentages']): print(f" {range_name}: {count} 筆 ({percentage}%)") print("\n" + "="*50) if __name__ == "__main__": # 測試分析器 analyzer = RentalDataAnalyzer() # 載入資料 df = analyzer.load_data("output/rental_data.csv") if df is not None: # 清洗資料 analyzer.clean_data() # 執行完整分析 results = analyzer.run_full_analysis() # 儲存結果 analyzer.save_analysis_results() # 顯示摘要 analyzer.print_summary()