2
0

🐛 (openai) Fix ask assistant not correctly referencing uploaded f… (#1469)

…iles

Closes #1468, closes #1467, closes #1211
This commit is contained in:
Baptiste Arnaud
2024-04-24 16:11:06 +02:00
committed by GitHub
parent a45e8ec8a8
commit dc1929e15b
57 changed files with 1576 additions and 448 deletions

View File

@ -1,6 +1,6 @@
import { Anthropic } from '@anthropic-ai/sdk'
import { options as createMessageOptions } from '../actions/createChatMessage'
import { ReadOnlyVariableStore } from '@typebot.io/forge'
import { VariableStore } from '@typebot.io/forge'
import { isNotEmpty } from '@typebot.io/lib'
import { z } from '@typebot.io/forge/zod'
@ -9,7 +9,7 @@ export const parseChatMessages = ({
variables,
}: {
options: Pick<z.infer<typeof createMessageOptions>, 'messages'>
variables: ReadOnlyVariableStore
variables: VariableStore
}): Anthropic.Messages.MessageParam[] => {
const parsedMessages = messages
?.flatMap((message) => {

View File

@ -11,10 +11,10 @@
"@typebot.io/lib": "workspace:*",
"@typebot.io/tsconfig": "workspace:*",
"@types/react": "18.2.15",
"typescript": "5.3.2"
"typescript": "5.4.5"
},
"dependencies": {
"@anthropic-ai/sdk": "0.18.0",
"ai": "3.0.12"
"@anthropic-ai/sdk": "0.20.6",
"ai": "3.0.31"
}
}

View File

@ -9,7 +9,7 @@
"@typebot.io/forge": "workspace:*",
"@typebot.io/tsconfig": "workspace:*",
"@types/react": "18.2.15",
"typescript": "5.3.2",
"typescript": "5.4.5",
"@typebot.io/lib": "workspace:*"
}
}

View File

@ -10,7 +10,7 @@
"@typebot.io/lib": "workspace:*",
"@typebot.io/tsconfig": "workspace:*",
"@types/react": "18.2.15",
"typescript": "5.3.2",
"typescript": "5.4.5",
"ky": "1.2.3"
}
}

View File

@ -11,6 +11,6 @@
"@typebot.io/tsconfig": "workspace:*",
"@types/react": "18.2.15",
"ky": "1.2.3",
"typescript": "5.3.2"
"typescript": "5.4.5"
}
}

View File

@ -10,7 +10,7 @@
"@typebot.io/lib": "workspace:*",
"@typebot.io/tsconfig": "workspace:*",
"@types/react": "18.2.15",
"typescript": "5.3.2"
"typescript": "5.4.5"
},
"dependencies": {
"ky": "1.2.3",

View File

@ -80,6 +80,7 @@ export const createChatCompletion = createAction({
blockId: 'anthropic',
transform: (options) => ({
...options,
model: undefined,
action: 'Create Chat Message',
responseMapping: options.responseMapping?.map((res: any) =>
res.item === 'Message content'

View File

@ -1,5 +1,5 @@
import { options as createChatCompletionOption } from '../actions/createChatCompletion'
import { ReadOnlyVariableStore } from '@typebot.io/forge'
import { VariableStore } from '@typebot.io/forge'
import { isDefined, isNotEmpty } from '@typebot.io/lib'
import { z } from '@typebot.io/forge/zod'
@ -8,7 +8,7 @@ export const parseMessages = ({
variables,
}: {
options: Pick<z.infer<typeof createChatCompletionOption>, 'messages'>
variables: ReadOnlyVariableStore
variables: VariableStore
}) =>
messages
?.flatMap((message) => {

View File

@ -11,9 +11,9 @@
"@typebot.io/tsconfig": "workspace:*",
"@types/node": "^20.12.4",
"@types/react": "18.2.15",
"typescript": "5.3.2"
"typescript": "5.4.5"
},
"dependencies": {
"ai": "3.0.12"
"ai": "3.0.31"
}
}

View File

@ -24,6 +24,7 @@ export const createChatCompletion = createAction({
blockId: 'anthropic',
transform: (options) => ({
...options,
model: undefined,
action: 'Create Chat Message',
responseMapping: options.responseMapping?.map((res: any) =>
res.item === 'Message content'

View File

@ -9,7 +9,7 @@
"@typebot.io/forge": "workspace:*",
"@typebot.io/tsconfig": "workspace:*",
"@types/react": "18.2.15",
"typescript": "5.3.2",
"typescript": "5.4.5",
"@typebot.io/lib": "workspace:*",
"@typebot.io/openai-block": "workspace:*",
"ky": "1.2.3"

View File

@ -1,54 +1,68 @@
import { createAction, option } from '@typebot.io/forge'
import { isDefined, isEmpty } from '@typebot.io/lib'
import {
LogsStore,
VariableStore,
createAction,
option,
} from '@typebot.io/forge'
import { isDefined, isEmpty, isNotEmpty } from '@typebot.io/lib'
import { auth } from '../auth'
import { ClientOptions, OpenAI } from 'openai'
import { baseOptions } from '../baseOptions'
import { executeFunction } from '@typebot.io/variables/executeFunction'
import { readDataStream } from 'ai'
import { deprecatedAskAssistantOptions } from '../deprecated'
import { OpenAIAssistantStream } from '../helpers/OpenAIAssistantStream'
export const askAssistant = createAction({
auth,
baseOptions,
name: 'Ask Assistant',
options: option.object({
assistantId: option.string.layout({
label: 'Assistant ID',
placeholder: 'Select an assistant',
moreInfoTooltip: 'The OpenAI assistant you want to ask question to.',
fetcher: 'fetchAssistants',
}),
threadId: option.string.layout({
label: 'Thread ID',
moreInfoTooltip:
'Used to remember the conversation with the user. If empty, a new thread is created.',
}),
message: option.string.layout({
label: 'Message',
inputType: 'textarea',
}),
functions: option
.array(
option.object({
name: option.string.layout({
fetcher: 'fetchAssistantFunctions',
label: 'Name',
}),
code: option.string.layout({
inputType: 'code',
label: 'Code',
lang: 'javascript',
moreInfoTooltip:
'A javascript code snippet that can use the defined parameters. It should return a value.',
withVariableButton: false,
}),
})
)
.layout({ accordion: 'Functions', itemLabel: 'function' }),
responseMapping: option
.saveResponseArray(['Message', 'Thread ID'] as const)
.layout({
accordion: 'Save response',
options: option
.object({
assistantId: option.string.layout({
label: 'Assistant ID',
placeholder: 'Select an assistant',
moreInfoTooltip: 'The OpenAI assistant you want to ask question to.',
fetcher: 'fetchAssistants',
}),
}),
threadVariableId: option.string.layout({
label: 'Thread ID',
moreInfoTooltip:
'Used to remember the conversation with the user. If empty, a new thread is created.',
inputType: 'variableDropdown',
}),
message: option.string.layout({
label: 'Message',
inputType: 'textarea',
}),
functions: option
.array(
option.object({
name: option.string.layout({
fetcher: 'fetchAssistantFunctions',
label: 'Name',
}),
code: option.string.layout({
inputType: 'code',
label: 'Code',
lang: 'javascript',
moreInfoTooltip:
'A javascript code snippet that can use the defined parameters. It should return a value.',
withVariableButton: false,
}),
})
)
.layout({ accordion: 'Functions', itemLabel: 'function' }),
responseMapping: option
.saveResponseArray(['Message', 'Thread ID'] as const, {
item: { hiddenItems: ['Thread ID'] },
})
.layout({
accordion: 'Save response',
}),
})
.merge(deprecatedAskAssistantOptions),
fetchers: [
{
id: 'fetchAssistants',
@ -121,6 +135,23 @@ export const askAssistant = createAction({
getSetVariableIds: ({ responseMapping }) =>
responseMapping?.map((r) => r.variableId).filter(isDefined) ?? [],
run: {
stream: {
getStreamVariableId: ({ responseMapping }) =>
responseMapping?.find((m) => !m.item || m.item === 'Message')
?.variableId,
run: async ({ credentials, options, variables }) =>
createAssistantStream({
apiKey: credentials.apiKey,
assistantId: options.assistantId,
message: options.message,
baseUrl: options.baseUrl,
apiVersion: options.apiVersion,
threadVariableId: options.threadVariableId,
variables,
functions: options.functions,
responseMapping: options.responseMapping,
}),
},
server: async ({
credentials: { apiKey },
options: {
@ -130,143 +161,188 @@ export const askAssistant = createAction({
message,
responseMapping,
threadId,
threadVariableId,
functions,
},
variables,
logs,
}) => {
if (isEmpty(assistantId)) {
logs.add('Assistant ID is empty')
return
}
if (isEmpty(message)) {
logs.add('Message is empty')
return
}
const config = {
const stream = await createAssistantStream({
apiKey,
baseURL: baseUrl,
defaultHeaders: {
'api-key': apiKey,
},
defaultQuery: apiVersion
? {
'api-version': apiVersion,
}
: undefined,
} satisfies ClientOptions
const openai = new OpenAI(config)
// Create a thread if needed
const currentThreadId = isEmpty(threadId)
? (await openai.beta.threads.create({})).id
: threadId
// Add a message to the thread
const createdMessage = await openai.beta.threads.messages.create(
currentThreadId,
{
role: 'user',
content: message,
}
)
const run = await openai.beta.threads.runs.create(currentThreadId, {
assistant_id: assistantId,
assistantId,
logs,
message,
baseUrl,
apiVersion,
threadVariableId,
variables,
threadId,
functions,
})
async function waitForRun(run: OpenAI.Beta.Threads.Runs.Run) {
// Poll for status change
while (run.status === 'queued' || run.status === 'in_progress') {
await new Promise((resolve) => setTimeout(resolve, 500))
if (!stream) return
run = await openai.beta.threads.runs.retrieve(currentThreadId, run.id)
}
let writingMessage = ''
// Check the run status
if (
run.status === 'cancelled' ||
run.status === 'cancelling' ||
run.status === 'failed' ||
run.status === 'expired'
) {
throw new Error(run.status)
}
if (run.status === 'requires_action') {
if (run.required_action?.type === 'submit_tool_outputs') {
const tool_outputs = (
await Promise.all(
run.required_action.submit_tool_outputs.tool_calls.map(
async (toolCall) => {
const parameters = JSON.parse(toolCall.function.arguments)
const functionToExecute = functions?.find(
(f) => f.name === toolCall.function.name
)
if (!functionToExecute) return
const name = toolCall.function.name
if (!name || !functionToExecute.code) return
const { output, newVariables } = await executeFunction({
variables: variables.list(),
body: functionToExecute.code,
args: parameters,
})
newVariables?.forEach((variable) => {
variables.set(variable.id, variable.value)
})
return {
tool_call_id: toolCall.id,
output,
}
}
)
)
).filter(isDefined)
run = await openai.beta.threads.runs.submitToolOutputs(
currentThreadId,
run.id,
{ tool_outputs }
)
await waitForRun(run)
}
for await (const { type, value } of readDataStream(stream.getReader())) {
if (type === 'text') {
writingMessage += value
}
}
await waitForRun(run)
const responseMessages = (
await openai.beta.threads.messages.list(currentThreadId, {
after: createdMessage.id,
order: 'asc',
})
).data
responseMapping?.forEach((mapping) => {
if (!mapping.variableId) return
if (!mapping.item || mapping.item === 'Message') {
let message = ''
const messageContents = responseMessages[0].content
for (const content of messageContents) {
switch (content.type) {
case 'text':
message +=
(message !== '' ? '\n\n' : '') +
content.text.value.replace(/【.+】/g, '')
break
}
}
variables.set(mapping.variableId, message)
variables.set(
mapping.variableId,
writingMessage.replace(/【.+】/g, '')
)
}
if (mapping.item === 'Thread ID')
variables.set(mapping.variableId, currentThreadId)
})
},
},
})
const createAssistantStream = async ({
apiKey,
assistantId,
logs,
message,
baseUrl,
apiVersion,
threadVariableId,
variables,
threadId,
functions,
responseMapping,
}: {
apiKey?: string
assistantId?: string
message?: string
baseUrl?: string
apiVersion?: string
threadVariableId?: string
threadId?: string
functions?: { name?: string; code?: string }[]
responseMapping?: {
item?: 'Thread ID' | 'Message' | undefined
variableId?: string | undefined
}[]
logs?: LogsStore
variables: VariableStore
}): Promise<ReadableStream | undefined> => {
if (isEmpty(assistantId)) {
logs?.add('Assistant ID is empty')
return
}
if (isEmpty(message)) {
logs?.add('Message is empty')
return
}
const config = {
apiKey,
baseURL: baseUrl,
defaultHeaders: {
'api-key': apiKey,
},
defaultQuery: apiVersion
? {
'api-version': apiVersion,
}
: undefined,
} satisfies ClientOptions
const openai = new OpenAI(config)
let currentThreadId: string | undefined
if (
threadVariableId &&
isNotEmpty(variables.get(threadVariableId)?.toString())
) {
currentThreadId = variables.get(threadVariableId)?.toString()
} else if (isNotEmpty(threadId)) {
currentThreadId = threadId
} else {
currentThreadId = (await openai.beta.threads.create({})).id
const threadIdResponseMapping = responseMapping?.find(
(mapping) => mapping.item === 'Thread ID'
)
if (threadIdResponseMapping?.variableId)
variables.set(threadIdResponseMapping.variableId, currentThreadId)
if (threadVariableId) variables.set(threadVariableId, currentThreadId)
}
if (!currentThreadId) {
logs?.add('Could not get thread ID')
return
}
// Add a message to the thread
const createdMessage = await openai.beta.threads.messages.create(
currentThreadId,
{
role: 'user',
content: message,
}
)
return OpenAIAssistantStream(
{ threadId: currentThreadId, messageId: createdMessage.id },
async ({ forwardStream }) => {
const runStream = openai.beta.threads.runs.createAndStream(
currentThreadId,
{
assistant_id: assistantId,
}
)
let runResult = await forwardStream(runStream)
while (
runResult?.status === 'requires_action' &&
runResult.required_action?.type === 'submit_tool_outputs'
) {
const tool_outputs = (
await Promise.all(
runResult.required_action.submit_tool_outputs.tool_calls.map(
async (toolCall) => {
const parameters = JSON.parse(toolCall.function.arguments)
const functionToExecute = functions?.find(
(f) => f.name === toolCall.function.name
)
if (!functionToExecute) return
const name = toolCall.function.name
if (!name || !functionToExecute.code) return
const { output, newVariables } = await executeFunction({
variables: variables.list(),
body: functionToExecute.code,
args: parameters,
})
newVariables?.forEach((variable) => {
variables.set(variable.id, variable.value)
})
return {
tool_call_id: toolCall.id,
output,
}
}
)
)
).filter(isDefined)
runResult = await forwardStream(
openai.beta.threads.runs.submitToolOutputsStream(
currentThreadId,
runResult.id,
{ tool_outputs }
)
)
}
}
)
}

View File

@ -31,6 +31,7 @@ export const createChatCompletion = createAction({
blockId: 'anthropic',
transform: (options) => ({
...options,
model: undefined,
action: 'Create Chat Message',
responseMapping: options.responseMapping?.map((res: any) =>
res.item === 'Message content'

View File

@ -0,0 +1,10 @@
import { option } from '@typebot.io/forge'
export const deprecatedAskAssistantOptions = option.object({
threadId: option.string.layout({
label: 'Thread ID',
moreInfoTooltip:
'Used to remember the conversation with the user. If empty, a new thread is created.',
isHidden: true,
}),
})

View File

@ -0,0 +1,145 @@
import { AssistantMessage, DataMessage, formatStreamPart } from 'ai'
import { AssistantStream } from 'openai/lib/AssistantStream'
import { Run } from 'openai/resources/beta/threads/runs/runs'
/**
You can pass the thread and the latest message into the `AssistantResponse`. This establishes the context for the response.
*/
type AssistantResponseSettings = {
/**
The thread ID that the response is associated with.
*/
threadId: string
/**
The ID of the latest message that the response is associated with.
*/
messageId: string
}
/**
The process parameter is a callback in which you can run the assistant on threads, and send messages and data messages to the client.
*/
type AssistantResponseCallback = (options: {
/**
@deprecated use variable from outer scope instead.
*/
threadId: string
/**
@deprecated use variable from outer scope instead.
*/
messageId: string
/**
Forwards an assistant message (non-streaming) to the client.
*/
sendMessage: (message: AssistantMessage) => void
/**
Send a data message to the client. You can use this to provide information for rendering custom UIs while the assistant is processing the thread.
*/
sendDataMessage: (message: DataMessage) => void
/**
Forwards the assistant response stream to the client. Returns the `Run` object after it completes, or when it requires an action.
*/
forwardStream: (stream: AssistantStream) => Promise<Run | undefined>
}) => Promise<void>
export const OpenAIAssistantStream = (
{ threadId, messageId }: AssistantResponseSettings,
process: AssistantResponseCallback
) =>
new ReadableStream({
async start(controller) {
const textEncoder = new TextEncoder()
const sendMessage = (message: AssistantMessage) => {
controller.enqueue(
textEncoder.encode(formatStreamPart('assistant_message', message))
)
}
const sendDataMessage = (message: DataMessage) => {
controller.enqueue(
textEncoder.encode(formatStreamPart('data_message', message))
)
}
const sendError = (errorMessage: string) => {
controller.enqueue(
textEncoder.encode(formatStreamPart('error', errorMessage))
)
}
const forwardStream = async (stream: AssistantStream) => {
let result: Run | undefined = undefined
for await (const value of stream) {
switch (value.event) {
case 'thread.message.created': {
controller.enqueue(
textEncoder.encode(
formatStreamPart('assistant_message', {
id: value.data.id,
role: 'assistant',
content: [{ type: 'text', text: { value: '' } }],
})
)
)
break
}
case 'thread.message.delta': {
const content = value.data.delta.content?.[0]
if (content?.type === 'text' && content.text?.value != null) {
controller.enqueue(
textEncoder.encode(
formatStreamPart('text', content.text.value)
)
)
}
break
}
case 'thread.run.completed':
case 'thread.run.requires_action': {
result = value.data
break
}
}
}
return result
}
// send the threadId and messageId as the first message:
controller.enqueue(
textEncoder.encode(
formatStreamPart('assistant_control_data', {
threadId,
messageId,
})
)
)
try {
await process({
threadId,
messageId,
sendMessage,
sendDataMessage,
forwardStream,
})
} catch (error) {
sendError((error as any).message ?? `${error}`)
} finally {
controller.close()
}
},
pull(controller) {},
cancel() {},
})

View File

@ -1,5 +1,5 @@
import type { OpenAI } from 'openai'
import { ReadOnlyVariableStore } from '@typebot.io/forge'
import { VariableStore } from '@typebot.io/forge'
import { isNotEmpty } from '@typebot.io/lib'
import { ChatCompletionOptions } from '../shared/parseChatCompletionOptions'
@ -8,7 +8,7 @@ export const parseChatCompletionMessages = ({
variables,
}: {
options: ChatCompletionOptions
variables: ReadOnlyVariableStore
variables: VariableStore
}): OpenAI.Chat.ChatCompletionMessageParam[] => {
const parsedMessages = messages
?.flatMap((message) => {

View File

@ -7,14 +7,14 @@
"author": "Baptiste Arnaud",
"license": "ISC",
"dependencies": {
"ai": "3.0.12",
"openai": "4.28.4"
"ai": "3.0.31",
"openai": "4.38.3"
},
"devDependencies": {
"@typebot.io/forge": "workspace:*",
"@typebot.io/tsconfig": "workspace:*",
"@types/react": "18.2.15",
"typescript": "5.3.2",
"typescript": "5.4.5",
"@typebot.io/lib": "workspace:*",
"@typebot.io/variables": "workspace:*"
}

View File

@ -1,4 +1,4 @@
import { LogsStore, ReadOnlyVariableStore } from '@typebot.io/forge/types'
import { VariableStore } from '@typebot.io/forge/types'
import { ChatCompletionOptions } from './parseChatCompletionOptions'
import { executeFunction } from '@typebot.io/variables/executeFunction'
import { OpenAIStream, ToolCallPayload } from 'ai'
@ -10,7 +10,7 @@ import { parseToolParameters } from '../helpers/parseToolParameters'
type Props = {
credentials: { apiKey?: string }
options: ChatCompletionOptions
variables: ReadOnlyVariableStore
variables: VariableStore
config: { baseUrl: string; defaultModel?: string }
}
export const runChatCompletionStream = async ({

View File

@ -11,9 +11,9 @@
"@typebot.io/tsconfig": "workspace:*",
"@types/react": "18.2.15",
"@types/qrcode": "^1.5.3",
"typescript": "5.3.2"
"typescript": "5.4.5"
},
"dependencies": {
"qrcode": "^1.5.3"
}
}
}

View File

@ -26,6 +26,7 @@ export const createChatCompletion = createAction({
blockId: 'anthropic',
transform: (options) => ({
...options,
model: undefined,
action: 'Create Chat Message',
responseMapping: options.responseMapping?.map((res: any) =>
res.item === 'Message content'

View File

@ -12,6 +12,6 @@
"@typebot.io/variables": "workspace:*",
"@typebot.io/openai-block": "workspace:*",
"@types/react": "18.2.15",
"typescript": "5.3.2"
"typescript": "5.4.5"
}
}

View File

@ -9,7 +9,7 @@
"@typebot.io/forge": "workspace:*",
"@typebot.io/tsconfig": "workspace:*",
"@types/react": "18.2.15",
"typescript": "5.3.2",
"typescript": "5.4.5",
"@typebot.io/lib": "workspace:*",
"ky": "1.2.3"
}

View File

@ -30,8 +30,6 @@ export type FunctionToExecute = {
content: string
}
export type ReadOnlyVariableStore = Omit<VariableStore, 'set'>
export type TurnableIntoParam<T = {}> = {
blockId: string
/**
@ -65,7 +63,7 @@ export type ActionDefinition<
run: (params: {
credentials: CredentialsFromAuthDef<A>
options: z.infer<BaseOptions> & z.infer<Options>
variables: ReadOnlyVariableStore
variables: VariableStore
}) => Promise<ReadableStream<any> | undefined>
}
web?: {