From 58f2e3bdeb5a9f08d60a4fdeb4c8313b8f3cffad Mon Sep 17 00:00:00 2001 From: Baptiste Arnaud Date: Thu, 23 May 2024 16:09:07 +0200 Subject: [PATCH] :zap: (difyAi) Enable streaming with Dify.AI block Closes #1281 --- .../difyAi/actions/createChatMessage.ts | 206 ++++++++++++++---- 1 file changed, 161 insertions(+), 45 deletions(-) diff --git a/packages/forge/blocks/difyAi/actions/createChatMessage.ts b/packages/forge/blocks/difyAi/actions/createChatMessage.ts index b7d66feea..d5d4d505b 100644 --- a/packages/forge/blocks/difyAi/actions/createChatMessage.ts +++ b/packages/forge/blocks/difyAi/actions/createChatMessage.ts @@ -49,6 +49,98 @@ export const createChatMessage = createAction({ getSetVariableIds: ({ responseMapping }) => responseMapping?.map((r) => r.variableId).filter(isDefined) ?? [], run: { + stream: { + getStreamVariableId: ({ responseMapping }) => + responseMapping?.find((r) => !r.item || r.item === 'Answer') + ?.variableId, + run: async ({ + credentials: { apiEndpoint, apiKey }, + options: { + conversation_id, + conversationVariableId, + query, + user, + inputs, + responseMapping, + }, + variables, + }) => { + const existingDifyConversationId = conversationVariableId + ? variables.get(conversationVariableId) + : conversation_id + const response = await ky( + (apiEndpoint ?? defaultBaseUrl) + '/v1/chat-messages', + { + method: 'POST', + headers: { + Authorization: `Bearer ${apiKey}`, + 'Content-Type': 'application/json', + }, + body: JSON.stringify({ + inputs: + inputs?.reduce((acc, { key, value }) => { + if (isEmpty(key) || isEmpty(value)) return acc + return { + ...acc, + [key]: value, + } + }, {}) ?? {}, + query, + response_mode: 'streaming', + conversation_id: existingDifyConversationId, + user, + files: [], + }), + } + ) + const reader = response.body?.getReader() + + if (!reader) return + + return new ReadableStream({ + async start(controller) { + try { + await processDifyStream(reader, { + onDone: () => { + controller.close() + }, + onMessage: (message) => { + controller.enqueue( + new TextEncoder().encode('0:"' + message + '"\n') + ) + }, + onMessageEnd({ totalTokens, conversationId }) { + if ( + conversationVariableId && + isNotEmpty(conversationId) && + isEmpty(existingDifyConversationId?.toString()) + ) + variables.set(conversationVariableId, conversationId) + + if ((responseMapping?.length ?? 0) === 0) return + responseMapping?.forEach((mapping) => { + if (!mapping.variableId) return + + if ( + mapping.item === 'Conversation ID' && + isNotEmpty(conversationId) && + isEmpty(existingDifyConversationId?.toString()) + ) + variables.set(mapping.variableId, conversationId) + + if (mapping.item === 'Total Tokens') + variables.set(mapping.variableId, totalTokens) + }) + }, + }) + } catch (e) { + console.error(e) + controller.error(e) // Properly closing the stream with an error + } + }, + }) + }, + }, server: async ({ credentials: { apiEndpoint, apiKey }, options: { @@ -105,55 +197,35 @@ export const createChatMessage = createAction({ conversationId: string | undefined totalTokens: number | undefined }>(async (resolve, reject) => { - let jsonChunk = '' let answer = '' let conversationId: string | undefined let totalTokens: number | undefined try { - while (true) { - const { value, done } = await reader.read() - if (done) { + await processDifyStream(reader, { + onMessage: (message) => { + answer += message + }, + onMessageEnd: ({ totalTokens: tokens, conversationId: id }) => { + totalTokens = tokens + conversationId = id + }, + onDone: () => { resolve({ answer, conversationId, totalTokens }) - return - } - - const chunk = new TextDecoder().decode(value) - - const lines = chunk.toString().split('\n') as string[] - lines - .filter((line) => line.length > 0 && line !== '\n') - .forEach((line) => { - jsonChunk += line - if (jsonChunk.startsWith('event: ')) { - jsonChunk = '' - return - } - if ( - !jsonChunk.startsWith('data: ') || - !jsonChunk.endsWith('}') - ) - return - - const data = JSON.parse(jsonChunk.slice(6)) as Chunk - jsonChunk = '' - if ( - data.event === 'message' || - data.event === 'agent_message' - ) { - answer += data.answer - } - if (data.event === 'message_end') { - totalTokens = data.metadata.usage.total_tokens - conversationId = data.conversation_id - } - }) - } + }, + }) } catch (e) { reject(e) } }) + if ( + conversationVariableId && + isNotEmpty(conversationId) && + isEmpty(existingDifyConversationId?.toString()) + ) + variables.set(conversationVariableId, conversationId) + responseMapping?.forEach((mapping) => { if (!mapping.variableId) return @@ -168,13 +240,6 @@ export const createChatMessage = createAction({ ) variables.set(mapping.variableId, conversationId) - if ( - conversationVariableId && - isNotEmpty(conversationId) && - isEmpty(existingDifyConversationId?.toString()) - ) - variables.set(conversationVariableId, conversationId) - if (item === 'Total Tokens') variables.set(mapping.variableId, totalTokens) }) @@ -193,3 +258,54 @@ const convertNonMarkdownLinks = (text: string) => { const nonMarkdownLinks = /(? `[${match}](${match})`) } + +const processDifyStream = async ( + reader: ReadableStreamDefaultReader, + callbacks: { + onDone: () => void + onMessage: (message: string) => void + onMessageEnd?: ({ + totalTokens, + conversationId, + }: { + totalTokens: number + conversationId: string + }) => void + } +) => { + let jsonChunk = '' + + while (true) { + const { value, done } = await reader.read() + if (done) { + callbacks.onDone() + return + } + + const chunk = new TextDecoder().decode(value) + + const lines = chunk.toString().split('\n') as string[] + lines + .filter((line) => line.length > 0 && line !== '\n') + .forEach((line) => { + jsonChunk += line + if (jsonChunk.startsWith('event: ')) { + jsonChunk = '' + return + } + if (!jsonChunk.startsWith('data: ') || !jsonChunk.endsWith('}')) return + + const data = JSON.parse(jsonChunk.slice(6)) as Chunk + jsonChunk = '' + if (data.event === 'message' || data.event === 'agent_message') { + callbacks.onMessage(data.answer) + } + if (data.event === 'message_end') { + callbacks.onMessageEnd?.({ + totalTokens: data.metadata.usage.total_tokens, + conversationId: data.conversation_id, + }) + } + }) + } +}