""" Forms for the core app. """ import re from zoneinfo import available_timezones from django import forms from django.utils.text import slugify from core.models import Script, Environment, ScriptSchedule, Tag, DataStore, DataStoreEntry, DataStoreAPIToken from core.services import EnvironmentService # Regex pattern for secret key validation (uppercase, numbers, underscores, must start with letter) SECRET_KEY_PATTERN = re.compile(r"^[A-Z][A-Z0-9_]*$") # Common timezone choices (sorted, common ones first) COMMON_TIMEZONES = [ "UTC", "America/New_York", "America/Chicago", "America/Denver", "America/Los_Angeles", "Europe/London", "Europe/Paris", "Europe/Berlin", "Asia/Tokyo", "Asia/Shanghai", "Asia/Singapore", "Australia/Sydney", ] def get_timezone_choices(): """Generate timezone choices with common ones first.""" all_tz = sorted(available_timezones()) common = [(tz, tz) for tz in COMMON_TIMEZONES if tz in all_tz] others = [(tz, tz) for tz in all_tz if tz not in COMMON_TIMEZONES] return [("", "---")] + common + [("---", "─" * 20)] + others class ScriptForm(forms.ModelForm): """Form for creating and editing scripts.""" class Meta: model = Script fields = [ "name", "description", "code", "environment", "tags", "timeout_seconds", "is_enabled", "notify_on", "notify_email", "notify_webhook_url", "notify_webhook_enabled", ] widgets = { "name": forms.TextInput( attrs={ "class": "w-full px-4 py-3 bg-code-bg border border-code-border rounded-lg text-code-text placeholder-code-muted/50 focus:outline-none focus:ring-2 focus:ring-code-accent/50 focus:border-code-accent", "placeholder": "My Script Name", } ), "description": forms.Textarea( attrs={ "class": "w-full px-4 py-3 bg-code-bg border border-code-border rounded-lg text-code-text placeholder-code-muted/50 focus:outline-none focus:ring-2 focus:ring-code-accent/50 focus:border-code-accent", "rows": 2, "placeholder": "What does this script do?", } ), "code": forms.Textarea( attrs={ "class": "w-full px-4 py-3 bg-code-bg border border-code-border rounded-lg text-code-text font-mono text-sm placeholder-code-muted/50 focus:outline-none focus:ring-2 focus:ring-code-accent/50 focus:border-code-accent", "rows": 15, "placeholder": '# Your Python code here\nprint("Hello, World!")', } ), "environment": forms.Select( attrs={ "class": "w-full px-4 py-3 bg-code-bg border border-code-border rounded-lg text-code-text focus:outline-none focus:ring-2 focus:ring-code-accent/50 focus:border-code-accent", } ), "tags": forms.CheckboxSelectMultiple( attrs={ "class": "tag-checkbox", } ), "timeout_seconds": forms.NumberInput( attrs={ "class": "w-full px-4 py-3 bg-code-bg border border-code-border rounded-lg text-code-text focus:outline-none focus:ring-2 focus:ring-code-accent/50 focus:border-code-accent", "min": 1, "max": 86400, } ), "is_enabled": forms.CheckboxInput( attrs={ "class": "w-5 h-5 text-code-accent bg-code-bg border-code-border rounded focus:ring-code-accent focus:ring-2", } ), "notify_on": forms.Select( attrs={ "class": "w-full px-4 py-3 bg-code-bg border border-code-border rounded-lg text-code-text focus:outline-none focus:ring-2 focus:ring-code-accent/50 focus:border-code-accent", } ), "notify_email": forms.EmailInput( attrs={ "class": "w-full px-4 py-3 bg-code-bg border border-code-border rounded-lg text-code-text placeholder-code-muted/50 focus:outline-none focus:ring-2 focus:ring-code-accent/50 focus:border-code-accent", "placeholder": "Override default email (optional)", } ), "notify_webhook_url": forms.URLInput( attrs={ "class": "w-full px-4 py-3 bg-code-bg border border-code-border rounded-lg text-code-text placeholder-code-muted/50 focus:outline-none focus:ring-2 focus:ring-code-accent/50 focus:border-code-accent", "placeholder": "https://your-service.com/webhook", } ), "notify_webhook_enabled": forms.CheckboxInput( attrs={ "class": "w-5 h-5 text-code-accent bg-code-bg border-code-border rounded focus:ring-code-accent focus:ring-2", } ), } labels = { "name": "Script Name", "description": "Description", "code": "Python Code", "environment": "Environment", "tags": "Tags", "timeout_seconds": "Timeout (seconds)", "is_enabled": "Enabled", "notify_on": "Notify On", "notify_email": "Notification Email", "notify_webhook_url": "Webhook URL", "notify_webhook_enabled": "Enable Webhook", } help_texts = { "timeout_seconds": "Maximum execution time (1 second to 24 hours)", "notify_email": "Leave empty to use global default", "notify_webhook_url": "URL to POST notifications to when script completes", } def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) # Only show active environments self.fields["environment"].queryset = Environment.objects.filter(is_active=True) def clean_code(self): code = self.cleaned_data.get("code", "").strip() if not code: raise forms.ValidationError("Script code cannot be empty.") return code def clean_timeout_seconds(self): timeout = self.cleaned_data.get("timeout_seconds") if timeout is not None and (timeout < 1 or timeout > 86400): raise forms.ValidationError("Timeout must be between 1 and 86400 seconds (24 hours).") return timeout class TagForm(forms.ModelForm): """Form for creating and editing tags.""" class Meta: model = Tag fields = ["name", "color"] widgets = { "name": forms.TextInput( attrs={ "class": "w-full px-4 py-3 bg-code-bg border border-code-border rounded-lg text-code-text placeholder-code-muted/50 focus:outline-none focus:ring-2 focus:ring-code-accent/50 focus:border-code-accent", "placeholder": "Tag name", } ), "color": forms.Select( attrs={ "class": "w-full px-4 py-3 bg-code-bg border border-code-border rounded-lg text-code-text focus:outline-none focus:ring-2 focus:ring-code-accent/50 focus:border-code-accent", } ), } labels = { "name": "Tag Name", "color": "Color", } def clean_name(self): name = self.cleaned_data.get("name", "").strip() if not name: raise forms.ValidationError("Tag name is required.") # Check uniqueness (excluding current instance for edits) qs = Tag.objects.filter(name__iexact=name) if self.instance.pk: qs = qs.exclude(pk=self.instance.pk) if qs.exists(): raise forms.ValidationError("A tag with this name already exists.") return name class ScheduleForm(forms.ModelForm): """Form for configuring script schedules.""" WEEKDAY_CHOICES = [ (0, "Monday"), (1, "Tuesday"), (2, "Wednesday"), (3, "Thursday"), (4, "Friday"), (5, "Saturday"), (6, "Sunday"), ] MONTHDAY_CHOICES = [(i, str(i)) for i in range(1, 32)] # Custom field for daily times (comma-separated input) daily_times_input = forms.CharField( required=False, widget=forms.TextInput( attrs={ "class": "w-full px-4 py-3 bg-code-bg border border-code-border rounded-lg text-code-text placeholder-code-muted/50 focus:outline-none focus:ring-2 focus:ring-code-accent/50", "placeholder": "09:00, 18:00", } ), label="Run Times", help_text="Comma-separated times in HH:MM format (24-hour)", ) # Weekly mode fields weekly_days_input = forms.MultipleChoiceField( required=False, choices=WEEKDAY_CHOICES, widget=forms.CheckboxSelectMultiple( attrs={ "class": "sr-only peer", } ), label="Days of Week", ) weekly_times_input = forms.CharField( required=False, widget=forms.TextInput( attrs={ "class": "w-full px-4 py-3 bg-code-bg border border-code-border rounded-lg text-code-text placeholder-code-muted/50 focus:outline-none focus:ring-2 focus:ring-code-accent/50", "placeholder": "09:00, 18:00", } ), label="Run Times", help_text="Comma-separated times in HH:MM format (24-hour)", ) # Monthly mode fields monthly_days_input = forms.MultipleChoiceField( required=False, choices=MONTHDAY_CHOICES, widget=forms.CheckboxSelectMultiple( attrs={ "class": "sr-only peer", } ), label="Days of Month", ) monthly_times_input = forms.CharField( required=False, widget=forms.TextInput( attrs={ "class": "w-full px-4 py-3 bg-code-bg border border-code-border rounded-lg text-code-text placeholder-code-muted/50 focus:outline-none focus:ring-2 focus:ring-code-accent/50", "placeholder": "09:00, 18:00", } ), label="Run Times", help_text="Comma-separated times in HH:MM format (24-hour)", ) timezone = forms.ChoiceField( choices=get_timezone_choices, initial="UTC", widget=forms.Select( attrs={ "class": "w-full px-4 py-3 bg-code-bg border border-code-border rounded-lg text-code-text focus:outline-none focus:ring-2 focus:ring-code-accent/50", } ), ) class Meta: model = ScriptSchedule fields = ["run_mode", "interval_minutes", "timezone", "is_active"] widgets = { "run_mode": forms.RadioSelect( attrs={ "class": "sr-only peer", } ), "interval_minutes": forms.Select( attrs={ "class": "w-full px-4 py-3 bg-code-bg border border-code-border rounded-lg text-code-text focus:outline-none focus:ring-2 focus:ring-code-accent/50", } ), "is_active": forms.CheckboxInput( attrs={ "class": "w-5 h-5 text-code-accent bg-code-bg border-code-border rounded focus:ring-code-accent focus:ring-2", } ), } labels = { "run_mode": "Run Mode", "interval_minutes": "Interval", "is_active": "Schedule Active", } def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) # Populate daily_times_input from instance if self.instance and self.instance.pk and self.instance.daily_times: self.fields["daily_times_input"].initial = ", ".join( self.instance.daily_times ) # Populate weekly fields from instance if self.instance and self.instance.pk: if self.instance.weekly_days: self.fields["weekly_days_input"].initial = [ str(d) for d in self.instance.weekly_days ] if self.instance.weekly_times: self.fields["weekly_times_input"].initial = ", ".join( self.instance.weekly_times ) # Populate monthly fields from instance if self.instance and self.instance.pk: if self.instance.monthly_days: self.fields["monthly_days_input"].initial = [ str(d) for d in self.instance.monthly_days ] if self.instance.monthly_times: self.fields["monthly_times_input"].initial = ", ".join( self.instance.monthly_times ) def _parse_times(self, value): """Parse and validate comma-separated times input.""" if not value: return [] value = value.strip() if not value: return [] times = [] for time_str in value.split(","): time_str = time_str.strip() if not time_str: continue # Validate HH:MM format try: parts = time_str.split(":") if len(parts) != 2: raise ValueError hour, minute = int(parts[0]), int(parts[1]) if not (0 <= hour <= 23 and 0 <= minute <= 59): raise ValueError times.append(f"{hour:02d}:{minute:02d}") except ValueError: raise forms.ValidationError( f"Invalid time format: '{time_str}'. Use HH:MM (e.g., 09:00)" ) return times def clean_daily_times_input(self): """Parse and validate daily times input.""" return self._parse_times(self.cleaned_data.get("daily_times_input", "")) def clean_weekly_times_input(self): """Parse and validate weekly times input.""" return self._parse_times(self.cleaned_data.get("weekly_times_input", "")) def clean_monthly_times_input(self): """Parse and validate monthly times input.""" return self._parse_times(self.cleaned_data.get("monthly_times_input", "")) def clean(self): cleaned_data = super().clean() run_mode = cleaned_data.get("run_mode") if run_mode == ScriptSchedule.RunMode.INTERVAL: if not cleaned_data.get("interval_minutes"): self.add_error( "interval_minutes", "Interval is required for interval mode." ) elif run_mode == ScriptSchedule.RunMode.DAILY: daily_times = cleaned_data.get("daily_times_input", []) if not daily_times: self.add_error( "daily_times_input", "At least one time is required for daily mode.", ) elif run_mode == ScriptSchedule.RunMode.WEEKLY: weekly_days = cleaned_data.get("weekly_days_input", []) weekly_times = cleaned_data.get("weekly_times_input", []) if not weekly_days: self.add_error( "weekly_days_input", "At least one day is required for weekly mode.", ) if not weekly_times: self.add_error( "weekly_times_input", "At least one time is required for weekly mode.", ) elif run_mode == ScriptSchedule.RunMode.MONTHLY: monthly_days = cleaned_data.get("monthly_days_input", []) monthly_times = cleaned_data.get("monthly_times_input", []) if not monthly_days: self.add_error( "monthly_days_input", "At least one day is required for monthly mode.", ) if not monthly_times: self.add_error( "monthly_times_input", "At least one time is required for monthly mode.", ) return cleaned_data def save(self, commit=True): instance = super().save(commit=False) # Set daily_times from parsed input instance.daily_times = self.cleaned_data.get("daily_times_input", []) # Set weekly fields from parsed input weekly_days = self.cleaned_data.get("weekly_days_input", []) instance.weekly_days = [int(d) for d in weekly_days] if weekly_days else [] instance.weekly_times = self.cleaned_data.get("weekly_times_input", []) # Set monthly fields from parsed input monthly_days = self.cleaned_data.get("monthly_days_input", []) instance.monthly_days = [int(d) for d in monthly_days] if monthly_days else [] instance.monthly_times = self.cleaned_data.get("monthly_times_input", []) if commit: instance.save() return instance class EnvironmentCreateForm(forms.ModelForm): """Form for creating a new environment.""" python_path = forms.ChoiceField( choices=[], widget=forms.Select( attrs={ "class": "w-full px-4 py-3 bg-code-bg border border-code-border rounded-lg text-code-text focus:outline-none focus:ring-2 focus:ring-code-accent/50 focus:border-code-accent", } ), label="Python Version", help_text="Select Python installation to use for this environment", ) class Meta: model = Environment fields = ["name", "description"] widgets = { "name": forms.TextInput( attrs={ "class": "w-full px-4 py-3 bg-code-bg border border-code-border rounded-lg text-code-text placeholder-code-muted/50 focus:outline-none focus:ring-2 focus:ring-code-accent/50 focus:border-code-accent", "placeholder": "My Environment", } ), "description": forms.Textarea( attrs={ "class": "w-full px-4 py-3 bg-code-bg border border-code-border rounded-lg text-code-text placeholder-code-muted/50 focus:outline-none focus:ring-2 focus:ring-code-accent/50 focus:border-code-accent", "rows": 2, "placeholder": "Environment description (optional)", } ), } labels = { "name": "Environment Name", "description": "Description", } def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) # Populate python_path choices from discovered Python installations pythons = EnvironmentService.discover_python_versions() choices = [(p["path"], p["display"]) for p in pythons] if not choices: choices = [("", "No Python installations found")] self.fields["python_path"].choices = choices def clean_name(self): """Validate name and check for path uniqueness.""" name = self.cleaned_data.get("name", "").strip() if not name: raise forms.ValidationError("Environment name is required.") # Generate path from slugified name base_path = slugify(name) if not base_path: base_path = "environment" # Ensure path is unique path = base_path counter = 1 while Environment.objects.filter(path=path).exists(): path = f"{base_path}-{counter}" counter += 1 # Store generated path for use in view self._generated_path = path return name def get_generated_path(self) -> str: """Return the generated path after validation.""" return getattr(self, "_generated_path", slugify(self.cleaned_data.get("name", "env"))) class EnvironmentEditForm(forms.ModelForm): """Form for editing environment details (name/description only).""" class Meta: model = Environment fields = ["name", "description"] widgets = { "name": forms.TextInput( attrs={ "class": "w-full px-4 py-3 bg-code-bg border border-code-border rounded-lg text-code-text placeholder-code-muted/50 focus:outline-none focus:ring-2 focus:ring-code-accent/50 focus:border-code-accent", "placeholder": "My Environment", } ), "description": forms.Textarea( attrs={ "class": "w-full px-4 py-3 bg-code-bg border border-code-border rounded-lg text-code-text placeholder-code-muted/50 focus:outline-none focus:ring-2 focus:ring-code-accent/50 focus:border-code-accent", "rows": 2, "placeholder": "Environment description (optional)", } ), } labels = { "name": "Environment Name", "description": "Description", } class PackageInstallForm(forms.Form): """Form for installing a single package.""" package_spec = forms.CharField( max_length=200, widget=forms.TextInput( attrs={ "class": "w-full px-4 py-3 bg-code-bg border border-code-border rounded-lg text-code-text font-mono placeholder-code-muted/50 focus:outline-none focus:ring-2 focus:ring-code-accent/50 focus:border-code-accent", "placeholder": "requests==2.31.0", } ), label="Package", help_text="Package name with optional version (e.g., requests, django>=4.0)", ) def clean_package_spec(self): spec = self.cleaned_data.get("package_spec", "").strip() if not spec: raise forms.ValidationError("Package specification is required.") if not EnvironmentService.validate_package_spec(spec): raise forms.ValidationError( "Invalid package specification. Use format: package or package==version" ) return spec class BulkInstallForm(forms.Form): """Form for bulk package installation from requirements.""" requirements = forms.CharField( widget=forms.Textarea( attrs={ "class": "w-full px-4 py-3 bg-code-bg border border-code-border rounded-lg text-code-text font-mono text-sm placeholder-code-muted/50 focus:outline-none focus:ring-2 focus:ring-code-accent/50 focus:border-code-accent", "rows": 10, "placeholder": "requests==2.31.0\ndjango>=4.0\nnumpy", } ), label="Requirements", help_text="Paste requirements.txt content (one package per line)", required=False, ) requirements_file = forms.FileField( required=False, widget=forms.FileInput( attrs={ "class": "block w-full text-sm text-code-muted file:mr-4 file:py-2 file:px-4 file:rounded-lg file:border-0 file:text-sm file:font-semibold file:bg-code-accent file:text-white hover:file:bg-code-accent/90 cursor-pointer", "accept": ".txt", } ), label="Or upload requirements.txt", ) def clean(self): cleaned_data = super().clean() text = cleaned_data.get("requirements", "").strip() file = cleaned_data.get("requirements_file") if not text and not file: raise forms.ValidationError( "Provide requirements text or upload a file." ) # If file provided, read its content if file: try: content = file.read().decode("utf-8") cleaned_data["requirements"] = content except UnicodeDecodeError: raise forms.ValidationError( "Could not read file. Ensure it's a valid text file." ) # Validate each line requirements_text = cleaned_data.get("requirements", "") for line in requirements_text.splitlines(): line = line.strip() # Skip empty lines and comments if not line or line.startswith("#") or line.startswith("-"): continue # Extract package spec (first word before any whitespace) pkg_spec = line.split()[0] if line.split() else "" if pkg_spec and not EnvironmentService.validate_package_spec(pkg_spec): raise forms.ValidationError(f"Invalid package specification: {line}") return cleaned_data class SecretCreateForm(forms.Form): """Form for creating a new secret.""" key = forms.CharField( max_length=100, widget=forms.TextInput( attrs={ "class": "w-full px-4 py-3 bg-code-bg border border-code-border rounded-lg text-code-text font-mono placeholder-code-muted/50 focus:outline-none focus:ring-2 focus:ring-code-accent/50 focus:border-code-accent uppercase", "placeholder": "API_KEY", "autocomplete": "off", } ), label="Key Name", help_text="Uppercase letters, numbers, and underscores only. Must start with a letter.", ) value = forms.CharField( widget=forms.Textarea( attrs={ "class": "w-full px-4 py-3 bg-code-bg border border-code-border rounded-lg text-code-text font-mono placeholder-code-muted/50 focus:outline-none focus:ring-2 focus:ring-code-accent/50 focus:border-code-accent", "rows": 3, "placeholder": "sk-your-secret-value-here", "autocomplete": "off", } ), label="Secret Value", help_text="The secret value (will be encrypted at rest)", ) description = forms.CharField( required=False, widget=forms.Textarea( attrs={ "class": "w-full px-4 py-3 bg-code-bg border border-code-border rounded-lg text-code-text placeholder-code-muted/50 focus:outline-none focus:ring-2 focus:ring-code-accent/50 focus:border-code-accent", "rows": 2, "placeholder": "What is this secret used for?", } ), label="Description", help_text="Optional description to help remember what this secret is for", ) def clean_key(self): """Validate and normalize the key.""" key = self.cleaned_data.get("key", "").strip().upper() if not key: raise forms.ValidationError("Key name is required.") if not SECRET_KEY_PATTERN.match(key): raise forms.ValidationError( "Key must start with a letter and contain only uppercase letters, numbers, and underscores." ) # Check for reserved environment variable names reserved = { "PATH", "HOME", "USER", "SHELL", "PWD", "PYTHONPATH", "VIRTUAL_ENV", "PYTHONHOME", "PYTHONDONTWRITEBYTECODE", "PYTHONUNBUFFERED", } if key in reserved: raise forms.ValidationError( f"'{key}' is a reserved environment variable name." ) # Check if key already exists from core.models import Secret if Secret.objects.filter(key=key).exists(): raise forms.ValidationError(f"A secret with key '{key}' already exists.") return key def clean_value(self): """Validate the secret value.""" value = self.cleaned_data.get("value", "") if not value: raise forms.ValidationError("Secret value is required.") # Reasonable max length for secrets if len(value) > 10000: raise forms.ValidationError( "Secret value is too long (max 10,000 characters)." ) return value class SecretEditForm(forms.Form): """Form for editing an existing secret (value and description only).""" value = forms.CharField( required=False, widget=forms.Textarea( attrs={ "class": "w-full px-4 py-3 bg-code-bg border border-code-border rounded-lg text-code-text font-mono placeholder-code-muted/50 focus:outline-none focus:ring-2 focus:ring-code-accent/50 focus:border-code-accent", "rows": 3, "placeholder": "Leave blank to keep current value", "autocomplete": "off", } ), label="New Secret Value", help_text="Leave blank to keep the current value", ) description = forms.CharField( required=False, widget=forms.Textarea( attrs={ "class": "w-full px-4 py-3 bg-code-bg border border-code-border rounded-lg text-code-text placeholder-code-muted/50 focus:outline-none focus:ring-2 focus:ring-code-accent/50 focus:border-code-accent", "rows": 2, "placeholder": "What is this secret used for?", } ), label="Description", ) def clean_value(self): """Validate the secret value if provided.""" value = self.cleaned_data.get("value", "") if value and len(value) > 10000: raise forms.ValidationError( "Secret value is too long (max 10,000 characters)." ) return value class NotificationSettingsForm(forms.Form): """Form for global notification settings.""" from core.models import GlobalSettings EMAIL_BACKEND_CHOICES = [ (GlobalSettings.EmailBackend.DISABLED, "Disabled"), (GlobalSettings.EmailBackend.SMTP, "SMTP"), (GlobalSettings.EmailBackend.RESEND, "Resend API"), ] email_backend = forms.ChoiceField( choices=EMAIL_BACKEND_CHOICES, initial=GlobalSettings.EmailBackend.DISABLED, widget=forms.RadioSelect( attrs={ "class": "sr-only peer", } ), label="Email Backend", ) # SMTP Configuration smtp_host = forms.CharField( required=False, max_length=255, widget=forms.TextInput( attrs={ "class": "w-full px-4 py-3 bg-code-bg border border-code-border rounded-lg text-code-text placeholder-code-muted/50 focus:outline-none focus:ring-2 focus:ring-code-accent/50 focus:border-code-accent", "placeholder": "smtp.example.com", } ), label="SMTP Host", ) smtp_port = forms.IntegerField( required=False, initial=587, widget=forms.NumberInput( attrs={ "class": "w-full px-4 py-3 bg-code-bg border border-code-border rounded-lg text-code-text focus:outline-none focus:ring-2 focus:ring-code-accent/50 focus:border-code-accent", "min": 1, "max": 65535, } ), label="SMTP Port", ) smtp_username = forms.CharField( required=False, max_length=255, widget=forms.TextInput( attrs={ "class": "w-full px-4 py-3 bg-code-bg border border-code-border rounded-lg text-code-text placeholder-code-muted/50 focus:outline-none focus:ring-2 focus:ring-code-accent/50 focus:border-code-accent", "placeholder": "username@example.com", } ), label="SMTP Username", ) smtp_password = forms.CharField( required=False, widget=forms.PasswordInput( attrs={ "class": "w-full px-4 py-3 bg-code-bg border border-code-border rounded-lg text-code-text placeholder-code-muted/50 focus:outline-none focus:ring-2 focus:ring-code-accent/50 focus:border-code-accent", "placeholder": "Leave blank to keep current", "autocomplete": "new-password", } ), label="SMTP Password", ) smtp_use_tls = forms.BooleanField( required=False, initial=True, widget=forms.CheckboxInput( attrs={ "class": "w-5 h-5 text-code-accent bg-code-bg border-code-border rounded focus:ring-code-accent focus:ring-2", } ), label="Use TLS", ) smtp_from_email = forms.EmailField( required=False, widget=forms.EmailInput( attrs={ "class": "w-full px-4 py-3 bg-code-bg border border-code-border rounded-lg text-code-text placeholder-code-muted/50 focus:outline-none focus:ring-2 focus:ring-code-accent/50 focus:border-code-accent", "placeholder": "noreply@example.com", } ), label="From Email", ) # Resend Configuration resend_api_key = forms.CharField( required=False, widget=forms.PasswordInput( attrs={ "class": "w-full px-4 py-3 bg-code-bg border border-code-border rounded-lg text-code-text placeholder-code-muted/50 focus:outline-none focus:ring-2 focus:ring-code-accent/50 focus:border-code-accent", "placeholder": "Leave blank to keep current", "autocomplete": "new-password", } ), label="Resend API Key", ) resend_from_email = forms.EmailField( required=False, widget=forms.EmailInput( attrs={ "class": "w-full px-4 py-3 bg-code-bg border border-code-border rounded-lg text-code-text placeholder-code-muted/50 focus:outline-none focus:ring-2 focus:ring-code-accent/50 focus:border-code-accent", "placeholder": "noreply@yourdomain.com", } ), label="From Email", ) # Default notification email default_notification_email = forms.EmailField( required=False, widget=forms.EmailInput( attrs={ "class": "w-full px-4 py-3 bg-code-bg border border-code-border rounded-lg text-code-text placeholder-code-muted/50 focus:outline-none focus:ring-2 focus:ring-code-accent/50 focus:border-code-accent", "placeholder": "notifications@example.com", } ), label="Default Notification Email", help_text="All script notifications will be sent here unless overridden per-script.", ) def __init__(self, *args, instance=None, **kwargs): """Initialize form with existing settings.""" super().__init__(*args, **kwargs) if instance: self.fields["email_backend"].initial = instance.email_backend self.fields["smtp_host"].initial = instance.smtp_host self.fields["smtp_port"].initial = instance.smtp_port self.fields["smtp_username"].initial = instance.smtp_username self.fields["smtp_use_tls"].initial = instance.smtp_use_tls self.fields["smtp_from_email"].initial = instance.smtp_from_email self.fields["resend_from_email"].initial = instance.resend_from_email self.fields["default_notification_email"].initial = instance.default_notification_email def clean(self): """Validate configuration based on selected backend.""" cleaned_data = super().clean() backend = cleaned_data.get("email_backend") from core.models import GlobalSettings if backend == GlobalSettings.EmailBackend.SMTP: if not cleaned_data.get("smtp_host"): self.add_error("smtp_host", "SMTP host is required for SMTP backend.") if not cleaned_data.get("smtp_from_email"): self.add_error("smtp_from_email", "From email is required for SMTP backend.") elif backend == GlobalSettings.EmailBackend.RESEND: if not cleaned_data.get("resend_from_email"): self.add_error("resend_from_email", "From email is required for Resend backend.") return cleaned_data def save(self, instance): """Save the notification settings to the GlobalSettings instance.""" from core.services import EncryptionService instance.email_backend = self.cleaned_data["email_backend"] instance.smtp_host = self.cleaned_data.get("smtp_host") or "" instance.smtp_port = self.cleaned_data.get("smtp_port") or 587 instance.smtp_username = self.cleaned_data.get("smtp_username") or "" instance.smtp_use_tls = self.cleaned_data.get("smtp_use_tls", True) instance.smtp_from_email = self.cleaned_data.get("smtp_from_email") or "" instance.resend_from_email = self.cleaned_data.get("resend_from_email") or "" instance.default_notification_email = self.cleaned_data.get("default_notification_email") or "" # Encrypt and save SMTP password if provided smtp_password = self.cleaned_data.get("smtp_password") if smtp_password: instance.smtp_password_encrypted = EncryptionService.encrypt(smtp_password) # Encrypt and save Resend API key if provided resend_api_key = self.cleaned_data.get("resend_api_key") if resend_api_key: instance.resend_api_key_encrypted = EncryptionService.encrypt(resend_api_key) instance.save() return instance class GeneralSettingsForm(forms.Form): """Form for general instance settings.""" from core.models import GlobalSettings DATE_FORMAT_CHOICES = [ (GlobalSettings.DateFormat.ISO, "YYYY-MM-DD (ISO)"), (GlobalSettings.DateFormat.US, "MM/DD/YYYY (US)"), (GlobalSettings.DateFormat.EU, "DD/MM/YYYY (EU)"), (GlobalSettings.DateFormat.DOT, "DD.MM.YYYY"), ] TIME_FORMAT_CHOICES = [ (GlobalSettings.TimeFormat.H24, "24-hour (14:30)"), (GlobalSettings.TimeFormat.H12, "12-hour (2:30 PM)"), ] instance_name = forms.CharField( max_length=100, required=False, widget=forms.TextInput( attrs={ "class": "w-full px-4 py-3 bg-code-bg border border-code-border rounded-lg text-code-text placeholder-code-muted/50 focus:outline-none focus:ring-2 focus:ring-code-accent/50 focus:border-code-accent", "placeholder": "PyRunner", } ), label="Instance Name", help_text="Displayed in the header and email notifications", ) timezone = forms.ChoiceField( choices=get_timezone_choices, initial="UTC", widget=forms.Select( attrs={ "class": "w-full px-4 py-3 bg-code-bg border border-code-border rounded-lg text-code-text focus:outline-none focus:ring-2 focus:ring-code-accent/50 focus:border-code-accent", } ), label="Timezone", help_text="Default timezone for displaying dates and times", ) date_format = forms.ChoiceField( choices=DATE_FORMAT_CHOICES, widget=forms.Select( attrs={ "class": "w-full px-4 py-3 bg-code-bg border border-code-border rounded-lg text-code-text focus:outline-none focus:ring-2 focus:ring-code-accent/50 focus:border-code-accent", } ), label="Date Format", ) time_format = forms.ChoiceField( choices=TIME_FORMAT_CHOICES, widget=forms.Select( attrs={ "class": "w-full px-4 py-3 bg-code-bg border border-code-border rounded-lg text-code-text focus:outline-none focus:ring-2 focus:ring-code-accent/50 focus:border-code-accent", } ), label="Time Format", ) admin_url_slug = forms.CharField( max_length=100, required=False, widget=forms.TextInput( attrs={ "class": "w-full px-4 py-3 bg-code-bg border border-code-border rounded-lg text-code-text placeholder-code-muted/50 focus:outline-none focus:ring-2 focus:ring-code-accent/50 focus:border-code-accent", "placeholder": "django-admin", } ), label="Django Admin URL", help_text="URL path for Django admin (e.g., 'django-admin' → /django-admin/). Requires app restart.", ) def clean_admin_url_slug(self): """Validate admin URL slug format.""" import re slug = self.cleaned_data.get("admin_url_slug", "django-admin").strip().lower() if not slug: slug = "django-admin" # Remove leading/trailing slashes slug = slug.strip("/") # Validate: alphanumeric, hyphens, underscores only if not re.match(r"^[a-z0-9_-]+$", slug): raise forms.ValidationError( "Admin URL can only contain lowercase letters, numbers, hyphens, and underscores." ) # Prevent conflicts with existing routes reserved = ["setup", "auth", "cpanel", "webhook", "static", "media"] if slug in reserved: raise forms.ValidationError( f"'{slug}' is a reserved URL path. Please choose a different name." ) return slug def __init__(self, *args, instance=None, **kwargs): """Initialize form with existing settings.""" super().__init__(*args, **kwargs) if instance: self.fields["instance_name"].initial = instance.instance_name self.fields["timezone"].initial = instance.timezone self.fields["date_format"].initial = instance.date_format self.fields["time_format"].initial = instance.time_format self.fields["admin_url_slug"].initial = instance.admin_url_slug def save(self, instance): """Save the general settings to the GlobalSettings instance.""" instance.instance_name = self.cleaned_data.get("instance_name") or "PyRunner" instance.timezone = self.cleaned_data.get("timezone") or "UTC" instance.date_format = self.cleaned_data.get("date_format") instance.time_format = self.cleaned_data.get("time_format") instance.admin_url_slug = self.cleaned_data.get("admin_url_slug") or "django-admin" instance.save(update_fields=[ "instance_name", "timezone", "date_format", "time_format", "admin_url_slug", "updated_at" ]) return instance class LogRetentionForm(forms.Form): """Form for log retention settings.""" retention_days = forms.IntegerField( min_value=0, initial=0, widget=forms.NumberInput( attrs={ "class": "w-full px-4 py-3 bg-code-bg border border-code-border rounded-lg text-code-text focus:outline-none focus:ring-2 focus:ring-code-accent/50 focus:border-code-accent", "min": 0, } ), label="Retention Days", help_text="Delete runs older than this many days (0 = keep forever)", ) retention_count = forms.IntegerField( min_value=0, initial=0, widget=forms.NumberInput( attrs={ "class": "w-full px-4 py-3 bg-code-bg border border-code-border rounded-lg text-code-text focus:outline-none focus:ring-2 focus:ring-code-accent/50 focus:border-code-accent", "min": 0, } ), label="Retention Count", help_text="Keep only the last N runs per script (0 = unlimited)", ) auto_cleanup_enabled = forms.BooleanField( required=False, initial=False, widget=forms.CheckboxInput( attrs={ "class": "w-5 h-5 text-code-accent bg-code-bg border-code-border rounded focus:ring-code-accent focus:ring-2", } ), label="Auto Cleanup", help_text="Automatically clean up old runs daily at 2 AM", ) def __init__(self, *args, instance=None, **kwargs): """Initialize form with existing settings.""" super().__init__(*args, **kwargs) if instance: self.fields["retention_days"].initial = instance.retention_days self.fields["retention_count"].initial = instance.retention_count self.fields["auto_cleanup_enabled"].initial = instance.auto_cleanup_enabled def save(self, instance): """Save the retention settings to the GlobalSettings instance.""" from core.services import RetentionService instance.retention_days = self.cleaned_data.get("retention_days") or 0 instance.retention_count = self.cleaned_data.get("retention_count") or 0 # Handle auto cleanup schedule new_auto_cleanup = self.cleaned_data.get("auto_cleanup_enabled", False) old_auto_cleanup = instance.auto_cleanup_enabled instance.auto_cleanup_enabled = new_auto_cleanup instance.save(update_fields=[ "retention_days", "retention_count", "auto_cleanup_enabled", "updated_at" ]) # Manage the django-q2 schedule if new_auto_cleanup and not old_auto_cleanup: RetentionService.enable_auto_cleanup() elif not new_auto_cleanup and old_auto_cleanup: RetentionService.disable_auto_cleanup() return instance class WorkerSettingsForm(forms.Form): """Form for Django-Q2 worker configuration.""" q_workers = forms.IntegerField( min_value=1, max_value=16, initial=2, widget=forms.NumberInput( attrs={ "class": "w-full px-4 py-3 bg-code-bg border border-code-border rounded-lg text-code-text focus:outline-none focus:ring-2 focus:ring-code-accent/50 focus:border-code-accent", "min": 1, "max": 16, } ), label="Worker Count", help_text="Number of worker processes (1-16). More workers can process more tasks simultaneously.", ) q_timeout = forms.IntegerField( min_value=0, max_value=86400, initial=600, widget=forms.NumberInput( attrs={ "class": "w-full px-4 py-3 bg-code-bg border border-code-border rounded-lg text-code-text focus:outline-none focus:ring-2 focus:ring-code-accent/50 focus:border-code-accent", "min": 0, "max": 86400, } ), label="Task Timeout (seconds)", help_text="Maximum time a task can run before worker timeout. Use 0 for no timeout (required on Windows). For long-running scripts, also increase the script's own timeout.", ) q_retry = forms.IntegerField( min_value=60, max_value=86400, initial=660, widget=forms.NumberInput( attrs={ "class": "w-full px-4 py-3 bg-code-bg border border-code-border rounded-lg text-code-text focus:outline-none focus:ring-2 focus:ring-code-accent/50 focus:border-code-accent", "min": 60, "max": 86400, } ), label="Retry Delay (seconds)", help_text="Time before retrying a failed/timed-out task. Should be greater than timeout.", ) q_queue_limit = forms.IntegerField( min_value=5, max_value=100, initial=20, widget=forms.NumberInput( attrs={ "class": "w-full px-4 py-3 bg-code-bg border border-code-border rounded-lg text-code-text focus:outline-none focus:ring-2 focus:ring-code-accent/50 focus:border-code-accent", "min": 5, "max": 100, } ), label="Queue Limit", help_text="Maximum number of tasks that can be queued at once.", ) def __init__(self, *args, instance=None, **kwargs): """Initialize form with existing settings.""" super().__init__(*args, **kwargs) if instance: self.fields["q_workers"].initial = instance.q_workers self.fields["q_timeout"].initial = instance.q_timeout self.fields["q_retry"].initial = instance.q_retry self.fields["q_queue_limit"].initial = instance.q_queue_limit def clean(self): """Validate that retry > timeout.""" cleaned_data = super().clean() timeout = cleaned_data.get("q_timeout", 0) retry = cleaned_data.get("q_retry", 660) if timeout > 0 and retry <= timeout: self.add_error( "q_retry", f"Retry delay ({retry}s) must be greater than timeout ({timeout}s).", ) return cleaned_data def save(self, instance): """Save the worker settings to the GlobalSettings instance.""" from django.utils import timezone # Check if any values actually changed changed = ( instance.q_workers != self.cleaned_data["q_workers"] or instance.q_timeout != self.cleaned_data["q_timeout"] or instance.q_retry != self.cleaned_data["q_retry"] or instance.q_queue_limit != self.cleaned_data["q_queue_limit"] ) instance.q_workers = self.cleaned_data["q_workers"] instance.q_timeout = self.cleaned_data["q_timeout"] instance.q_retry = self.cleaned_data["q_retry"] instance.q_queue_limit = self.cleaned_data["q_queue_limit"] if changed: instance.worker_settings_updated_at = timezone.now() instance.save( update_fields=[ "q_workers", "q_timeout", "q_retry", "q_queue_limit", "worker_settings_updated_at", "updated_at", ] ) return instance class BackupCreateForm(forms.Form): """Form for configuring backup creation.""" backup_format = forms.ChoiceField( choices=[ ("gzip", "Compressed (recommended) - .json.gz"), ("json", "Plain JSON - .json"), ], initial="gzip", widget=forms.RadioSelect( attrs={ "class": "w-4 h-4 text-code-accent bg-code-bg border-code-border focus:ring-code-accent", } ), label="Backup format", help_text="Compressed backups are 80-95% smaller", ) include_datastores = forms.BooleanField( required=False, initial=True, widget=forms.CheckboxInput( attrs={ "class": "w-5 h-5 text-code-accent bg-code-bg border-code-border rounded focus:ring-code-accent focus:ring-2", } ), label="Include DataStores", help_text="Include all DataStores and their key-value entries", ) include_runs = forms.BooleanField( required=False, initial=True, widget=forms.CheckboxInput( attrs={ "class": "w-5 h-5 text-code-accent bg-code-bg border-code-border rounded focus:ring-code-accent focus:ring-2", } ), label="Include run history", help_text="Include execution history (stdout/stderr)", ) max_runs = forms.IntegerField( initial=1000, min_value=0, max_value=10000, widget=forms.NumberInput( attrs={ "class": "w-full px-4 py-2 bg-code-bg text-code-text border border-code-border rounded-lg focus:ring-2 focus:ring-code-accent focus:border-transparent", "placeholder": "1000", } ), label="Maximum runs to include", help_text="Limit run history to most recent N runs (0 = all runs)", ) include_package_operations = forms.BooleanField( required=False, initial=False, widget=forms.CheckboxInput( attrs={ "class": "w-5 h-5 text-code-accent bg-code-bg border-code-border rounded focus:ring-code-accent focus:ring-2", } ), label="Include package operations", help_text="Include pip installation history", ) class BackupRestoreForm(forms.Form): """Form for restoring from backup.""" backup_file = forms.FileField( widget=forms.FileInput( attrs={ "class": "block w-full text-sm text-code-text file:mr-4 file:py-2 file:px-4 file:rounded-lg file:border-0 file:text-sm file:font-semibold file:bg-code-accent file:text-white hover:file:bg-opacity-90", "accept": ".json,.json.gz,.gz", } ), label="Backup file", help_text="JSON or compressed backup file (.json or .json.gz)", ) restore_runs = forms.BooleanField( required=False, initial=True, widget=forms.CheckboxInput( attrs={ "class": "w-5 h-5 text-code-accent bg-code-bg border-code-border rounded focus:ring-code-accent focus:ring-2", } ), label="Restore run history", help_text="Import execution history from backup", ) confirm_delete = forms.BooleanField( required=True, widget=forms.CheckboxInput( attrs={ "class": "w-5 h-5 text-red-500 bg-code-bg border-code-border rounded focus:ring-red-500 focus:ring-2", } ), label="I understand all existing data will be deleted", help_text="This action cannot be undone without the automatic backup", ) # ============================================================================= # Data Store Forms # ============================================================================= class DataStoreForm(forms.ModelForm): """Form for creating and editing data stores.""" class Meta: model = DataStore fields = ["name", "description"] widgets = { "name": forms.TextInput( attrs={ "class": "w-full px-4 py-3 bg-code-bg border border-code-border rounded-lg text-code-text placeholder-code-muted/50 focus:outline-none focus:ring-2 focus:ring-code-accent/50 focus:border-code-accent", "placeholder": "my_data_store", } ), "description": forms.Textarea( attrs={ "class": "w-full px-4 py-3 bg-code-bg border border-code-border rounded-lg text-code-text placeholder-code-muted/50 focus:outline-none focus:ring-2 focus:ring-code-accent/50 focus:border-code-accent", "rows": 2, "placeholder": "What is this data store used for?", } ), } labels = { "name": "Store Name", "description": "Description", } help_texts = { "name": "Used in scripts as: DataStore(\"name\")", } def clean_name(self): name = self.cleaned_data.get("name", "").strip() if not name: raise forms.ValidationError("Store name is required.") # Check for valid identifier-like name if not name.replace("_", "").replace("-", "").isalnum(): raise forms.ValidationError( "Name can only contain letters, numbers, underscores, and hyphens." ) # Check uniqueness (excluding current instance for edits) qs = DataStore.objects.filter(name__iexact=name) if self.instance.pk: qs = qs.exclude(pk=self.instance.pk) if qs.exists(): raise forms.ValidationError("A data store with this name already exists.") return name class DataStoreEntryForm(forms.Form): """Form for creating and editing data store entries.""" key = forms.CharField( max_length=255, widget=forms.TextInput( attrs={ "class": "w-full px-4 py-3 bg-code-bg border border-code-border rounded-lg text-code-text font-mono placeholder-code-muted/50 focus:outline-none focus:ring-2 focus:ring-code-accent/50 focus:border-code-accent", "placeholder": "my_key", } ), label="Key", help_text="Unique identifier for this entry", ) value = forms.CharField( widget=forms.Textarea( attrs={ "class": "w-full px-4 py-3 bg-code-bg border border-code-border rounded-lg text-code-text font-mono text-sm placeholder-code-muted/50 focus:outline-none focus:ring-2 focus:ring-code-accent/50 focus:border-code-accent", "rows": 6, "placeholder": '{"example": "value"}\nor just a string\nor a number like 42', } ), label="Value (JSON)", help_text="JSON value: string, number, boolean, array, or object", ) def __init__(self, *args, datastore=None, instance=None, **kwargs): super().__init__(*args, **kwargs) self.datastore = datastore self.instance = instance # Pre-populate for editing if instance: self.fields["key"].initial = instance.key self.fields["value"].initial = instance.value_json def clean_key(self): key = self.cleaned_data.get("key", "").strip() if not key: raise forms.ValidationError("Key is required.") if len(key) > 255: raise forms.ValidationError("Key cannot exceed 255 characters.") # Check uniqueness within the data store if self.datastore: qs = DataStoreEntry.objects.filter(datastore=self.datastore, key=key) if self.instance: qs = qs.exclude(pk=self.instance.pk) if qs.exists(): raise forms.ValidationError( f"Key '{key}' already exists in this data store." ) return key def clean_value(self): import json value = self.cleaned_data.get("value", "").strip() if not value: raise forms.ValidationError("Value is required.") try: # Validate it's valid JSON json.loads(value) except json.JSONDecodeError as e: raise forms.ValidationError(f"Invalid JSON: {e}") return value # ============================================================================= # API Token Forms # ============================================================================= class DataStoreAPITokenForm(forms.ModelForm): """Form for creating API tokens.""" class Meta: model = DataStoreAPIToken fields = ["name", "datastore", "expires_at"] widgets = { "name": forms.TextInput( attrs={ "class": "w-full px-4 py-3 bg-code-bg border border-code-border rounded-lg text-code-text placeholder-code-muted/50 focus:outline-none focus:ring-2 focus:ring-code-accent/50 focus:border-code-accent", "placeholder": "My Dashboard Token", } ), "datastore": forms.Select( attrs={ "class": "w-full px-4 py-3 bg-code-bg border border-code-border rounded-lg text-code-text focus:outline-none focus:ring-2 focus:ring-code-accent/50 focus:border-code-accent", } ), "expires_at": forms.DateTimeInput( attrs={ "class": "w-full px-4 py-3 bg-code-bg border border-code-border rounded-lg text-code-text focus:outline-none focus:ring-2 focus:ring-code-accent/50 focus:border-code-accent", "type": "datetime-local", }, format="%Y-%m-%dT%H:%M", ), } labels = { "name": "Token Name", "datastore": "Scope", "expires_at": "Expires At", } help_texts = { "name": "A friendly name to identify this token", "datastore": "Leave empty for access to all datastores, or select a specific datastore", "expires_at": "Optional. Leave empty for no expiration.", } def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) # Make datastore optional with a clear empty choice self.fields["datastore"].required = False self.fields["datastore"].empty_label = "All Datastores (Global Access)" self.fields["expires_at"].required = False # ============================================================================= # Authentication Forms # ============================================================================= INPUT_CLASS = "w-full px-4 py-3 bg-code-bg border border-code-border rounded-lg text-code-text placeholder-code-muted/50 focus:outline-none focus:ring-2 focus:ring-code-accent/50 focus:border-code-accent" class PasswordLoginForm(forms.Form): """Form for password-based login.""" email = forms.EmailField( widget=forms.EmailInput( attrs={ "class": INPUT_CLASS, "placeholder": "you@example.com", "autocomplete": "email", } ), label="Email address", ) password = forms.CharField( widget=forms.PasswordInput( attrs={ "class": INPUT_CLASS, "placeholder": "Your password", "autocomplete": "current-password", } ), label="Password", ) class SetPasswordForm(forms.Form): """Form for setting or changing password.""" password = forms.CharField( min_length=8, widget=forms.PasswordInput( attrs={ "class": INPUT_CLASS, "placeholder": "New password", "autocomplete": "new-password", } ), label="New Password", help_text="Minimum 8 characters", ) password_confirm = forms.CharField( widget=forms.PasswordInput( attrs={ "class": INPUT_CLASS, "placeholder": "Confirm new password", "autocomplete": "new-password", } ), label="Confirm Password", ) def clean(self): cleaned_data = super().clean() password = cleaned_data.get("password") confirm = cleaned_data.get("password_confirm") if password and confirm and password != confirm: raise forms.ValidationError("Passwords do not match.") return cleaned_data class AdminSetupForm(forms.Form): """Form for initial admin setup with password.""" email = forms.EmailField( widget=forms.EmailInput( attrs={ "class": INPUT_CLASS, "placeholder": "admin@example.com", "autocomplete": "email", } ), label="Admin Email", ) password = forms.CharField( min_length=8, widget=forms.PasswordInput( attrs={ "class": INPUT_CLASS, "placeholder": "Create a strong password", "autocomplete": "new-password", } ), label="Password", help_text="Minimum 8 characters", ) password_confirm = forms.CharField( widget=forms.PasswordInput( attrs={ "class": INPUT_CLASS, "placeholder": "Confirm password", "autocomplete": "new-password", } ), label="Confirm Password", ) def clean(self): cleaned_data = super().clean() password = cleaned_data.get("password") confirm = cleaned_data.get("password_confirm") if password and confirm and password != confirm: raise forms.ValidationError("Passwords do not match.") return cleaned_data # ============================================================================= # Services Forms # ============================================================================= class S3SettingsForm(forms.Form): """Form for S3 storage configuration.""" s3_enabled = forms.BooleanField( required=False, initial=False, widget=forms.CheckboxInput( attrs={ "class": "w-5 h-5 text-code-accent bg-code-bg border-code-border rounded focus:ring-code-accent focus:ring-2", } ), label="Enable S3 Storage", ) s3_endpoint_url = forms.CharField( required=False, max_length=500, widget=forms.TextInput( attrs={ "class": INPUT_CLASS, "placeholder": "https://s3.amazonaws.com or https://minio.example.com:9000", } ), label="Endpoint URL", help_text="Leave empty for AWS S3. Required for MinIO, DigitalOcean Spaces, etc.", ) s3_region = forms.CharField( required=False, max_length=50, initial="us-east-1", widget=forms.TextInput( attrs={ "class": INPUT_CLASS, "placeholder": "us-east-1", } ), label="Region", ) s3_bucket_name = forms.CharField( required=False, max_length=255, widget=forms.TextInput( attrs={ "class": INPUT_CLASS, "placeholder": "my-backup-bucket", } ), label="Bucket Name", ) s3_access_key = forms.CharField( required=False, widget=forms.PasswordInput( attrs={ "class": INPUT_CLASS, "placeholder": "Leave blank to keep current", "autocomplete": "new-password", } ), label="Access Key", ) s3_secret_key = forms.CharField( required=False, widget=forms.PasswordInput( attrs={ "class": INPUT_CLASS, "placeholder": "Leave blank to keep current", "autocomplete": "new-password", } ), label="Secret Key", ) s3_use_ssl = forms.BooleanField( required=False, initial=True, widget=forms.CheckboxInput( attrs={ "class": "w-5 h-5 text-code-accent bg-code-bg border-code-border rounded focus:ring-code-accent focus:ring-2", } ), label="Use SSL/TLS", help_text="Recommended for security. Disable only for local development.", ) s3_path_style = forms.BooleanField( required=False, initial=False, widget=forms.CheckboxInput( attrs={ "class": "w-5 h-5 text-code-accent bg-code-bg border-code-border rounded focus:ring-code-accent focus:ring-2", } ), label="Path-style addressing", help_text="Required for MinIO and some S3-compatible providers.", ) def __init__(self, *args, instance=None, **kwargs): """Initialize form with existing settings.""" super().__init__(*args, **kwargs) if instance: self.fields["s3_enabled"].initial = instance.s3_enabled self.fields["s3_endpoint_url"].initial = instance.s3_endpoint_url self.fields["s3_region"].initial = instance.s3_region or "us-east-1" self.fields["s3_bucket_name"].initial = instance.s3_bucket_name self.fields["s3_use_ssl"].initial = instance.s3_use_ssl self.fields["s3_path_style"].initial = instance.s3_path_style def save(self, instance): """Save the S3 settings to the GlobalSettings instance.""" from core.services.encryption_service import EncryptionService instance.s3_enabled = self.cleaned_data.get("s3_enabled", False) instance.s3_endpoint_url = self.cleaned_data.get("s3_endpoint_url") or "" instance.s3_region = self.cleaned_data.get("s3_region") or "us-east-1" instance.s3_bucket_name = self.cleaned_data.get("s3_bucket_name") or "" instance.s3_use_ssl = self.cleaned_data.get("s3_use_ssl", True) instance.s3_path_style = self.cleaned_data.get("s3_path_style", False) # Encrypt and save access key if provided access_key = self.cleaned_data.get("s3_access_key") if access_key: instance.s3_access_key_encrypted = EncryptionService.encrypt(access_key) # Encrypt and save secret key if provided secret_key = self.cleaned_data.get("s3_secret_key") if secret_key: instance.s3_secret_key_encrypted = EncryptionService.encrypt(secret_key) instance.save() return instance class S3BackupScheduleForm(forms.Form): """Form for S3 scheduled backup configuration.""" from core.models import GlobalSettings WEEKDAY_CHOICES = [ (0, "Monday"), (1, "Tuesday"), (2, "Wednesday"), (3, "Thursday"), (4, "Friday"), (5, "Saturday"), (6, "Sunday"), ] s3_backup_enabled = forms.BooleanField( required=False, initial=False, widget=forms.CheckboxInput( attrs={ "class": "w-5 h-5 text-code-accent bg-code-bg border-code-border rounded focus:ring-code-accent focus:ring-2", } ), label="Enable Scheduled Backups", ) s3_backup_schedule = forms.ChoiceField( choices=GlobalSettings.S3BackupSchedule.choices, initial=GlobalSettings.S3BackupSchedule.DISABLED, widget=forms.Select( attrs={ "class": INPUT_CLASS, } ), label="Schedule Frequency", ) s3_backup_time = forms.TimeField( initial="02:00", widget=forms.TimeInput( attrs={ "class": INPUT_CLASS, "type": "time", } ), label="Backup Time", help_text="Time to run backups (in instance timezone)", ) s3_backup_day = forms.ChoiceField( choices=WEEKDAY_CHOICES, initial=0, widget=forms.Select( attrs={ "class": INPUT_CLASS, } ), label="Day of Week", help_text="For weekly backups", ) s3_backup_prefix = forms.CharField( required=False, max_length=255, initial="pyrunner-backups/", widget=forms.TextInput( attrs={ "class": INPUT_CLASS, "placeholder": "pyrunner-backups/", } ), label="S3 Path Prefix", help_text="Path prefix for backup files in the bucket", ) s3_backup_retention_count = forms.IntegerField( min_value=0, initial=7, widget=forms.NumberInput( attrs={ "class": INPUT_CLASS, "min": "0", } ), label="Retention Count", help_text="Keep the last N backups (0 = keep all)", ) s3_backup_include_runs = forms.BooleanField( required=False, initial=False, widget=forms.CheckboxInput( attrs={ "class": "w-5 h-5 text-code-accent bg-code-bg border-code-border rounded focus:ring-code-accent focus:ring-2", } ), label="Include Run History", ) s3_backup_max_runs = forms.IntegerField( min_value=0, initial=1000, widget=forms.NumberInput( attrs={ "class": INPUT_CLASS, "min": "0", } ), label="Max Runs", help_text="Maximum runs to include (0 = all)", ) s3_backup_include_datastores = forms.BooleanField( required=False, initial=True, widget=forms.CheckboxInput( attrs={ "class": "w-5 h-5 text-code-accent bg-code-bg border-code-border rounded focus:ring-code-accent focus:ring-2", } ), label="Include DataStores", ) def __init__(self, *args, instance=None, **kwargs): """Initialize form with existing settings.""" super().__init__(*args, **kwargs) if instance: self.fields["s3_backup_enabled"].initial = instance.s3_backup_enabled self.fields["s3_backup_schedule"].initial = instance.s3_backup_schedule self.fields["s3_backup_time"].initial = instance.s3_backup_time self.fields["s3_backup_day"].initial = instance.s3_backup_day self.fields["s3_backup_prefix"].initial = instance.s3_backup_prefix or "pyrunner-backups/" self.fields["s3_backup_retention_count"].initial = instance.s3_backup_retention_count self.fields["s3_backup_include_runs"].initial = instance.s3_backup_include_runs self.fields["s3_backup_max_runs"].initial = instance.s3_backup_max_runs self.fields["s3_backup_include_datastores"].initial = instance.s3_backup_include_datastores def save(self, instance): """Save the backup schedule settings.""" from core.services.backup_schedule_service import BackupScheduleService instance.s3_backup_enabled = self.cleaned_data.get("s3_backup_enabled", False) instance.s3_backup_schedule = self.cleaned_data.get("s3_backup_schedule") instance.s3_backup_time = self.cleaned_data.get("s3_backup_time") instance.s3_backup_day = int(self.cleaned_data.get("s3_backup_day", 0)) instance.s3_backup_prefix = self.cleaned_data.get("s3_backup_prefix") or "pyrunner-backups/" instance.s3_backup_retention_count = self.cleaned_data.get("s3_backup_retention_count", 7) instance.s3_backup_include_runs = self.cleaned_data.get("s3_backup_include_runs", False) instance.s3_backup_max_runs = self.cleaned_data.get("s3_backup_max_runs", 1000) instance.s3_backup_include_datastores = self.cleaned_data.get("s3_backup_include_datastores", True) instance.save() # Sync the django-q2 schedule BackupScheduleService.sync_schedule() return instance