diff --git a/apps/docs/openapi/builder.json b/apps/docs/openapi/builder.json index f6f679ba7..2da6f2fcf 100644 --- a/apps/docs/openapi/builder.json +++ b/apps/docs/openapi/builder.json @@ -13345,6 +13345,7 @@ "google sheets", "stripe", "whatsApp", + "zemanticAi", "openai", "zemantic-ai", "chat-node", @@ -13382,6 +13383,7 @@ "google sheets", "stripe", "whatsApp", + "zemanticAi", "openai", "zemantic-ai", "chat-node", diff --git a/apps/viewer/src/app/api/integrations/openai/streamer/route.ts b/apps/viewer/src/app/api/integrations/openai/streamer/route.ts index 07ab21068..0e4606d9a 100644 --- a/apps/viewer/src/app/api/integrations/openai/streamer/route.ts +++ b/apps/viewer/src/app/api/integrations/openai/streamer/route.ts @@ -7,7 +7,6 @@ import { NextResponse } from 'next/dist/server/web/spec-extension/response' import { getBlockById } from '@typebot.io/schemas/helpers' import { forgedBlocks } from '@typebot.io/forge-repository/definitions' import { decryptV2 } from '@typebot.io/lib/api/encryption/decryptV2' -import { VariableStore } from '@typebot.io/forge' import { ParseVariablesOptions, parseVariables, @@ -16,6 +15,7 @@ import { IntegrationBlockType } from '@typebot.io/schemas/features/blocks/integr import { getChatCompletionStream } from '@typebot.io/bot-engine/blocks/integrations/legacy/openai/getChatCompletionStream' import { ChatCompletionOpenAIOptions } from '@typebot.io/schemas/features/blocks/integrations/openai/schema' import { isForgedBlockType } from '@typebot.io/schemas/features/blocks/forged/helpers' +import { AsyncVariableStore } from '@typebot.io/forge/types' export const preferredRegion = 'lhr1' export const dynamic = 'force-dynamic' @@ -140,7 +140,7 @@ export async function POST(req: Request) { credentials.data, credentials.iv ) - const variables: VariableStore = { + const variables: AsyncVariableStore = { list: () => state.typebotsQueue[0].typebot.variables, get: (id: string) => { const variable = state.typebotsQueue[0].typebot.variables.find( @@ -151,7 +151,7 @@ export async function POST(req: Request) { parse: (text: string, params?: ParseVariablesOptions) => parseVariables(state.typebotsQueue[0].typebot.variables, params)(text), // eslint-disable-next-line @typescript-eslint/no-unused-vars - set: (_1: string, _2: unknown) => {}, + set: async (_1: string, _2: unknown) => {}, } const { stream } = await action.run.stream.run({ credentials: decryptedCredentials, diff --git a/packages/bot-engine/apiHandlers/getMessageStream.ts b/packages/bot-engine/apiHandlers/getMessageStream.ts index 65008b159..6c58a6c34 100644 --- a/packages/bot-engine/apiHandlers/getMessageStream.ts +++ b/packages/bot-engine/apiHandlers/getMessageStream.ts @@ -3,7 +3,7 @@ import { ChatCompletionOpenAIOptions } from '@typebot.io/schemas/features/blocks import { OpenAI } from 'openai' import { decryptV2 } from '@typebot.io/lib/api/encryption/decryptV2' import { forgedBlocks } from '@typebot.io/forge-repository/definitions' -import { VariableStore } from '@typebot.io/forge' +import { AsyncVariableStore } from '@typebot.io/forge' import { ParseVariablesOptions, parseVariables, @@ -104,7 +104,7 @@ export const getMessageStream = async ({ credentials.iv ) - const variables: VariableStore = { + const variables: AsyncVariableStore = { list: () => session.state.typebotsQueue[0].typebot.variables, get: (id: string) => { const variable = session.state.typebotsQueue[0].typebot.variables.find( diff --git a/packages/embeds/js/src/components/ConversationContainer/ConversationContainer.tsx b/packages/embeds/js/src/components/ConversationContainer/ConversationContainer.tsx index bf8279903..6260d6e3e 100644 --- a/packages/embeds/js/src/components/ConversationContainer/ConversationContainer.tsx +++ b/packages/embeds/js/src/components/ConversationContainer/ConversationContainer.tsx @@ -275,7 +275,6 @@ export const ConversationContainer = (props: Props) => { const processClientSideActions = async ( actions: NonNullable ) => { - console.log('YES') if (isRecovered()) return for (const action of actions) { if ( diff --git a/packages/forge/blocks/difyAi/actions/createChatMessage.ts b/packages/forge/blocks/difyAi/actions/createChatMessage.ts index 7ffaea7fe..ee22b022e 100644 --- a/packages/forge/blocks/difyAi/actions/createChatMessage.ts +++ b/packages/forge/blocks/difyAi/actions/createChatMessage.ts @@ -125,28 +125,34 @@ export const createChatMessage = createAction({ ) ) }, - onMessageEnd({ totalTokens, conversationId }) { + async onMessageEnd({ totalTokens, conversationId }) { if ( conversationVariableId && isNotEmpty(conversationId) && isEmpty(existingDifyConversationId?.toString()) ) - variables.set(conversationVariableId, conversationId) + await variables.set( + conversationVariableId, + conversationId + ) if ((responseMapping?.length ?? 0) === 0) return - responseMapping?.forEach((mapping) => { - if (!mapping.variableId) return + for (const mapping of responseMapping ?? []) { + if (!mapping.variableId) continue if ( mapping.item === 'Conversation ID' && isNotEmpty(conversationId) && isEmpty(existingDifyConversationId?.toString()) ) - variables.set(mapping.variableId, conversationId) + await variables.set( + mapping.variableId, + conversationId + ) if (mapping.item === 'Total Tokens') - variables.set(mapping.variableId, totalTokens) - }) + await variables.set(mapping.variableId, totalTokens) + } }, }) } catch (e) { @@ -235,7 +241,10 @@ export const createChatMessage = createAction({ onMessage: (message) => { answer += message }, - onMessageEnd: ({ totalTokens: tokens, conversationId: id }) => { + onMessageEnd: async ({ + totalTokens: tokens, + conversationId: id, + }) => { totalTokens = tokens conversationId = id }, @@ -302,7 +311,7 @@ const processDifyStream = async ( }: { totalTokens?: number conversationId: string - }) => void + }) => Promise } ) => { let jsonChunk = '' @@ -317,27 +326,27 @@ const processDifyStream = async ( const chunk = new TextDecoder().decode(value) const lines = chunk.toString().split('\n') as string[] - lines - .filter((line) => line.length > 0 && line !== '\n') - .forEach((line) => { - jsonChunk += line - if (jsonChunk.startsWith('event: ')) { - jsonChunk = '' - return - } - if (!jsonChunk.startsWith('data: ') || !jsonChunk.endsWith('}')) return - - const data = JSON.parse(jsonChunk.slice(6)) as Chunk + for (const line of lines.filter( + (line) => line.length > 0 && line !== '\n' + )) { + jsonChunk += line + if (jsonChunk.startsWith('event: ')) { jsonChunk = '' - if (data.event === 'message' || data.event === 'agent_message') { - callbacks.onMessage(data.answer) - } - if (data.event === 'message_end') { - callbacks.onMessageEnd?.({ - totalTokens: data.metadata.usage?.total_tokens, - conversationId: data.conversation_id, - }) - } - }) + continue + } + if (!jsonChunk.startsWith('data: ') || !jsonChunk.endsWith('}')) continue + + const data = JSON.parse(jsonChunk.slice(6)) as Chunk + jsonChunk = '' + if (data.event === 'message' || data.event === 'agent_message') { + callbacks.onMessage(data.answer) + } + if (data.event === 'message_end') { + await callbacks.onMessageEnd?.({ + totalTokens: data.metadata.usage?.total_tokens, + conversationId: data.conversation_id, + }) + } + } } } diff --git a/packages/forge/blocks/openai/actions/askAssistant.tsx b/packages/forge/blocks/openai/actions/askAssistant.tsx index bc3f4f3e8..14693b1bb 100644 --- a/packages/forge/blocks/openai/actions/askAssistant.tsx +++ b/packages/forge/blocks/openai/actions/askAssistant.tsx @@ -1,4 +1,5 @@ import { + AsyncVariableStore, LogsStore, VariableStore, createAction, @@ -237,7 +238,7 @@ const createAssistantStream = async ({ variableId?: string | undefined }[] logs?: LogsStore - variables: VariableStore + variables: AsyncVariableStore | VariableStore }): Promise => { if (isEmpty(assistantId)) { logs?.add('Assistant ID is empty') @@ -277,8 +278,9 @@ const createAssistantStream = async ({ (mapping) => mapping.item === 'Thread ID' ) if (threadIdResponseMapping?.variableId) - variables.set(threadIdResponseMapping.variableId, currentThreadId) - else if (threadVariableId) variables.set(threadVariableId, currentThreadId) + await variables.set(threadIdResponseMapping.variableId, currentThreadId) + else if (threadVariableId) + await variables.set(threadVariableId, currentThreadId) } if (!currentThreadId) { @@ -331,9 +333,9 @@ const createAssistantStream = async ({ args: parameters, }) - newVariables?.forEach((variable) => { - variables.set(variable.id, variable.value) - }) + for (const variable of newVariables ?? []) { + await variables.set(variable.id, variable.value) + } return { tool_call_id: toolCall.id, diff --git a/packages/forge/blocks/openai/shared/runOpenAIChatCompletionStream.ts b/packages/forge/blocks/openai/shared/runOpenAIChatCompletionStream.ts index 9031c583e..067cfe24a 100644 --- a/packages/forge/blocks/openai/shared/runOpenAIChatCompletionStream.ts +++ b/packages/forge/blocks/openai/shared/runOpenAIChatCompletionStream.ts @@ -1,4 +1,4 @@ -import { VariableStore } from '@typebot.io/forge/types' +import { AsyncVariableStore, VariableStore } from '@typebot.io/forge/types' import { ChatCompletionOptions } from './parseChatCompletionOptions' import { APICallError, streamText, ToolCallPart, ToolResultPart } from 'ai' import { createOpenAI } from '@ai-sdk/openai' @@ -12,7 +12,7 @@ import { appendToolResultsToMessages } from '@typebot.io/ai/appendToolResultsToM type Props = { credentials: { apiKey?: string } options: ChatCompletionOptions - variables: VariableStore + variables: AsyncVariableStore config: { baseUrl: string; defaultModel?: string } compatibility?: 'strict' | 'compatible' } @@ -59,6 +59,7 @@ export const runOpenAIChatCompletionStream = async ({ const response = await streamText(streamConfig) let totalToolCalls = 0 + let totalTokens = 0 let toolCalls: ToolCallPart[] = [] let toolResults: ToolResultPart[] = [] @@ -69,6 +70,8 @@ export const runOpenAIChatCompletionStream = async ({ await pumpStreamUntilDone(controller, reader) + totalTokens = (await response.usage).totalTokens + toolCalls = await response.toolCalls if (toolCalls.length > 0) toolResults = (await response.toolResults) as ToolResultPart[] @@ -89,11 +92,19 @@ export const runOpenAIChatCompletionStream = async ({ }) const reader = newResponse.toAIStream().getReader() await pumpStreamUntilDone(controller, reader) + totalTokens += (await newResponse.usage).totalTokens toolCalls = await newResponse.toolCalls if (toolCalls.length > 0) toolResults = (await newResponse.toolResults) as ToolResultPart[] } + const totalTokenVariableId = options.responseMapping?.find( + (mapping) => mapping.item === 'Total tokens' + )?.variableId + + if (totalTokenVariableId) + await variables.set(totalTokenVariableId, totalTokens) + controller.close() }, }), diff --git a/packages/forge/core/types.ts b/packages/forge/core/types.ts index 1a7b88056..8d246be7f 100644 --- a/packages/forge/core/types.ts +++ b/packages/forge/core/types.ts @@ -13,6 +13,10 @@ export type VariableStore = { }[] } +export type AsyncVariableStore = Omit & { + set: (variableId: string, value: unknown) => Promise +} + export type LogsStore = { add: ( log: @@ -63,7 +67,7 @@ export type ActionDefinition< run: (params: { credentials: CredentialsFromAuthDef options: z.infer & z.infer - variables: VariableStore + variables: AsyncVariableStore }) => Promise<{ stream?: ReadableStream httpError?: { status: number; message: string }