Spaces:
Sleeping
Sleeping
| # evaluation.py | |
| import os | |
| import pandas as pd | |
| import re | |
| from collections import Counter | |
| from sklearn.metrics import precision_score, recall_score, f1_score, accuracy_score, classification_report | |
| def evaluate_model(scraper_folder="drug_analysis_data_3months"): | |
| """ | |
| Full evaluation function for Twitter drug/crime scraping dataset: | |
| - Computes general stats, missing data, duplicates | |
| - Drug/Crime-related stats | |
| - Time coverage | |
| - Text analysis | |
| - User/source analysis | |
| - Scraper evaluation metrics | |
| - Classification metrics if applicable | |
| - Saves all results to 'evaluation_results' folder | |
| """ | |
| # ----------------------------- | |
| # Output folder | |
| # ----------------------------- | |
| output_folder = "evaluation_results" | |
| os.makedirs(output_folder, exist_ok=True) | |
| output_lines = [] | |
| # ----------------------------- | |
| # Load CSVs | |
| # ----------------------------- | |
| csv_files = [f for f in os.listdir(scraper_folder) if f.endswith(".csv")] | |
| if not csv_files: | |
| print("❌ No CSV files found in scraper folder!") | |
| return | |
| dfs = [pd.read_csv(os.path.join(scraper_folder, f)) for f in csv_files] | |
| df = pd.concat(dfs, ignore_index=True) | |
| msg = f"✅ Loaded {len(df)} rows from {len(csv_files)} CSV files." | |
| print(msg) | |
| output_lines.append(msg) | |
| msg = f"Columns detected in CSVs: {df.columns.tolist()}" | |
| print(msg) | |
| output_lines.append(msg) | |
| # ----------------------------- | |
| # General Stats | |
| # ----------------------------- | |
| output_lines.append("\n=== General Stats ===") | |
| print("\n=== General Stats ===") | |
| stats = [ | |
| f"Columns: {df.columns.tolist()}", | |
| f"Total rows: {len(df)}", | |
| f"Missing values per column:\n{df.isna().sum()}", | |
| f"Duplicate rows: {df.duplicated().sum()}" | |
| ] | |
| for s in stats: | |
| print(s) | |
| output_lines.append(s) | |
| # Sample rows with missing data | |
| missing_rows = df[df.isna().any(axis=1)] | |
| if not missing_rows.empty: | |
| msg = f"\nSample rows with missing values:\n{missing_rows.head()}" | |
| print(msg) | |
| output_lines.append(msg) | |
| # Sample duplicate rows | |
| duplicates = df[df.duplicated(keep=False)] | |
| if not duplicates.empty: | |
| msg = f"\nSample duplicate rows:\n{duplicates.head()}" | |
| print(msg) | |
| output_lines.append(msg) | |
| # ----------------------------- | |
| # Drug/Crime-related stats | |
| # ----------------------------- | |
| for col in ["is_drug_related", "is_crime_related", "risk_level"]: | |
| if col in df.columns: | |
| msg = f"\n=== {col} Distribution ===" | |
| print(msg) | |
| output_lines.append(msg) | |
| msg = str(df[col].value_counts()) | |
| print(msg) | |
| output_lines.append(msg) | |
| msg = f"Proportion:\n{round(df[col].value_counts(normalize=True), 4)}" | |
| print(msg) | |
| output_lines.append(msg) | |
| # Risk level numeric analysis | |
| if "risk_level" in df.columns and pd.api.types.is_numeric_dtype(df["risk_level"]): | |
| stats = [ | |
| "\n=== Risk Level Stats ===", | |
| f"Average risk: {round(df['risk_level'].mean(), 2)}", | |
| f"Max risk: {df['risk_level'].max()}", | |
| f"Number of high-risk items (risk >= 0.7): {(df['risk_level'] >= 0.7).sum()}" | |
| ] | |
| for s in stats: | |
| print(s) | |
| output_lines.append(s) | |
| # ----------------------------- | |
| # Time coverage | |
| # ----------------------------- | |
| if "datetime" in df.columns: | |
| df["datetime"] = pd.to_datetime(df["datetime"], errors="coerce") | |
| stats = [ | |
| "\n=== Date Range ===", | |
| f"Earliest: {df['datetime'].min()}", | |
| f"Latest: {df['datetime'].max()}" | |
| ] | |
| for s in stats: | |
| print(s) | |
| output_lines.append(s) | |
| # Daily counts | |
| df["date"] = df["datetime"].dt.date | |
| daily_counts = df.groupby("date").size() | |
| msg = f"\n=== Daily Counts of Posts ===\n{daily_counts}" | |
| print(msg) | |
| output_lines.append(msg) | |
| # ----------------------------- | |
| # Text Analysis | |
| # ----------------------------- | |
| if "text" in df.columns: | |
| df["text"] = df["text"].astype(str) | |
| df["text_length"] = df["text"].apply(len) | |
| stats = [ | |
| "\n=== Text Length Stats ===", | |
| f"Average length: {round(df['text_length'].mean(), 2)}", | |
| f"Min length: {df['text_length'].min()}", | |
| f"Max length: {df['text_length'].max()}" | |
| ] | |
| for s in stats: | |
| print(s) | |
| output_lines.append(s) | |
| # Top 10 most common words | |
| words = Counter() | |
| for t in df["text"]: | |
| words.update(re.findall(r"\w+", t.lower())) | |
| msg = f"\nTop 10 common words: {words.most_common(10)}" | |
| print(msg) | |
| output_lines.append(msg) | |
| # ----------------------------- | |
| # User / Source Analysis | |
| # ----------------------------- | |
| if "username" in df.columns: | |
| stats = [ | |
| "\n=== User Analysis ===", | |
| f"Total unique users: {df['username'].nunique()}", | |
| f"Top 10 users by post count:\n{df['username'].value_counts().head(10)}" | |
| ] | |
| for s in stats: | |
| print(s) | |
| output_lines.append(s) | |
| # ----------------------------- | |
| # Scraper Evaluation Metrics | |
| # ----------------------------- | |
| output_lines.append("\n=== Scraper Evaluation Metrics ===") | |
| print("\n=== Scraper Evaluation Metrics ===") | |
| completeness = 1 - df.isna().mean().mean() | |
| duplicate_rate = df.duplicated().mean() | |
| output_lines += [ | |
| f"Completeness (all columns filled): {round(completeness*100, 2)}%", | |
| f"Duplicate rows rate: {round(duplicate_rate*100, 2)}%" | |
| ] | |
| print(output_lines[-2]) | |
| print(output_lines[-1]) | |
| for col in ["is_drug_related", "is_crime_related"]: | |
| if col in df.columns: | |
| relevance = df[col].sum() / len(df) | |
| msg = f"{col} relevance rate: {round(relevance*100,2)}%" | |
| print(msg) | |
| output_lines.append(msg) | |
| if "datetime" in df.columns: | |
| total_days = (df["datetime"].max() - df["datetime"].min()).days + 1 | |
| active_days = df["date"].nunique() | |
| coverage_ratio = active_days / total_days | |
| msg = f"Time coverage ratio (active days / total days): {round(coverage_ratio*100,2)}%" | |
| print(msg) | |
| output_lines.append(msg) | |
| if "text" in df.columns: | |
| msg = f"Average text length: {round(df['text_length'].mean(),2)} characters" | |
| print(msg) | |
| output_lines.append(msg) | |
| # ----------------------------- | |
| # Classification Metrics | |
| # ----------------------------- | |
| if "is_drug_related" in df.columns and "is_crime_related" in df.columns: | |
| y_true = df["is_crime_related"] | |
| y_pred = df["is_drug_related"] | |
| report = classification_report(y_true, y_pred, output_dict=True) | |
| class_report_df = pd.DataFrame(report).transpose() | |
| class_report_df.to_csv(os.path.join(output_folder, "classification_report.csv"), index=True) | |
| stats = [ | |
| "\n=== Classification Metrics (is_drug_related vs is_crime_related) ===", | |
| f"Accuracy: {round(accuracy_score(y_true, y_pred), 4)}", | |
| f"Precision: {round(precision_score(y_true, y_pred), 4)}", | |
| f"Recall: {round(recall_score(y_true, y_pred), 4)}", | |
| f"F1-score: {round(f1_score(y_true, y_pred), 4)}", | |
| "\nClassification report saved as 'classification_report.csv'" | |
| ] | |
| for s in stats: | |
| print(s) | |
| output_lines.append(s) | |
| else: | |
| msg = "\n⚠️ Skipping classification metrics: Not enough columns for evaluation." | |
| print(msg) | |
| output_lines.append(msg) | |
| # ----------------------------- | |
| # Save all results to txt | |
| # ----------------------------- | |
| with open(os.path.join(output_folder, "evaluation_results.txt"), "w", encoding="utf-8") as f: | |
| for line in output_lines: | |
| f.write(str(line) + "\n") | |
| print("\n✅ Data evaluation + metrics complete! Results saved in 'evaluation_results/' folder.") | |
| # ----------------------------- | |
| # Run the evaluation | |
| # ----------------------------- | |
| if __name__ == "__main__": | |
| evaluate_model(scraper_folder="drug_analysis_data_3months") | |