Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[AsistantResponse] - to save streaming data to external database #1461

Open
ayepRahman opened this issue Apr 28, 2024 · 8 comments
Open

[AsistantResponse] - to save streaming data to external database #1461

ayepRahman opened this issue Apr 28, 2024 · 8 comments
Labels

Comments

@ayepRahman
Copy link

Feature Description

Save streaming thread message data to external DB

Use Case

Is there a way to save thread messages to external database after the completion of streaming via AssistantResponse like how OpenAiStream has a callback for completion?

Additional context

No response

@ravvi-kumar
Copy link

Hi @ayepRahman ,

you can use "sendMessage" function present on assistantresponsecallback to save the current response to your external database
https://sdk.vercel.ai/docs/api-reference/providers/assistant-response#process-assistantresponsecallback
image
as of now, the documentation on "sendMessage" is not updated yet.

and if you want to get the whole chat history then you can use the threadId to get all the messages
https://platform.openai.com/docs/api-reference/messages/listMessages

@ayepRahman
Copy link
Author

do u by any chance have any example on using sendMessage()?

@ravvi-kumar
Copy link

opps my bad, sendMessage is something else.
you can use finalMessages() method returned from stream to get the response result.
image

runStream.finalMessages().then((finalMessages) => {
    console.log("finalMessages", finalMessages);
});

hope this works for your usecase, let me know.

@mattp0123
Copy link

mattp0123 commented Apr 28, 2024

@ravvi-kumar do you know how to map the response of openai.beta.threads.messages.list to the useAssistant hook's messages properly?

const { setMessages } = useAssistant({
  // ...
})
useEffect(() => {
   if (props.messages) {
     const mapped = props.messages.map((message) => {
       // ...
     })
     setMessages(mapped)
   }
}, [props.message, setMessages])

@ElectricCodeGuy
Copy link

I just save the string that the model output at the very end of my action.tsx

 const result = await experimental_streamText({
      model: anthropic('claude-3-opus-20240229'),
      maxTokens: 4000,
      temperature: 0,
      frequencyPenalty: 0.5,
      system: systemPromptTemplate,
      messages: [
        ...aiState.get().map((info: any) => ({
          role: info.role,
          content: info.content,
          name: info.name
        }))
      ]
    });

    let fullResponse = '';
    for await (const textDelta of result.textStream) {
      fullResponse += textDelta;
      uiStream.update(<BotMessage>{fullResponse}</BotMessage>);
    }

    uiStream.done();
    aiState.done([
      ...aiState.get(),
      {
        role: 'assistant',
        content: fullResponse,
        id: uuidv4()
      }
    ]);
    saveChatToRedis(
      CurrentChatSessionId,
      session.id,
      currnetUserMessage,
      fullResponse,
      Array.from(uniqueReferences)
    );
  })();

  return {
    id: Date.now(),
    display: uiStream.value,
    chatId: CurrentChatSessionId
  };
}

I have not had any issues with saying them.

In an api route i used the

let partialCompletion = '';
  const { stream, handlers } = LangChainStream({
    onToken: (token: string) => {
      partialCompletion += token;
    },
    onFinal: (completion) => {
      try {
        saveChatToRedis(
          chatSessionId,
          userId,
          messages[messages.length - 1].content,
          partialCompletion,
          Array.from(uniqueReferences)
        );
        cacheChatCompletionVectorDB(
          messages[messages.length - 1].content,
          completion,
          'cache-chat',
          Array.from(uniqueReferences)
        );
      } catch (error) {
        console.error('Error saving chat to database:', error);
      }
      revalidatePath('/chatai', 'layout');
    }
  });

the partial on token is if the user stops the chat in teh middle of the streaming response so we always store the latest token in the database :)

Hope this helps

@ElectricCodeGuy
Copy link

@ravvi-kumar do you know how to map the response of openai.beta.threads.messages.list to the useAssistant hook's messages properly?

const { setMessages } = useAssistant({
  // ...
})
useEffect(() => {
   if (props.messages) {
     const mapped = props.messages.map((message) => {
       // ...
     })
     setMessages(mapped)
   }
}, [props.message, setMessages])

Something like this here:

for (
    let i = 0;
    i < Math.max(userMessages.length, assistantMessages.length);
    i++
  ) {
    if (userMessages[i]) {
      combinedMessages.push({
        role: 'user',
        id: `user-${i}`,
        content: userMessages[i]
      });
    }
    if (assistantMessages[i]) {
      combinedMessages.push({
        role: 'assistant',
        id: `assistant-${i}`,
        content: assistantMessages[i]
      });
    }
  }

@ayepRahman
Copy link
Author

opps my bad, sendMessage is something else. you can use finalMessages() method returned from stream to get the response result. image

runStream.finalMessages().then((finalMessages) => {
    console.log("finalMessages", finalMessages);
});

hope this works for your usecase, let me know.

Yes, it works, but at the same time, I was wondering. How do I convert the messages coming openai into the same format as forwardStream() does properly?

@oliviermills
Copy link

oliviermills commented Apr 29, 2024

the LangChainStream from Vercel SDK does not work (still today) for langchain's llm.stream() ... for your api route, you can create your own LangChainStream to work with it like this:

This is what LangChainStreamCustom() looks like. It only has the onCompletion but you get the jist and can implement the other callbacks like onToken etc.. if you need it.

export const LangChainStreamCustom= (
  stream: any,
  { onCompletion }: { onCompletion: (completion: string) => Promise<void> }
) => {
  let completion = ''
  const transformStream = new TransformStream({
    transform(chunk, controller) {
      completion += new TextDecoder('utf-8').decode(chunk)
      controller.enqueue(chunk)
    },
    flush(controller) {
      onCompletion(completion)
        .then(() => {
          controller.terminate()
        })
        .catch((e: any) => {
          console.error('Error', e)
          controller.terminate()
        })
    }
  })

  stream.pipeThrough(transformStream)

  return transformStream.readable
}

Then in your api route.. you get the stream or response from the LangChain call .stream()

//....
response = llm.stream({}) // assuming this comes from a typical LangChain

Then you can use it like all the other Vercel "OpenAIStream, AnthropicStream" .. etc.

const stream = LangChainStreamCustom(response, {
    onCompletion: async (completion: string) => {
      console.log('COMPLETE!', completion)
    }
})

return new StreamingTextResponse(stream)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

6 participants