2
0

🐛 Fix openai total tokens variable set when streaming

This commit is contained in:
Baptiste Arnaud
2024-07-17 14:47:12 +02:00
parent 0ee820b4da
commit c6645d4505
8 changed files with 72 additions and 45 deletions

View File

@ -13345,6 +13345,7 @@
"google sheets",
"stripe",
"whatsApp",
"zemanticAi",
"openai",
"zemantic-ai",
"chat-node",
@ -13382,6 +13383,7 @@
"google sheets",
"stripe",
"whatsApp",
"zemanticAi",
"openai",
"zemantic-ai",
"chat-node",

View File

@ -7,7 +7,6 @@ import { NextResponse } from 'next/dist/server/web/spec-extension/response'
import { getBlockById } from '@typebot.io/schemas/helpers'
import { forgedBlocks } from '@typebot.io/forge-repository/definitions'
import { decryptV2 } from '@typebot.io/lib/api/encryption/decryptV2'
import { VariableStore } from '@typebot.io/forge'
import {
ParseVariablesOptions,
parseVariables,
@ -16,6 +15,7 @@ import { IntegrationBlockType } from '@typebot.io/schemas/features/blocks/integr
import { getChatCompletionStream } from '@typebot.io/bot-engine/blocks/integrations/legacy/openai/getChatCompletionStream'
import { ChatCompletionOpenAIOptions } from '@typebot.io/schemas/features/blocks/integrations/openai/schema'
import { isForgedBlockType } from '@typebot.io/schemas/features/blocks/forged/helpers'
import { AsyncVariableStore } from '@typebot.io/forge/types'
export const preferredRegion = 'lhr1'
export const dynamic = 'force-dynamic'
@ -140,7 +140,7 @@ export async function POST(req: Request) {
credentials.data,
credentials.iv
)
const variables: VariableStore = {
const variables: AsyncVariableStore = {
list: () => state.typebotsQueue[0].typebot.variables,
get: (id: string) => {
const variable = state.typebotsQueue[0].typebot.variables.find(
@ -151,7 +151,7 @@ export async function POST(req: Request) {
parse: (text: string, params?: ParseVariablesOptions) =>
parseVariables(state.typebotsQueue[0].typebot.variables, params)(text),
// eslint-disable-next-line @typescript-eslint/no-unused-vars
set: (_1: string, _2: unknown) => {},
set: async (_1: string, _2: unknown) => {},
}
const { stream } = await action.run.stream.run({
credentials: decryptedCredentials,

View File

@ -3,7 +3,7 @@ import { ChatCompletionOpenAIOptions } from '@typebot.io/schemas/features/blocks
import { OpenAI } from 'openai'
import { decryptV2 } from '@typebot.io/lib/api/encryption/decryptV2'
import { forgedBlocks } from '@typebot.io/forge-repository/definitions'
import { VariableStore } from '@typebot.io/forge'
import { AsyncVariableStore } from '@typebot.io/forge'
import {
ParseVariablesOptions,
parseVariables,
@ -104,7 +104,7 @@ export const getMessageStream = async ({
credentials.iv
)
const variables: VariableStore = {
const variables: AsyncVariableStore = {
list: () => session.state.typebotsQueue[0].typebot.variables,
get: (id: string) => {
const variable = session.state.typebotsQueue[0].typebot.variables.find(

View File

@ -275,7 +275,6 @@ export const ConversationContainer = (props: Props) => {
const processClientSideActions = async (
actions: NonNullable<ContinueChatResponse['clientSideActions']>
) => {
console.log('YES')
if (isRecovered()) return
for (const action of actions) {
if (

View File

@ -125,28 +125,34 @@ export const createChatMessage = createAction({
)
)
},
onMessageEnd({ totalTokens, conversationId }) {
async onMessageEnd({ totalTokens, conversationId }) {
if (
conversationVariableId &&
isNotEmpty(conversationId) &&
isEmpty(existingDifyConversationId?.toString())
)
variables.set(conversationVariableId, conversationId)
await variables.set(
conversationVariableId,
conversationId
)
if ((responseMapping?.length ?? 0) === 0) return
responseMapping?.forEach((mapping) => {
if (!mapping.variableId) return
for (const mapping of responseMapping ?? []) {
if (!mapping.variableId) continue
if (
mapping.item === 'Conversation ID' &&
isNotEmpty(conversationId) &&
isEmpty(existingDifyConversationId?.toString())
)
variables.set(mapping.variableId, conversationId)
await variables.set(
mapping.variableId,
conversationId
)
if (mapping.item === 'Total Tokens')
variables.set(mapping.variableId, totalTokens)
})
await variables.set(mapping.variableId, totalTokens)
}
},
})
} catch (e) {
@ -235,7 +241,10 @@ export const createChatMessage = createAction({
onMessage: (message) => {
answer += message
},
onMessageEnd: ({ totalTokens: tokens, conversationId: id }) => {
onMessageEnd: async ({
totalTokens: tokens,
conversationId: id,
}) => {
totalTokens = tokens
conversationId = id
},
@ -302,7 +311,7 @@ const processDifyStream = async (
}: {
totalTokens?: number
conversationId: string
}) => void
}) => Promise<void>
}
) => {
let jsonChunk = ''
@ -317,27 +326,27 @@ const processDifyStream = async (
const chunk = new TextDecoder().decode(value)
const lines = chunk.toString().split('\n') as string[]
lines
.filter((line) => line.length > 0 && line !== '\n')
.forEach((line) => {
jsonChunk += line
if (jsonChunk.startsWith('event: ')) {
jsonChunk = ''
return
}
if (!jsonChunk.startsWith('data: ') || !jsonChunk.endsWith('}')) return
const data = JSON.parse(jsonChunk.slice(6)) as Chunk
for (const line of lines.filter(
(line) => line.length > 0 && line !== '\n'
)) {
jsonChunk += line
if (jsonChunk.startsWith('event: ')) {
jsonChunk = ''
if (data.event === 'message' || data.event === 'agent_message') {
callbacks.onMessage(data.answer)
}
if (data.event === 'message_end') {
callbacks.onMessageEnd?.({
totalTokens: data.metadata.usage?.total_tokens,
conversationId: data.conversation_id,
})
}
})
continue
}
if (!jsonChunk.startsWith('data: ') || !jsonChunk.endsWith('}')) continue
const data = JSON.parse(jsonChunk.slice(6)) as Chunk
jsonChunk = ''
if (data.event === 'message' || data.event === 'agent_message') {
callbacks.onMessage(data.answer)
}
if (data.event === 'message_end') {
await callbacks.onMessageEnd?.({
totalTokens: data.metadata.usage?.total_tokens,
conversationId: data.conversation_id,
})
}
}
}
}

View File

@ -1,4 +1,5 @@
import {
AsyncVariableStore,
LogsStore,
VariableStore,
createAction,
@ -237,7 +238,7 @@ const createAssistantStream = async ({
variableId?: string | undefined
}[]
logs?: LogsStore
variables: VariableStore
variables: AsyncVariableStore | VariableStore
}): Promise<ReadableStream | undefined> => {
if (isEmpty(assistantId)) {
logs?.add('Assistant ID is empty')
@ -277,8 +278,9 @@ const createAssistantStream = async ({
(mapping) => mapping.item === 'Thread ID'
)
if (threadIdResponseMapping?.variableId)
variables.set(threadIdResponseMapping.variableId, currentThreadId)
else if (threadVariableId) variables.set(threadVariableId, currentThreadId)
await variables.set(threadIdResponseMapping.variableId, currentThreadId)
else if (threadVariableId)
await variables.set(threadVariableId, currentThreadId)
}
if (!currentThreadId) {
@ -331,9 +333,9 @@ const createAssistantStream = async ({
args: parameters,
})
newVariables?.forEach((variable) => {
variables.set(variable.id, variable.value)
})
for (const variable of newVariables ?? []) {
await variables.set(variable.id, variable.value)
}
return {
tool_call_id: toolCall.id,

View File

@ -1,4 +1,4 @@
import { VariableStore } from '@typebot.io/forge/types'
import { AsyncVariableStore, VariableStore } from '@typebot.io/forge/types'
import { ChatCompletionOptions } from './parseChatCompletionOptions'
import { APICallError, streamText, ToolCallPart, ToolResultPart } from 'ai'
import { createOpenAI } from '@ai-sdk/openai'
@ -12,7 +12,7 @@ import { appendToolResultsToMessages } from '@typebot.io/ai/appendToolResultsToM
type Props = {
credentials: { apiKey?: string }
options: ChatCompletionOptions
variables: VariableStore
variables: AsyncVariableStore
config: { baseUrl: string; defaultModel?: string }
compatibility?: 'strict' | 'compatible'
}
@ -59,6 +59,7 @@ export const runOpenAIChatCompletionStream = async ({
const response = await streamText(streamConfig)
let totalToolCalls = 0
let totalTokens = 0
let toolCalls: ToolCallPart[] = []
let toolResults: ToolResultPart[] = []
@ -69,6 +70,8 @@ export const runOpenAIChatCompletionStream = async ({
await pumpStreamUntilDone(controller, reader)
totalTokens = (await response.usage).totalTokens
toolCalls = await response.toolCalls
if (toolCalls.length > 0)
toolResults = (await response.toolResults) as ToolResultPart[]
@ -89,11 +92,19 @@ export const runOpenAIChatCompletionStream = async ({
})
const reader = newResponse.toAIStream().getReader()
await pumpStreamUntilDone(controller, reader)
totalTokens += (await newResponse.usage).totalTokens
toolCalls = await newResponse.toolCalls
if (toolCalls.length > 0)
toolResults = (await newResponse.toolResults) as ToolResultPart[]
}
const totalTokenVariableId = options.responseMapping?.find(
(mapping) => mapping.item === 'Total tokens'
)?.variableId
if (totalTokenVariableId)
await variables.set(totalTokenVariableId, totalTokens)
controller.close()
},
}),

View File

@ -13,6 +13,10 @@ export type VariableStore = {
}[]
}
export type AsyncVariableStore = Omit<VariableStore, 'set'> & {
set: (variableId: string, value: unknown) => Promise<void>
}
export type LogsStore = {
add: (
log:
@ -63,7 +67,7 @@ export type ActionDefinition<
run: (params: {
credentials: CredentialsFromAuthDef<A>
options: z.infer<BaseOptions> & z.infer<Options>
variables: VariableStore
variables: AsyncVariableStore
}) => Promise<{
stream?: ReadableStream<any>
httpError?: { status: number; message: string }