# evaluation.py import os import pandas as pd import re from collections import Counter from sklearn.metrics import precision_score, recall_score, f1_score, accuracy_score, classification_report def evaluate_model(scraper_folder="drug_analysis_data_3months"): """ Full evaluation function for Twitter drug/crime scraping dataset: - Computes general stats, missing data, duplicates - Drug/Crime-related stats - Time coverage - Text analysis - User/source analysis - Scraper evaluation metrics - Classification metrics if applicable - Saves all results to 'evaluation_results' folder """ # ----------------------------- # Output folder # ----------------------------- output_folder = "evaluation_results" os.makedirs(output_folder, exist_ok=True) output_lines = [] # ----------------------------- # Load CSVs # ----------------------------- csv_files = [f for f in os.listdir(scraper_folder) if f.endswith(".csv")] if not csv_files: print("❌ No CSV files found in scraper folder!") return dfs = [pd.read_csv(os.path.join(scraper_folder, f)) for f in csv_files] df = pd.concat(dfs, ignore_index=True) msg = f"✅ Loaded {len(df)} rows from {len(csv_files)} CSV files." print(msg) output_lines.append(msg) msg = f"Columns detected in CSVs: {df.columns.tolist()}" print(msg) output_lines.append(msg) # ----------------------------- # General Stats # ----------------------------- output_lines.append("\n=== General Stats ===") print("\n=== General Stats ===") stats = [ f"Columns: {df.columns.tolist()}", f"Total rows: {len(df)}", f"Missing values per column:\n{df.isna().sum()}", f"Duplicate rows: {df.duplicated().sum()}" ] for s in stats: print(s) output_lines.append(s) # Sample rows with missing data missing_rows = df[df.isna().any(axis=1)] if not missing_rows.empty: msg = f"\nSample rows with missing values:\n{missing_rows.head()}" print(msg) output_lines.append(msg) # Sample duplicate rows duplicates = df[df.duplicated(keep=False)] if not duplicates.empty: msg = f"\nSample duplicate rows:\n{duplicates.head()}" print(msg) output_lines.append(msg) # ----------------------------- # Drug/Crime-related stats # ----------------------------- for col in ["is_drug_related", "is_crime_related", "risk_level"]: if col in df.columns: msg = f"\n=== {col} Distribution ===" print(msg) output_lines.append(msg) msg = str(df[col].value_counts()) print(msg) output_lines.append(msg) msg = f"Proportion:\n{round(df[col].value_counts(normalize=True), 4)}" print(msg) output_lines.append(msg) # Risk level numeric analysis if "risk_level" in df.columns and pd.api.types.is_numeric_dtype(df["risk_level"]): stats = [ "\n=== Risk Level Stats ===", f"Average risk: {round(df['risk_level'].mean(), 2)}", f"Max risk: {df['risk_level'].max()}", f"Number of high-risk items (risk >= 0.7): {(df['risk_level'] >= 0.7).sum()}" ] for s in stats: print(s) output_lines.append(s) # ----------------------------- # Time coverage # ----------------------------- if "datetime" in df.columns: df["datetime"] = pd.to_datetime(df["datetime"], errors="coerce") stats = [ "\n=== Date Range ===", f"Earliest: {df['datetime'].min()}", f"Latest: {df['datetime'].max()}" ] for s in stats: print(s) output_lines.append(s) # Daily counts df["date"] = df["datetime"].dt.date daily_counts = df.groupby("date").size() msg = f"\n=== Daily Counts of Posts ===\n{daily_counts}" print(msg) output_lines.append(msg) # ----------------------------- # Text Analysis # ----------------------------- if "text" in df.columns: df["text"] = df["text"].astype(str) df["text_length"] = df["text"].apply(len) stats = [ "\n=== Text Length Stats ===", f"Average length: {round(df['text_length'].mean(), 2)}", f"Min length: {df['text_length'].min()}", f"Max length: {df['text_length'].max()}" ] for s in stats: print(s) output_lines.append(s) # Top 10 most common words words = Counter() for t in df["text"]: words.update(re.findall(r"\w+", t.lower())) msg = f"\nTop 10 common words: {words.most_common(10)}" print(msg) output_lines.append(msg) # ----------------------------- # User / Source Analysis # ----------------------------- if "username" in df.columns: stats = [ "\n=== User Analysis ===", f"Total unique users: {df['username'].nunique()}", f"Top 10 users by post count:\n{df['username'].value_counts().head(10)}" ] for s in stats: print(s) output_lines.append(s) # ----------------------------- # Scraper Evaluation Metrics # ----------------------------- output_lines.append("\n=== Scraper Evaluation Metrics ===") print("\n=== Scraper Evaluation Metrics ===") completeness = 1 - df.isna().mean().mean() duplicate_rate = df.duplicated().mean() output_lines += [ f"Completeness (all columns filled): {round(completeness*100, 2)}%", f"Duplicate rows rate: {round(duplicate_rate*100, 2)}%" ] print(output_lines[-2]) print(output_lines[-1]) for col in ["is_drug_related", "is_crime_related"]: if col in df.columns: relevance = df[col].sum() / len(df) msg = f"{col} relevance rate: {round(relevance*100,2)}%" print(msg) output_lines.append(msg) if "datetime" in df.columns: total_days = (df["datetime"].max() - df["datetime"].min()).days + 1 active_days = df["date"].nunique() coverage_ratio = active_days / total_days msg = f"Time coverage ratio (active days / total days): {round(coverage_ratio*100,2)}%" print(msg) output_lines.append(msg) if "text" in df.columns: msg = f"Average text length: {round(df['text_length'].mean(),2)} characters" print(msg) output_lines.append(msg) # ----------------------------- # Classification Metrics # ----------------------------- if "is_drug_related" in df.columns and "is_crime_related" in df.columns: y_true = df["is_crime_related"] y_pred = df["is_drug_related"] report = classification_report(y_true, y_pred, output_dict=True) class_report_df = pd.DataFrame(report).transpose() class_report_df.to_csv(os.path.join(output_folder, "classification_report.csv"), index=True) stats = [ "\n=== Classification Metrics (is_drug_related vs is_crime_related) ===", f"Accuracy: {round(accuracy_score(y_true, y_pred), 4)}", f"Precision: {round(precision_score(y_true, y_pred), 4)}", f"Recall: {round(recall_score(y_true, y_pred), 4)}", f"F1-score: {round(f1_score(y_true, y_pred), 4)}", "\nClassification report saved as 'classification_report.csv'" ] for s in stats: print(s) output_lines.append(s) else: msg = "\n⚠️ Skipping classification metrics: Not enough columns for evaluation." print(msg) output_lines.append(msg) # ----------------------------- # Save all results to txt # ----------------------------- with open(os.path.join(output_folder, "evaluation_results.txt"), "w", encoding="utf-8") as f: for line in output_lines: f.write(str(line) + "\n") print("\n✅ Data evaluation + metrics complete! Results saved in 'evaluation_results/' folder.") # ----------------------------- # Run the evaluation # ----------------------------- if __name__ == "__main__": evaluate_model(scraper_folder="drug_analysis_data_3months")