Spaces:
Paused
Paused
| """ | |
| Forms for the core app. | |
| """ | |
| import re | |
| from zoneinfo import available_timezones | |
| from django import forms | |
| from django.utils.text import slugify | |
| from core.models import Script, Environment, ScriptSchedule, Tag, DataStore, DataStoreEntry, DataStoreAPIToken | |
| from core.services import EnvironmentService | |
| # Regex pattern for secret key validation (uppercase, numbers, underscores, must start with letter) | |
| SECRET_KEY_PATTERN = re.compile(r"^[A-Z][A-Z0-9_]*$") | |
| # Common timezone choices (sorted, common ones first) | |
| COMMON_TIMEZONES = [ | |
| "UTC", | |
| "America/New_York", | |
| "America/Chicago", | |
| "America/Denver", | |
| "America/Los_Angeles", | |
| "Europe/London", | |
| "Europe/Paris", | |
| "Europe/Berlin", | |
| "Asia/Tokyo", | |
| "Asia/Shanghai", | |
| "Asia/Singapore", | |
| "Australia/Sydney", | |
| ] | |
| def get_timezone_choices(): | |
| """Generate timezone choices with common ones first.""" | |
| all_tz = sorted(available_timezones()) | |
| common = [(tz, tz) for tz in COMMON_TIMEZONES if tz in all_tz] | |
| others = [(tz, tz) for tz in all_tz if tz not in COMMON_TIMEZONES] | |
| return [("", "---")] + common + [("---", "─" * 20)] + others | |
| class ScriptForm(forms.ModelForm): | |
| """Form for creating and editing scripts.""" | |
| class Meta: | |
| model = Script | |
| fields = [ | |
| "name", | |
| "description", | |
| "code", | |
| "environment", | |
| "tags", | |
| "timeout_seconds", | |
| "is_enabled", | |
| "notify_on", | |
| "notify_email", | |
| "notify_webhook_url", | |
| "notify_webhook_enabled", | |
| ] | |
| widgets = { | |
| "name": forms.TextInput( | |
| attrs={ | |
| "class": "w-full px-4 py-3 bg-code-bg border border-code-border rounded-lg text-code-text placeholder-code-muted/50 focus:outline-none focus:ring-2 focus:ring-code-accent/50 focus:border-code-accent", | |
| "placeholder": "My Script Name", | |
| } | |
| ), | |
| "description": forms.Textarea( | |
| attrs={ | |
| "class": "w-full px-4 py-3 bg-code-bg border border-code-border rounded-lg text-code-text placeholder-code-muted/50 focus:outline-none focus:ring-2 focus:ring-code-accent/50 focus:border-code-accent", | |
| "rows": 2, | |
| "placeholder": "What does this script do?", | |
| } | |
| ), | |
| "code": forms.Textarea( | |
| attrs={ | |
| "class": "w-full px-4 py-3 bg-code-bg border border-code-border rounded-lg text-code-text font-mono text-sm placeholder-code-muted/50 focus:outline-none focus:ring-2 focus:ring-code-accent/50 focus:border-code-accent", | |
| "rows": 15, | |
| "placeholder": '# Your Python code here\nprint("Hello, World!")', | |
| } | |
| ), | |
| "environment": forms.Select( | |
| attrs={ | |
| "class": "w-full px-4 py-3 bg-code-bg border border-code-border rounded-lg text-code-text focus:outline-none focus:ring-2 focus:ring-code-accent/50 focus:border-code-accent", | |
| } | |
| ), | |
| "tags": forms.CheckboxSelectMultiple( | |
| attrs={ | |
| "class": "tag-checkbox", | |
| } | |
| ), | |
| "timeout_seconds": forms.NumberInput( | |
| attrs={ | |
| "class": "w-full px-4 py-3 bg-code-bg border border-code-border rounded-lg text-code-text focus:outline-none focus:ring-2 focus:ring-code-accent/50 focus:border-code-accent", | |
| "min": 1, | |
| "max": 86400, | |
| } | |
| ), | |
| "is_enabled": forms.CheckboxInput( | |
| attrs={ | |
| "class": "w-5 h-5 text-code-accent bg-code-bg border-code-border rounded focus:ring-code-accent focus:ring-2", | |
| } | |
| ), | |
| "notify_on": forms.Select( | |
| attrs={ | |
| "class": "w-full px-4 py-3 bg-code-bg border border-code-border rounded-lg text-code-text focus:outline-none focus:ring-2 focus:ring-code-accent/50 focus:border-code-accent", | |
| } | |
| ), | |
| "notify_email": forms.EmailInput( | |
| attrs={ | |
| "class": "w-full px-4 py-3 bg-code-bg border border-code-border rounded-lg text-code-text placeholder-code-muted/50 focus:outline-none focus:ring-2 focus:ring-code-accent/50 focus:border-code-accent", | |
| "placeholder": "Override default email (optional)", | |
| } | |
| ), | |
| "notify_webhook_url": forms.URLInput( | |
| attrs={ | |
| "class": "w-full px-4 py-3 bg-code-bg border border-code-border rounded-lg text-code-text placeholder-code-muted/50 focus:outline-none focus:ring-2 focus:ring-code-accent/50 focus:border-code-accent", | |
| "placeholder": "https://your-service.com/webhook", | |
| } | |
| ), | |
| "notify_webhook_enabled": forms.CheckboxInput( | |
| attrs={ | |
| "class": "w-5 h-5 text-code-accent bg-code-bg border-code-border rounded focus:ring-code-accent focus:ring-2", | |
| } | |
| ), | |
| } | |
| labels = { | |
| "name": "Script Name", | |
| "description": "Description", | |
| "code": "Python Code", | |
| "environment": "Environment", | |
| "tags": "Tags", | |
| "timeout_seconds": "Timeout (seconds)", | |
| "is_enabled": "Enabled", | |
| "notify_on": "Notify On", | |
| "notify_email": "Notification Email", | |
| "notify_webhook_url": "Webhook URL", | |
| "notify_webhook_enabled": "Enable Webhook", | |
| } | |
| help_texts = { | |
| "timeout_seconds": "Maximum execution time (1 second to 24 hours)", | |
| "notify_email": "Leave empty to use global default", | |
| "notify_webhook_url": "URL to POST notifications to when script completes", | |
| } | |
| def __init__(self, *args, **kwargs): | |
| super().__init__(*args, **kwargs) | |
| # Only show active environments | |
| self.fields["environment"].queryset = Environment.objects.filter(is_active=True) | |
| def clean_code(self): | |
| code = self.cleaned_data.get("code", "").strip() | |
| if not code: | |
| raise forms.ValidationError("Script code cannot be empty.") | |
| return code | |
| def clean_timeout_seconds(self): | |
| timeout = self.cleaned_data.get("timeout_seconds") | |
| if timeout is not None and (timeout < 1 or timeout > 86400): | |
| raise forms.ValidationError("Timeout must be between 1 and 86400 seconds (24 hours).") | |
| return timeout | |
| class TagForm(forms.ModelForm): | |
| """Form for creating and editing tags.""" | |
| class Meta: | |
| model = Tag | |
| fields = ["name", "color"] | |
| widgets = { | |
| "name": forms.TextInput( | |
| attrs={ | |
| "class": "w-full px-4 py-3 bg-code-bg border border-code-border rounded-lg text-code-text placeholder-code-muted/50 focus:outline-none focus:ring-2 focus:ring-code-accent/50 focus:border-code-accent", | |
| "placeholder": "Tag name", | |
| } | |
| ), | |
| "color": forms.Select( | |
| attrs={ | |
| "class": "w-full px-4 py-3 bg-code-bg border border-code-border rounded-lg text-code-text focus:outline-none focus:ring-2 focus:ring-code-accent/50 focus:border-code-accent", | |
| } | |
| ), | |
| } | |
| labels = { | |
| "name": "Tag Name", | |
| "color": "Color", | |
| } | |
| def clean_name(self): | |
| name = self.cleaned_data.get("name", "").strip() | |
| if not name: | |
| raise forms.ValidationError("Tag name is required.") | |
| # Check uniqueness (excluding current instance for edits) | |
| qs = Tag.objects.filter(name__iexact=name) | |
| if self.instance.pk: | |
| qs = qs.exclude(pk=self.instance.pk) | |
| if qs.exists(): | |
| raise forms.ValidationError("A tag with this name already exists.") | |
| return name | |
| class ScheduleForm(forms.ModelForm): | |
| """Form for configuring script schedules.""" | |
| WEEKDAY_CHOICES = [ | |
| (0, "Monday"), | |
| (1, "Tuesday"), | |
| (2, "Wednesday"), | |
| (3, "Thursday"), | |
| (4, "Friday"), | |
| (5, "Saturday"), | |
| (6, "Sunday"), | |
| ] | |
| MONTHDAY_CHOICES = [(i, str(i)) for i in range(1, 32)] | |
| # Custom field for daily times (comma-separated input) | |
| daily_times_input = forms.CharField( | |
| required=False, | |
| widget=forms.TextInput( | |
| attrs={ | |
| "class": "w-full px-4 py-3 bg-code-bg border border-code-border rounded-lg text-code-text placeholder-code-muted/50 focus:outline-none focus:ring-2 focus:ring-code-accent/50", | |
| "placeholder": "09:00, 18:00", | |
| } | |
| ), | |
| label="Run Times", | |
| help_text="Comma-separated times in HH:MM format (24-hour)", | |
| ) | |
| # Weekly mode fields | |
| weekly_days_input = forms.MultipleChoiceField( | |
| required=False, | |
| choices=WEEKDAY_CHOICES, | |
| widget=forms.CheckboxSelectMultiple( | |
| attrs={ | |
| "class": "sr-only peer", | |
| } | |
| ), | |
| label="Days of Week", | |
| ) | |
| weekly_times_input = forms.CharField( | |
| required=False, | |
| widget=forms.TextInput( | |
| attrs={ | |
| "class": "w-full px-4 py-3 bg-code-bg border border-code-border rounded-lg text-code-text placeholder-code-muted/50 focus:outline-none focus:ring-2 focus:ring-code-accent/50", | |
| "placeholder": "09:00, 18:00", | |
| } | |
| ), | |
| label="Run Times", | |
| help_text="Comma-separated times in HH:MM format (24-hour)", | |
| ) | |
| # Monthly mode fields | |
| monthly_days_input = forms.MultipleChoiceField( | |
| required=False, | |
| choices=MONTHDAY_CHOICES, | |
| widget=forms.CheckboxSelectMultiple( | |
| attrs={ | |
| "class": "sr-only peer", | |
| } | |
| ), | |
| label="Days of Month", | |
| ) | |
| monthly_times_input = forms.CharField( | |
| required=False, | |
| widget=forms.TextInput( | |
| attrs={ | |
| "class": "w-full px-4 py-3 bg-code-bg border border-code-border rounded-lg text-code-text placeholder-code-muted/50 focus:outline-none focus:ring-2 focus:ring-code-accent/50", | |
| "placeholder": "09:00, 18:00", | |
| } | |
| ), | |
| label="Run Times", | |
| help_text="Comma-separated times in HH:MM format (24-hour)", | |
| ) | |
| timezone = forms.ChoiceField( | |
| choices=get_timezone_choices, | |
| initial="UTC", | |
| widget=forms.Select( | |
| attrs={ | |
| "class": "w-full px-4 py-3 bg-code-bg border border-code-border rounded-lg text-code-text focus:outline-none focus:ring-2 focus:ring-code-accent/50", | |
| } | |
| ), | |
| ) | |
| class Meta: | |
| model = ScriptSchedule | |
| fields = ["run_mode", "interval_minutes", "timezone", "is_active"] | |
| widgets = { | |
| "run_mode": forms.RadioSelect( | |
| attrs={ | |
| "class": "sr-only peer", | |
| } | |
| ), | |
| "interval_minutes": forms.Select( | |
| attrs={ | |
| "class": "w-full px-4 py-3 bg-code-bg border border-code-border rounded-lg text-code-text focus:outline-none focus:ring-2 focus:ring-code-accent/50", | |
| } | |
| ), | |
| "is_active": forms.CheckboxInput( | |
| attrs={ | |
| "class": "w-5 h-5 text-code-accent bg-code-bg border-code-border rounded focus:ring-code-accent focus:ring-2", | |
| } | |
| ), | |
| } | |
| labels = { | |
| "run_mode": "Run Mode", | |
| "interval_minutes": "Interval", | |
| "is_active": "Schedule Active", | |
| } | |
| def __init__(self, *args, **kwargs): | |
| super().__init__(*args, **kwargs) | |
| # Populate daily_times_input from instance | |
| if self.instance and self.instance.pk and self.instance.daily_times: | |
| self.fields["daily_times_input"].initial = ", ".join( | |
| self.instance.daily_times | |
| ) | |
| # Populate weekly fields from instance | |
| if self.instance and self.instance.pk: | |
| if self.instance.weekly_days: | |
| self.fields["weekly_days_input"].initial = [ | |
| str(d) for d in self.instance.weekly_days | |
| ] | |
| if self.instance.weekly_times: | |
| self.fields["weekly_times_input"].initial = ", ".join( | |
| self.instance.weekly_times | |
| ) | |
| # Populate monthly fields from instance | |
| if self.instance and self.instance.pk: | |
| if self.instance.monthly_days: | |
| self.fields["monthly_days_input"].initial = [ | |
| str(d) for d in self.instance.monthly_days | |
| ] | |
| if self.instance.monthly_times: | |
| self.fields["monthly_times_input"].initial = ", ".join( | |
| self.instance.monthly_times | |
| ) | |
| def _parse_times(self, value): | |
| """Parse and validate comma-separated times input.""" | |
| if not value: | |
| return [] | |
| value = value.strip() | |
| if not value: | |
| return [] | |
| times = [] | |
| for time_str in value.split(","): | |
| time_str = time_str.strip() | |
| if not time_str: | |
| continue | |
| # Validate HH:MM format | |
| try: | |
| parts = time_str.split(":") | |
| if len(parts) != 2: | |
| raise ValueError | |
| hour, minute = int(parts[0]), int(parts[1]) | |
| if not (0 <= hour <= 23 and 0 <= minute <= 59): | |
| raise ValueError | |
| times.append(f"{hour:02d}:{minute:02d}") | |
| except ValueError: | |
| raise forms.ValidationError( | |
| f"Invalid time format: '{time_str}'. Use HH:MM (e.g., 09:00)" | |
| ) | |
| return times | |
| def clean_daily_times_input(self): | |
| """Parse and validate daily times input.""" | |
| return self._parse_times(self.cleaned_data.get("daily_times_input", "")) | |
| def clean_weekly_times_input(self): | |
| """Parse and validate weekly times input.""" | |
| return self._parse_times(self.cleaned_data.get("weekly_times_input", "")) | |
| def clean_monthly_times_input(self): | |
| """Parse and validate monthly times input.""" | |
| return self._parse_times(self.cleaned_data.get("monthly_times_input", "")) | |
| def clean(self): | |
| cleaned_data = super().clean() | |
| run_mode = cleaned_data.get("run_mode") | |
| if run_mode == ScriptSchedule.RunMode.INTERVAL: | |
| if not cleaned_data.get("interval_minutes"): | |
| self.add_error( | |
| "interval_minutes", "Interval is required for interval mode." | |
| ) | |
| elif run_mode == ScriptSchedule.RunMode.DAILY: | |
| daily_times = cleaned_data.get("daily_times_input", []) | |
| if not daily_times: | |
| self.add_error( | |
| "daily_times_input", | |
| "At least one time is required for daily mode.", | |
| ) | |
| elif run_mode == ScriptSchedule.RunMode.WEEKLY: | |
| weekly_days = cleaned_data.get("weekly_days_input", []) | |
| weekly_times = cleaned_data.get("weekly_times_input", []) | |
| if not weekly_days: | |
| self.add_error( | |
| "weekly_days_input", | |
| "At least one day is required for weekly mode.", | |
| ) | |
| if not weekly_times: | |
| self.add_error( | |
| "weekly_times_input", | |
| "At least one time is required for weekly mode.", | |
| ) | |
| elif run_mode == ScriptSchedule.RunMode.MONTHLY: | |
| monthly_days = cleaned_data.get("monthly_days_input", []) | |
| monthly_times = cleaned_data.get("monthly_times_input", []) | |
| if not monthly_days: | |
| self.add_error( | |
| "monthly_days_input", | |
| "At least one day is required for monthly mode.", | |
| ) | |
| if not monthly_times: | |
| self.add_error( | |
| "monthly_times_input", | |
| "At least one time is required for monthly mode.", | |
| ) | |
| return cleaned_data | |
| def save(self, commit=True): | |
| instance = super().save(commit=False) | |
| # Set daily_times from parsed input | |
| instance.daily_times = self.cleaned_data.get("daily_times_input", []) | |
| # Set weekly fields from parsed input | |
| weekly_days = self.cleaned_data.get("weekly_days_input", []) | |
| instance.weekly_days = [int(d) for d in weekly_days] if weekly_days else [] | |
| instance.weekly_times = self.cleaned_data.get("weekly_times_input", []) | |
| # Set monthly fields from parsed input | |
| monthly_days = self.cleaned_data.get("monthly_days_input", []) | |
| instance.monthly_days = [int(d) for d in monthly_days] if monthly_days else [] | |
| instance.monthly_times = self.cleaned_data.get("monthly_times_input", []) | |
| if commit: | |
| instance.save() | |
| return instance | |
| class EnvironmentCreateForm(forms.ModelForm): | |
| """Form for creating a new environment.""" | |
| python_path = forms.ChoiceField( | |
| choices=[], | |
| widget=forms.Select( | |
| attrs={ | |
| "class": "w-full px-4 py-3 bg-code-bg border border-code-border rounded-lg text-code-text focus:outline-none focus:ring-2 focus:ring-code-accent/50 focus:border-code-accent", | |
| } | |
| ), | |
| label="Python Version", | |
| help_text="Select Python installation to use for this environment", | |
| ) | |
| class Meta: | |
| model = Environment | |
| fields = ["name", "description"] | |
| widgets = { | |
| "name": forms.TextInput( | |
| attrs={ | |
| "class": "w-full px-4 py-3 bg-code-bg border border-code-border rounded-lg text-code-text placeholder-code-muted/50 focus:outline-none focus:ring-2 focus:ring-code-accent/50 focus:border-code-accent", | |
| "placeholder": "My Environment", | |
| } | |
| ), | |
| "description": forms.Textarea( | |
| attrs={ | |
| "class": "w-full px-4 py-3 bg-code-bg border border-code-border rounded-lg text-code-text placeholder-code-muted/50 focus:outline-none focus:ring-2 focus:ring-code-accent/50 focus:border-code-accent", | |
| "rows": 2, | |
| "placeholder": "Environment description (optional)", | |
| } | |
| ), | |
| } | |
| labels = { | |
| "name": "Environment Name", | |
| "description": "Description", | |
| } | |
| def __init__(self, *args, **kwargs): | |
| super().__init__(*args, **kwargs) | |
| # Populate python_path choices from discovered Python installations | |
| pythons = EnvironmentService.discover_python_versions() | |
| choices = [(p["path"], p["display"]) for p in pythons] | |
| if not choices: | |
| choices = [("", "No Python installations found")] | |
| self.fields["python_path"].choices = choices | |
| def clean_name(self): | |
| """Validate name and check for path uniqueness.""" | |
| name = self.cleaned_data.get("name", "").strip() | |
| if not name: | |
| raise forms.ValidationError("Environment name is required.") | |
| # Generate path from slugified name | |
| base_path = slugify(name) | |
| if not base_path: | |
| base_path = "environment" | |
| # Ensure path is unique | |
| path = base_path | |
| counter = 1 | |
| while Environment.objects.filter(path=path).exists(): | |
| path = f"{base_path}-{counter}" | |
| counter += 1 | |
| # Store generated path for use in view | |
| self._generated_path = path | |
| return name | |
| def get_generated_path(self) -> str: | |
| """Return the generated path after validation.""" | |
| return getattr(self, "_generated_path", slugify(self.cleaned_data.get("name", "env"))) | |
| class EnvironmentEditForm(forms.ModelForm): | |
| """Form for editing environment details (name/description only).""" | |
| class Meta: | |
| model = Environment | |
| fields = ["name", "description"] | |
| widgets = { | |
| "name": forms.TextInput( | |
| attrs={ | |
| "class": "w-full px-4 py-3 bg-code-bg border border-code-border rounded-lg text-code-text placeholder-code-muted/50 focus:outline-none focus:ring-2 focus:ring-code-accent/50 focus:border-code-accent", | |
| "placeholder": "My Environment", | |
| } | |
| ), | |
| "description": forms.Textarea( | |
| attrs={ | |
| "class": "w-full px-4 py-3 bg-code-bg border border-code-border rounded-lg text-code-text placeholder-code-muted/50 focus:outline-none focus:ring-2 focus:ring-code-accent/50 focus:border-code-accent", | |
| "rows": 2, | |
| "placeholder": "Environment description (optional)", | |
| } | |
| ), | |
| } | |
| labels = { | |
| "name": "Environment Name", | |
| "description": "Description", | |
| } | |
| class PackageInstallForm(forms.Form): | |
| """Form for installing a single package.""" | |
| package_spec = forms.CharField( | |
| max_length=200, | |
| widget=forms.TextInput( | |
| attrs={ | |
| "class": "w-full px-4 py-3 bg-code-bg border border-code-border rounded-lg text-code-text font-mono placeholder-code-muted/50 focus:outline-none focus:ring-2 focus:ring-code-accent/50 focus:border-code-accent", | |
| "placeholder": "requests==2.31.0", | |
| } | |
| ), | |
| label="Package", | |
| help_text="Package name with optional version (e.g., requests, django>=4.0)", | |
| ) | |
| def clean_package_spec(self): | |
| spec = self.cleaned_data.get("package_spec", "").strip() | |
| if not spec: | |
| raise forms.ValidationError("Package specification is required.") | |
| if not EnvironmentService.validate_package_spec(spec): | |
| raise forms.ValidationError( | |
| "Invalid package specification. Use format: package or package==version" | |
| ) | |
| return spec | |
| class BulkInstallForm(forms.Form): | |
| """Form for bulk package installation from requirements.""" | |
| requirements = forms.CharField( | |
| widget=forms.Textarea( | |
| attrs={ | |
| "class": "w-full px-4 py-3 bg-code-bg border border-code-border rounded-lg text-code-text font-mono text-sm placeholder-code-muted/50 focus:outline-none focus:ring-2 focus:ring-code-accent/50 focus:border-code-accent", | |
| "rows": 10, | |
| "placeholder": "requests==2.31.0\ndjango>=4.0\nnumpy", | |
| } | |
| ), | |
| label="Requirements", | |
| help_text="Paste requirements.txt content (one package per line)", | |
| required=False, | |
| ) | |
| requirements_file = forms.FileField( | |
| required=False, | |
| widget=forms.FileInput( | |
| attrs={ | |
| "class": "block w-full text-sm text-code-muted file:mr-4 file:py-2 file:px-4 file:rounded-lg file:border-0 file:text-sm file:font-semibold file:bg-code-accent file:text-white hover:file:bg-code-accent/90 cursor-pointer", | |
| "accept": ".txt", | |
| } | |
| ), | |
| label="Or upload requirements.txt", | |
| ) | |
| def clean(self): | |
| cleaned_data = super().clean() | |
| text = cleaned_data.get("requirements", "").strip() | |
| file = cleaned_data.get("requirements_file") | |
| if not text and not file: | |
| raise forms.ValidationError( | |
| "Provide requirements text or upload a file." | |
| ) | |
| # If file provided, read its content | |
| if file: | |
| try: | |
| content = file.read().decode("utf-8") | |
| cleaned_data["requirements"] = content | |
| except UnicodeDecodeError: | |
| raise forms.ValidationError( | |
| "Could not read file. Ensure it's a valid text file." | |
| ) | |
| # Validate each line | |
| requirements_text = cleaned_data.get("requirements", "") | |
| for line in requirements_text.splitlines(): | |
| line = line.strip() | |
| # Skip empty lines and comments | |
| if not line or line.startswith("#") or line.startswith("-"): | |
| continue | |
| # Extract package spec (first word before any whitespace) | |
| pkg_spec = line.split()[0] if line.split() else "" | |
| if pkg_spec and not EnvironmentService.validate_package_spec(pkg_spec): | |
| raise forms.ValidationError(f"Invalid package specification: {line}") | |
| return cleaned_data | |
| class SecretCreateForm(forms.Form): | |
| """Form for creating a new secret.""" | |
| key = forms.CharField( | |
| max_length=100, | |
| widget=forms.TextInput( | |
| attrs={ | |
| "class": "w-full px-4 py-3 bg-code-bg border border-code-border rounded-lg text-code-text font-mono placeholder-code-muted/50 focus:outline-none focus:ring-2 focus:ring-code-accent/50 focus:border-code-accent uppercase", | |
| "placeholder": "API_KEY", | |
| "autocomplete": "off", | |
| } | |
| ), | |
| label="Key Name", | |
| help_text="Uppercase letters, numbers, and underscores only. Must start with a letter.", | |
| ) | |
| value = forms.CharField( | |
| widget=forms.Textarea( | |
| attrs={ | |
| "class": "w-full px-4 py-3 bg-code-bg border border-code-border rounded-lg text-code-text font-mono placeholder-code-muted/50 focus:outline-none focus:ring-2 focus:ring-code-accent/50 focus:border-code-accent", | |
| "rows": 3, | |
| "placeholder": "sk-your-secret-value-here", | |
| "autocomplete": "off", | |
| } | |
| ), | |
| label="Secret Value", | |
| help_text="The secret value (will be encrypted at rest)", | |
| ) | |
| description = forms.CharField( | |
| required=False, | |
| widget=forms.Textarea( | |
| attrs={ | |
| "class": "w-full px-4 py-3 bg-code-bg border border-code-border rounded-lg text-code-text placeholder-code-muted/50 focus:outline-none focus:ring-2 focus:ring-code-accent/50 focus:border-code-accent", | |
| "rows": 2, | |
| "placeholder": "What is this secret used for?", | |
| } | |
| ), | |
| label="Description", | |
| help_text="Optional description to help remember what this secret is for", | |
| ) | |
| def clean_key(self): | |
| """Validate and normalize the key.""" | |
| key = self.cleaned_data.get("key", "").strip().upper() | |
| if not key: | |
| raise forms.ValidationError("Key name is required.") | |
| if not SECRET_KEY_PATTERN.match(key): | |
| raise forms.ValidationError( | |
| "Key must start with a letter and contain only uppercase letters, numbers, and underscores." | |
| ) | |
| # Check for reserved environment variable names | |
| reserved = { | |
| "PATH", | |
| "HOME", | |
| "USER", | |
| "SHELL", | |
| "PWD", | |
| "PYTHONPATH", | |
| "VIRTUAL_ENV", | |
| "PYTHONHOME", | |
| "PYTHONDONTWRITEBYTECODE", | |
| "PYTHONUNBUFFERED", | |
| } | |
| if key in reserved: | |
| raise forms.ValidationError( | |
| f"'{key}' is a reserved environment variable name." | |
| ) | |
| # Check if key already exists | |
| from core.models import Secret | |
| if Secret.objects.filter(key=key).exists(): | |
| raise forms.ValidationError(f"A secret with key '{key}' already exists.") | |
| return key | |
| def clean_value(self): | |
| """Validate the secret value.""" | |
| value = self.cleaned_data.get("value", "") | |
| if not value: | |
| raise forms.ValidationError("Secret value is required.") | |
| # Reasonable max length for secrets | |
| if len(value) > 10000: | |
| raise forms.ValidationError( | |
| "Secret value is too long (max 10,000 characters)." | |
| ) | |
| return value | |
| class SecretEditForm(forms.Form): | |
| """Form for editing an existing secret (value and description only).""" | |
| value = forms.CharField( | |
| required=False, | |
| widget=forms.Textarea( | |
| attrs={ | |
| "class": "w-full px-4 py-3 bg-code-bg border border-code-border rounded-lg text-code-text font-mono placeholder-code-muted/50 focus:outline-none focus:ring-2 focus:ring-code-accent/50 focus:border-code-accent", | |
| "rows": 3, | |
| "placeholder": "Leave blank to keep current value", | |
| "autocomplete": "off", | |
| } | |
| ), | |
| label="New Secret Value", | |
| help_text="Leave blank to keep the current value", | |
| ) | |
| description = forms.CharField( | |
| required=False, | |
| widget=forms.Textarea( | |
| attrs={ | |
| "class": "w-full px-4 py-3 bg-code-bg border border-code-border rounded-lg text-code-text placeholder-code-muted/50 focus:outline-none focus:ring-2 focus:ring-code-accent/50 focus:border-code-accent", | |
| "rows": 2, | |
| "placeholder": "What is this secret used for?", | |
| } | |
| ), | |
| label="Description", | |
| ) | |
| def clean_value(self): | |
| """Validate the secret value if provided.""" | |
| value = self.cleaned_data.get("value", "") | |
| if value and len(value) > 10000: | |
| raise forms.ValidationError( | |
| "Secret value is too long (max 10,000 characters)." | |
| ) | |
| return value | |
| class NotificationSettingsForm(forms.Form): | |
| """Form for global notification settings.""" | |
| from core.models import GlobalSettings | |
| EMAIL_BACKEND_CHOICES = [ | |
| (GlobalSettings.EmailBackend.DISABLED, "Disabled"), | |
| (GlobalSettings.EmailBackend.SMTP, "SMTP"), | |
| (GlobalSettings.EmailBackend.RESEND, "Resend API"), | |
| ] | |
| email_backend = forms.ChoiceField( | |
| choices=EMAIL_BACKEND_CHOICES, | |
| initial=GlobalSettings.EmailBackend.DISABLED, | |
| widget=forms.RadioSelect( | |
| attrs={ | |
| "class": "sr-only peer", | |
| } | |
| ), | |
| label="Email Backend", | |
| ) | |
| # SMTP Configuration | |
| smtp_host = forms.CharField( | |
| required=False, | |
| max_length=255, | |
| widget=forms.TextInput( | |
| attrs={ | |
| "class": "w-full px-4 py-3 bg-code-bg border border-code-border rounded-lg text-code-text placeholder-code-muted/50 focus:outline-none focus:ring-2 focus:ring-code-accent/50 focus:border-code-accent", | |
| "placeholder": "smtp.example.com", | |
| } | |
| ), | |
| label="SMTP Host", | |
| ) | |
| smtp_port = forms.IntegerField( | |
| required=False, | |
| initial=587, | |
| widget=forms.NumberInput( | |
| attrs={ | |
| "class": "w-full px-4 py-3 bg-code-bg border border-code-border rounded-lg text-code-text focus:outline-none focus:ring-2 focus:ring-code-accent/50 focus:border-code-accent", | |
| "min": 1, | |
| "max": 65535, | |
| } | |
| ), | |
| label="SMTP Port", | |
| ) | |
| smtp_username = forms.CharField( | |
| required=False, | |
| max_length=255, | |
| widget=forms.TextInput( | |
| attrs={ | |
| "class": "w-full px-4 py-3 bg-code-bg border border-code-border rounded-lg text-code-text placeholder-code-muted/50 focus:outline-none focus:ring-2 focus:ring-code-accent/50 focus:border-code-accent", | |
| "placeholder": "username@example.com", | |
| } | |
| ), | |
| label="SMTP Username", | |
| ) | |
| smtp_password = forms.CharField( | |
| required=False, | |
| widget=forms.PasswordInput( | |
| attrs={ | |
| "class": "w-full px-4 py-3 bg-code-bg border border-code-border rounded-lg text-code-text placeholder-code-muted/50 focus:outline-none focus:ring-2 focus:ring-code-accent/50 focus:border-code-accent", | |
| "placeholder": "Leave blank to keep current", | |
| "autocomplete": "new-password", | |
| } | |
| ), | |
| label="SMTP Password", | |
| ) | |
| smtp_use_tls = forms.BooleanField( | |
| required=False, | |
| initial=True, | |
| widget=forms.CheckboxInput( | |
| attrs={ | |
| "class": "w-5 h-5 text-code-accent bg-code-bg border-code-border rounded focus:ring-code-accent focus:ring-2", | |
| } | |
| ), | |
| label="Use TLS", | |
| ) | |
| smtp_from_email = forms.EmailField( | |
| required=False, | |
| widget=forms.EmailInput( | |
| attrs={ | |
| "class": "w-full px-4 py-3 bg-code-bg border border-code-border rounded-lg text-code-text placeholder-code-muted/50 focus:outline-none focus:ring-2 focus:ring-code-accent/50 focus:border-code-accent", | |
| "placeholder": "noreply@example.com", | |
| } | |
| ), | |
| label="From Email", | |
| ) | |
| # Resend Configuration | |
| resend_api_key = forms.CharField( | |
| required=False, | |
| widget=forms.PasswordInput( | |
| attrs={ | |
| "class": "w-full px-4 py-3 bg-code-bg border border-code-border rounded-lg text-code-text placeholder-code-muted/50 focus:outline-none focus:ring-2 focus:ring-code-accent/50 focus:border-code-accent", | |
| "placeholder": "Leave blank to keep current", | |
| "autocomplete": "new-password", | |
| } | |
| ), | |
| label="Resend API Key", | |
| ) | |
| resend_from_email = forms.EmailField( | |
| required=False, | |
| widget=forms.EmailInput( | |
| attrs={ | |
| "class": "w-full px-4 py-3 bg-code-bg border border-code-border rounded-lg text-code-text placeholder-code-muted/50 focus:outline-none focus:ring-2 focus:ring-code-accent/50 focus:border-code-accent", | |
| "placeholder": "noreply@yourdomain.com", | |
| } | |
| ), | |
| label="From Email", | |
| ) | |
| # Default notification email | |
| default_notification_email = forms.EmailField( | |
| required=False, | |
| widget=forms.EmailInput( | |
| attrs={ | |
| "class": "w-full px-4 py-3 bg-code-bg border border-code-border rounded-lg text-code-text placeholder-code-muted/50 focus:outline-none focus:ring-2 focus:ring-code-accent/50 focus:border-code-accent", | |
| "placeholder": "notifications@example.com", | |
| } | |
| ), | |
| label="Default Notification Email", | |
| help_text="All script notifications will be sent here unless overridden per-script.", | |
| ) | |
| def __init__(self, *args, instance=None, **kwargs): | |
| """Initialize form with existing settings.""" | |
| super().__init__(*args, **kwargs) | |
| if instance: | |
| self.fields["email_backend"].initial = instance.email_backend | |
| self.fields["smtp_host"].initial = instance.smtp_host | |
| self.fields["smtp_port"].initial = instance.smtp_port | |
| self.fields["smtp_username"].initial = instance.smtp_username | |
| self.fields["smtp_use_tls"].initial = instance.smtp_use_tls | |
| self.fields["smtp_from_email"].initial = instance.smtp_from_email | |
| self.fields["resend_from_email"].initial = instance.resend_from_email | |
| self.fields["default_notification_email"].initial = instance.default_notification_email | |
| def clean(self): | |
| """Validate configuration based on selected backend.""" | |
| cleaned_data = super().clean() | |
| backend = cleaned_data.get("email_backend") | |
| from core.models import GlobalSettings | |
| if backend == GlobalSettings.EmailBackend.SMTP: | |
| if not cleaned_data.get("smtp_host"): | |
| self.add_error("smtp_host", "SMTP host is required for SMTP backend.") | |
| if not cleaned_data.get("smtp_from_email"): | |
| self.add_error("smtp_from_email", "From email is required for SMTP backend.") | |
| elif backend == GlobalSettings.EmailBackend.RESEND: | |
| if not cleaned_data.get("resend_from_email"): | |
| self.add_error("resend_from_email", "From email is required for Resend backend.") | |
| return cleaned_data | |
| def save(self, instance): | |
| """Save the notification settings to the GlobalSettings instance.""" | |
| from core.services import EncryptionService | |
| instance.email_backend = self.cleaned_data["email_backend"] | |
| instance.smtp_host = self.cleaned_data.get("smtp_host") or "" | |
| instance.smtp_port = self.cleaned_data.get("smtp_port") or 587 | |
| instance.smtp_username = self.cleaned_data.get("smtp_username") or "" | |
| instance.smtp_use_tls = self.cleaned_data.get("smtp_use_tls", True) | |
| instance.smtp_from_email = self.cleaned_data.get("smtp_from_email") or "" | |
| instance.resend_from_email = self.cleaned_data.get("resend_from_email") or "" | |
| instance.default_notification_email = self.cleaned_data.get("default_notification_email") or "" | |
| # Encrypt and save SMTP password if provided | |
| smtp_password = self.cleaned_data.get("smtp_password") | |
| if smtp_password: | |
| instance.smtp_password_encrypted = EncryptionService.encrypt(smtp_password) | |
| # Encrypt and save Resend API key if provided | |
| resend_api_key = self.cleaned_data.get("resend_api_key") | |
| if resend_api_key: | |
| instance.resend_api_key_encrypted = EncryptionService.encrypt(resend_api_key) | |
| instance.save() | |
| return instance | |
| class GeneralSettingsForm(forms.Form): | |
| """Form for general instance settings.""" | |
| from core.models import GlobalSettings | |
| DATE_FORMAT_CHOICES = [ | |
| (GlobalSettings.DateFormat.ISO, "YYYY-MM-DD (ISO)"), | |
| (GlobalSettings.DateFormat.US, "MM/DD/YYYY (US)"), | |
| (GlobalSettings.DateFormat.EU, "DD/MM/YYYY (EU)"), | |
| (GlobalSettings.DateFormat.DOT, "DD.MM.YYYY"), | |
| ] | |
| TIME_FORMAT_CHOICES = [ | |
| (GlobalSettings.TimeFormat.H24, "24-hour (14:30)"), | |
| (GlobalSettings.TimeFormat.H12, "12-hour (2:30 PM)"), | |
| ] | |
| instance_name = forms.CharField( | |
| max_length=100, | |
| required=False, | |
| widget=forms.TextInput( | |
| attrs={ | |
| "class": "w-full px-4 py-3 bg-code-bg border border-code-border rounded-lg text-code-text placeholder-code-muted/50 focus:outline-none focus:ring-2 focus:ring-code-accent/50 focus:border-code-accent", | |
| "placeholder": "PyRunner", | |
| } | |
| ), | |
| label="Instance Name", | |
| help_text="Displayed in the header and email notifications", | |
| ) | |
| timezone = forms.ChoiceField( | |
| choices=get_timezone_choices, | |
| initial="UTC", | |
| widget=forms.Select( | |
| attrs={ | |
| "class": "w-full px-4 py-3 bg-code-bg border border-code-border rounded-lg text-code-text focus:outline-none focus:ring-2 focus:ring-code-accent/50 focus:border-code-accent", | |
| } | |
| ), | |
| label="Timezone", | |
| help_text="Default timezone for displaying dates and times", | |
| ) | |
| date_format = forms.ChoiceField( | |
| choices=DATE_FORMAT_CHOICES, | |
| widget=forms.Select( | |
| attrs={ | |
| "class": "w-full px-4 py-3 bg-code-bg border border-code-border rounded-lg text-code-text focus:outline-none focus:ring-2 focus:ring-code-accent/50 focus:border-code-accent", | |
| } | |
| ), | |
| label="Date Format", | |
| ) | |
| time_format = forms.ChoiceField( | |
| choices=TIME_FORMAT_CHOICES, | |
| widget=forms.Select( | |
| attrs={ | |
| "class": "w-full px-4 py-3 bg-code-bg border border-code-border rounded-lg text-code-text focus:outline-none focus:ring-2 focus:ring-code-accent/50 focus:border-code-accent", | |
| } | |
| ), | |
| label="Time Format", | |
| ) | |
| admin_url_slug = forms.CharField( | |
| max_length=100, | |
| required=False, | |
| widget=forms.TextInput( | |
| attrs={ | |
| "class": "w-full px-4 py-3 bg-code-bg border border-code-border rounded-lg text-code-text placeholder-code-muted/50 focus:outline-none focus:ring-2 focus:ring-code-accent/50 focus:border-code-accent", | |
| "placeholder": "django-admin", | |
| } | |
| ), | |
| label="Django Admin URL", | |
| help_text="URL path for Django admin (e.g., 'django-admin' → /django-admin/). Requires app restart.", | |
| ) | |
| def clean_admin_url_slug(self): | |
| """Validate admin URL slug format.""" | |
| import re | |
| slug = self.cleaned_data.get("admin_url_slug", "django-admin").strip().lower() | |
| if not slug: | |
| slug = "django-admin" | |
| # Remove leading/trailing slashes | |
| slug = slug.strip("/") | |
| # Validate: alphanumeric, hyphens, underscores only | |
| if not re.match(r"^[a-z0-9_-]+$", slug): | |
| raise forms.ValidationError( | |
| "Admin URL can only contain lowercase letters, numbers, hyphens, and underscores." | |
| ) | |
| # Prevent conflicts with existing routes | |
| reserved = ["setup", "auth", "cpanel", "webhook", "static", "media"] | |
| if slug in reserved: | |
| raise forms.ValidationError( | |
| f"'{slug}' is a reserved URL path. Please choose a different name." | |
| ) | |
| return slug | |
| def __init__(self, *args, instance=None, **kwargs): | |
| """Initialize form with existing settings.""" | |
| super().__init__(*args, **kwargs) | |
| if instance: | |
| self.fields["instance_name"].initial = instance.instance_name | |
| self.fields["timezone"].initial = instance.timezone | |
| self.fields["date_format"].initial = instance.date_format | |
| self.fields["time_format"].initial = instance.time_format | |
| self.fields["admin_url_slug"].initial = instance.admin_url_slug | |
| def save(self, instance): | |
| """Save the general settings to the GlobalSettings instance.""" | |
| instance.instance_name = self.cleaned_data.get("instance_name") or "PyRunner" | |
| instance.timezone = self.cleaned_data.get("timezone") or "UTC" | |
| instance.date_format = self.cleaned_data.get("date_format") | |
| instance.time_format = self.cleaned_data.get("time_format") | |
| instance.admin_url_slug = self.cleaned_data.get("admin_url_slug") or "django-admin" | |
| instance.save(update_fields=[ | |
| "instance_name", "timezone", "date_format", "time_format", "admin_url_slug", "updated_at" | |
| ]) | |
| return instance | |
| class LogRetentionForm(forms.Form): | |
| """Form for log retention settings.""" | |
| retention_days = forms.IntegerField( | |
| min_value=0, | |
| initial=0, | |
| widget=forms.NumberInput( | |
| attrs={ | |
| "class": "w-full px-4 py-3 bg-code-bg border border-code-border rounded-lg text-code-text focus:outline-none focus:ring-2 focus:ring-code-accent/50 focus:border-code-accent", | |
| "min": 0, | |
| } | |
| ), | |
| label="Retention Days", | |
| help_text="Delete runs older than this many days (0 = keep forever)", | |
| ) | |
| retention_count = forms.IntegerField( | |
| min_value=0, | |
| initial=0, | |
| widget=forms.NumberInput( | |
| attrs={ | |
| "class": "w-full px-4 py-3 bg-code-bg border border-code-border rounded-lg text-code-text focus:outline-none focus:ring-2 focus:ring-code-accent/50 focus:border-code-accent", | |
| "min": 0, | |
| } | |
| ), | |
| label="Retention Count", | |
| help_text="Keep only the last N runs per script (0 = unlimited)", | |
| ) | |
| auto_cleanup_enabled = forms.BooleanField( | |
| required=False, | |
| initial=False, | |
| widget=forms.CheckboxInput( | |
| attrs={ | |
| "class": "w-5 h-5 text-code-accent bg-code-bg border-code-border rounded focus:ring-code-accent focus:ring-2", | |
| } | |
| ), | |
| label="Auto Cleanup", | |
| help_text="Automatically clean up old runs daily at 2 AM", | |
| ) | |
| def __init__(self, *args, instance=None, **kwargs): | |
| """Initialize form with existing settings.""" | |
| super().__init__(*args, **kwargs) | |
| if instance: | |
| self.fields["retention_days"].initial = instance.retention_days | |
| self.fields["retention_count"].initial = instance.retention_count | |
| self.fields["auto_cleanup_enabled"].initial = instance.auto_cleanup_enabled | |
| def save(self, instance): | |
| """Save the retention settings to the GlobalSettings instance.""" | |
| from core.services import RetentionService | |
| instance.retention_days = self.cleaned_data.get("retention_days") or 0 | |
| instance.retention_count = self.cleaned_data.get("retention_count") or 0 | |
| # Handle auto cleanup schedule | |
| new_auto_cleanup = self.cleaned_data.get("auto_cleanup_enabled", False) | |
| old_auto_cleanup = instance.auto_cleanup_enabled | |
| instance.auto_cleanup_enabled = new_auto_cleanup | |
| instance.save(update_fields=[ | |
| "retention_days", "retention_count", "auto_cleanup_enabled", "updated_at" | |
| ]) | |
| # Manage the django-q2 schedule | |
| if new_auto_cleanup and not old_auto_cleanup: | |
| RetentionService.enable_auto_cleanup() | |
| elif not new_auto_cleanup and old_auto_cleanup: | |
| RetentionService.disable_auto_cleanup() | |
| return instance | |
| class WorkerSettingsForm(forms.Form): | |
| """Form for Django-Q2 worker configuration.""" | |
| q_workers = forms.IntegerField( | |
| min_value=1, | |
| max_value=16, | |
| initial=2, | |
| widget=forms.NumberInput( | |
| attrs={ | |
| "class": "w-full px-4 py-3 bg-code-bg border border-code-border rounded-lg text-code-text focus:outline-none focus:ring-2 focus:ring-code-accent/50 focus:border-code-accent", | |
| "min": 1, | |
| "max": 16, | |
| } | |
| ), | |
| label="Worker Count", | |
| help_text="Number of worker processes (1-16). More workers can process more tasks simultaneously.", | |
| ) | |
| q_timeout = forms.IntegerField( | |
| min_value=0, | |
| max_value=86400, | |
| initial=600, | |
| widget=forms.NumberInput( | |
| attrs={ | |
| "class": "w-full px-4 py-3 bg-code-bg border border-code-border rounded-lg text-code-text focus:outline-none focus:ring-2 focus:ring-code-accent/50 focus:border-code-accent", | |
| "min": 0, | |
| "max": 86400, | |
| } | |
| ), | |
| label="Task Timeout (seconds)", | |
| help_text="Maximum time a task can run before worker timeout. Use 0 for no timeout (required on Windows). For long-running scripts, also increase the script's own timeout.", | |
| ) | |
| q_retry = forms.IntegerField( | |
| min_value=60, | |
| max_value=86400, | |
| initial=660, | |
| widget=forms.NumberInput( | |
| attrs={ | |
| "class": "w-full px-4 py-3 bg-code-bg border border-code-border rounded-lg text-code-text focus:outline-none focus:ring-2 focus:ring-code-accent/50 focus:border-code-accent", | |
| "min": 60, | |
| "max": 86400, | |
| } | |
| ), | |
| label="Retry Delay (seconds)", | |
| help_text="Time before retrying a failed/timed-out task. Should be greater than timeout.", | |
| ) | |
| q_queue_limit = forms.IntegerField( | |
| min_value=5, | |
| max_value=100, | |
| initial=20, | |
| widget=forms.NumberInput( | |
| attrs={ | |
| "class": "w-full px-4 py-3 bg-code-bg border border-code-border rounded-lg text-code-text focus:outline-none focus:ring-2 focus:ring-code-accent/50 focus:border-code-accent", | |
| "min": 5, | |
| "max": 100, | |
| } | |
| ), | |
| label="Queue Limit", | |
| help_text="Maximum number of tasks that can be queued at once.", | |
| ) | |
| def __init__(self, *args, instance=None, **kwargs): | |
| """Initialize form with existing settings.""" | |
| super().__init__(*args, **kwargs) | |
| if instance: | |
| self.fields["q_workers"].initial = instance.q_workers | |
| self.fields["q_timeout"].initial = instance.q_timeout | |
| self.fields["q_retry"].initial = instance.q_retry | |
| self.fields["q_queue_limit"].initial = instance.q_queue_limit | |
| def clean(self): | |
| """Validate that retry > timeout.""" | |
| cleaned_data = super().clean() | |
| timeout = cleaned_data.get("q_timeout", 0) | |
| retry = cleaned_data.get("q_retry", 660) | |
| if timeout > 0 and retry <= timeout: | |
| self.add_error( | |
| "q_retry", | |
| f"Retry delay ({retry}s) must be greater than timeout ({timeout}s).", | |
| ) | |
| return cleaned_data | |
| def save(self, instance): | |
| """Save the worker settings to the GlobalSettings instance.""" | |
| from django.utils import timezone | |
| # Check if any values actually changed | |
| changed = ( | |
| instance.q_workers != self.cleaned_data["q_workers"] | |
| or instance.q_timeout != self.cleaned_data["q_timeout"] | |
| or instance.q_retry != self.cleaned_data["q_retry"] | |
| or instance.q_queue_limit != self.cleaned_data["q_queue_limit"] | |
| ) | |
| instance.q_workers = self.cleaned_data["q_workers"] | |
| instance.q_timeout = self.cleaned_data["q_timeout"] | |
| instance.q_retry = self.cleaned_data["q_retry"] | |
| instance.q_queue_limit = self.cleaned_data["q_queue_limit"] | |
| if changed: | |
| instance.worker_settings_updated_at = timezone.now() | |
| instance.save( | |
| update_fields=[ | |
| "q_workers", | |
| "q_timeout", | |
| "q_retry", | |
| "q_queue_limit", | |
| "worker_settings_updated_at", | |
| "updated_at", | |
| ] | |
| ) | |
| return instance | |
| class BackupCreateForm(forms.Form): | |
| """Form for configuring backup creation.""" | |
| backup_format = forms.ChoiceField( | |
| choices=[ | |
| ("gzip", "Compressed (recommended) - .json.gz"), | |
| ("json", "Plain JSON - .json"), | |
| ], | |
| initial="gzip", | |
| widget=forms.RadioSelect( | |
| attrs={ | |
| "class": "w-4 h-4 text-code-accent bg-code-bg border-code-border focus:ring-code-accent", | |
| } | |
| ), | |
| label="Backup format", | |
| help_text="Compressed backups are 80-95% smaller", | |
| ) | |
| include_datastores = forms.BooleanField( | |
| required=False, | |
| initial=True, | |
| widget=forms.CheckboxInput( | |
| attrs={ | |
| "class": "w-5 h-5 text-code-accent bg-code-bg border-code-border rounded focus:ring-code-accent focus:ring-2", | |
| } | |
| ), | |
| label="Include DataStores", | |
| help_text="Include all DataStores and their key-value entries", | |
| ) | |
| include_runs = forms.BooleanField( | |
| required=False, | |
| initial=True, | |
| widget=forms.CheckboxInput( | |
| attrs={ | |
| "class": "w-5 h-5 text-code-accent bg-code-bg border-code-border rounded focus:ring-code-accent focus:ring-2", | |
| } | |
| ), | |
| label="Include run history", | |
| help_text="Include execution history (stdout/stderr)", | |
| ) | |
| max_runs = forms.IntegerField( | |
| initial=1000, | |
| min_value=0, | |
| max_value=10000, | |
| widget=forms.NumberInput( | |
| attrs={ | |
| "class": "w-full px-4 py-2 bg-code-bg text-code-text border border-code-border rounded-lg focus:ring-2 focus:ring-code-accent focus:border-transparent", | |
| "placeholder": "1000", | |
| } | |
| ), | |
| label="Maximum runs to include", | |
| help_text="Limit run history to most recent N runs (0 = all runs)", | |
| ) | |
| include_package_operations = forms.BooleanField( | |
| required=False, | |
| initial=False, | |
| widget=forms.CheckboxInput( | |
| attrs={ | |
| "class": "w-5 h-5 text-code-accent bg-code-bg border-code-border rounded focus:ring-code-accent focus:ring-2", | |
| } | |
| ), | |
| label="Include package operations", | |
| help_text="Include pip installation history", | |
| ) | |
| class BackupRestoreForm(forms.Form): | |
| """Form for restoring from backup.""" | |
| backup_file = forms.FileField( | |
| widget=forms.FileInput( | |
| attrs={ | |
| "class": "block w-full text-sm text-code-text file:mr-4 file:py-2 file:px-4 file:rounded-lg file:border-0 file:text-sm file:font-semibold file:bg-code-accent file:text-white hover:file:bg-opacity-90", | |
| "accept": ".json,.json.gz,.gz", | |
| } | |
| ), | |
| label="Backup file", | |
| help_text="JSON or compressed backup file (.json or .json.gz)", | |
| ) | |
| restore_runs = forms.BooleanField( | |
| required=False, | |
| initial=True, | |
| widget=forms.CheckboxInput( | |
| attrs={ | |
| "class": "w-5 h-5 text-code-accent bg-code-bg border-code-border rounded focus:ring-code-accent focus:ring-2", | |
| } | |
| ), | |
| label="Restore run history", | |
| help_text="Import execution history from backup", | |
| ) | |
| confirm_delete = forms.BooleanField( | |
| required=True, | |
| widget=forms.CheckboxInput( | |
| attrs={ | |
| "class": "w-5 h-5 text-red-500 bg-code-bg border-code-border rounded focus:ring-red-500 focus:ring-2", | |
| } | |
| ), | |
| label="I understand all existing data will be deleted", | |
| help_text="This action cannot be undone without the automatic backup", | |
| ) | |
| # ============================================================================= | |
| # Data Store Forms | |
| # ============================================================================= | |
| class DataStoreForm(forms.ModelForm): | |
| """Form for creating and editing data stores.""" | |
| class Meta: | |
| model = DataStore | |
| fields = ["name", "description"] | |
| widgets = { | |
| "name": forms.TextInput( | |
| attrs={ | |
| "class": "w-full px-4 py-3 bg-code-bg border border-code-border rounded-lg text-code-text placeholder-code-muted/50 focus:outline-none focus:ring-2 focus:ring-code-accent/50 focus:border-code-accent", | |
| "placeholder": "my_data_store", | |
| } | |
| ), | |
| "description": forms.Textarea( | |
| attrs={ | |
| "class": "w-full px-4 py-3 bg-code-bg border border-code-border rounded-lg text-code-text placeholder-code-muted/50 focus:outline-none focus:ring-2 focus:ring-code-accent/50 focus:border-code-accent", | |
| "rows": 2, | |
| "placeholder": "What is this data store used for?", | |
| } | |
| ), | |
| } | |
| labels = { | |
| "name": "Store Name", | |
| "description": "Description", | |
| } | |
| help_texts = { | |
| "name": "Used in scripts as: DataStore(\"name\")", | |
| } | |
| def clean_name(self): | |
| name = self.cleaned_data.get("name", "").strip() | |
| if not name: | |
| raise forms.ValidationError("Store name is required.") | |
| # Check for valid identifier-like name | |
| if not name.replace("_", "").replace("-", "").isalnum(): | |
| raise forms.ValidationError( | |
| "Name can only contain letters, numbers, underscores, and hyphens." | |
| ) | |
| # Check uniqueness (excluding current instance for edits) | |
| qs = DataStore.objects.filter(name__iexact=name) | |
| if self.instance.pk: | |
| qs = qs.exclude(pk=self.instance.pk) | |
| if qs.exists(): | |
| raise forms.ValidationError("A data store with this name already exists.") | |
| return name | |
| class DataStoreEntryForm(forms.Form): | |
| """Form for creating and editing data store entries.""" | |
| key = forms.CharField( | |
| max_length=255, | |
| widget=forms.TextInput( | |
| attrs={ | |
| "class": "w-full px-4 py-3 bg-code-bg border border-code-border rounded-lg text-code-text font-mono placeholder-code-muted/50 focus:outline-none focus:ring-2 focus:ring-code-accent/50 focus:border-code-accent", | |
| "placeholder": "my_key", | |
| } | |
| ), | |
| label="Key", | |
| help_text="Unique identifier for this entry", | |
| ) | |
| value = forms.CharField( | |
| widget=forms.Textarea( | |
| attrs={ | |
| "class": "w-full px-4 py-3 bg-code-bg border border-code-border rounded-lg text-code-text font-mono text-sm placeholder-code-muted/50 focus:outline-none focus:ring-2 focus:ring-code-accent/50 focus:border-code-accent", | |
| "rows": 6, | |
| "placeholder": '{"example": "value"}\nor just a string\nor a number like 42', | |
| } | |
| ), | |
| label="Value (JSON)", | |
| help_text="JSON value: string, number, boolean, array, or object", | |
| ) | |
| def __init__(self, *args, datastore=None, instance=None, **kwargs): | |
| super().__init__(*args, **kwargs) | |
| self.datastore = datastore | |
| self.instance = instance | |
| # Pre-populate for editing | |
| if instance: | |
| self.fields["key"].initial = instance.key | |
| self.fields["value"].initial = instance.value_json | |
| def clean_key(self): | |
| key = self.cleaned_data.get("key", "").strip() | |
| if not key: | |
| raise forms.ValidationError("Key is required.") | |
| if len(key) > 255: | |
| raise forms.ValidationError("Key cannot exceed 255 characters.") | |
| # Check uniqueness within the data store | |
| if self.datastore: | |
| qs = DataStoreEntry.objects.filter(datastore=self.datastore, key=key) | |
| if self.instance: | |
| qs = qs.exclude(pk=self.instance.pk) | |
| if qs.exists(): | |
| raise forms.ValidationError( | |
| f"Key '{key}' already exists in this data store." | |
| ) | |
| return key | |
| def clean_value(self): | |
| import json | |
| value = self.cleaned_data.get("value", "").strip() | |
| if not value: | |
| raise forms.ValidationError("Value is required.") | |
| try: | |
| # Validate it's valid JSON | |
| json.loads(value) | |
| except json.JSONDecodeError as e: | |
| raise forms.ValidationError(f"Invalid JSON: {e}") | |
| return value | |
| # ============================================================================= | |
| # API Token Forms | |
| # ============================================================================= | |
| class DataStoreAPITokenForm(forms.ModelForm): | |
| """Form for creating API tokens.""" | |
| class Meta: | |
| model = DataStoreAPIToken | |
| fields = ["name", "datastore", "expires_at"] | |
| widgets = { | |
| "name": forms.TextInput( | |
| attrs={ | |
| "class": "w-full px-4 py-3 bg-code-bg border border-code-border rounded-lg text-code-text placeholder-code-muted/50 focus:outline-none focus:ring-2 focus:ring-code-accent/50 focus:border-code-accent", | |
| "placeholder": "My Dashboard Token", | |
| } | |
| ), | |
| "datastore": forms.Select( | |
| attrs={ | |
| "class": "w-full px-4 py-3 bg-code-bg border border-code-border rounded-lg text-code-text focus:outline-none focus:ring-2 focus:ring-code-accent/50 focus:border-code-accent", | |
| } | |
| ), | |
| "expires_at": forms.DateTimeInput( | |
| attrs={ | |
| "class": "w-full px-4 py-3 bg-code-bg border border-code-border rounded-lg text-code-text focus:outline-none focus:ring-2 focus:ring-code-accent/50 focus:border-code-accent", | |
| "type": "datetime-local", | |
| }, | |
| format="%Y-%m-%dT%H:%M", | |
| ), | |
| } | |
| labels = { | |
| "name": "Token Name", | |
| "datastore": "Scope", | |
| "expires_at": "Expires At", | |
| } | |
| help_texts = { | |
| "name": "A friendly name to identify this token", | |
| "datastore": "Leave empty for access to all datastores, or select a specific datastore", | |
| "expires_at": "Optional. Leave empty for no expiration.", | |
| } | |
| def __init__(self, *args, **kwargs): | |
| super().__init__(*args, **kwargs) | |
| # Make datastore optional with a clear empty choice | |
| self.fields["datastore"].required = False | |
| self.fields["datastore"].empty_label = "All Datastores (Global Access)" | |
| self.fields["expires_at"].required = False | |
| # ============================================================================= | |
| # Authentication Forms | |
| # ============================================================================= | |
| INPUT_CLASS = "w-full px-4 py-3 bg-code-bg border border-code-border rounded-lg text-code-text placeholder-code-muted/50 focus:outline-none focus:ring-2 focus:ring-code-accent/50 focus:border-code-accent" | |
| class PasswordLoginForm(forms.Form): | |
| """Form for password-based login.""" | |
| email = forms.EmailField( | |
| widget=forms.EmailInput( | |
| attrs={ | |
| "class": INPUT_CLASS, | |
| "placeholder": "you@example.com", | |
| "autocomplete": "email", | |
| } | |
| ), | |
| label="Email address", | |
| ) | |
| password = forms.CharField( | |
| widget=forms.PasswordInput( | |
| attrs={ | |
| "class": INPUT_CLASS, | |
| "placeholder": "Your password", | |
| "autocomplete": "current-password", | |
| } | |
| ), | |
| label="Password", | |
| ) | |
| class SetPasswordForm(forms.Form): | |
| """Form for setting or changing password.""" | |
| password = forms.CharField( | |
| min_length=8, | |
| widget=forms.PasswordInput( | |
| attrs={ | |
| "class": INPUT_CLASS, | |
| "placeholder": "New password", | |
| "autocomplete": "new-password", | |
| } | |
| ), | |
| label="New Password", | |
| help_text="Minimum 8 characters", | |
| ) | |
| password_confirm = forms.CharField( | |
| widget=forms.PasswordInput( | |
| attrs={ | |
| "class": INPUT_CLASS, | |
| "placeholder": "Confirm new password", | |
| "autocomplete": "new-password", | |
| } | |
| ), | |
| label="Confirm Password", | |
| ) | |
| def clean(self): | |
| cleaned_data = super().clean() | |
| password = cleaned_data.get("password") | |
| confirm = cleaned_data.get("password_confirm") | |
| if password and confirm and password != confirm: | |
| raise forms.ValidationError("Passwords do not match.") | |
| return cleaned_data | |
| class AdminSetupForm(forms.Form): | |
| """Form for initial admin setup with password.""" | |
| email = forms.EmailField( | |
| widget=forms.EmailInput( | |
| attrs={ | |
| "class": INPUT_CLASS, | |
| "placeholder": "admin@example.com", | |
| "autocomplete": "email", | |
| } | |
| ), | |
| label="Admin Email", | |
| ) | |
| password = forms.CharField( | |
| min_length=8, | |
| widget=forms.PasswordInput( | |
| attrs={ | |
| "class": INPUT_CLASS, | |
| "placeholder": "Create a strong password", | |
| "autocomplete": "new-password", | |
| } | |
| ), | |
| label="Password", | |
| help_text="Minimum 8 characters", | |
| ) | |
| password_confirm = forms.CharField( | |
| widget=forms.PasswordInput( | |
| attrs={ | |
| "class": INPUT_CLASS, | |
| "placeholder": "Confirm password", | |
| "autocomplete": "new-password", | |
| } | |
| ), | |
| label="Confirm Password", | |
| ) | |
| def clean(self): | |
| cleaned_data = super().clean() | |
| password = cleaned_data.get("password") | |
| confirm = cleaned_data.get("password_confirm") | |
| if password and confirm and password != confirm: | |
| raise forms.ValidationError("Passwords do not match.") | |
| return cleaned_data | |
| # ============================================================================= | |
| # Services Forms | |
| # ============================================================================= | |
| class S3SettingsForm(forms.Form): | |
| """Form for S3 storage configuration.""" | |
| s3_enabled = forms.BooleanField( | |
| required=False, | |
| initial=False, | |
| widget=forms.CheckboxInput( | |
| attrs={ | |
| "class": "w-5 h-5 text-code-accent bg-code-bg border-code-border rounded focus:ring-code-accent focus:ring-2", | |
| } | |
| ), | |
| label="Enable S3 Storage", | |
| ) | |
| s3_endpoint_url = forms.CharField( | |
| required=False, | |
| max_length=500, | |
| widget=forms.TextInput( | |
| attrs={ | |
| "class": INPUT_CLASS, | |
| "placeholder": "https://s3.amazonaws.com or https://minio.example.com:9000", | |
| } | |
| ), | |
| label="Endpoint URL", | |
| help_text="Leave empty for AWS S3. Required for MinIO, DigitalOcean Spaces, etc.", | |
| ) | |
| s3_region = forms.CharField( | |
| required=False, | |
| max_length=50, | |
| initial="us-east-1", | |
| widget=forms.TextInput( | |
| attrs={ | |
| "class": INPUT_CLASS, | |
| "placeholder": "us-east-1", | |
| } | |
| ), | |
| label="Region", | |
| ) | |
| s3_bucket_name = forms.CharField( | |
| required=False, | |
| max_length=255, | |
| widget=forms.TextInput( | |
| attrs={ | |
| "class": INPUT_CLASS, | |
| "placeholder": "my-backup-bucket", | |
| } | |
| ), | |
| label="Bucket Name", | |
| ) | |
| s3_access_key = forms.CharField( | |
| required=False, | |
| widget=forms.PasswordInput( | |
| attrs={ | |
| "class": INPUT_CLASS, | |
| "placeholder": "Leave blank to keep current", | |
| "autocomplete": "new-password", | |
| } | |
| ), | |
| label="Access Key", | |
| ) | |
| s3_secret_key = forms.CharField( | |
| required=False, | |
| widget=forms.PasswordInput( | |
| attrs={ | |
| "class": INPUT_CLASS, | |
| "placeholder": "Leave blank to keep current", | |
| "autocomplete": "new-password", | |
| } | |
| ), | |
| label="Secret Key", | |
| ) | |
| s3_use_ssl = forms.BooleanField( | |
| required=False, | |
| initial=True, | |
| widget=forms.CheckboxInput( | |
| attrs={ | |
| "class": "w-5 h-5 text-code-accent bg-code-bg border-code-border rounded focus:ring-code-accent focus:ring-2", | |
| } | |
| ), | |
| label="Use SSL/TLS", | |
| help_text="Recommended for security. Disable only for local development.", | |
| ) | |
| s3_path_style = forms.BooleanField( | |
| required=False, | |
| initial=False, | |
| widget=forms.CheckboxInput( | |
| attrs={ | |
| "class": "w-5 h-5 text-code-accent bg-code-bg border-code-border rounded focus:ring-code-accent focus:ring-2", | |
| } | |
| ), | |
| label="Path-style addressing", | |
| help_text="Required for MinIO and some S3-compatible providers.", | |
| ) | |
| def __init__(self, *args, instance=None, **kwargs): | |
| """Initialize form with existing settings.""" | |
| super().__init__(*args, **kwargs) | |
| if instance: | |
| self.fields["s3_enabled"].initial = instance.s3_enabled | |
| self.fields["s3_endpoint_url"].initial = instance.s3_endpoint_url | |
| self.fields["s3_region"].initial = instance.s3_region or "us-east-1" | |
| self.fields["s3_bucket_name"].initial = instance.s3_bucket_name | |
| self.fields["s3_use_ssl"].initial = instance.s3_use_ssl | |
| self.fields["s3_path_style"].initial = instance.s3_path_style | |
| def save(self, instance): | |
| """Save the S3 settings to the GlobalSettings instance.""" | |
| from core.services.encryption_service import EncryptionService | |
| instance.s3_enabled = self.cleaned_data.get("s3_enabled", False) | |
| instance.s3_endpoint_url = self.cleaned_data.get("s3_endpoint_url") or "" | |
| instance.s3_region = self.cleaned_data.get("s3_region") or "us-east-1" | |
| instance.s3_bucket_name = self.cleaned_data.get("s3_bucket_name") or "" | |
| instance.s3_use_ssl = self.cleaned_data.get("s3_use_ssl", True) | |
| instance.s3_path_style = self.cleaned_data.get("s3_path_style", False) | |
| # Encrypt and save access key if provided | |
| access_key = self.cleaned_data.get("s3_access_key") | |
| if access_key: | |
| instance.s3_access_key_encrypted = EncryptionService.encrypt(access_key) | |
| # Encrypt and save secret key if provided | |
| secret_key = self.cleaned_data.get("s3_secret_key") | |
| if secret_key: | |
| instance.s3_secret_key_encrypted = EncryptionService.encrypt(secret_key) | |
| instance.save() | |
| return instance | |
| class S3BackupScheduleForm(forms.Form): | |
| """Form for S3 scheduled backup configuration.""" | |
| from core.models import GlobalSettings | |
| WEEKDAY_CHOICES = [ | |
| (0, "Monday"), | |
| (1, "Tuesday"), | |
| (2, "Wednesday"), | |
| (3, "Thursday"), | |
| (4, "Friday"), | |
| (5, "Saturday"), | |
| (6, "Sunday"), | |
| ] | |
| s3_backup_enabled = forms.BooleanField( | |
| required=False, | |
| initial=False, | |
| widget=forms.CheckboxInput( | |
| attrs={ | |
| "class": "w-5 h-5 text-code-accent bg-code-bg border-code-border rounded focus:ring-code-accent focus:ring-2", | |
| } | |
| ), | |
| label="Enable Scheduled Backups", | |
| ) | |
| s3_backup_schedule = forms.ChoiceField( | |
| choices=GlobalSettings.S3BackupSchedule.choices, | |
| initial=GlobalSettings.S3BackupSchedule.DISABLED, | |
| widget=forms.Select( | |
| attrs={ | |
| "class": INPUT_CLASS, | |
| } | |
| ), | |
| label="Schedule Frequency", | |
| ) | |
| s3_backup_time = forms.TimeField( | |
| initial="02:00", | |
| widget=forms.TimeInput( | |
| attrs={ | |
| "class": INPUT_CLASS, | |
| "type": "time", | |
| } | |
| ), | |
| label="Backup Time", | |
| help_text="Time to run backups (in instance timezone)", | |
| ) | |
| s3_backup_day = forms.ChoiceField( | |
| choices=WEEKDAY_CHOICES, | |
| initial=0, | |
| widget=forms.Select( | |
| attrs={ | |
| "class": INPUT_CLASS, | |
| } | |
| ), | |
| label="Day of Week", | |
| help_text="For weekly backups", | |
| ) | |
| s3_backup_prefix = forms.CharField( | |
| required=False, | |
| max_length=255, | |
| initial="pyrunner-backups/", | |
| widget=forms.TextInput( | |
| attrs={ | |
| "class": INPUT_CLASS, | |
| "placeholder": "pyrunner-backups/", | |
| } | |
| ), | |
| label="S3 Path Prefix", | |
| help_text="Path prefix for backup files in the bucket", | |
| ) | |
| s3_backup_retention_count = forms.IntegerField( | |
| min_value=0, | |
| initial=7, | |
| widget=forms.NumberInput( | |
| attrs={ | |
| "class": INPUT_CLASS, | |
| "min": "0", | |
| } | |
| ), | |
| label="Retention Count", | |
| help_text="Keep the last N backups (0 = keep all)", | |
| ) | |
| s3_backup_include_runs = forms.BooleanField( | |
| required=False, | |
| initial=False, | |
| widget=forms.CheckboxInput( | |
| attrs={ | |
| "class": "w-5 h-5 text-code-accent bg-code-bg border-code-border rounded focus:ring-code-accent focus:ring-2", | |
| } | |
| ), | |
| label="Include Run History", | |
| ) | |
| s3_backup_max_runs = forms.IntegerField( | |
| min_value=0, | |
| initial=1000, | |
| widget=forms.NumberInput( | |
| attrs={ | |
| "class": INPUT_CLASS, | |
| "min": "0", | |
| } | |
| ), | |
| label="Max Runs", | |
| help_text="Maximum runs to include (0 = all)", | |
| ) | |
| s3_backup_include_datastores = forms.BooleanField( | |
| required=False, | |
| initial=True, | |
| widget=forms.CheckboxInput( | |
| attrs={ | |
| "class": "w-5 h-5 text-code-accent bg-code-bg border-code-border rounded focus:ring-code-accent focus:ring-2", | |
| } | |
| ), | |
| label="Include DataStores", | |
| ) | |
| def __init__(self, *args, instance=None, **kwargs): | |
| """Initialize form with existing settings.""" | |
| super().__init__(*args, **kwargs) | |
| if instance: | |
| self.fields["s3_backup_enabled"].initial = instance.s3_backup_enabled | |
| self.fields["s3_backup_schedule"].initial = instance.s3_backup_schedule | |
| self.fields["s3_backup_time"].initial = instance.s3_backup_time | |
| self.fields["s3_backup_day"].initial = instance.s3_backup_day | |
| self.fields["s3_backup_prefix"].initial = instance.s3_backup_prefix or "pyrunner-backups/" | |
| self.fields["s3_backup_retention_count"].initial = instance.s3_backup_retention_count | |
| self.fields["s3_backup_include_runs"].initial = instance.s3_backup_include_runs | |
| self.fields["s3_backup_max_runs"].initial = instance.s3_backup_max_runs | |
| self.fields["s3_backup_include_datastores"].initial = instance.s3_backup_include_datastores | |
| def save(self, instance): | |
| """Save the backup schedule settings.""" | |
| from core.services.backup_schedule_service import BackupScheduleService | |
| instance.s3_backup_enabled = self.cleaned_data.get("s3_backup_enabled", False) | |
| instance.s3_backup_schedule = self.cleaned_data.get("s3_backup_schedule") | |
| instance.s3_backup_time = self.cleaned_data.get("s3_backup_time") | |
| instance.s3_backup_day = int(self.cleaned_data.get("s3_backup_day", 0)) | |
| instance.s3_backup_prefix = self.cleaned_data.get("s3_backup_prefix") or "pyrunner-backups/" | |
| instance.s3_backup_retention_count = self.cleaned_data.get("s3_backup_retention_count", 7) | |
| instance.s3_backup_include_runs = self.cleaned_data.get("s3_backup_include_runs", False) | |
| instance.s3_backup_max_runs = self.cleaned_data.get("s3_backup_max_runs", 1000) | |
| instance.s3_backup_include_datastores = self.cleaned_data.get("s3_backup_include_datastores", True) | |
| instance.save() | |
| # Sync the django-q2 schedule | |
| BackupScheduleService.sync_schedule() | |
| return instance | |