Building a Real-time Chat Interface with Phoenix LiveView and OpenAI
Building a Real-time Chat Interface with Phoenix LiveView and OpenAI
In this comprehensive guide, I’ll walk you through building a real-time chat application using Phoenix LiveView and OpenAI’s API. We’ll focus on implementing streaming responses, async processing, and error handling to create a smooth user experience.
Why Phoenix LiveView for Chat?
Phoenix LiveView is an excellent choice for building real-time chat applications because it provides:
- Real-time updates without complex WebSocket management
- Efficient server-side rendering
- Minimal JavaScript footprint
- Built-in streaming support
- Robust error handling and state management
Core Implementation
Let’s break down the implementation into key components:
1. LiveView Setup
First, we set up our LiveView module with necessary assigns:
defmodule ChatLive do
  use Phoenix.LiveView
  @timeout 30_000  # 30 second timeout for responses
  @retry_attempts 3
  def mount(_params, _session, socket) do
    socket =
      socket
      |> assign(:message, "")
      |> assign(:loading, false)
      |> assign(:error, nil)
      |> assign(:streaming_message, nil)
      |> assign(:messages, [])
      |> assign(:message_queue, :queue.new())
      |> assign(:processing, false)
      |> stream(:messages, [])
    {:ok, socket}
  end
end2. Message Processing with Async Tasks
Following the pattern described in LiveView Async Task, we implement async processing using LiveView’s start_async:
def handle_event("submit", %{"message" => message}, socket) when message != "" do
  if can_submit?(socket) do
    user_message = Chat.create_message("user", message)
    assistant_message = Chat.create_message("assistant", "")
    previous_messages = socket.assigns.messages ++ [user_message]
    socket =
      socket
      |> stream_insert(:messages, user_message)
      |> stream_insert(:messages, assistant_message)
      |> assign(:loading, true)
      |> assign(:messages, previous_messages)
      |> assign(:message, "")
      |> assign(:streaming_message, assistant_message)
      |> assign(:processing, true)
      |> start_async(:process_message, fn ->
        Chat.process_message(message, previous_messages)
      end)
    Process.send_after(self(), {:timeout, message}, @timeout)
    {:noreply, socket}
  else
    {:noreply, socket}
  end
end3. Handling Async Results
We handle the async results using handle_async callbacks:
def handle_async(:process_message, {:ok, {:ok, _user_msg, response}}, socket) do
  ChatAPI.stream_message(response, fn
    {:chunk, content} -> send(self(), {:stream_chunk, content})
    {:done} -> send(self(), :stream_complete)
    {:error, error} -> send(self(), {:stream_error, error, @retry_attempts - 1, socket.assigns.message})
  end)
  {:noreply, socket}
end
def handle_async(:process_message, {:ok, {:error, error}}, socket) do
  {:noreply,
   socket
   |> assign(:loading, false)
   |> assign(:processing, false)
   |> assign(:streaming_message, nil)
   |> assign(:error, error)
   |> maybe_process_next_message()}
end4. Streaming Response Handling
The streaming implementation follows a pattern where we:
- Start streaming in the async handler
- Process chunks as they arrive
- Update the UI in real-time
def handle_info({:stream_chunk, content}, socket) do
  if socket.assigns.streaming_message do
    updated_message = Chat.update_message(socket.assigns.streaming_message, content)
    {:noreply,
     socket
     |> stream_insert(:messages, updated_message, at: -1)
     |> assign(:streaming_message, updated_message)
     |> assign(:messages, update_messages(socket.assigns.messages, updated_message))
     |> push_event("scroll", %{to: "bottom"})}
  else
    {:noreply, socket}
  end
end
def handle_info(:stream_complete, socket) do
  {:noreply,
   socket
   |> assign(:loading, false)
   |> assign(:processing, false)
   |> assign(:streaming_message, nil)
   |> maybe_process_next_message()
   |> push_event("scroll", %{to: "bottom"})}
end5. Error Handling and Recovery
We implement a comprehensive error handling system with retries:
def handle_info({:stream_error, error, attempts, message}, socket) do
  if attempts > 0 do
    Process.send_after(self(), {:retry, message, attempts}, 1000)
    {:noreply, socket}
  else
    {:noreply,
     socket
     |> assign(:loading, false)
     |> assign(:processing, false)
     |> assign(:streaming_message, nil)
     |> assign(:error, error)
     |> maybe_process_next_message()}
  end
end6. Message Queue Management
To handle multiple messages reliably:
defp queue_message(socket, message) do
  update(socket, :message_queue, &:queue.in({message, @retry_attempts}, &1))
end
defp maybe_process_next_message(%{assigns: %{processing: true}} = socket), do: socket
defp maybe_process_next_message(%{assigns: %{message_queue: queue}} = socket) do
  case :queue.out(queue) do
    {{:value, {message, attempts}}, new_queue} ->
      process_message_with_retry(socket, message, attempts, new_queue)
    {:empty, _} ->
      socket
  end
endKey Features and Benefits
- Real-time Updates: Messages appear instantly with typing indicators
- Streaming Responses: OpenAI responses stream in real-time
- Error Resilience: Automatic retries and graceful error handling
- Queue Management: Ordered processing of multiple messages
- State Management: Clean handling of loading and processing states
Best Practices
- 
    Extract Messages Before Async # Before async operation previous_messages = socket.assigns.messages ++ [user_message] # Use in async function start_async(:process_message, fn -> Chat.process_message(message, previous_messages) end)
- 
    Clear States at Right Time # Only clear states after streaming is complete def handle_info(:stream_complete, socket) do {:noreply, socket |> assign(:loading, false) |> assign(:processing, false) |> assign(:streaming_message, nil)} end
- 
    Handle Timeouts # Set timeout when starting async operation Process.send_after(self(), {:timeout, message}, @timeout)
References
- LiveView Async Task - Great overview of async processing in LiveView
- Phoenix LiveView Documentation
- OpenAI Streaming Best Practices
What’s Next?
Consider these enhancements for your chat application:
- [ ] Implement rate limiting for API calls
- [ ] Add conversation persistence with Ecto
- [ ] Include user authentication
- [ ] Add typing indicators between messages
- [ ] Implement message reactions
The complete code for this implementation is available in my GitHub repository.
For more Elixir deployment tips, check out my guide on deploying Phoenix apps to Fly.io.
Happy coding! 🚀
