| import { useEffect, useState } from 'react'; |
| import { v4 } from 'uuid'; |
| import { SSE } from 'sse.js'; |
| import { useSetRecoilState } from 'recoil'; |
| import { |
| request, |
| Constants, |
| |
| createPayload, |
| LocalStorageKeys, |
| removeNullishValues, |
| } from 'librechat-data-provider'; |
| import type { TMessage, TPayload, TSubmission, EventSubmission } from 'librechat-data-provider'; |
| import type { EventHandlerParams } from './useEventHandlers'; |
| import type { TResData } from '~/common'; |
| import { useGenTitleMutation, useGetStartupConfig, useGetUserBalance } from '~/data-provider'; |
| import { useAuthContext } from '~/hooks/AuthContext'; |
| import useEventHandlers from './useEventHandlers'; |
| import store from '~/store'; |
|
|
| const clearDraft = (conversationId?: string | null) => { |
| if (conversationId) { |
| localStorage.removeItem(`${LocalStorageKeys.TEXT_DRAFT}${conversationId}`); |
| localStorage.removeItem(`${LocalStorageKeys.FILES_DRAFT}${conversationId}`); |
| } else { |
| localStorage.removeItem(`${LocalStorageKeys.TEXT_DRAFT}${Constants.NEW_CONVO}`); |
| localStorage.removeItem(`${LocalStorageKeys.FILES_DRAFT}${Constants.NEW_CONVO}`); |
| } |
| }; |
|
|
| type ChatHelpers = Pick< |
| EventHandlerParams, |
| | 'setMessages' |
| | 'getMessages' |
| | 'setConversation' |
| | 'setIsSubmitting' |
| | 'newConversation' |
| | 'resetLatestMessage' |
| >; |
|
|
| export default function useSSE( |
| submission: TSubmission | null, |
| chatHelpers: ChatHelpers, |
| isAddedRequest = false, |
| runIndex = 0, |
| ) { |
| const genTitle = useGenTitleMutation(); |
| const setActiveRunId = useSetRecoilState(store.activeRunFamily(runIndex)); |
|
|
| const { token, isAuthenticated } = useAuthContext(); |
| const [completed, setCompleted] = useState(new Set()); |
| const setAbortScroll = useSetRecoilState(store.abortScrollFamily(runIndex)); |
| const setShowStopButton = useSetRecoilState(store.showStopButtonByIndex(runIndex)); |
|
|
| const { |
| setMessages, |
| getMessages, |
| setConversation, |
| setIsSubmitting, |
| newConversation, |
| resetLatestMessage, |
| } = chatHelpers; |
|
|
| const { |
| clearStepMaps, |
| stepHandler, |
| syncHandler, |
| finalHandler, |
| errorHandler, |
| messageHandler, |
| contentHandler, |
| createdHandler, |
| attachmentHandler, |
| abortConversation, |
| } = useEventHandlers({ |
| genTitle, |
| setMessages, |
| getMessages, |
| setCompleted, |
| isAddedRequest, |
| setConversation, |
| setIsSubmitting, |
| newConversation, |
| setShowStopButton, |
| resetLatestMessage, |
| }); |
|
|
| const { data: startupConfig } = useGetStartupConfig(); |
| const balanceQuery = useGetUserBalance({ |
| enabled: !!isAuthenticated && startupConfig?.balance?.enabled, |
| }); |
|
|
| useEffect(() => { |
| if (submission == null || Object.keys(submission).length === 0) { |
| return; |
| } |
|
|
| let { userMessage } = submission; |
|
|
| const payloadData = createPayload(submission); |
| let { payload } = payloadData; |
| payload = removeNullishValues(payload) as TPayload; |
|
|
| let textIndex = null; |
| clearStepMaps(); |
|
|
| const sse = new SSE(payloadData.server, { |
| payload: JSON.stringify(payload), |
| headers: { 'Content-Type': 'application/json', Authorization: `Bearer ${token}` }, |
| }); |
|
|
| sse.addEventListener('attachment', (e: MessageEvent) => { |
| try { |
| const data = JSON.parse(e.data); |
| attachmentHandler({ data, submission: submission as EventSubmission }); |
| } catch (error) { |
| console.error(error); |
| } |
| }); |
|
|
| sse.addEventListener('message', (e: MessageEvent) => { |
| const data = JSON.parse(e.data); |
|
|
| if (data.final != null) { |
| clearDraft(submission.conversation?.conversationId); |
| const { plugins } = data; |
| try { |
| finalHandler(data, { ...submission, plugins } as EventSubmission); |
| } catch (error) { |
| console.error('Error in finalHandler:', error); |
| setIsSubmitting(false); |
| setShowStopButton(false); |
| } |
| (startupConfig?.balance?.enabled ?? false) && balanceQuery.refetch(); |
| console.log('final', data); |
| return; |
| } else if (data.created != null) { |
| const runId = v4(); |
| setActiveRunId(runId); |
| userMessage = { |
| ...userMessage, |
| ...data.message, |
| overrideParentMessageId: userMessage.overrideParentMessageId, |
| }; |
|
|
| createdHandler(data, { ...submission, userMessage } as EventSubmission); |
| } else if (data.event != null) { |
| stepHandler(data, { ...submission, userMessage } as EventSubmission); |
| } else if (data.sync != null) { |
| const runId = v4(); |
| setActiveRunId(runId); |
| |
| syncHandler(data, { ...submission, userMessage } as EventSubmission); |
| } else if (data.type != null) { |
| const { text, index } = data; |
| if (text != null && index !== textIndex) { |
| textIndex = index; |
| } |
|
|
| contentHandler({ data, submission: submission as EventSubmission }); |
| } else { |
| const text = data.text ?? data.response; |
| const { plugin, plugins } = data; |
|
|
| const initialResponse = { |
| ...(submission.initialResponse as TMessage), |
| parentMessageId: data.parentMessageId, |
| messageId: data.messageId, |
| }; |
|
|
| if (data.message != null) { |
| messageHandler(text, { ...submission, plugin, plugins, userMessage, initialResponse }); |
| } |
| } |
| }); |
|
|
| sse.addEventListener('open', () => { |
| setAbortScroll(false); |
| console.log('connection is opened'); |
| }); |
|
|
| sse.addEventListener('cancel', async () => { |
| const streamKey = (submission as TSubmission | null)?.['initialResponse']?.messageId; |
| if (completed.has(streamKey)) { |
| setIsSubmitting(false); |
| setCompleted((prev) => { |
| prev.delete(streamKey); |
| return new Set(prev); |
| }); |
| return; |
| } |
|
|
| setCompleted((prev) => new Set(prev.add(streamKey))); |
| const latestMessages = getMessages(); |
| const conversationId = latestMessages?.[latestMessages.length - 1]?.conversationId; |
| try { |
| await abortConversation( |
| conversationId ?? |
| userMessage.conversationId ?? |
| submission.conversation?.conversationId ?? |
| '', |
| submission as EventSubmission, |
| latestMessages, |
| ); |
| } catch (error) { |
| console.error('Error during abort:', error); |
| setIsSubmitting(false); |
| setShowStopButton(false); |
| } |
| }); |
|
|
| sse.addEventListener('error', async (e: MessageEvent) => { |
| |
| if (e.responseCode === 401) { |
| |
| try { |
| const refreshResponse = await request.refreshToken(); |
| const token = refreshResponse?.token ?? ''; |
| if (!token) { |
| throw new Error('Token refresh failed.'); |
| } |
| sse.headers = { |
| 'Content-Type': 'application/json', |
| Authorization: `Bearer ${token}`, |
| }; |
|
|
| request.dispatchTokenUpdatedEvent(token); |
| sse.stream(); |
| return; |
| } catch (error) { |
| |
| console.log(error); |
| } |
| } |
|
|
| console.log('error in server stream.'); |
| (startupConfig?.balance?.enabled ?? false) && balanceQuery.refetch(); |
|
|
| let data: TResData | undefined = undefined; |
| try { |
| data = JSON.parse(e.data) as TResData; |
| } catch (error) { |
| console.error(error); |
| console.log(e); |
| setIsSubmitting(false); |
| } |
|
|
| errorHandler({ data, submission: { ...submission, userMessage } as EventSubmission }); |
| }); |
|
|
| setIsSubmitting(true); |
| sse.stream(); |
|
|
| return () => { |
| const isCancelled = sse.readyState <= 1; |
| sse.close(); |
| if (isCancelled) { |
| const e = new Event('cancel'); |
| |
| sse.dispatchEvent(e); |
| } |
| }; |
| |
| }, [submission]); |
| } |
|
|