2
0

🚸 (dify) Improve error display when streaming

This commit is contained in:
Baptiste Arnaud
2024-05-25 19:59:34 +02:00
parent e015385a7c
commit e1f1b58c1c
10 changed files with 126 additions and 90 deletions

View File

@ -153,7 +153,7 @@ export async function POST(req: Request) {
// eslint-disable-next-line @typescript-eslint/no-unused-vars // eslint-disable-next-line @typescript-eslint/no-unused-vars
set: (_1: string, _2: unknown) => {}, set: (_1: string, _2: unknown) => {},
} }
const stream = await action.run.stream.run({ const { stream } = await action.run.stream.run({
credentials: decryptedCredentials, credentials: decryptedCredentials,
options: block.options, options: block.options,
variables, variables,

View File

@ -23,7 +23,14 @@ type Props = {
messages: OpenAI.Chat.ChatCompletionMessage[] | undefined messages: OpenAI.Chat.ChatCompletionMessage[] | undefined
} }
export const getMessageStream = async ({ sessionId, messages }: Props) => { export const getMessageStream = async ({
sessionId,
messages,
}: Props): Promise<{
stream?: ReadableStream<any>
status?: number
message?: string
}> => {
const session = await getSession(sessionId) const session = await getSession(sessionId)
if (!session?.state || !session.state.currentBlockId) if (!session?.state || !session.state.currentBlockId)
@ -130,13 +137,15 @@ export const getMessageStream = async ({ sessionId, messages }: Props) => {
}) })
}, },
} }
const stream = await action.run.stream.run({ const { stream, httpError } = await action.run.stream.run({
credentials: decryptedCredentials, credentials: decryptedCredentials,
options: deepParseVariables( options: deepParseVariables(
session.state.typebotsQueue[0].typebot.variables session.state.typebotsQueue[0].typebot.variables
)(block.options), )(block.options),
variables, variables,
}) })
if (httpError) return httpError
if (!stream) return { status: 500, message: 'Could not create stream' } if (!stream) return { status: 500, message: 'Could not create stream' }
return { stream } return { stream }

View File

@ -168,7 +168,7 @@ export const createChatMessage = createAction({
stream: true, stream: true,
}) })
return AnthropicStream(response) return { stream: AnthropicStream(response) }
}, },
}, },
}, },

View File

@ -3,7 +3,7 @@ import { isDefined, isEmpty, isNotEmpty } from '@typebot.io/lib'
import { auth } from '../auth' import { auth } from '../auth'
import { defaultBaseUrl } from '../constants' import { defaultBaseUrl } from '../constants'
import { Chunk } from '../types' import { Chunk } from '../types'
import ky from 'ky' import ky, { HTTPError } from 'ky'
import { deprecatedCreateChatMessageOptions } from '../deprecated' import { deprecatedCreateChatMessageOptions } from '../deprecated'
export const createChatMessage = createAction({ export const createChatMessage = createAction({
@ -68,77 +68,94 @@ export const createChatMessage = createAction({
const existingDifyConversationId = conversationVariableId const existingDifyConversationId = conversationVariableId
? variables.get(conversationVariableId) ? variables.get(conversationVariableId)
: conversation_id : conversation_id
const response = await ky( try {
(apiEndpoint ?? defaultBaseUrl) + '/v1/chat-messages', const response = await ky(
{ (apiEndpoint ?? defaultBaseUrl) + '/v1/chat-messages',
method: 'POST', {
headers: { method: 'POST',
Authorization: `Bearer ${apiKey}`, headers: {
'Content-Type': 'application/json', Authorization: `Bearer ${apiKey}`,
}, 'Content-Type': 'application/json',
body: JSON.stringify({ },
inputs: body: JSON.stringify({
inputs?.reduce((acc, { key, value }) => { inputs:
if (isEmpty(key) || isEmpty(value)) return acc inputs?.reduce((acc, { key, value }) => {
return { if (isEmpty(key) || isEmpty(value)) return acc
...acc, return {
[key]: value, ...acc,
} [key]: value,
}, {}) ?? {}, }
query, }, {}) ?? {},
response_mode: 'streaming', query,
conversation_id: existingDifyConversationId, response_mode: 'streaming',
user, conversation_id: existingDifyConversationId,
files: [], user,
files: [],
}),
}
)
const reader = response.body?.getReader()
if (!reader) return {}
return {
stream: new ReadableStream({
async start(controller) {
try {
await processDifyStream(reader, {
onDone: () => {
controller.close()
},
onMessage: (message) => {
controller.enqueue(
new TextEncoder().encode(
'0:"' + message.replace(/"/g, '\\"') + '"\n'
)
)
},
onMessageEnd({ totalTokens, conversationId }) {
if (
conversationVariableId &&
isNotEmpty(conversationId) &&
isEmpty(existingDifyConversationId?.toString())
)
variables.set(conversationVariableId, conversationId)
if ((responseMapping?.length ?? 0) === 0) return
responseMapping?.forEach((mapping) => {
if (!mapping.variableId) return
if (
mapping.item === 'Conversation ID' &&
isNotEmpty(conversationId) &&
isEmpty(existingDifyConversationId?.toString())
)
variables.set(mapping.variableId, conversationId)
if (mapping.item === 'Total Tokens')
variables.set(mapping.variableId, totalTokens)
})
},
})
} catch (e) {
console.error(e)
controller.error(e) // Properly closing the stream with an error
}
},
}), }),
} }
) } catch (err) {
const reader = response.body?.getReader() if (err instanceof HTTPError) {
return {
if (!reader) return httpError: {
status: err.response.status,
return new ReadableStream({ message: await err.response.text(),
async start(controller) { },
try {
await processDifyStream(reader, {
onDone: () => {
controller.close()
},
onMessage: (message) => {
controller.enqueue(
new TextEncoder().encode('0:"' + message + '"\n')
)
},
onMessageEnd({ totalTokens, conversationId }) {
if (
conversationVariableId &&
isNotEmpty(conversationId) &&
isEmpty(existingDifyConversationId?.toString())
)
variables.set(conversationVariableId, conversationId)
if ((responseMapping?.length ?? 0) === 0) return
responseMapping?.forEach((mapping) => {
if (!mapping.variableId) return
if (
mapping.item === 'Conversation ID' &&
isNotEmpty(conversationId) &&
isEmpty(existingDifyConversationId?.toString())
)
variables.set(mapping.variableId, conversationId)
if (mapping.item === 'Total Tokens')
variables.set(mapping.variableId, totalTokens)
})
},
})
} catch (e) {
console.error(e)
controller.error(e) // Properly closing the stream with an error
} }
}, }
}) console.error(err)
return {}
}
}, },
}, },
server: async ({ server: async ({
@ -243,12 +260,15 @@ export const createChatMessage = createAction({
if (item === 'Total Tokens') if (item === 'Total Tokens')
variables.set(mapping.variableId, totalTokens) variables.set(mapping.variableId, totalTokens)
}) })
} catch (error) { } catch (err) {
logs.add({ if (err instanceof HTTPError) {
status: 'error', return logs.add({
description: 'Failed to create chat message', status: 'error',
}) description: err.message,
console.error(error) details: await err.response.text(),
})
}
console.error(err)
} }
}, },
}, },

View File

@ -139,7 +139,7 @@ export const createChatCompletion = createAction({
(res) => res.item === 'Message content' || !res.item (res) => res.item === 'Message content' || !res.item
)?.variableId, )?.variableId,
run: async ({ credentials: { apiKey }, options, variables }) => { run: async ({ credentials: { apiKey }, options, variables }) => {
if (!options.model) return if (!options.model) return {}
const model = createMistral({ const model = createMistral({
apiKey, apiKey,
})(options.model) })(options.model)
@ -149,7 +149,7 @@ export const createChatCompletion = createAction({
messages: parseMessages({ options, variables }), messages: parseMessages({ options, variables }),
}) })
return response.toAIStream() return { stream: response.toAIStream() }
}, },
}, },
}, },

View File

@ -62,11 +62,12 @@ export const createChatCompletion = createAction({
}), }),
stream: { stream: {
getStreamVariableId: getChatCompletionStreamVarId, getStreamVariableId: getChatCompletionStreamVarId,
run: (params) => run: async (params) => ({
runChatCompletionStream({ stream: await runChatCompletionStream({
...params, ...params,
config: { baseUrl: defaultOpenRouterOptions.baseUrl }, config: { baseUrl: defaultOpenRouterOptions.baseUrl },
}), }),
}),
}, },
}, },
}) })

View File

@ -142,8 +142,8 @@ export const askAssistant = createAction({
getStreamVariableId: ({ responseMapping }) => getStreamVariableId: ({ responseMapping }) =>
responseMapping?.find((m) => !m.item || m.item === 'Message') responseMapping?.find((m) => !m.item || m.item === 'Message')
?.variableId, ?.variableId,
run: async ({ credentials, options, variables }) => run: async ({ credentials, options, variables }) => ({
createAssistantStream({ stream: await createAssistantStream({
apiKey: credentials.apiKey, apiKey: credentials.apiKey,
assistantId: options.assistantId, assistantId: options.assistantId,
message: options.message, message: options.message,
@ -154,6 +154,7 @@ export const askAssistant = createAction({
functions: options.functions, functions: options.functions,
responseMapping: options.responseMapping, responseMapping: options.responseMapping,
}), }),
}),
}, },
server: async ({ server: async ({
credentials: { apiKey }, credentials: { apiKey },

View File

@ -86,14 +86,15 @@ export const createChatCompletion = createAction({
}), }),
stream: { stream: {
getStreamVariableId: getChatCompletionStreamVarId, getStreamVariableId: getChatCompletionStreamVarId,
run: (params) => run: async (params) => ({
runChatCompletionStream({ stream: await runChatCompletionStream({
...params, ...params,
config: { config: {
baseUrl: defaultOpenAIOptions.baseUrl, baseUrl: defaultOpenAIOptions.baseUrl,
defaultModel: defaultOpenAIOptions.model, defaultModel: defaultOpenAIOptions.model,
}, },
}), }),
}),
}, },
}, },
}) })

View File

@ -45,11 +45,12 @@ export const createChatCompletion = createAction({
}), }),
stream: { stream: {
getStreamVariableId: getChatCompletionStreamVarId, getStreamVariableId: getChatCompletionStreamVarId,
run: (params) => run: async (params) => ({
runChatCompletionStream({ stream: await runChatCompletionStream({
...params, ...params,
config: { baseUrl: defaultTogetherOptions.baseUrl }, config: { baseUrl: defaultTogetherOptions.baseUrl },
}), }),
}),
}, },
}, },
}) })

View File

@ -64,7 +64,10 @@ export type ActionDefinition<
credentials: CredentialsFromAuthDef<A> credentials: CredentialsFromAuthDef<A>
options: z.infer<BaseOptions> & z.infer<Options> options: z.infer<BaseOptions> & z.infer<Options>
variables: VariableStore variables: VariableStore
}) => Promise<ReadableStream<any> | undefined> }) => Promise<{
stream?: ReadableStream<any>
httpError?: { status: number; message: string }
}>
} }
web?: { web?: {
displayEmbedBubble?: { displayEmbedBubble?: {