2024-06-26 10:13:38 +02:00
|
|
|
import { Block, SessionState } from '@typebot.io/schemas'
|
2023-08-29 10:01:28 +02:00
|
|
|
import {
|
|
|
|
WhatsAppCredentials,
|
|
|
|
WhatsAppIncomingMessage,
|
|
|
|
} from '@typebot.io/schemas/features/whatsapp'
|
2023-09-20 15:26:52 +02:00
|
|
|
import { env } from '@typebot.io/env'
|
|
|
|
import { sendChatReplyToWhatsApp } from './sendChatReplyToWhatsApp'
|
2023-08-29 10:01:28 +02:00
|
|
|
import { startWhatsAppSession } from './startWhatsAppSession'
|
2023-09-20 15:26:52 +02:00
|
|
|
import { getSession } from '../queries/getSession'
|
|
|
|
import { continueBotFlow } from '../continueBotFlow'
|
2023-10-06 16:34:10 +02:00
|
|
|
import { decrypt } from '@typebot.io/lib/api/encryption/decrypt'
|
2023-09-20 15:26:52 +02:00
|
|
|
import { saveStateToDatabase } from '../saveStateToDatabase'
|
|
|
|
import prisma from '@typebot.io/lib/prisma'
|
2023-09-22 17:12:15 +02:00
|
|
|
import { isDefined } from '@typebot.io/lib/utils'
|
2024-01-30 08:02:10 +01:00
|
|
|
import { Reply } from '../types'
|
2024-05-15 17:47:38 +02:00
|
|
|
import { setIsReplyingInChatSession } from '../queries/setIsReplyingInChatSession'
|
|
|
|
import { removeIsReplyingInChatSession } from '../queries/removeIsReplyingInChatSession'
|
2024-06-26 10:13:38 +02:00
|
|
|
import redis from '@typebot.io/lib/redis'
|
|
|
|
import { downloadMedia } from './downloadMedia'
|
|
|
|
import { InputBlockType } from '@typebot.io/schemas/features/blocks/inputs/constants'
|
|
|
|
import { uploadFileToBucket } from '@typebot.io/lib/s3/uploadFileToBucket'
|
|
|
|
import { getBlockById } from '@typebot.io/schemas/helpers'
|
|
|
|
|
|
|
|
const incomingMessageDebounce = 3000
|
2023-09-27 16:45:14 +02:00
|
|
|
|
|
|
|
type Props = {
|
|
|
|
receivedMessage: WhatsAppIncomingMessage
|
|
|
|
sessionId: string
|
|
|
|
credentialsId?: string
|
2024-02-26 13:54:41 +01:00
|
|
|
phoneNumberId?: string
|
2023-09-27 16:45:14 +02:00
|
|
|
workspaceId?: string
|
|
|
|
contact: NonNullable<SessionState['whatsApp']>['contact']
|
|
|
|
}
|
2023-08-29 10:01:28 +02:00
|
|
|
|
|
|
|
export const resumeWhatsAppFlow = async ({
|
|
|
|
receivedMessage,
|
|
|
|
sessionId,
|
|
|
|
workspaceId,
|
2023-09-22 11:08:41 +02:00
|
|
|
credentialsId,
|
2024-02-26 13:54:41 +01:00
|
|
|
phoneNumberId,
|
2023-08-29 10:01:28 +02:00
|
|
|
contact,
|
2023-09-27 16:45:14 +02:00
|
|
|
}: Props): Promise<{ message: string }> => {
|
2023-08-29 10:01:28 +02:00
|
|
|
const messageSendDate = new Date(Number(receivedMessage.timestamp) * 1000)
|
|
|
|
const messageSentBefore3MinutesAgo =
|
|
|
|
messageSendDate.getTime() < Date.now() - 180000
|
|
|
|
if (messageSentBefore3MinutesAgo) {
|
|
|
|
console.log('Message is too old', messageSendDate.getTime())
|
|
|
|
return {
|
|
|
|
message: 'Message received',
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2023-09-22 11:08:41 +02:00
|
|
|
const isPreview = workspaceId === undefined || credentialsId === undefined
|
2023-08-29 10:01:28 +02:00
|
|
|
|
2023-09-22 11:08:41 +02:00
|
|
|
const credentials = await getCredentials({ credentialsId, isPreview })
|
|
|
|
|
|
|
|
if (!credentials) {
|
|
|
|
console.error('Could not find credentials')
|
|
|
|
return {
|
|
|
|
message: 'Message received',
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2024-02-28 15:49:18 +01:00
|
|
|
if (credentials.phoneNumberId !== phoneNumberId && !isPreview) {
|
2024-02-26 13:54:41 +01:00
|
|
|
console.error('Credentials point to another phone ID, skipping...')
|
|
|
|
return {
|
|
|
|
message: 'Message received',
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2024-03-07 14:26:02 +01:00
|
|
|
const session = await getSession(sessionId)
|
|
|
|
|
2024-06-26 10:13:38 +02:00
|
|
|
const { incomingMessages, isReplyingWasSet } =
|
|
|
|
await aggregateParallelMediaMessagesIfRedisEnabled({
|
|
|
|
receivedMessage,
|
|
|
|
existingSessionId: session?.id,
|
|
|
|
newSessionId: sessionId,
|
|
|
|
})
|
|
|
|
|
|
|
|
if (incomingMessages.length === 0) {
|
|
|
|
if (isReplyingWasSet) await removeIsReplyingInChatSession(sessionId)
|
|
|
|
|
|
|
|
return {
|
|
|
|
message: 'Message received',
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2023-09-22 17:12:15 +02:00
|
|
|
const isSessionExpired =
|
|
|
|
session &&
|
|
|
|
isDefined(session.state.expiryTimeout) &&
|
|
|
|
session?.updatedAt.getTime() + session.state.expiryTimeout < Date.now()
|
|
|
|
|
2024-06-26 10:13:38 +02:00
|
|
|
if (!isReplyingWasSet) {
|
|
|
|
if (session?.isReplying) {
|
|
|
|
if (!isSessionExpired) {
|
|
|
|
console.log('Is currently replying, skipping...')
|
|
|
|
return {
|
|
|
|
message: 'Message received',
|
|
|
|
}
|
2024-05-21 14:28:46 +02:00
|
|
|
}
|
2024-06-26 10:13:38 +02:00
|
|
|
} else {
|
|
|
|
await setIsReplyingInChatSession({
|
|
|
|
existingSessionId: session?.id,
|
|
|
|
newSessionId: sessionId,
|
|
|
|
})
|
2024-05-21 14:28:46 +02:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2024-06-26 10:13:38 +02:00
|
|
|
const currentTypebot = session?.state.typebotsQueue[0].typebot
|
|
|
|
const { block } =
|
|
|
|
(currentTypebot && session?.state.currentBlockId
|
|
|
|
? getBlockById(session.state.currentBlockId, currentTypebot.groups)
|
|
|
|
: undefined) ?? {}
|
|
|
|
|
|
|
|
const reply = await getIncomingMessageContent({
|
|
|
|
messages: incomingMessages,
|
|
|
|
workspaceId,
|
|
|
|
accessToken: credentials?.systemUserAccessToken,
|
|
|
|
typebotId: currentTypebot?.id,
|
|
|
|
resultId: session?.state.typebotsQueue[0].resultId,
|
|
|
|
block,
|
|
|
|
})
|
|
|
|
|
2023-09-22 17:12:15 +02:00
|
|
|
const resumeResponse =
|
2023-09-26 09:50:20 +02:00
|
|
|
session && !isSessionExpired
|
2024-01-30 08:02:10 +01:00
|
|
|
? await continueBotFlow(reply, {
|
2023-10-06 10:14:26 +02:00
|
|
|
version: 2,
|
|
|
|
state: { ...session.state, whatsApp: { contact } },
|
2024-05-23 16:30:56 +02:00
|
|
|
textBubbleContentFormat: 'richText',
|
2023-10-06 10:14:26 +02:00
|
|
|
})
|
2023-09-22 17:12:15 +02:00
|
|
|
: workspaceId
|
|
|
|
? await startWhatsAppSession({
|
2024-01-30 08:02:10 +01:00
|
|
|
incomingMessage: reply,
|
2023-09-22 17:12:15 +02:00
|
|
|
workspaceId,
|
|
|
|
credentials: { ...credentials, id: credentialsId as string },
|
|
|
|
contact,
|
|
|
|
})
|
2023-10-25 14:26:10 +02:00
|
|
|
: { error: 'workspaceId not found' }
|
2023-08-29 10:01:28 +02:00
|
|
|
|
2023-10-25 14:26:10 +02:00
|
|
|
if ('error' in resumeResponse) {
|
2024-05-15 17:47:38 +02:00
|
|
|
await removeIsReplyingInChatSession(sessionId)
|
2023-10-25 14:26:10 +02:00
|
|
|
console.log('Chat not starting:', resumeResponse.error)
|
2023-08-29 10:01:28 +02:00
|
|
|
return {
|
|
|
|
message: 'Message received',
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2023-11-08 15:34:16 +01:00
|
|
|
const {
|
|
|
|
input,
|
|
|
|
logs,
|
|
|
|
newSessionState,
|
|
|
|
messages,
|
|
|
|
clientSideActions,
|
|
|
|
visitedEdges,
|
2024-05-15 14:24:55 +02:00
|
|
|
setVariableHistory,
|
2023-11-08 15:34:16 +01:00
|
|
|
} = resumeResponse
|
2023-08-29 10:01:28 +02:00
|
|
|
|
2024-01-24 12:03:41 +01:00
|
|
|
const isFirstChatChunk = (!session || isSessionExpired) ?? false
|
2023-08-29 10:01:28 +02:00
|
|
|
await sendChatReplyToWhatsApp({
|
|
|
|
to: receivedMessage.from,
|
|
|
|
messages,
|
|
|
|
input,
|
2024-01-24 12:03:41 +01:00
|
|
|
isFirstChatChunk,
|
2023-08-29 10:01:28 +02:00
|
|
|
typingEmulation: newSessionState.typingEmulation,
|
|
|
|
clientSideActions,
|
|
|
|
credentials,
|
2023-09-27 16:45:14 +02:00
|
|
|
state: newSessionState,
|
2023-08-29 10:01:28 +02:00
|
|
|
})
|
|
|
|
|
|
|
|
await saveStateToDatabase({
|
|
|
|
clientSideActions: [],
|
|
|
|
input,
|
|
|
|
logs,
|
|
|
|
session: {
|
|
|
|
id: sessionId,
|
|
|
|
state: {
|
|
|
|
...newSessionState,
|
2023-11-08 15:34:16 +01:00
|
|
|
currentBlockId: !input ? undefined : newSessionState.currentBlockId,
|
2023-08-29 10:01:28 +02:00
|
|
|
},
|
|
|
|
},
|
2023-11-08 15:34:16 +01:00
|
|
|
visitedEdges,
|
2024-05-15 14:24:55 +02:00
|
|
|
setVariableHistory,
|
2023-08-29 10:01:28 +02:00
|
|
|
})
|
|
|
|
|
|
|
|
return {
|
|
|
|
message: 'Message received',
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
const getIncomingMessageContent = async ({
|
2024-06-26 10:13:38 +02:00
|
|
|
messages,
|
2024-01-30 08:02:10 +01:00
|
|
|
workspaceId,
|
|
|
|
accessToken,
|
2024-06-26 10:13:38 +02:00
|
|
|
typebotId,
|
|
|
|
resultId,
|
|
|
|
block,
|
2023-08-29 10:01:28 +02:00
|
|
|
}: {
|
2024-06-26 10:13:38 +02:00
|
|
|
messages: WhatsAppIncomingMessage[]
|
2024-01-30 08:02:10 +01:00
|
|
|
workspaceId?: string
|
|
|
|
accessToken: string
|
2024-06-26 10:13:38 +02:00
|
|
|
typebotId?: string
|
|
|
|
resultId?: string
|
|
|
|
block?: Block
|
2024-01-30 08:02:10 +01:00
|
|
|
}): Promise<Reply> => {
|
2024-06-26 10:13:38 +02:00
|
|
|
let text: string = ''
|
|
|
|
const attachedFileUrls: string[] = []
|
|
|
|
for (const message of messages) {
|
|
|
|
switch (message.type) {
|
|
|
|
case 'text': {
|
|
|
|
if (text !== '') text += `\n\n${message.text.body}`
|
|
|
|
else text = message.text.body
|
|
|
|
break
|
|
|
|
}
|
|
|
|
case 'button': {
|
|
|
|
if (text !== '') text += `\n\n${message.button.text}`
|
|
|
|
else text = message.button.text
|
|
|
|
break
|
|
|
|
}
|
|
|
|
case 'interactive': {
|
|
|
|
if (text !== '') text += `\n\n${message.interactive.button_reply.id}`
|
|
|
|
else text = message.interactive.button_reply.id
|
|
|
|
break
|
|
|
|
}
|
|
|
|
case 'document':
|
|
|
|
case 'audio':
|
|
|
|
case 'video':
|
|
|
|
case 'image': {
|
|
|
|
let mediaId: string | undefined
|
|
|
|
if (message.type === 'video') mediaId = message.video.id
|
|
|
|
if (message.type === 'image') mediaId = message.image.id
|
|
|
|
if (message.type === 'audio') mediaId = message.audio.id
|
|
|
|
if (message.type === 'document') mediaId = message.document.id
|
|
|
|
if (!mediaId) return
|
|
|
|
const fileVisibility =
|
2024-08-20 14:35:20 +02:00
|
|
|
block?.type === InputBlockType.TEXT &&
|
|
|
|
block.options?.audioClip?.isEnabled &&
|
|
|
|
message.type === 'audio'
|
|
|
|
? block.options?.audioClip.visibility
|
|
|
|
: block?.type === InputBlockType.FILE
|
2024-06-26 10:13:38 +02:00
|
|
|
? block.options?.visibility
|
|
|
|
: block?.type === InputBlockType.TEXT
|
|
|
|
? block.options?.attachments?.visibility
|
|
|
|
: undefined
|
|
|
|
let fileUrl
|
|
|
|
if (fileVisibility !== 'Public') {
|
|
|
|
fileUrl =
|
|
|
|
env.NEXTAUTH_URL +
|
|
|
|
`/api/typebots/${typebotId}/whatsapp/media/${
|
|
|
|
workspaceId ? `` : 'preview/'
|
|
|
|
}${mediaId}`
|
|
|
|
} else {
|
|
|
|
const { file, mimeType } = await downloadMedia({
|
|
|
|
mediaId,
|
|
|
|
systemUserAccessToken: accessToken,
|
|
|
|
})
|
|
|
|
const url = await uploadFileToBucket({
|
|
|
|
file,
|
|
|
|
key:
|
|
|
|
resultId && workspaceId && typebotId
|
|
|
|
? `public/workspaces/${workspaceId}/typebots/${typebotId}/results/${resultId}/${mediaId}`
|
|
|
|
: `tmp/whatsapp/media/${mediaId}`,
|
|
|
|
mimeType,
|
|
|
|
})
|
|
|
|
fileUrl = url
|
|
|
|
}
|
2024-08-20 14:35:20 +02:00
|
|
|
if (message.type === 'audio')
|
|
|
|
return {
|
|
|
|
type: 'audio',
|
|
|
|
url: fileUrl,
|
|
|
|
}
|
2024-06-26 10:13:38 +02:00
|
|
|
if (block?.type === InputBlockType.FILE) {
|
|
|
|
if (text !== '') text += `, ${fileUrl}`
|
|
|
|
else text = fileUrl
|
|
|
|
} else if (block?.type === InputBlockType.TEXT) {
|
|
|
|
let caption: string | undefined
|
|
|
|
if (message.type === 'document' && message.document.caption) {
|
|
|
|
if (!/^[\w,\s-]+\.[A-Za-z]{3}$/.test(message.document.caption))
|
|
|
|
caption = message.document.caption
|
|
|
|
} else if (message.type === 'image' && message.image.caption)
|
|
|
|
caption = message.image.caption
|
|
|
|
else if (message.type === 'video' && message.video.caption)
|
|
|
|
caption = message.video.caption
|
|
|
|
if (caption) text = text === '' ? caption : `${text}\n\n${caption}`
|
|
|
|
attachedFileUrls.push(fileUrl)
|
|
|
|
}
|
|
|
|
break
|
|
|
|
}
|
|
|
|
case 'location': {
|
|
|
|
const location = `${message.location.latitude}, ${message.location.longitude}`
|
|
|
|
if (text !== '') text += `\n\n${location}`
|
|
|
|
else text = location
|
|
|
|
break
|
|
|
|
}
|
2023-08-29 10:01:28 +02:00
|
|
|
}
|
2024-06-26 10:13:38 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
return {
|
|
|
|
type: 'text',
|
|
|
|
text,
|
|
|
|
attachedFileUrls,
|
2023-08-29 10:01:28 +02:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2023-09-22 11:08:41 +02:00
|
|
|
const getCredentials = async ({
|
|
|
|
credentialsId,
|
|
|
|
isPreview,
|
|
|
|
}: {
|
|
|
|
credentialsId?: string
|
|
|
|
isPreview: boolean
|
|
|
|
}): Promise<WhatsAppCredentials['data'] | undefined> => {
|
|
|
|
if (isPreview) {
|
|
|
|
if (
|
|
|
|
!env.META_SYSTEM_USER_TOKEN ||
|
|
|
|
!env.WHATSAPP_PREVIEW_FROM_PHONE_NUMBER_ID
|
|
|
|
)
|
|
|
|
return
|
2023-08-29 10:01:28 +02:00
|
|
|
return {
|
2023-09-22 11:08:41 +02:00
|
|
|
systemUserAccessToken: env.META_SYSTEM_USER_TOKEN,
|
|
|
|
phoneNumberId: env.WHATSAPP_PREVIEW_FROM_PHONE_NUMBER_ID,
|
2023-08-29 10:01:28 +02:00
|
|
|
}
|
|
|
|
}
|
2023-09-22 11:08:41 +02:00
|
|
|
|
|
|
|
if (!credentialsId) return
|
|
|
|
|
|
|
|
const credentials = await prisma.credentials.findUnique({
|
|
|
|
where: {
|
|
|
|
id: credentialsId,
|
|
|
|
},
|
|
|
|
select: {
|
|
|
|
data: true,
|
|
|
|
iv: true,
|
|
|
|
},
|
|
|
|
})
|
|
|
|
if (!credentials) return
|
|
|
|
const data = (await decrypt(
|
|
|
|
credentials.data,
|
|
|
|
credentials.iv
|
|
|
|
)) as WhatsAppCredentials['data']
|
|
|
|
return {
|
|
|
|
systemUserAccessToken: data.systemUserAccessToken,
|
|
|
|
phoneNumberId: data.phoneNumberId,
|
|
|
|
}
|
|
|
|
}
|
2024-06-26 10:13:38 +02:00
|
|
|
|
|
|
|
const aggregateParallelMediaMessagesIfRedisEnabled = async ({
|
|
|
|
receivedMessage,
|
|
|
|
existingSessionId,
|
|
|
|
newSessionId,
|
|
|
|
}: {
|
|
|
|
receivedMessage: WhatsAppIncomingMessage
|
|
|
|
existingSessionId?: string
|
|
|
|
newSessionId: string
|
|
|
|
}): Promise<{
|
|
|
|
isReplyingWasSet: boolean
|
|
|
|
incomingMessages: WhatsAppIncomingMessage[]
|
|
|
|
}> => {
|
|
|
|
if (redis && ['document', 'video', 'image'].includes(receivedMessage.type)) {
|
|
|
|
const redisKey = `wasession:${newSessionId}`
|
|
|
|
try {
|
|
|
|
const len = await redis.rpush(redisKey, JSON.stringify(receivedMessage))
|
|
|
|
|
|
|
|
if (len === 1) {
|
|
|
|
await setIsReplyingInChatSession({
|
|
|
|
existingSessionId,
|
|
|
|
newSessionId,
|
|
|
|
})
|
|
|
|
}
|
|
|
|
|
|
|
|
await new Promise((resolve) =>
|
|
|
|
setTimeout(resolve, incomingMessageDebounce)
|
|
|
|
)
|
|
|
|
|
|
|
|
const newMessagesResponse = await redis.lrange(redisKey, 0, -1)
|
|
|
|
|
|
|
|
if (!newMessagesResponse || newMessagesResponse.length > len) {
|
|
|
|
// Current message was aggregated with other messages another webhook handler. Skipping...
|
|
|
|
return { isReplyingWasSet: true, incomingMessages: [] }
|
|
|
|
}
|
|
|
|
|
|
|
|
redis.del(redisKey).then()
|
|
|
|
|
|
|
|
return {
|
|
|
|
isReplyingWasSet: true,
|
|
|
|
incomingMessages: newMessagesResponse.map((msgStr) =>
|
|
|
|
JSON.parse(msgStr)
|
|
|
|
),
|
|
|
|
}
|
|
|
|
} catch (error) {
|
|
|
|
console.error('Failed to process webhook event:', error, receivedMessage)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
return {
|
|
|
|
isReplyingWasSet: false,
|
|
|
|
incomingMessages: [receivedMessage],
|
|
|
|
}
|
|
|
|
}
|