2
0

🚸 (openai) Parse stream on client to correctly handle errors

This commit is contained in:
Baptiste Arnaud
2023-06-16 19:26:29 +02:00
parent 83f2a29faa
commit 524f1565d8
11 changed files with 209 additions and 154 deletions

View File

@ -10,10 +10,9 @@ import { decrypt, isCredentialsV2 } from '@typebot.io/lib/api/encryption'
import { updateVariables } from '@/features/variables/updateVariables'
import { parseVariableNumber } from '@/features/variables/parseVariableNumber'
import { resumeChatCompletion } from './resumeChatCompletion'
import { isPlaneteScale } from '@/helpers/api/isPlanetScale'
import { isVercel } from '@/helpers/api/isVercel'
import { parseChatCompletionMessages } from './parseChatCompletionMessages'
import { executeChatCompletionOpenAIRequest } from './executeChatCompletionOpenAIRequest'
import { isPlaneteScale } from '@/helpers/api/isPlanetScale'
export const createChatCompletionOpenAI = async (
state: SessionState,
@ -58,7 +57,6 @@ export const createChatCompletionOpenAI = async (
if (
isPlaneteScale() &&
isVercel() &&
isCredentialsV2(credentials) &&
newSessionState.isStreamEnabled
)

View File

@ -6,11 +6,6 @@ import {
OpenAICredentials,
} from '@typebot.io/schemas/features/blocks/integrations/openai'
import { SessionState } from '@typebot.io/schemas/features/chat'
import {
ParsedEvent,
ReconnectInterval,
createParser,
} from 'eventsource-parser'
import type {
ChatCompletionRequestMessage,
CreateChatCompletionRequest,
@ -42,11 +37,6 @@ export const getChatCompletionStream =
options.advancedSettings?.temperature
)
const encoder = new TextEncoder()
const decoder = new TextDecoder()
let counter = 0
const res = await fetch('https://api.openai.com/v1/chat/completions', {
headers: {
'Content-Type': 'application/json',
@ -61,43 +51,5 @@ export const getChatCompletionStream =
} satisfies CreateChatCompletionRequest),
})
const stream = new ReadableStream({
async start(controller) {
function onParse(event: ParsedEvent | ReconnectInterval) {
if (event.type === 'event') {
const data = event.data
if (data === '[DONE]') {
controller.close()
return
}
try {
const json = JSON.parse(data) as {
choices: { delta: { content: string } }[]
}
const text = json.choices.at(0)?.delta.content
if (counter < 2 && (text?.match(/\n/) || []).length) {
return
}
const queue = encoder.encode(text)
controller.enqueue(queue)
counter++
} catch (e) {
controller.error(e)
}
}
}
// stream response (SSE) from OpenAI may be fragmented into multiple chunks
// this ensures we properly read chunks & invoke an event for each SSE event stream
const parser = createParser(onParse)
// https://web.dev/streams/#asynchronous-iteration
// eslint-disable-next-line @typescript-eslint/no-explicit-any
for await (const chunk of res.body as any) {
parser.feed(decoder.decode(chunk))
}
},
})
return stream
return res.body
}

View File

@ -27,6 +27,7 @@ import { prefillVariables } from '@/features/variables/prefillVariables'
import { injectVariablesFromExistingResult } from '@/features/variables/injectVariablesFromExistingResult'
import { deepParseVariables } from '@/features/variables/deepParseVariable'
import { parseVariables } from '@/features/variables/parseVariables'
import { saveLog } from '@/features/logs/saveLog'
export const sendMessage = publicProcedure
.meta({
@ -42,9 +43,23 @@ export const sendMessage = publicProcedure
.input(sendMessageInputSchema)
.output(chatReplySchema)
.query(
async ({ input: { sessionId, message, startParams }, ctx: { user } }) => {
async ({
input: { sessionId, message, startParams, clientLogs },
ctx: { user },
}) => {
const session = sessionId ? await getSession(sessionId) : null
if (clientLogs) {
for (const log of clientLogs) {
await saveLog({
message: log.description,
status: log.status as 'error' | 'success' | 'info',
resultId: session?.state.result.id,
details: log.details,
})
}
}
if (!session) {
const {
sessionId,

View File

@ -16,7 +16,7 @@ import {
SetVariableBlock,
WebhookBlock,
} from '@typebot.io/schemas'
import { isInputBlock, isNotDefined, byId, isDefined } from '@typebot.io/lib'
import { isInputBlock, isNotDefined, byId } from '@typebot.io/lib'
import { executeGroup } from './executeGroup'
import { getNextGroup } from './getNextGroup'
import { validateEmail } from '@/features/blocks/inputs/email/validateEmail'
@ -69,15 +69,16 @@ export const continueBotFlow =
)(JSON.parse(reply))
if (result.newSessionState) newSessionState = result.newSessionState
} else if (
isDefined(reply) &&
block.type === IntegrationBlockType.OPEN_AI &&
block.options.task === 'Create chat completion'
) {
const result = await resumeChatCompletion(state, {
options: block.options,
outgoingEdgeId: block.outgoingEdgeId,
})(reply)
newSessionState = result.newSessionState
if (reply) {
const result = await resumeChatCompletion(state, {
options: block.options,
outgoingEdgeId: block.outgoingEdgeId,
})(reply)
newSessionState = result.newSessionState
}
} else if (!isInputBlock(block))
throw new TRPCError({
code: 'INTERNAL_SERVER_ERROR',