import time import uuid from typing import Optional from sqlalchemy.orm import Session from open_webui.internal.db import Base, JSONField, get_db, get_db_context from open_webui.models.groups import Groups from open_webui.models.users import Users, UserResponse from open_webui.models.prompt_history import PromptHistories from open_webui.models.access_grants import AccessGrantModel, AccessGrants from pydantic import BaseModel, ConfigDict, Field from sqlalchemy import BigInteger, Boolean, Column, String, Text, JSON, or_, func, cast #################### # Prompts DB Schema #################### class Prompt(Base): __tablename__ = "prompt" id = Column(Text, primary_key=True) command = Column(String, unique=True, index=True) user_id = Column(String) name = Column(Text) content = Column(Text) data = Column(JSON, nullable=True) meta = Column(JSON, nullable=True) tags = Column(JSON, nullable=True) is_active = Column(Boolean, default=True) version_id = Column(Text, nullable=True) # Points to active history entry created_at = Column(BigInteger, nullable=True) updated_at = Column(BigInteger, nullable=True) class PromptModel(BaseModel): id: Optional[str] = None command: str user_id: str name: str content: str data: Optional[dict] = None meta: Optional[dict] = None tags: Optional[list[str]] = None is_active: Optional[bool] = True version_id: Optional[str] = None created_at: Optional[int] = None updated_at: Optional[int] = None access_grants: list[AccessGrantModel] = Field(default_factory=list) model_config = ConfigDict(from_attributes=True) #################### # Forms #################### class PromptUserResponse(PromptModel): user: Optional[UserResponse] = None class PromptAccessResponse(PromptUserResponse): write_access: Optional[bool] = False class PromptListResponse(BaseModel): items: list[PromptUserResponse] total: int class PromptAccessListResponse(BaseModel): items: list[PromptAccessResponse] total: int class PromptForm(BaseModel): command: str name: str # Changed from title content: str data: Optional[dict] = None meta: Optional[dict] = None tags: Optional[list[str]] = None access_grants: Optional[list[dict]] = None version_id: Optional[str] = None # Active version commit_message: Optional[str] = None # For history tracking is_production: Optional[bool] = True # Whether to set new version as production class PromptsTable: def _get_access_grants( self, prompt_id: str, db: Optional[Session] = None ) -> list[AccessGrantModel]: return AccessGrants.get_grants_by_resource("prompt", prompt_id, db=db) def _to_prompt_model( self, prompt: Prompt, db: Optional[Session] = None ) -> PromptModel: prompt_data = PromptModel.model_validate(prompt).model_dump( exclude={"access_grants"} ) prompt_data["access_grants"] = self._get_access_grants(prompt_data["id"], db=db) return PromptModel.model_validate(prompt_data) def insert_new_prompt( self, user_id: str, form_data: PromptForm, db: Optional[Session] = None ) -> Optional[PromptModel]: now = int(time.time()) prompt_id = str(uuid.uuid4()) prompt = PromptModel( id=prompt_id, user_id=user_id, command=form_data.command, name=form_data.name, content=form_data.content, data=form_data.data or {}, meta=form_data.meta or {}, tags=form_data.tags or [], access_grants=[], is_active=True, created_at=now, updated_at=now, ) try: with get_db_context(db) as db: result = Prompt(**prompt.model_dump(exclude={"access_grants"})) db.add(result) db.commit() db.refresh(result) AccessGrants.set_access_grants( "prompt", prompt_id, form_data.access_grants, db=db ) if result: current_access_grants = self._get_access_grants(prompt_id, db=db) snapshot = { "name": form_data.name, "content": form_data.content, "command": form_data.command, "data": form_data.data or {}, "meta": form_data.meta or {}, "tags": form_data.tags or [], "access_grants": [ grant.model_dump() for grant in current_access_grants ], } history_entry = PromptHistories.create_history_entry( prompt_id=prompt_id, snapshot=snapshot, user_id=user_id, parent_id=None, # Initial commit has no parent commit_message=form_data.commit_message or "Initial version", db=db, ) # Set the initial version as the production version if history_entry: result.version_id = history_entry.id db.commit() db.refresh(result) return self._to_prompt_model(result, db=db) else: return None except Exception: return None def get_prompt_by_id( self, prompt_id: str, db: Optional[Session] = None ) -> Optional[PromptModel]: """Get prompt by UUID.""" try: with get_db_context(db) as db: prompt = db.query(Prompt).filter_by(id=prompt_id).first() if prompt: return self._to_prompt_model(prompt, db=db) return None except Exception: return None def get_prompt_by_command( self, command: str, db: Optional[Session] = None ) -> Optional[PromptModel]: try: with get_db_context(db) as db: prompt = db.query(Prompt).filter_by(command=command).first() if prompt: return self._to_prompt_model(prompt, db=db) return None except Exception: return None def get_prompts(self, db: Optional[Session] = None) -> list[PromptUserResponse]: with get_db_context(db) as db: all_prompts = ( db.query(Prompt) .filter(Prompt.is_active == True) .order_by(Prompt.updated_at.desc()) .all() ) user_ids = list(set(prompt.user_id for prompt in all_prompts)) users = Users.get_users_by_user_ids(user_ids, db=db) if user_ids else [] users_dict = {user.id: user for user in users} prompts = [] for prompt in all_prompts: user = users_dict.get(prompt.user_id) prompts.append( PromptUserResponse.model_validate( { **self._to_prompt_model(prompt, db=db).model_dump(), "user": user.model_dump() if user else None, } ) ) return prompts def get_prompts_by_user_id( self, user_id: str, permission: str = "write", db: Optional[Session] = None ) -> list[PromptUserResponse]: prompts = self.get_prompts(db=db) user_group_ids = { group.id for group in Groups.get_groups_by_member_id(user_id, db=db) } return [ prompt for prompt in prompts if prompt.user_id == user_id or AccessGrants.has_access( user_id=user_id, resource_type="prompt", resource_id=prompt.id, permission=permission, user_group_ids=user_group_ids, db=db, ) ] def search_prompts( self, user_id: str, filter: dict = {}, skip: int = 0, limit: int = 30, db: Optional[Session] = None, ) -> PromptListResponse: with get_db_context(db) as db: from open_webui.models.users import User, UserModel # Join with User table for user filtering and sorting query = db.query(Prompt, User).outerjoin(User, User.id == Prompt.user_id) query = query.filter(Prompt.is_active == True) if filter: query_key = filter.get("query") if query_key: query = query.filter( or_( Prompt.name.ilike(f"%{query_key}%"), Prompt.command.ilike(f"%{query_key}%"), Prompt.content.ilike(f"%{query_key}%"), User.name.ilike(f"%{query_key}%"), User.email.ilike(f"%{query_key}%"), ) ) view_option = filter.get("view_option") if view_option == "created": query = query.filter(Prompt.user_id == user_id) elif view_option == "shared": query = query.filter(Prompt.user_id != user_id) # Apply access grant filtering query = AccessGrants.has_permission_filter( db=db, query=query, DocumentModel=Prompt, filter=filter, resource_type="prompt", permission="read", ) tag = filter.get("tag") if tag: # Search for tag in JSON array field like_pattern = f'%"{tag.lower()}"%' tags_text = func.lower(cast(Prompt.tags, String)) query = query.filter(tags_text.like(like_pattern)) order_by = filter.get("order_by") direction = filter.get("direction") if order_by == "name": if direction == "asc": query = query.order_by(Prompt.name.asc()) else: query = query.order_by(Prompt.name.desc()) elif order_by == "created_at": if direction == "asc": query = query.order_by(Prompt.created_at.asc()) else: query = query.order_by(Prompt.created_at.desc()) elif order_by == "updated_at": if direction == "asc": query = query.order_by(Prompt.updated_at.asc()) else: query = query.order_by(Prompt.updated_at.desc()) else: query = query.order_by(Prompt.updated_at.desc()) else: query = query.order_by(Prompt.updated_at.desc()) # Count BEFORE pagination total = query.count() if skip: query = query.offset(skip) if limit: query = query.limit(limit) items = query.all() prompts = [] for prompt, user in items: prompts.append( PromptUserResponse( **self._to_prompt_model(prompt, db=db).model_dump(), user=( UserResponse(**UserModel.model_validate(user).model_dump()) if user else None ), ) ) return PromptListResponse(items=prompts, total=total) def update_prompt_by_command( self, command: str, form_data: PromptForm, user_id: str, db: Optional[Session] = None, ) -> Optional[PromptModel]: try: with get_db_context(db) as db: prompt = db.query(Prompt).filter_by(command=command).first() if not prompt: return None latest_history = PromptHistories.get_latest_history_entry( prompt.id, db=db ) parent_id = latest_history.id if latest_history else None current_access_grants = self._get_access_grants(prompt.id, db=db) # Check if content changed to decide on history creation content_changed = ( prompt.name != form_data.name or prompt.content != form_data.content or form_data.access_grants is not None ) # Update prompt fields prompt.name = form_data.name prompt.content = form_data.content prompt.data = form_data.data or prompt.data prompt.meta = form_data.meta or prompt.meta prompt.updated_at = int(time.time()) if form_data.access_grants is not None: AccessGrants.set_access_grants( "prompt", prompt.id, form_data.access_grants, db=db ) current_access_grants = self._get_access_grants(prompt.id, db=db) db.commit() # Create history entry only if content changed if content_changed: snapshot = { "name": form_data.name, "content": form_data.content, "command": command, "data": form_data.data or {}, "meta": form_data.meta or {}, "access_grants": [ grant.model_dump() for grant in current_access_grants ], } history_entry = PromptHistories.create_history_entry( prompt_id=prompt.id, snapshot=snapshot, user_id=user_id, parent_id=parent_id, commit_message=form_data.commit_message, db=db, ) # Set as production if flag is True (default) if form_data.is_production and history_entry: prompt.version_id = history_entry.id db.commit() return self._to_prompt_model(prompt, db=db) except Exception: return None def update_prompt_by_id( self, prompt_id: str, form_data: PromptForm, user_id: str, db: Optional[Session] = None, ) -> Optional[PromptModel]: try: with get_db_context(db) as db: prompt = db.query(Prompt).filter_by(id=prompt_id).first() if not prompt: return None latest_history = PromptHistories.get_latest_history_entry( prompt.id, db=db ) parent_id = latest_history.id if latest_history else None current_access_grants = self._get_access_grants(prompt.id, db=db) # Check if content changed to decide on history creation content_changed = ( prompt.name != form_data.name or prompt.command != form_data.command or prompt.content != form_data.content or form_data.access_grants is not None or (form_data.tags is not None and prompt.tags != form_data.tags) ) # Update prompt fields prompt.name = form_data.name prompt.command = form_data.command prompt.content = form_data.content prompt.data = form_data.data or prompt.data prompt.meta = form_data.meta or prompt.meta if form_data.tags is not None: prompt.tags = form_data.tags if form_data.access_grants is not None: AccessGrants.set_access_grants( "prompt", prompt.id, form_data.access_grants, db=db ) current_access_grants = self._get_access_grants(prompt.id, db=db) prompt.updated_at = int(time.time()) db.commit() # Create history entry only if content changed if content_changed: snapshot = { "name": form_data.name, "content": form_data.content, "command": prompt.command, "data": form_data.data or {}, "meta": form_data.meta or {}, "tags": prompt.tags or [], "access_grants": [ grant.model_dump() for grant in current_access_grants ], } history_entry = PromptHistories.create_history_entry( prompt_id=prompt.id, snapshot=snapshot, user_id=user_id, parent_id=parent_id, commit_message=form_data.commit_message, db=db, ) # Set as production if flag is True (default) if form_data.is_production and history_entry: prompt.version_id = history_entry.id db.commit() return self._to_prompt_model(prompt, db=db) except Exception: return None def update_prompt_metadata( self, prompt_id: str, name: str, command: str, tags: Optional[list[str]] = None, db: Optional[Session] = None, ) -> Optional[PromptModel]: """Update only name, command, and tags (no history created).""" try: with get_db_context(db) as db: prompt = db.query(Prompt).filter_by(id=prompt_id).first() if not prompt: return None prompt.name = name prompt.command = command if tags is not None: prompt.tags = tags prompt.updated_at = int(time.time()) db.commit() return self._to_prompt_model(prompt, db=db) except Exception: return None def update_prompt_version( self, prompt_id: str, version_id: str, db: Optional[Session] = None, ) -> Optional[PromptModel]: """Set the active version of a prompt and restore content from that version's snapshot.""" try: with get_db_context(db) as db: prompt = db.query(Prompt).filter_by(id=prompt_id).first() if not prompt: return None history_entry = PromptHistories.get_history_entry_by_id( version_id, db=db ) if not history_entry: return None # Restore prompt content from the snapshot snapshot = history_entry.snapshot if snapshot: prompt.name = snapshot.get("name", prompt.name) prompt.content = snapshot.get("content", prompt.content) prompt.data = snapshot.get("data", prompt.data) prompt.meta = snapshot.get("meta", prompt.meta) prompt.tags = snapshot.get("tags", prompt.tags) # Note: command and access_grants are not restored from snapshot prompt.version_id = version_id prompt.updated_at = int(time.time()) db.commit() return self._to_prompt_model(prompt, db=db) except Exception: return None def delete_prompt_by_command( self, command: str, db: Optional[Session] = None ) -> bool: """Soft delete a prompt by setting is_active to False.""" try: with get_db_context(db) as db: prompt = db.query(Prompt).filter_by(command=command).first() if prompt: PromptHistories.delete_history_by_prompt_id(prompt.id, db=db) AccessGrants.revoke_all_access("prompt", prompt.id, db=db) prompt.is_active = False prompt.updated_at = int(time.time()) db.commit() return True return False except Exception: return False def delete_prompt_by_id(self, prompt_id: str, db: Optional[Session] = None) -> bool: """Soft delete a prompt by setting is_active to False.""" try: with get_db_context(db) as db: prompt = db.query(Prompt).filter_by(id=prompt_id).first() if prompt: PromptHistories.delete_history_by_prompt_id(prompt.id, db=db) AccessGrants.revoke_all_access("prompt", prompt.id, db=db) prompt.is_active = False prompt.updated_at = int(time.time()) db.commit() return True return False except Exception: return False def hard_delete_prompt_by_command( self, command: str, db: Optional[Session] = None ) -> bool: """Permanently delete a prompt and its history.""" try: with get_db_context(db) as db: prompt = db.query(Prompt).filter_by(command=command).first() if prompt: PromptHistories.delete_history_by_prompt_id(prompt.id, db=db) AccessGrants.revoke_all_access("prompt", prompt.id, db=db) # Delete prompt db.query(Prompt).filter_by(command=command).delete() db.commit() return True return False except Exception: return False def get_tags(self, db: Optional[Session] = None) -> list[str]: try: with get_db_context(db) as db: prompts = db.query(Prompt).filter_by(is_active=True).all() tags = set() for prompt in prompts: if prompt.tags: for tag in prompt.tags: if tag: tags.add(tag) return sorted(list(tags)) except Exception: return [] Prompts = PromptsTable()