| import os |
| import json |
| import csv |
| from celery import Celery |
|
|
| from infra.database import get_db, JobModel, DatasetModel |
| from infra.logger import get_logger |
| from infra.result_contract import normalize_results |
|
|
| log = get_logger(__name__) |
| from core.insights import generate_insights, generate_story |
| from core.meta_learning import save_meta_record |
| from core.pipeline_engine import PipelineEngine, PipelineContext |
| from services.training.components import ( |
| DataValidationComponent, |
| FeatureEngineeringComponent, |
| ModelSelectionComponent, |
| TrainingComponent, |
| EvaluationComponent |
| ) |
|
|
| |
| csv.field_size_limit(int(1e9)) |
|
|
| REDIS_URL = os.environ.get("REDIS_URL", "redis://localhost:6379/0") |
|
|
| celery_app = Celery( |
| "automl_worker", |
| broker=REDIS_URL, |
| backend=REDIS_URL |
| ) |
| celery_app.conf.broker_connection_retry_on_startup = True |
|
|
| @celery_app.task(bind=True, max_retries=0) |
| def run_training_job( |
| self, job_id, dataset_id, file_path, target_column, goal, mode, |
| eval_metric="Performance", |
| selected_features=None, |
| handle_imbalance=False, |
| auto_clean=True, |
| cv_folds=0, |
| ): |
| """ |
| Celery task: runs training in background using the Modular Component Pipeline Engine. |
| """ |
| |
| profile_data = {} |
| health_metadata = {} |
| try: |
| with get_db() as db: |
| ds = db.query(DatasetModel).filter(DatasetModel.id == dataset_id).first() |
| if ds and ds.profile_json: |
| try: |
| profile_data = json.loads(ds.profile_json) |
| health_metadata = profile_data.get("health", {}) |
| except Exception: |
| profile_data = {} |
| health_metadata = {} |
| except Exception: |
| profile_data = {} |
| health_metadata = {} |
|
|
| config = { |
| "eval_metric": eval_metric, |
| "selected_features": selected_features, |
| "handle_imbalance": handle_imbalance, |
| "auto_clean": auto_clean, |
| "cv_folds": cv_folds |
| } |
|
|
| ctx = PipelineContext( |
| job_id=job_id, |
| dataset_id=dataset_id, |
| file_path=file_path, |
| target_column=target_column, |
| goal=goal, |
| mode=mode, |
| config=config |
| ) |
| ctx.health_metadata = health_metadata |
|
|
| components = [ |
| DataValidationComponent(), |
| FeatureEngineeringComponent(), |
| ModelSelectionComponent(), |
| TrainingComponent(), |
| EvaluationComponent() |
| ] |
|
|
| engine = PipelineEngine(context=ctx, components=components) |
|
|
| try: |
| final_ctx = engine.run() |
| results = normalize_results(final_ctx.metrics or {}) |
|
|
| try: |
| insights = generate_insights(profile_data or {}, results) |
| story = generate_story(profile_data or {}, results) |
| except Exception as e: |
| log.warning(f"Insights/story generation failed: {e}", exc_info=True) |
| insights = {} |
| story = None |
|
|
| try: |
| save_meta_record(profile_data or {}, results or {}) |
| except Exception as e: |
| log.warning(f"Meta-learning save skipped: {e}") |
|
|
| try: |
| with get_db() as db: |
| job = db.query(JobModel).filter(JobModel.id == job_id).first() |
| if job: |
| job.status = "completed" |
|
|
| try: |
| job.results_json = json.dumps(results) |
| except Exception: |
| job.results_json = json.dumps({}) |
|
|
| try: |
| job.insights_json = json.dumps(insights) |
| except Exception: |
| job.insights_json = json.dumps({}) |
|
|
| job.story = story |
| job.model_path = results.get("model_path") if isinstance(results, dict) else None |
|
|
| reasoning = final_ctx.reasoning if isinstance(final_ctx.reasoning, list) else [] |
| try: |
| job.reasoning_json = json.dumps(reasoning) |
| except Exception: |
| job.reasoning_json = json.dumps([str(r) for r in reasoning]) |
|
|
| db.commit() |
| except Exception as e: |
| log.warning(f"Final DB write failed: {e}", exc_info=True) |
|
|
| except Exception: |
| raise |
|
|