import csv import os from datetime import datetime from bson import ObjectId from django.core.management.base import BaseCommand from expense_tracker.utils import MongoDBClient class Command(BaseCommand): help = 'Seed dataset.csv into the shared sample collection in MongoDB Atlas' def handle(self, *args, **options): from django.conf import settings # dataset.csv lives at the project root (one level above backend/) # settings.BASE_DIR points to the backend/ directory csv_path = os.path.join(settings.BASE_DIR, '..', 'dataset.csv') csv_path = os.path.abspath(csv_path) if not os.path.exists(csv_path): self.stderr.write(self.style.ERROR(f'dataset.csv not found at: {csv_path}')) return self.stdout.write(f'Reading from: {csv_path}') docs = [] with open(csv_path, newline='', encoding='utf-8') as f: reader = csv.DictReader(f) for row in reader: try: date_obj = datetime.strptime(row['Date'].strip(), '%Y-%m-%d') except ValueError: date_obj = datetime.now() doc = { '_id': ObjectId(), 'title': row['Title'].strip(), 'amount': float(row['Amount'].strip()), 'category': row['Category'].strip(), 'date': date_obj, 'type': row['Type'].strip(), # 'Income' or 'Expense' 'source': 'sample', 'created_at': datetime.now(), } docs.append(doc) if not docs: self.stderr.write(self.style.ERROR('No rows found in CSV.')) return db = MongoDBClient.get_client() # Drop existing sample data and replace with fresh seed self.stdout.write('Dropping existing sample collection...') db.sample.drop() self.stdout.write(f'Inserting {len(docs)} documents into sample collection...') db.sample.insert_many(docs) # Quick stats income_count = sum(1 for d in docs if d['type'].lower() == 'income') expense_count = len(docs) - income_count self.stdout.write(self.style.SUCCESS( f'✅ Done! Seeded {len(docs)} sample transactions ' f'({income_count} income, {expense_count} expense) ' f'into the sample collection.' ))