How to use Server-Sent Events with FastAPI, React, and Langgraph

Let's start with a simple challenge. Can you spot the difference between the two demos below?

Comparing streaming vs non-streaming in AI data analysis and visualization tool DataLine

While it may look like one is faster than the other (and it is not), this is none other than the illusion of streaming. For apps where long waits are inevitable, for e.g. due to AI nowadays, it's a great way to keep your users hooked by giving them the final result in bits and pieces. The trick is to immediately show any intermediate output instead of waiting for the entire process to finish.

To better understand this, let's implement streaming in a straightforward manner using tools that need no introduction: FastAPI, React, and Server-Sent Events—okay, this last one might actually need an intro.

What is SSE?

Server-Sent Events are a way for servers to push updates to a web page over a single, long-lived HTTP connection. Think of it like a one-way street where the server sends new information to the browser whenever there's an update, without the browser having to keep asking for it. This is perfect for things like live sports scores, live news feeds, or any other real-time updates. It's simple to use and integrates well with many web technologies.

Why SSE?

Using SSE is less complex compared to WebSockets, making it easier to implement and maintain. It is ideal for scenarios where the server needs to push updates to the client without requiring a response since it is unidirectional by nature (Server -> Client only).

SSE works by sending strings that start with a field name, usually event: or data:, then the data for that field, followed by a new line. Two new lines mark the end of a chunk.

Strings and new lines – really? Guess if it ain't broken...

Here's an example:

# chunk 1:
event: my_custom_event_name
data: 30


# chunk 2:
event: another_custom_event_name
data: {'answer': 42}

Notice that the spacing between chunk 1 and chunk 2 isn't to make it visually appealing, those new lines are actually what separate the events apart! For those interested in the nitty-gritty details, this MDN documentation offers a deeper dive.

Streaming with FastAPI and React

We'll start with a toy example to get familiar with the concept. FastAPI makes streaming responses straightforward with its StreamingResponse class. Here's a simple example:

 from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import time app = FastAPI() def event_stream(): for i in range(10): event_str = "event: stream_event" data_str = f"data: Message {i}" yield f"{event_str}\n{data_str}\n\n" time.sleep(1) @app.get("/stream")
async def stream(): return StreamingResponse(event_stream(), media_type="text/event-stream")

The snippet above exposes an endpoint that returns streaming responses from FastAPI.

On the frontend, we'll be streaming results with React. We'll make use of Microsoft's fetch-event-source library because the native EventSource API doesn't support POST with payloads. Here's a quick setup:

// StreamComponent.jsx

import React, { useEffect, useState } from "react";
import { fetchEventSource } from "@microsoft/fetch-event-source";

function StreamComponent() {
  const [messages, setMessages] = useState([]);

  useEffect(() => {
    const fetchData = async () => {
      await fetchEventSource("/stream", {
        onmessage(ev) {
          console.log(`Received event: ${ev.event}`); // for debugging purposes
          setMessages((prev) => [...prev, ev.data]);
        },
      });
    };
    fetchData();
  }, []);

  return (
    <div>
      {messages.map((msg, index) => (
        <div key={index}>{msg}</div>
      ))}
    </div>
  );
}

export default StreamComponent;

The component above consumes the streaming endpoint and displays new messages as they arrive.

Streaming with Langgraph and Langchain

Let's add some AI into the mix.

We won't go into the details of compiling a Langgraph Graph object, as this isn't the focus of the article and we're keeping things simple:

 import operator
from typing import Annotated, Sequence from langgraph.graph import END, StateGraph
from langchain_openai import ChatOpenAI
from langchain_core.messages import BaseMessage, SystemMessage, HumanMessage
from langchain_core.pydantic_v1 import BaseModel class MyAppState(BaseModel): messages: Annotated[Sequence[BaseMessage], operator.add] graph_builder = StateGraph(MyAppState)
model = ChatOpenAI(model="gpt-4o", temperature=0) def brainstorm(state: MyAppState): brainstorming_prompt = ( "You are a helpful assistant. Brainstorm one idea given the user's input. " "Your idea will be turned into a blog by an AI system, keep it short." ) user_message = state.messages[-1] response = model.invoke([SystemMessage(content=brainstorming_prompt), HumanMessage(content=user_message.content)]) return {"messages": [response]} def write_outline(state: MyAppState): outlining_prompt = ( "Write an outline for a short article given a prompt by the user. A brainstorming AI was used to elaborate" ) response = model.invoke([SystemMessage(content=outlining_prompt), *state.messages]) return {"messages": [response]} def write_article(state: MyAppState): writing_prompt = "Write a short article following an outline of the user's prompt." response = model.invoke([SystemMessage(content=writing_prompt), *state.messages]) return {"messages": [response]} graph_builder.add_node("brainstorm", brainstorm)
graph_builder.add_node("write_outline", write_outline)
graph_builder.add_node("write_article", write_article) graph_builder.add_edge("brainstorm", "write_outline")
graph_builder.add_edge("write_outline", "write_article")
graph_builder.add_edge("write_article", END) graph_builder.set_entry_point("brainstorm") graph = graph_builder.compile()

The code above is a 3-step article generator pipeline. Given a word or sentence from the user:

  • Brainstorm article ideas
  • Write an outline for the article
  • Write the article

Then, all we need to do is to yield the outputs of graph.stream(), and the rest of the code can be left unchanged:

 from typing import Annotated from fastapi import FastAPI, Body
from fastapi.responses import StreamingResponse
from langchain_core.messages import HumanMessage, ToolMessage
from myapp.llm_flow import graph app = FastAPI() def event_stream(query: str): initial_state = {"messages": [HumanMessage(content=query)]} for output in graph.stream(initial_state): for node_name, node_results in chunk.items(): chunk_messages = node_results.get("messages", []) for message in chunk_messages: if not message.content: continue if isinstance(message, ToolMessage): event_str = "event: tool_event" else: event_str = "event: ai_event" data_str = f"data: {message.content}" yield f"{event_str}\n{data_str}\n\n" @app.post("/stream")
async def stream(query: Annotated[str, Body(embed=True)]): return StreamingResponse(event_stream(query), media_type="text/event-stream")

And on the frontend, modify the React component:

// StreamComponent.jsx

function StreamComponent(query) {
  ...
  useEffect(() => {
    const fetchData = async () => {
      await fetchEventSource("/stream", {
        headers: {
          "Content-Type": "application/json",
        },
        method: "POST",
        body: JSON.stringify({ query }),
        
        onmessage(ev) {
          ... // unchanged
        },
      });
    };
    fetchData();
  }, []);
  ...
}

And that's it! The component will send a POST request with the given query, and the backend will stream the messages generated by Langgraph as they come in.

What about error handling?

Sometimes s**t happens when you're executing complex logic. Streaming does make error handling a little more complicated. That's cause the built-in Starlette StreamingResponse classes aren't built to handle all kinds of errors that come out of your code.

But throwing a generic error message on the frontend is not desirable in production-grade applications - we need to communicate what's happening to our users to provide the best experience.

Let's say you're building an OpenAI application and your user's token runs out of credits. You'll want to forward that error to the user so that they know to resolve it on their own instead of sending you emails frustrated and complaining, requiring you to dig into logs only to find the same issue you could've communicated automatically! (Sounds very specific right?..) So let's build something custom.

One solution, that also happens to be the simplest, is to just wrap your generator with another generator that captures these exceptions! Then you can package that exception into a nice, well-structured event and send it on its way.

Let's add the error handling logic for OpenAI rate limit errors:

 from typing import AsyncGenerator, cast
from openai import RateLimitError async def stream_with_errors(generator: AsyncGenerator[str, None]) -> AsyncGenerator[str, None]: try: async for chunk in generator: yield chunk except RateLimitError as e: body = cast(dict, e.body) error_msg = body.get("message", "OpenAI API rate limit exceeded") yield f"event: error_event\ndata: {error_msg}\n\n" except Exception as e: error_msg = "An error occurred and our developers were notified" yield f"event: error_event\ndata: {error_msg}\n\n"

Now we only have to make a very small change on the backend endpoint:


...
from myapp.utils import stream_with_errors ... @app.post("/stream")
async def stream(query: Annotated[str, Body(embed=True)]): return StreamingResponse(stream_with_errors(event_stream(query)), media_type="text/event-stream")

And on the frontend we handle this new error event as follows:

// StreamComponent.jsx

function StreamComponent(query) {
  ...
  useEffect(() => {
    const fetchData = async () => {
      await fetchEventSource("/stream", {
        ...
        onmessage(ev) {
          console.log(`Received event: ${ev.event}`); // for debugging purposes
          if (ev.event === "error_event") {
            alert(`An error occured: ${ev.data}`);
          } else {
            setMessages((prev) => [...prev, ev.data]);
          }
        },
      });
    };
    fetchData();
  }, []);
  ...
}

Voila 🎉! Exceptions handled like a pro. Users understand what went wrong, and in this case, they can fix it themselves!

Want more code? Here's a production app that does this.

The open-source project Dataline integrates streaming results into the React frontend. Previously, users had to wait for the entire Langgraph execution to finish before seeing any results, which was... painful to say the least. Now, you can see results in real time, it’s a real game-changer. You can try it out here for free.

We hope you enjoyed this read, give us a sub if you like topics like this!

Главная - Вики-сайт
Copyright © 2011-2024 iteam. Current version is 2.139.0. UTC+08:00, 2024-12-25 16:16
浙ICP备14020137号-1 $Гость$