| import csv |
| import os |
| from datetime import datetime |
| from bson import ObjectId |
| from django.core.management.base import BaseCommand |
| from expense_tracker.utils import MongoDBClient |
|
|
|
|
| class Command(BaseCommand): |
| help = 'Seed dataset.csv into the shared sample collection in MongoDB Atlas' |
|
|
| def handle(self, *args, **options): |
| from django.conf import settings |
|
|
| |
| |
| csv_path = os.path.join(settings.BASE_DIR, '..', 'dataset.csv') |
| csv_path = os.path.abspath(csv_path) |
|
|
| if not os.path.exists(csv_path): |
| self.stderr.write(self.style.ERROR(f'dataset.csv not found at: {csv_path}')) |
| return |
|
|
| self.stdout.write(f'Reading from: {csv_path}') |
|
|
| docs = [] |
| with open(csv_path, newline='', encoding='utf-8') as f: |
| reader = csv.DictReader(f) |
| for row in reader: |
| try: |
| date_obj = datetime.strptime(row['Date'].strip(), '%Y-%m-%d') |
| except ValueError: |
| date_obj = datetime.now() |
|
|
| doc = { |
| '_id': ObjectId(), |
| 'title': row['Title'].strip(), |
| 'amount': float(row['Amount'].strip()), |
| 'category': row['Category'].strip(), |
| 'date': date_obj, |
| 'type': row['Type'].strip(), |
| 'source': 'sample', |
| 'created_at': datetime.now(), |
| } |
| docs.append(doc) |
|
|
| if not docs: |
| self.stderr.write(self.style.ERROR('No rows found in CSV.')) |
| return |
|
|
| db = MongoDBClient.get_client() |
|
|
| |
| self.stdout.write('Dropping existing sample collection...') |
| db.sample.drop() |
|
|
| self.stdout.write(f'Inserting {len(docs)} documents into sample collection...') |
| db.sample.insert_many(docs) |
|
|
| |
| income_count = sum(1 for d in docs if d['type'].lower() == 'income') |
| expense_count = len(docs) - income_count |
|
|
| self.stdout.write(self.style.SUCCESS( |
| f'✅ Done! Seeded {len(docs)} sample transactions ' |
| f'({income_count} income, {expense_count} expense) ' |
| f'into the sample collection.' |
| )) |
|
|