Overview

On August 30th 2024 Brazil blocked Twitter. Because of this Bluesky had an influx of new users. This gave me an idea to see how much this resulted in the product's growth.

While looking at their API documentation I saw that there was a firehose. This being an event stream that users can read off of, after seeing this I wanted to create something that would read new posts events off the stream. I also have been wanting to do something in htmx, I figured this would be an easy way to introduce myself to do it.

Getting Started

Since I'm wanting to get better at Elixir all my side projects are in it. I got started by creating a new project with the code name icky_venus:

mix new icky_venus --sup

I knew this was going to be a website given that I wanted to use htmx. I also figured that I would not bother with Phoenix as I don't think it makes much sense to mix htmx and Liveview. Because of that I went with only using Plugs and Bandit to serve.

However before I do any of that I want to first try and get data from Bluesky.

Connecting and Reading from Bluesky Firehose

I started writing an application for that which would read from the firehose and publish out web socket events for every created post event sent over the hose.

lib/blue_sky/application.ex
defmodule BlueSky.Application do
  use Application

  @impl true
  def start(_type, _args) do
    children = [
      Registry.child_spec(
        keys: :duplicate,
        name: BlueSky.EventRegistry.PostCreated
      ),
      BlueSky.StreamReader,
      BlueSky.PostCreatedServer
    ]

    opts = [strategy: :one_for_one, name: BlueSky.Supervisor]
    Supervisor.start_link(children, opts)
  end
end

Given that this is a simple application there isn't much going on here. We want a few things:

  • Registry to store metrics
  • A process to read from the firehose
  • A GenServer for inter-process commination (IPC)

We don't have to do anything for the registry as that can use the generic APIs already provided. That leaves us needing to create the workers for reading from the firehose and an API for processes to communicate.

lib/blue_sky/stream_reader.ex (firehose)
defmodule BlueSky.StreamReader do
  use WebSockex

  def start_link(state \\ []) do
    WebSockex.start_link(
      "wss://bsky.network/xrpc/com.atproto.sync.subscribeRepos",
      __MODULE__,
      state
    )
  end

  def handle_frame({_, msg}, state) do
    {:ok, result, payload_raw} = CBOR.decode(msg)

    if Map.get(result, "t", "") == "#commit" do
      {:ok, payload, _} = CBOR.decode(payload_raw)
      operations = Map.get(payload, "ops", [])

      if Enum.any?(operations, fn x ->
          Map.get(x, "action", "") == "create" &&
            String.starts_with?(Map.get(x, "path", ""), "app.bsky.feed.post")
        end) do
        Registry.dispatch(BlueSky.EventRegistry.PostCreated, :post_created, fn entries ->
          for {pid, _} <- entries, do: send(pid, :broadcast)
        end)
      end
    end

    {:ok, state}
  end
end

Few things to note. This uses WebSockex to connect to Bluesky. Additionally these payloads sent over the stream are Concise Binary Object Representation (CBOR) encoded. To decode these payloads I used the library cbor.

GenServer
# lib/blue_sky/post_created_client.ex
defmodule BlueSky.PostCreatedClient do
  def get_totals() do
    GenServer.call(BlueSky.PostCreatedWriter, :get_totals)
  end
end
# lib/blue_sky/post_created_server.ex
defmodule BlueSky.PostCreatedServer do
  use GenServer

  def start_link(_) do
    GenServer.start_link(__MODULE__, %{}, name: __MODULE__)
  end

  def init(_) do
    Process.send_after(self(), :register_event, 1000)
    {:ok, %{}}
  end

  def handle_call(:get_totals, _from, state) do
    {:reply, state, state}
  end

  def handle_info(:register_event, state) do
    Registry.register(BlueSky.EventRegistry.PostCreated, :post_created, %{})
    {:noreply, state}
  end

  def handle_info(:broadcast, state) do
    count = Map.get(state, Date.to_string(Date.utc_today()), 0) + 1
    {:noreply, Map.put(state, Date.to_string(Date.utc_today()), count)}
  end

  def handle_info({:EXIT, _pid, _reason}, state) do
    {:stop, :normal, state}
  end

  def terminate(_reason, state) do
    {:ok, state}
  end
end

The structure of what is stored in the state includes the date as the key, this can then be used to the effect of being a bucket. Allowing for a time series of posts per day to be displayed.

This can be tested by running the application under blue_sky. I don't know of a good way to do this in umbrella structure projects besides updating the mix.exs's application/0 as follows:

def application do
  [
    extra_applications: [:logger],
    mod: {BlueSky.Application, []}
    # mod: {IckyVenus.Application, []}
  ]
end

Then afterwards running the following command will start up that application into a shell

iex -S mix

Once running the BlueSky.PostCreatedClient.get_totals() can be invoked to verify the stream is being read and metrics are adding up.

➜  icky_venus git:(master) ✗ iex -S mix
Erlang/OTP 27 [erts-15.1.1] [source] [64-bit] [smp:8:8] [ds:8:8:10] [async-threads:1] [jit] [dtrace]

Interactive Elixir (1.17.3) - press Ctrl+C to exit (type h() ENTER for help)
iex(1)> BlueSky.PostCreatedClient.get_totals()
%{"2024-09-09" => 1948}
iex(2)> 

Serving PostCreated Metrics

With metrics being aggregated I now want a way for users to view them. This is where htmx and Bandit come in. I'll start by creating the main application's entry point

lib/icky_venus/application.ex
defmodule IckyVenus.Application do
  use Application
  require Logger

  def start(_type, _args) do
    children = [
      {Bandit, scheme: :http, plug: IckyVenus.Router, port: server_port()},
      Registry.child_spec(
        keys: :duplicate,
        name: BlueSky.EventRegistry.PostCreated
      ),
      BlueSky.StreamReader,
      BlueSky.PostCreatedServer
    ]

    opts = [strategy: :one_for_one, name: IckyVenus.Supervisor]

    Logger.info("Starting application...")

    Supervisor.start_link(children, opts)
  end

  defp server_port, do: Application.get_env(:icky_venus, :server_port, 4000)
end

Given that this is a web server and we are using Plugs a router will also need to be defined and specify endpoints

lib/icky_venus/router.ex
defmodule IckyVenus.Router do
  import EEx
  use Plug.Router
  use Plug.ErrorHandler

  plug(:match)
  plug(:dispatch)

  get "/" do
    conn
    |> put_resp_content_type("text/html; charset=utf-8")
    |> send_resp(
      200,
      IckyVenus.HtmlRenderer.render_html("lib/icky_venus/index.html.heex", %{
        total_content:
          eval_file(
            "lib/icky_venus/totals.html.heex",
            assigns: %{totals: BlueSky.PostCreatedClient.get_totals()}
          )
      })
    )
  end

  get "/events/post-created/totals" do
    conn
    |> put_resp_content_type("text/html; charset=utf-8")
    |> send_resp(
      200,
      eval_file(
        "lib/icky_venus/totals.html.heex",
        assigns: %{totals: BlueSky.PostCreatedClient.get_totals()}
      )
    )
  end

  get "/events/post-created/stream" do
    conn
    |> WebSockAdapter.upgrade(IckyVenus.WebSocketServer.PostCreatedServer, [], timeout: 60_000)
    |> halt()
  end

  get "/favicon.ico" do
    conn
    |> put_resp_content_type("image/x-icon")
    |> send_file(200, "priv/static/favicon.ico")
  end

  get "/robots.txt" do
    conn
    |> put_resp_content_type("text/plain; charset=utf-8")
    |> send_file(200, "priv/static/robots.txt")
  end

  match _ do
    send_resp(conn, 404, "Not found")
  end

  @impl Plug.ErrorHandler
  def handle_errors(conn, %{kind: kind, reason: reason, stack: stack}) do
    IO.inspect(kind, label: :kind)
    IO.inspect(reason, label: :reason)
    IO.inspect(stack, label: :stack)
    send_resp(conn, conn.status, "Encountered an error that cannot be handled")
  end
end

Two things are being done here:

lib/icky_venus/web_socket_server/post_created_server.ex
defmodule IckyVenus.WebSocketServer.PostCreatedServer do
  def init(_) do
    Process.send_after(self(), :register_event, 1000)
    {:ok, 0}
  end

  def handle_in(_, state) do
    {:ok, state}
  end

  def handle_info(:broadcast, state) do
    count = state + 1

    {:reply, :ok,
    {:text, "<span id=\"count\">#{Number.Delimit.number_to_delimited(count)}</span>"}, count}
  end

  def handle_info(:register_event, state) do
    Registry.register(BlueSky.EventRegistry.PostCreated, :post_created, %{})
    {:ok, state}
  end

  def handle_info({:EXIT, _pid, _reason}, state) do
    {:stop, :normal, state}
  end

  def terminate(_reason, state) do
    {:ok, state}
  end
end

The final part is to create the HTML that is served

lib/icky_venus/root.html.eex
<!DOCTYPE html>
<html lang="en">

<head>
  <meta charset="UTF-8">
  <meta name="viewport" content="width=device-width, initial-scale=1" />
  <title><%= Application.get_env(:icky_venus, :product_name) %></title>
  <script src="https://unpkg.com/[email protected]" integrity="sha384-ujb1lZYygJmzgSwoxRggbCHcjc0rB2XoQrxeTUQyRjrOnlCoYta87iKBWq3EsdM2" crossorigin="anonymous"></script>
  <script src="https://unpkg.com/[email protected]/dist/ext/ws.js"></script>
  <script src="https://cdn.tailwindcss.com"></script>
  <script>
    tailwind.config = {
      theme: {
        extend: {
          colors: {
            brand: "#800080",
            primary: colors.purple,
            secondary: colors.emerald,
            neutral: colors.gray
          }
        },
      }
    }
  </script>
</head>

<body class="bg-white antialiased">
  <header>
    <div class="flex items-center justify-between border-b border-secondary-100 py-3">
      <div class="flex items-center gap-4">
        <a href="/" class="rounded-full bg-primary/5 px-2 font-medium leading-6 text-brand">
          <%= Application.get_env(:icky_venus, :product_name) %>
        </a>
      </div>
      <div class="flex items-center gap-4 text-[0.7525rem] font-semibold leading-6 text-secondary-800 active:text-secondary-800/70">
        <a
          href="https://github.com/jflowaa/icky_venus"
          class="rounded-lg bg-primary-100 px-2 py-1 text-[0.8125rem] hover:bg-secondary-200/80"
        >
          Source Code <span aria-hidden="true">&rarr;</span>
        </a>
      </div>
    </div>
  </header>
  <main class="px-4 py-5 sm:px-6 lg:px-8">
    <div class="mx-auto w-fit">
      <%= @content %>
    </div>
  </main>
</body>
</html>
lib/icky_venus/index.html.eex
<div class="px-4 sm:py-28 sm:px-6 lg:px-8 xl:py-32 xl:px-28">
  <div class="mx-auto max-w-xl lg:mx-0">
    <p class="mt-4 text-xl font-semibold leading-10 tracking-tighter text-primary-900">
      Bluesky Metrics
    </p>
    <p class="mt-4 text-base leading-7 text-neutral-600">
      Metrics on the number of posts created since page load and the total number of posts created by date.
    </p>
    <div class="flex">
      <div class="w-full sm:w-auto">
        <div class="mt-10">
          <div hx-ext="ws" ws-connect="/events/post-created/stream">
            <p>
              Since page load: <span id="count">0</span>
            </p>
          </div>
          <div id="totals" hx-get="/events/post-created/totals" hx-trigger="every 30s">
            <%= @total_content %>
          </div>
        </div>
      </div>
    </div>
  </div>
</div>
lib/icky_venus/totals.html.eex
<div>
  <p>By date:</p>
  <%= for {k, v} <- @totals do %>
    <p><%= k %>: <%= Number.Delimit.number_to_delimited(v) %></p>
  <% end %>
</div>
lib/icky_venus/html_render.ex
defmodule IckyVenus.HtmlRenderer do
  import EEx

  def render_html(html_path, assigns \\ []) do
    content = eval_file(html_path, assigns: assigns)

    eval_file("lib/icky_venus/root.html.eex", assigns: [content: content])
  end
end

At this point the server is complete and the page should have soft real-time updating via htmx.

The htmx part can be easily missed, here's a call out of the setup

<!-- lib/icky_venus/root.html.eex -->
<script src="https://unpkg.com/[email protected]" integrity="sha384-ujb1lZYygJmzgSwoxRggbCHcjc0rB2XoQrxeTUQyRjrOnlCoYta87iKBWq3EsdM2" crossorigin="anonymous"></script>
<script src="https://unpkg.com/[email protected]/dist/ext/ws.js"></script>

Since web sockets are being used htmx's ws.js extension is needed.

The HTML to hook into the web socket connection

<!-- lib/icky_venus/index.html.eex -->
<div hx-ext="ws" ws-connect="/events/post-created/stream">
  <p>
    Since page load: <span id="count">0</span>
  </p>
</div>
<div id="totals" hx-get="/events/post-created/totals" hx-trigger="every 30s">
  <%= @total_content %>
</div>

This opens a connection on the /events/post-created/stream endpoint, which is the web socket server found in lib/icky_venus/web_socket_server/post_created_server.ex.

Which sends an updated total on the web socket connection for every :broadcast handle_info. The :broadcast happen for every new event read on the Bluesky firehose. This is configured in lib/blue_sky/stream_reader.ex

def handle_frame({_, msg}, state) do
  {:ok, result, payload_raw} = CBOR.decode(msg)

  if Map.get(result, "t", "") == "#commit" do
    {:ok, payload, _} = CBOR.decode(payload_raw)
    operations = Map.get(payload, "ops", [])

    if Enum.any?(operations, fn x ->
          Map.get(x, "action", "") == "create" &&
            String.starts_with?(Map.get(x, "path", ""), "app.bsky.feed.post")
        end) do
      Registry.dispatch(BlueSky.EventRegistry.PostCreated, :post_created, fn entries ->
        for {pid, _} <- entries, do: send(pid, :broadcast)
      end)
    end
  end

  {:ok, state}
end

After updating the mix.exs application/0 to start icky_venus' application:

def application do
  [
    extra_applications: [:logger],
    # mod: {BlueSky.Application, []}
    mod: {IckyVenus.Application, []}
  ]
end

Running mix run --no-halt will start the server at http://localhost:4000/. All of the post created metrics are stored in memory. So when the application stops, the accumulated counts is lost.

The site is currently hosted at https://icky-venus.jack-develops.com/.