Twitter-Analysis / src /evaluation.py
lawlevisan's picture
Update src/evaluation.py
b1eb703 verified
# evaluation.py
import os
import pandas as pd
import re
from collections import Counter
from sklearn.metrics import precision_score, recall_score, f1_score, accuracy_score, classification_report
def evaluate_model(scraper_folder="drug_analysis_data_3months"):
"""
Full evaluation function for Twitter drug/crime scraping dataset:
- Computes general stats, missing data, duplicates
- Drug/Crime-related stats
- Time coverage
- Text analysis
- User/source analysis
- Scraper evaluation metrics
- Classification metrics if applicable
- Saves all results to 'evaluation_results' folder
"""
# -----------------------------
# Output folder
# -----------------------------
output_folder = "evaluation_results"
os.makedirs(output_folder, exist_ok=True)
output_lines = []
# -----------------------------
# Load CSVs
# -----------------------------
csv_files = [f for f in os.listdir(scraper_folder) if f.endswith(".csv")]
if not csv_files:
print("❌ No CSV files found in scraper folder!")
return
dfs = [pd.read_csv(os.path.join(scraper_folder, f)) for f in csv_files]
df = pd.concat(dfs, ignore_index=True)
msg = f"✅ Loaded {len(df)} rows from {len(csv_files)} CSV files."
print(msg)
output_lines.append(msg)
msg = f"Columns detected in CSVs: {df.columns.tolist()}"
print(msg)
output_lines.append(msg)
# -----------------------------
# General Stats
# -----------------------------
output_lines.append("\n=== General Stats ===")
print("\n=== General Stats ===")
stats = [
f"Columns: {df.columns.tolist()}",
f"Total rows: {len(df)}",
f"Missing values per column:\n{df.isna().sum()}",
f"Duplicate rows: {df.duplicated().sum()}"
]
for s in stats:
print(s)
output_lines.append(s)
# Sample rows with missing data
missing_rows = df[df.isna().any(axis=1)]
if not missing_rows.empty:
msg = f"\nSample rows with missing values:\n{missing_rows.head()}"
print(msg)
output_lines.append(msg)
# Sample duplicate rows
duplicates = df[df.duplicated(keep=False)]
if not duplicates.empty:
msg = f"\nSample duplicate rows:\n{duplicates.head()}"
print(msg)
output_lines.append(msg)
# -----------------------------
# Drug/Crime-related stats
# -----------------------------
for col in ["is_drug_related", "is_crime_related", "risk_level"]:
if col in df.columns:
msg = f"\n=== {col} Distribution ==="
print(msg)
output_lines.append(msg)
msg = str(df[col].value_counts())
print(msg)
output_lines.append(msg)
msg = f"Proportion:\n{round(df[col].value_counts(normalize=True), 4)}"
print(msg)
output_lines.append(msg)
# Risk level numeric analysis
if "risk_level" in df.columns and pd.api.types.is_numeric_dtype(df["risk_level"]):
stats = [
"\n=== Risk Level Stats ===",
f"Average risk: {round(df['risk_level'].mean(), 2)}",
f"Max risk: {df['risk_level'].max()}",
f"Number of high-risk items (risk >= 0.7): {(df['risk_level'] >= 0.7).sum()}"
]
for s in stats:
print(s)
output_lines.append(s)
# -----------------------------
# Time coverage
# -----------------------------
if "datetime" in df.columns:
df["datetime"] = pd.to_datetime(df["datetime"], errors="coerce")
stats = [
"\n=== Date Range ===",
f"Earliest: {df['datetime'].min()}",
f"Latest: {df['datetime'].max()}"
]
for s in stats:
print(s)
output_lines.append(s)
# Daily counts
df["date"] = df["datetime"].dt.date
daily_counts = df.groupby("date").size()
msg = f"\n=== Daily Counts of Posts ===\n{daily_counts}"
print(msg)
output_lines.append(msg)
# -----------------------------
# Text Analysis
# -----------------------------
if "text" in df.columns:
df["text"] = df["text"].astype(str)
df["text_length"] = df["text"].apply(len)
stats = [
"\n=== Text Length Stats ===",
f"Average length: {round(df['text_length'].mean(), 2)}",
f"Min length: {df['text_length'].min()}",
f"Max length: {df['text_length'].max()}"
]
for s in stats:
print(s)
output_lines.append(s)
# Top 10 most common words
words = Counter()
for t in df["text"]:
words.update(re.findall(r"\w+", t.lower()))
msg = f"\nTop 10 common words: {words.most_common(10)}"
print(msg)
output_lines.append(msg)
# -----------------------------
# User / Source Analysis
# -----------------------------
if "username" in df.columns:
stats = [
"\n=== User Analysis ===",
f"Total unique users: {df['username'].nunique()}",
f"Top 10 users by post count:\n{df['username'].value_counts().head(10)}"
]
for s in stats:
print(s)
output_lines.append(s)
# -----------------------------
# Scraper Evaluation Metrics
# -----------------------------
output_lines.append("\n=== Scraper Evaluation Metrics ===")
print("\n=== Scraper Evaluation Metrics ===")
completeness = 1 - df.isna().mean().mean()
duplicate_rate = df.duplicated().mean()
output_lines += [
f"Completeness (all columns filled): {round(completeness*100, 2)}%",
f"Duplicate rows rate: {round(duplicate_rate*100, 2)}%"
]
print(output_lines[-2])
print(output_lines[-1])
for col in ["is_drug_related", "is_crime_related"]:
if col in df.columns:
relevance = df[col].sum() / len(df)
msg = f"{col} relevance rate: {round(relevance*100,2)}%"
print(msg)
output_lines.append(msg)
if "datetime" in df.columns:
total_days = (df["datetime"].max() - df["datetime"].min()).days + 1
active_days = df["date"].nunique()
coverage_ratio = active_days / total_days
msg = f"Time coverage ratio (active days / total days): {round(coverage_ratio*100,2)}%"
print(msg)
output_lines.append(msg)
if "text" in df.columns:
msg = f"Average text length: {round(df['text_length'].mean(),2)} characters"
print(msg)
output_lines.append(msg)
# -----------------------------
# Classification Metrics
# -----------------------------
if "is_drug_related" in df.columns and "is_crime_related" in df.columns:
y_true = df["is_crime_related"]
y_pred = df["is_drug_related"]
report = classification_report(y_true, y_pred, output_dict=True)
class_report_df = pd.DataFrame(report).transpose()
class_report_df.to_csv(os.path.join(output_folder, "classification_report.csv"), index=True)
stats = [
"\n=== Classification Metrics (is_drug_related vs is_crime_related) ===",
f"Accuracy: {round(accuracy_score(y_true, y_pred), 4)}",
f"Precision: {round(precision_score(y_true, y_pred), 4)}",
f"Recall: {round(recall_score(y_true, y_pred), 4)}",
f"F1-score: {round(f1_score(y_true, y_pred), 4)}",
"\nClassification report saved as 'classification_report.csv'"
]
for s in stats:
print(s)
output_lines.append(s)
else:
msg = "\n⚠️ Skipping classification metrics: Not enough columns for evaluation."
print(msg)
output_lines.append(msg)
# -----------------------------
# Save all results to txt
# -----------------------------
with open(os.path.join(output_folder, "evaluation_results.txt"), "w", encoding="utf-8") as f:
for line in output_lines:
f.write(str(line) + "\n")
print("\n✅ Data evaluation + metrics complete! Results saved in 'evaluation_results/' folder.")
# -----------------------------
# Run the evaluation
# -----------------------------
if __name__ == "__main__":
evaluate_model(scraper_folder="drug_analysis_data_3months")