(openai) Stream chat completion to avoid serverless timeout (#526)

Closes #520
This commit is contained in:
Baptiste Arnaud
2023-05-25 10:32:35 +02:00
committed by GitHub
parent 6bb6a2b0e3
commit 56364fd863
39 changed files with 556 additions and 121 deletions

View File

@@ -88,7 +88,10 @@ const getStripeInfo = async (
where: { id: credentialsId },
})
if (!credentials) return
return decrypt(credentials.data, credentials.iv) as StripeCredentials['data']
return (await decrypt(
credentials.data,
credentials.iv
)) as StripeCredentials['data']
}
// https://stripe.com/docs/currencies#zero-decimal

View File

@@ -5,7 +5,6 @@ import {
ChatReply,
SessionState,
Variable,
VariableWithUnknowValue,
VariableWithValue,
} from '@typebot.io/schemas'
import {
@@ -13,17 +12,25 @@ import {
OpenAICredentials,
modelLimit,
} from '@typebot.io/schemas/features/blocks/integrations/openai'
import { OpenAIApi, Configuration, ChatCompletionRequestMessage } from 'openai'
import { isDefined, byId, isNotEmpty, isEmpty } from '@typebot.io/lib'
import { decrypt } from '@typebot.io/lib/api/encryption'
import type {
ChatCompletionRequestMessage,
CreateChatCompletionRequest,
CreateChatCompletionResponse,
} from 'openai'
import { byId, isNotEmpty, isEmpty } from '@typebot.io/lib'
import { decrypt, isCredentialsV2 } from '@typebot.io/lib/api/encryption'
import { saveErrorLog } from '@/features/logs/saveErrorLog'
import { updateVariables } from '@/features/variables/updateVariables'
import { parseVariables } from '@/features/variables/parseVariables'
import { saveSuccessLog } from '@/features/logs/saveSuccessLog'
import { parseVariableNumber } from '@/features/variables/parseVariableNumber'
import { encoding_for_model } from '@dqbd/tiktoken'
import got from 'got'
import { resumeChatCompletion } from './resumeChatCompletion'
import { isPlaneteScale } from '@/helpers/api/isPlanetScale'
import { isVercel } from '@/helpers/api/isVercel'
const minTokenCompletion = 200
const createChatEndpoint = 'https://api.openai.com/v1/chat/completions'
export const createChatCompletionOpenAI = async (
state: SessionState,
@@ -52,13 +59,10 @@ export const createChatCompletionOpenAI = async (
console.error('Could not find credentials in database')
return { outgoingEdgeId, logs: [noCredentialsError] }
}
const { apiKey } = decrypt(
const { apiKey } = (await decrypt(
credentials.data,
credentials.iv
) as OpenAICredentials['data']
const configuration = new Configuration({
apiKey,
})
)) as OpenAICredentials['data']
const { variablesTransformedToList, messages } = parseMessages(
newSessionState.typebot.variables,
options.model
@@ -71,52 +75,39 @@ export const createChatCompletionOpenAI = async (
)
try {
const openai = new OpenAIApi(configuration)
const response = await openai.createChatCompletion({
model: options.model,
messages,
temperature,
})
const messageContent = response.data.choices.at(0)?.message?.content
const totalTokens = response.data.usage?.total_tokens
if (
isPlaneteScale() &&
isVercel() &&
isCredentialsV2(credentials) &&
newSessionState.isStreamEnabled
)
return {
clientSideActions: [{ streamOpenAiChatCompletion: { messages } }],
outgoingEdgeId,
newSessionState,
}
const response = await got
.post(createChatEndpoint, {
headers: {
Authorization: `Bearer ${apiKey}`,
},
json: {
model: options.model,
messages,
temperature,
} satisfies CreateChatCompletionRequest,
})
.json<CreateChatCompletionResponse>()
const messageContent = response.choices.at(0)?.message?.content
const totalTokens = response.usage?.total_tokens
if (isEmpty(messageContent)) {
console.error('OpenAI block returned empty message', response)
return { outgoingEdgeId, newSessionState }
}
const newVariables = options.responseMapping.reduce<
VariableWithUnknowValue[]
>((newVariables, mapping) => {
const existingVariable = newSessionState.typebot.variables.find(
byId(mapping.variableId)
)
if (!existingVariable) return newVariables
if (mapping.valueToExtract === 'Message content') {
newVariables.push({
...existingVariable,
value: Array.isArray(existingVariable.value)
? existingVariable.value.concat(messageContent)
: messageContent,
})
}
if (mapping.valueToExtract === 'Total tokens' && isDefined(totalTokens)) {
newVariables.push({
...existingVariable,
value: totalTokens,
})
}
return newVariables
}, [])
if (newVariables.length > 0)
newSessionState = await updateVariables(newSessionState)(newVariables)
state.result &&
(await saveSuccessLog({
resultId: state.result.id,
message: 'OpenAI block successfully executed',
}))
return {
return resumeChatCompletion(newSessionState, {
options,
outgoingEdgeId,
newSessionState,
}
})(messageContent, totalTokens)
} catch (err) {
const log: NonNullable<ChatReply['logs']>[number] = {
status: 'error',

View File

@@ -0,0 +1,103 @@
import { parseVariableNumber } from '@/features/variables/parseVariableNumber'
import { Connection } from '@planetscale/database'
import { decrypt } from '@typebot.io/lib/api/encryption'
import {
ChatCompletionOpenAIOptions,
OpenAICredentials,
} from '@typebot.io/schemas/features/blocks/integrations/openai'
import { SessionState } from '@typebot.io/schemas/features/chat'
import {
ParsedEvent,
ReconnectInterval,
createParser,
} from 'eventsource-parser'
import type {
ChatCompletionRequestMessage,
CreateChatCompletionRequest,
} from 'openai'
export const getChatCompletionStream =
(conn: Connection) =>
async (
state: SessionState,
options: ChatCompletionOpenAIOptions,
messages: ChatCompletionRequestMessage[]
) => {
if (!options.credentialsId) return
const credentials = (
await conn.execute('select data, iv from Credentials where id=?', [
options.credentialsId,
])
).rows.at(0) as { data: string; iv: string } | undefined
if (!credentials) {
console.error('Could not find credentials in database')
return
}
const { apiKey } = (await decrypt(
credentials.data,
credentials.iv
)) as OpenAICredentials['data']
const temperature = parseVariableNumber(state.typebot.variables)(
options.advancedSettings?.temperature
)
const encoder = new TextEncoder()
const decoder = new TextDecoder()
let counter = 0
const res = await fetch('https://api.openai.com/v1/chat/completions', {
headers: {
'Content-Type': 'application/json',
Authorization: `Bearer ${apiKey}`,
},
method: 'POST',
body: JSON.stringify({
messages,
model: options.model,
temperature,
stream: true,
} satisfies CreateChatCompletionRequest),
})
const stream = new ReadableStream({
async start(controller) {
function onParse(event: ParsedEvent | ReconnectInterval) {
if (event.type === 'event') {
const data = event.data
if (data === '[DONE]') {
controller.close()
return
}
try {
const json = JSON.parse(data) as {
choices: { delta: { content: string } }[]
}
const text = json.choices.at(0)?.delta.content
if (counter < 2 && (text?.match(/\n/) || []).length) {
return
}
const queue = encoder.encode(text)
controller.enqueue(queue)
counter++
} catch (e) {
controller.error(e)
}
}
}
// stream response (SSE) from OpenAI may be fragmented into multiple chunks
// this ensures we properly read chunks & invoke an event for each SSE event stream
const parser = createParser(onParse)
// https://web.dev/streams/#asynchronous-iteration
// eslint-disable-next-line @typescript-eslint/no-explicit-any
for await (const chunk of res.body as any) {
parser.feed(decoder.decode(chunk))
}
},
})
return stream
}

View File

@@ -0,0 +1,52 @@
import { saveSuccessLog } from '@/features/logs/saveSuccessLog'
import { updateVariables } from '@/features/variables/updateVariables'
import { byId, isDefined } from '@typebot.io/lib'
import { SessionState } from '@typebot.io/schemas'
import { ChatCompletionOpenAIOptions } from '@typebot.io/schemas/features/blocks/integrations/openai'
import { VariableWithUnknowValue } from '@typebot.io/schemas/features/typebot/variable'
export const resumeChatCompletion =
(
state: SessionState,
{
outgoingEdgeId,
options,
}: { outgoingEdgeId?: string; options: ChatCompletionOpenAIOptions }
) =>
async (message: string, totalTokens?: number) => {
let newSessionState = state
const newVariables = options.responseMapping.reduce<
VariableWithUnknowValue[]
>((newVariables, mapping) => {
const existingVariable = newSessionState.typebot.variables.find(
byId(mapping.variableId)
)
if (!existingVariable) return newVariables
if (mapping.valueToExtract === 'Message content') {
newVariables.push({
...existingVariable,
value: Array.isArray(existingVariable.value)
? existingVariable.value.concat(message)
: message,
})
}
if (mapping.valueToExtract === 'Total tokens' && isDefined(totalTokens)) {
newVariables.push({
...existingVariable,
value: totalTokens,
})
}
return newVariables
}, [])
if (newVariables.length > 0)
newSessionState = await updateVariables(newSessionState)(newVariables)
state.result &&
(await saveSuccessLog({
resultId: state.result.id,
message: 'OpenAI block successfully executed',
}))
return {
outgoingEdgeId,
newSessionState,
}
}

View File

@@ -193,7 +193,10 @@ const getEmailInfo = async (
where: { id: credentialsId },
})
if (!credentials) return
return decrypt(credentials.data, credentials.iv) as SmtpCredentials['data']
return (await decrypt(
credentials.data,
credentials.iv
)) as SmtpCredentials['data']
}
const getEmailBody = async ({

View File

@@ -61,6 +61,11 @@ const evaluateSetVariableExpression =
const evaluating = parseVariables(variables, { fieldToParse: 'id' })(
str.includes('return ') ? str : `return ${str}`
)
console.log(
variables.map((v) => v.id),
...variables.map((v) => parseGuessedValueType(v.value)),
evaluating
)
try {
const func = Function(...variables.map((v) => v.id), evaluating)
return func(...variables.map((v) => parseGuessedValueType(v.value)))

View File

@@ -154,6 +154,7 @@ const startSession = async (startParams?: StartParams, userId?: string) => {
},
currentTypebotId: typebot.id,
dynamicTheme: parseDynamicThemeInState(typebot.theme),
isStreamEnabled: startParams.isStreamEnabled,
}
const { messages, input, clientSideActions, newSessionState, logs } =

View File

@@ -9,12 +9,13 @@ import {
ChatReply,
InputBlock,
InputBlockType,
IntegrationBlockType,
LogicBlockType,
ResultInSession,
SessionState,
SetVariableBlock,
} from '@typebot.io/schemas'
import { isInputBlock, isNotDefined, byId } from '@typebot.io/lib'
import { isInputBlock, isNotDefined, byId, isDefined } from '@typebot.io/lib'
import { executeGroup } from './executeGroup'
import { getNextGroup } from './getNextGroup'
import { validateEmail } from '@/features/blocks/inputs/email/validateEmail'
@@ -23,6 +24,8 @@ import { validatePhoneNumber } from '@/features/blocks/inputs/phone/validatePhon
import { validateUrl } from '@/features/blocks/inputs/url/validateUrl'
import { updateVariables } from '@/features/variables/updateVariables'
import { parseVariables } from '@/features/variables/parseVariables'
import { OpenAIBlock } from '@typebot.io/schemas/features/blocks/integrations/openai'
import { resumeChatCompletion } from '@/features/blocks/integrations/openai/resumeChatCompletion'
export const continueBotFlow =
(state: SessionState) =>
@@ -57,6 +60,16 @@ export const continueBotFlow =
}
newSessionState = await updateVariables(state)([newVariable])
}
} else if (
isDefined(reply) &&
block.type === IntegrationBlockType.OPEN_AI &&
block.options.task === 'Create chat completion'
) {
const result = await resumeChatCompletion(state, {
options: block.options,
outgoingEdgeId: block.outgoingEdgeId,
})(reply)
newSessionState = result.newSessionState
} else if (!isInputBlock(block))
throw new TRPCError({
code: 'INTERNAL_SERVER_ERROR',
@@ -236,7 +249,10 @@ const computeStorageUsed = async (reply: string) => {
const getOutgoingEdgeId =
({ typebot: { variables } }: Pick<SessionState, 'typebot'>) =>
(block: InputBlock | SetVariableBlock, reply: string | null) => {
(
block: InputBlock | SetVariableBlock | OpenAIBlock,
reply: string | null
) => {
if (
block.type === InputBlockType.CHOICE &&
!block.options.isMultipleChoice &&

View File

@@ -73,6 +73,10 @@ export const executeGroup =
: null
if (!executionResponse) continue
if (executionResponse.logs)
logs = [...(logs ?? []), ...executionResponse.logs]
if (executionResponse.newSessionState)
newSessionState = executionResponse.newSessionState
if (
'clientSideActions' in executionResponse &&
executionResponse.clientSideActions
@@ -83,7 +87,8 @@ export const executeGroup =
]
if (
executionResponse.clientSideActions?.find(
(action) => 'setVariable' in action
(action) =>
'setVariable' in action || 'streamOpenAiChatCompletion' in action
)
) {
return {
@@ -101,10 +106,6 @@ export const executeGroup =
}
}
if (executionResponse.logs)
logs = [...(logs ?? []), ...executionResponse.logs]
if (executionResponse.newSessionState)
newSessionState = executionResponse.newSessionState
if (executionResponse.outgoingEdgeId) {
nextEdgeId = executionResponse.outgoingEdgeId
break

View File

@@ -0,0 +1,2 @@
export const isPlaneteScale = () =>
process.env.DATABASE_URL?.includes('pscale_pw')

View File

@@ -0,0 +1,3 @@
import { isDefined } from '@typebot.io/lib/utils'
export const isVercel = () => isDefined(process.env.NEXT_PUBLIC_VERCEL_ENV)

View File

@@ -12,10 +12,10 @@ export const getAuthenticatedGoogleClient = async (
where: { id: credentialsId },
})) as CredentialsFromDb | undefined
if (!credentials) return
const data = decrypt(
const data = (await decrypt(
credentials.data,
credentials.iv
) as GoogleSheetsCredentials['data']
)) as GoogleSheetsCredentials['data']
const oauth2Client = new OAuth2Client(
process.env.GOOGLE_CLIENT_ID,
@@ -43,7 +43,7 @@ const updateTokens =
expiry_date: credentials.expiry_date,
access_token: credentials.access_token,
}
const { encryptedData, iv } = encrypt(newCredentials)
const { encryptedData, iv } = await encrypt(newCredentials)
await prisma.credentials.update({
where: { id: credentialsId },
data: { data: encryptedData, iv },

View File

@@ -0,0 +1,78 @@
import { getChatCompletionStream } from '@/features/blocks/integrations/openai/getChatCompletionStream'
import { connect } from '@planetscale/database'
import { IntegrationBlockType, SessionState } from '@typebot.io/schemas'
import { ChatCompletionRequestMessage } from 'openai'
export const config = {
runtime: 'edge',
regions: ['lhr1'],
}
const handler = async (req: Request) => {
if (req.method === 'OPTIONS') {
return new Response('ok', {
headers: {
'Access-Control-Allow-Origin': '*',
'Access-Control-Allow-Methods': 'POST',
'Access-Control-Expose-Headers': 'Content-Length, X-JSON',
'Access-Control-Allow-Headers':
'apikey,X-Client-Info, Content-Type, Authorization, Accept, Accept-Language, X-Authorization',
},
})
}
const { sessionId, messages } = (await req.json()) as {
sessionId: string
messages: ChatCompletionRequestMessage[]
}
if (!sessionId) return new Response('No session ID provided', { status: 400 })
if (!messages) return new Response('No messages provided', { status: 400 })
const conn = connect({ url: process.env.DATABASE_URL })
const chatSession = await conn.execute(
'select state from ChatSession where id=?',
[sessionId]
)
const state = (chatSession.rows.at(0) as { state: SessionState } | undefined)
?.state
if (!state) return new Response('No state found', { status: 400 })
const group = state.typebot.groups.find(
(group) => group.id === state.currentBlock?.groupId
)
const blockIndex =
group?.blocks.findIndex(
(block) => block.id === state.currentBlock?.blockId
) ?? -1
const block = blockIndex >= 0 ? group?.blocks[blockIndex ?? 0] : null
if (!block || !group)
return new Response('Current block not found', { status: 400 })
if (
block.type !== IntegrationBlockType.OPEN_AI ||
block.options.task !== 'Create chat completion'
)
return new Response('Current block is not an OpenAI block', { status: 400 })
const stream = await getChatCompletionStream(conn)(
state,
block.options,
messages
)
return new Response(stream, {
status: 200,
headers: {
'Content-Type': 'application/json; charset=utf-8',
'Access-Control-Allow-Origin': '*',
},
})
}
export default handler

View File

@@ -112,7 +112,10 @@ const getStripeInfo = async (
where: { id: credentialsId },
})
if (!credentials) return
return decrypt(credentials.data, credentials.iv) as StripeCredentials['data']
return (await decrypt(
credentials.data,
credentials.iv
)) as StripeCredentials['data']
}
// https://stripe.com/docs/currencies#zero-decimal

View File

@@ -170,7 +170,10 @@ const getEmailInfo = async (
where: { id: credentialsId },
})
if (!credentials) return
return decrypt(credentials.data, credentials.iv) as SmtpCredentials['data']
return (await decrypt(
credentials.data,
credentials.iv
)) as SmtpCredentials['data']
}
const getEmailBody = async ({

View File

@@ -5,11 +5,11 @@ import { proWorkspaceId } from '@typebot.io/lib/playwright/databaseSetup'
const prisma = new PrismaClient()
export const createSmtpCredentials = (
export const createSmtpCredentials = async (
id: string,
smtpData: SmtpCredentials['data']
) => {
const { encryptedData, iv } = encrypt(smtpData)
const { encryptedData, iv } = await encrypt(smtpData)
return prisma.credentials.create({
data: {
id,