2
0

(difyAi) Enable streaming with Dify.AI block

Closes #1281
This commit is contained in:
Baptiste Arnaud
2024-05-23 16:09:07 +02:00
parent c2f3c9709f
commit 58f2e3bdeb

View File

@ -49,6 +49,98 @@ export const createChatMessage = createAction({
getSetVariableIds: ({ responseMapping }) => getSetVariableIds: ({ responseMapping }) =>
responseMapping?.map((r) => r.variableId).filter(isDefined) ?? [], responseMapping?.map((r) => r.variableId).filter(isDefined) ?? [],
run: { run: {
stream: {
getStreamVariableId: ({ responseMapping }) =>
responseMapping?.find((r) => !r.item || r.item === 'Answer')
?.variableId,
run: async ({
credentials: { apiEndpoint, apiKey },
options: {
conversation_id,
conversationVariableId,
query,
user,
inputs,
responseMapping,
},
variables,
}) => {
const existingDifyConversationId = conversationVariableId
? variables.get(conversationVariableId)
: conversation_id
const response = await ky(
(apiEndpoint ?? defaultBaseUrl) + '/v1/chat-messages',
{
method: 'POST',
headers: {
Authorization: `Bearer ${apiKey}`,
'Content-Type': 'application/json',
},
body: JSON.stringify({
inputs:
inputs?.reduce((acc, { key, value }) => {
if (isEmpty(key) || isEmpty(value)) return acc
return {
...acc,
[key]: value,
}
}, {}) ?? {},
query,
response_mode: 'streaming',
conversation_id: existingDifyConversationId,
user,
files: [],
}),
}
)
const reader = response.body?.getReader()
if (!reader) return
return new ReadableStream({
async start(controller) {
try {
await processDifyStream(reader, {
onDone: () => {
controller.close()
},
onMessage: (message) => {
controller.enqueue(
new TextEncoder().encode('0:"' + message + '"\n')
)
},
onMessageEnd({ totalTokens, conversationId }) {
if (
conversationVariableId &&
isNotEmpty(conversationId) &&
isEmpty(existingDifyConversationId?.toString())
)
variables.set(conversationVariableId, conversationId)
if ((responseMapping?.length ?? 0) === 0) return
responseMapping?.forEach((mapping) => {
if (!mapping.variableId) return
if (
mapping.item === 'Conversation ID' &&
isNotEmpty(conversationId) &&
isEmpty(existingDifyConversationId?.toString())
)
variables.set(mapping.variableId, conversationId)
if (mapping.item === 'Total Tokens')
variables.set(mapping.variableId, totalTokens)
})
},
})
} catch (e) {
console.error(e)
controller.error(e) // Properly closing the stream with an error
}
},
})
},
},
server: async ({ server: async ({
credentials: { apiEndpoint, apiKey }, credentials: { apiEndpoint, apiKey },
options: { options: {
@ -105,55 +197,35 @@ export const createChatMessage = createAction({
conversationId: string | undefined conversationId: string | undefined
totalTokens: number | undefined totalTokens: number | undefined
}>(async (resolve, reject) => { }>(async (resolve, reject) => {
let jsonChunk = ''
let answer = '' let answer = ''
let conversationId: string | undefined let conversationId: string | undefined
let totalTokens: number | undefined let totalTokens: number | undefined
try { try {
while (true) { await processDifyStream(reader, {
const { value, done } = await reader.read() onMessage: (message) => {
if (done) { answer += message
},
onMessageEnd: ({ totalTokens: tokens, conversationId: id }) => {
totalTokens = tokens
conversationId = id
},
onDone: () => {
resolve({ answer, conversationId, totalTokens }) resolve({ answer, conversationId, totalTokens })
return },
} })
const chunk = new TextDecoder().decode(value)
const lines = chunk.toString().split('\n') as string[]
lines
.filter((line) => line.length > 0 && line !== '\n')
.forEach((line) => {
jsonChunk += line
if (jsonChunk.startsWith('event: ')) {
jsonChunk = ''
return
}
if (
!jsonChunk.startsWith('data: ') ||
!jsonChunk.endsWith('}')
)
return
const data = JSON.parse(jsonChunk.slice(6)) as Chunk
jsonChunk = ''
if (
data.event === 'message' ||
data.event === 'agent_message'
) {
answer += data.answer
}
if (data.event === 'message_end') {
totalTokens = data.metadata.usage.total_tokens
conversationId = data.conversation_id
}
})
}
} catch (e) { } catch (e) {
reject(e) reject(e)
} }
}) })
if (
conversationVariableId &&
isNotEmpty(conversationId) &&
isEmpty(existingDifyConversationId?.toString())
)
variables.set(conversationVariableId, conversationId)
responseMapping?.forEach((mapping) => { responseMapping?.forEach((mapping) => {
if (!mapping.variableId) return if (!mapping.variableId) return
@ -168,13 +240,6 @@ export const createChatMessage = createAction({
) )
variables.set(mapping.variableId, conversationId) variables.set(mapping.variableId, conversationId)
if (
conversationVariableId &&
isNotEmpty(conversationId) &&
isEmpty(existingDifyConversationId?.toString())
)
variables.set(conversationVariableId, conversationId)
if (item === 'Total Tokens') if (item === 'Total Tokens')
variables.set(mapping.variableId, totalTokens) variables.set(mapping.variableId, totalTokens)
}) })
@ -193,3 +258,54 @@ const convertNonMarkdownLinks = (text: string) => {
const nonMarkdownLinks = /(?<![\([])https?:\/\/\S+/g const nonMarkdownLinks = /(?<![\([])https?:\/\/\S+/g
return text.replace(nonMarkdownLinks, (match) => `[${match}](${match})`) return text.replace(nonMarkdownLinks, (match) => `[${match}](${match})`)
} }
const processDifyStream = async (
reader: ReadableStreamDefaultReader<Uint8Array>,
callbacks: {
onDone: () => void
onMessage: (message: string) => void
onMessageEnd?: ({
totalTokens,
conversationId,
}: {
totalTokens: number
conversationId: string
}) => void
}
) => {
let jsonChunk = ''
while (true) {
const { value, done } = await reader.read()
if (done) {
callbacks.onDone()
return
}
const chunk = new TextDecoder().decode(value)
const lines = chunk.toString().split('\n') as string[]
lines
.filter((line) => line.length > 0 && line !== '\n')
.forEach((line) => {
jsonChunk += line
if (jsonChunk.startsWith('event: ')) {
jsonChunk = ''
return
}
if (!jsonChunk.startsWith('data: ') || !jsonChunk.endsWith('}')) return
const data = JSON.parse(jsonChunk.slice(6)) as Chunk
jsonChunk = ''
if (data.event === 'message' || data.event === 'agent_message') {
callbacks.onMessage(data.answer)
}
if (data.event === 'message_end') {
callbacks.onMessageEnd?.({
totalTokens: data.metadata.usage.total_tokens,
conversationId: data.conversation_id,
})
}
})
}
}