| import type { PushPayload } from '@n8n/api-types'; |
| import type { User } from '@n8n/db'; |
| import { UserRepository } from '@n8n/db'; |
| import { Service } from '@n8n/di'; |
| import { ErrorReporter } from 'n8n-core'; |
| import type { Workflow } from 'n8n-workflow'; |
| import { UnexpectedError } from 'n8n-workflow'; |
|
|
| import { CollaborationState } from '@/collaboration/collaboration.state'; |
| import { Push } from '@/push'; |
| import type { OnPushMessage } from '@/push/types'; |
| import { AccessService } from '@/services/access.service'; |
|
|
| import type { WorkflowClosedMessage, WorkflowOpenedMessage } from './collaboration.message'; |
| import { parseWorkflowMessage } from './collaboration.message'; |
|
|
| |
| |
| |
| |
| @Service() |
| export class CollaborationService { |
| constructor( |
| private readonly errorReporter: ErrorReporter, |
| private readonly push: Push, |
| private readonly state: CollaborationState, |
| private readonly userRepository: UserRepository, |
| private readonly accessService: AccessService, |
| ) {} |
|
|
| init() { |
| this.push.on('message', async (event: OnPushMessage) => { |
| try { |
| await this.handleUserMessage(event.userId, event.msg); |
| } catch (error) { |
| this.errorReporter.error( |
| new UnexpectedError('Error handling CollaborationService push message', { |
| extra: { |
| msg: event.msg, |
| userId: event.userId, |
| }, |
| cause: error, |
| }), |
| ); |
| } |
| }); |
| } |
|
|
| async handleUserMessage(userId: User['id'], msg: unknown) { |
| const workflowMessage = await parseWorkflowMessage(msg); |
|
|
| if (workflowMessage.type === 'workflowOpened') { |
| await this.handleWorkflowOpened(userId, workflowMessage); |
| } else if (workflowMessage.type === 'workflowClosed') { |
| await this.handleWorkflowClosed(userId, workflowMessage); |
| } |
| } |
|
|
| private async handleWorkflowOpened(userId: User['id'], msg: WorkflowOpenedMessage) { |
| const { workflowId } = msg; |
|
|
| if (!(await this.accessService.hasReadAccess(userId, workflowId))) { |
| return; |
| } |
|
|
| await this.state.addCollaborator(workflowId, userId); |
|
|
| await this.sendWorkflowUsersChangedMessage(workflowId); |
| } |
|
|
| private async handleWorkflowClosed(userId: User['id'], msg: WorkflowClosedMessage) { |
| const { workflowId } = msg; |
|
|
| if (!(await this.accessService.hasReadAccess(userId, workflowId))) { |
| return; |
| } |
|
|
| await this.state.removeCollaborator(workflowId, userId); |
|
|
| await this.sendWorkflowUsersChangedMessage(workflowId); |
| } |
|
|
| private async sendWorkflowUsersChangedMessage(workflowId: Workflow['id']) { |
| |
| |
| const collaborators = await this.state.getCollaborators(workflowId); |
| const userIds = collaborators.map((user) => user.userId); |
|
|
| if (userIds.length === 0) { |
| return; |
| } |
| const users = await this.userRepository.getByIds(this.userRepository.manager, userIds); |
| const activeCollaborators = users.map((user) => ({ |
| user: user.toIUser(), |
| lastSeen: collaborators.find(({ userId }) => userId === user.id)!.lastSeen, |
| })); |
| const msgData: PushPayload<'collaboratorsChanged'> = { |
| workflowId, |
| collaborators: activeCollaborators, |
| }; |
|
|
| this.push.sendToUsers({ type: 'collaboratorsChanged', data: msgData }, userIds); |
| } |
| } |
|
|