Overview
On August 30th 2024 Brazil blocked Twitter. Because of this Bluesky had an influx of new users. This gave me an idea to see how much this resulted in the product's growth.
While looking at their API documentation I saw that there was a firehose. This being an event stream that users can read off of, after seeing this I wanted to create something that would read new posts events off the stream. I also have been wanting to do something in htmx, I figured this would be an easy way to introduce myself to do it.
Getting Started
Since I'm wanting to get better at Elixir all my side projects are in it. I got started by creating a new project with the code name icky_venus
:
mix new icky_venus --sup
I knew this was going to be a website given that I wanted to use htmx. I also figured that I would not bother with Phoenix as I don't think it makes much sense to mix htmx and Liveview. Because of that I went with only using Plugs and Bandit to serve.
However before I do any of that I want to first try and get data from Bluesky.
Connecting and Reading from Bluesky Firehose
I started writing an application for that which would read from the firehose and publish out web socket events for every created post event sent over the hose.
lib/blue_sky/application.ex
defmodule BlueSky.Application do
use Application
@impl true
def start(_type, _args) do
children = [
Registry.child_spec(
keys: :duplicate,
name: BlueSky.EventRegistry.PostCreated
),
BlueSky.StreamReader,
BlueSky.PostCreatedServer
]
opts = [strategy: :one_for_one, name: BlueSky.Supervisor]
Supervisor.start_link(children, opts)
end
end
Given that this is a simple application there isn't much going on here. We want a few things:
- Registry to store metrics
- A process to read from the firehose
- A GenServer for inter-process commination (IPC)
We don't have to do anything for the registry as that can use the generic APIs already provided. That leaves us needing to create the workers for reading from the firehose and an API for processes to communicate.
lib/blue_sky/stream_reader.ex (firehose)
defmodule BlueSky.StreamReader do
use WebSockex
def start_link(state \\ []) do
WebSockex.start_link(
"wss://bsky.network/xrpc/com.atproto.sync.subscribeRepos",
__MODULE__,
state
)
end
def handle_frame({_, msg}, state) do
{:ok, result, payload_raw} = CBOR.decode(msg)
if Map.get(result, "t", "") == "#commit" do
{:ok, payload, _} = CBOR.decode(payload_raw)
operations = Map.get(payload, "ops", [])
if Enum.any?(operations, fn x ->
Map.get(x, "action", "") == "create" &&
String.starts_with?(Map.get(x, "path", ""), "app.bsky.feed.post")
end) do
Registry.dispatch(BlueSky.EventRegistry.PostCreated, :post_created, fn entries ->
for {pid, _} <- entries, do: send(pid, :broadcast)
end)
end
end
{:ok, state}
end
end
Few things to note. This uses WebSockex to connect to Bluesky. Additionally these payloads sent over the stream are Concise Binary Object Representation (CBOR) encoded. To decode these payloads I used the library cbor.
GenServer
# lib/blue_sky/post_created_client.ex
defmodule BlueSky.PostCreatedClient do
def get_totals() do
GenServer.call(BlueSky.PostCreatedWriter, :get_totals)
end
end
# lib/blue_sky/post_created_server.ex
defmodule BlueSky.PostCreatedServer do
use GenServer
def start_link(_) do
GenServer.start_link(__MODULE__, %{}, name: __MODULE__)
end
def init(_) do
Process.send_after(self(), :register_event, 1000)
{:ok, %{}}
end
def handle_call(:get_totals, _from, state) do
{:reply, state, state}
end
def handle_info(:register_event, state) do
Registry.register(BlueSky.EventRegistry.PostCreated, :post_created, %{})
{:noreply, state}
end
def handle_info(:broadcast, state) do
count = Map.get(state, Date.to_string(Date.utc_today()), 0) + 1
{:noreply, Map.put(state, Date.to_string(Date.utc_today()), count)}
end
def handle_info({:EXIT, _pid, _reason}, state) do
{:stop, :normal, state}
end
def terminate(_reason, state) do
{:ok, state}
end
end
The structure of what is stored in the state includes the date as the key, this can then be used to the effect of being a bucket. Allowing for a time series of posts per day to be displayed.
This can be tested by running the application under blue_sky
. I don't know of a good way to do this in umbrella structure projects besides updating the mix.exs
's application/0
as follows:
def application do
[
extra_applications: [:logger],
mod: {BlueSky.Application, []}
# mod: {IckyVenus.Application, []}
]
end
Then afterwards running the following command will start up that application into a shell
iex -S mix
Once running the BlueSky.PostCreatedClient.get_totals()
can be invoked to verify the stream is being read and metrics are adding up.
➜ icky_venus git:(master) ✗ iex -S mix
Erlang/OTP 27 [erts-15.1.1] [source] [64-bit] [smp:8:8] [ds:8:8:10] [async-threads:1] [jit] [dtrace]
Interactive Elixir (1.17.3) - press Ctrl+C to exit (type h() ENTER for help)
iex(1)> BlueSky.PostCreatedClient.get_totals()
%{"2024-09-09" => 1948}
iex(2)>
Serving PostCreated Metrics
With metrics being aggregated I now want a way for users to view them. This is where htmx and Bandit come in. I'll start by creating the main application's entry point
lib/icky_venus/application.ex
defmodule IckyVenus.Application do
use Application
require Logger
def start(_type, _args) do
children = [
{Bandit, scheme: :http, plug: IckyVenus.Router, port: server_port()},
Registry.child_spec(
keys: :duplicate,
name: BlueSky.EventRegistry.PostCreated
),
BlueSky.StreamReader,
BlueSky.PostCreatedServer
]
opts = [strategy: :one_for_one, name: IckyVenus.Supervisor]
Logger.info("Starting application...")
Supervisor.start_link(children, opts)
end
defp server_port, do: Application.get_env(:icky_venus, :server_port, 4000)
end
Given that this is a web server and we are using Plugs a router will also need to be defined and specify endpoints
lib/icky_venus/router.ex
defmodule IckyVenus.Router do
import EEx
use Plug.Router
use Plug.ErrorHandler
plug(:match)
plug(:dispatch)
get "/" do
conn
|> put_resp_content_type("text/html; charset=utf-8")
|> send_resp(
200,
IckyVenus.HtmlRenderer.render_html("lib/icky_venus/index.html.heex", %{
total_content:
eval_file(
"lib/icky_venus/totals.html.heex",
assigns: %{totals: BlueSky.PostCreatedClient.get_totals()}
)
})
)
end
get "/events/post-created/totals" do
conn
|> put_resp_content_type("text/html; charset=utf-8")
|> send_resp(
200,
eval_file(
"lib/icky_venus/totals.html.heex",
assigns: %{totals: BlueSky.PostCreatedClient.get_totals()}
)
)
end
get "/events/post-created/stream" do
conn
|> WebSockAdapter.upgrade(IckyVenus.WebSocketServer.PostCreatedServer, [], timeout: 60_000)
|> halt()
end
get "/favicon.ico" do
conn
|> put_resp_content_type("image/x-icon")
|> send_file(200, "priv/static/favicon.ico")
end
get "/robots.txt" do
conn
|> put_resp_content_type("text/plain; charset=utf-8")
|> send_file(200, "priv/static/robots.txt")
end
match _ do
send_resp(conn, 404, "Not found")
end
@impl Plug.ErrorHandler
def handle_errors(conn, %{kind: kind, reason: reason, stack: stack}) do
IO.inspect(kind, label: :kind)
IO.inspect(reason, label: :reason)
IO.inspect(stack, label: :stack)
send_resp(conn, conn.status, "Encountered an error that cannot be handled")
end
end
Two things are being done here:
- EEx is being used for HTML templating
- Web Socket connection upgrade
lib/icky_venus/web_socket_server/post_created_server.ex
defmodule IckyVenus.WebSocketServer.PostCreatedServer do
def init(_) do
Process.send_after(self(), :register_event, 1000)
{:ok, 0}
end
def handle_in(_, state) do
{:ok, state}
end
def handle_info(:broadcast, state) do
count = state + 1
{:reply, :ok,
{:text, "<span id=\"count\">#{Number.Delimit.number_to_delimited(count)}</span>"}, count}
end
def handle_info(:register_event, state) do
Registry.register(BlueSky.EventRegistry.PostCreated, :post_created, %{})
{:ok, state}
end
def handle_info({:EXIT, _pid, _reason}, state) do
{:stop, :normal, state}
end
def terminate(_reason, state) do
{:ok, state}
end
end
The final part is to create the HTML that is served
lib/icky_venus/root.html.eex
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1" />
<title><%= Application.get_env(:icky_venus, :product_name) %></title>
<script src="https://unpkg.com/[email protected]" integrity="sha384-ujb1lZYygJmzgSwoxRggbCHcjc0rB2XoQrxeTUQyRjrOnlCoYta87iKBWq3EsdM2" crossorigin="anonymous"></script>
<script src="https://unpkg.com/[email protected]/dist/ext/ws.js"></script>
<script src="https://cdn.tailwindcss.com"></script>
<script>
tailwind.config = {
theme: {
extend: {
colors: {
brand: "#800080",
primary: colors.purple,
secondary: colors.emerald,
neutral: colors.gray
}
},
}
}
</script>
</head>
<body class="bg-white antialiased">
<header>
<div class="flex items-center justify-between border-b border-secondary-100 py-3">
<div class="flex items-center gap-4">
<a href="/" class="rounded-full bg-primary/5 px-2 font-medium leading-6 text-brand">
<%= Application.get_env(:icky_venus, :product_name) %>
</a>
</div>
<div class="flex items-center gap-4 text-[0.7525rem] font-semibold leading-6 text-secondary-800 active:text-secondary-800/70">
<a
href="https://github.com/jflowaa/icky_venus"
class="rounded-lg bg-primary-100 px-2 py-1 text-[0.8125rem] hover:bg-secondary-200/80"
>
Source Code <span aria-hidden="true">→</span>
</a>
</div>
</div>
</header>
<main class="px-4 py-5 sm:px-6 lg:px-8">
<div class="mx-auto w-fit">
<%= @content %>
</div>
</main>
</body>
</html>
lib/icky_venus/index.html.eex
<div class="px-4 sm:py-28 sm:px-6 lg:px-8 xl:py-32 xl:px-28">
<div class="mx-auto max-w-xl lg:mx-0">
<p class="mt-4 text-xl font-semibold leading-10 tracking-tighter text-primary-900">
Bluesky Metrics
</p>
<p class="mt-4 text-base leading-7 text-neutral-600">
Metrics on the number of posts created since page load and the total number of posts created by date.
</p>
<div class="flex">
<div class="w-full sm:w-auto">
<div class="mt-10">
<div hx-ext="ws" ws-connect="/events/post-created/stream">
<p>
Since page load: <span id="count">0</span>
</p>
</div>
<div id="totals" hx-get="/events/post-created/totals" hx-trigger="every 30s">
<%= @total_content %>
</div>
</div>
</div>
</div>
</div>
</div>
lib/icky_venus/totals.html.eex
<div>
<p>By date:</p>
<%= for {k, v} <- @totals do %>
<p><%= k %>: <%= Number.Delimit.number_to_delimited(v) %></p>
<% end %>
</div>
lib/icky_venus/html_render.ex
defmodule IckyVenus.HtmlRenderer do
import EEx
def render_html(html_path, assigns \\ []) do
content = eval_file(html_path, assigns: assigns)
eval_file("lib/icky_venus/root.html.eex", assigns: [content: content])
end
end
At this point the server is complete and the page should have soft real-time updating via htmx.
The htmx part can be easily missed, here's a call out of the setup
<!-- lib/icky_venus/root.html.eex -->
<script src="https://unpkg.com/[email protected]" integrity="sha384-ujb1lZYygJmzgSwoxRggbCHcjc0rB2XoQrxeTUQyRjrOnlCoYta87iKBWq3EsdM2" crossorigin="anonymous"></script>
<script src="https://unpkg.com/[email protected]/dist/ext/ws.js"></script>
Since web sockets are being used htmx's ws.js extension is needed.
The HTML to hook into the web socket connection
<!-- lib/icky_venus/index.html.eex -->
<div hx-ext="ws" ws-connect="/events/post-created/stream">
<p>
Since page load: <span id="count">0</span>
</p>
</div>
<div id="totals" hx-get="/events/post-created/totals" hx-trigger="every 30s">
<%= @total_content %>
</div>
This opens a connection on the /events/post-created/stream
endpoint, which is the web socket server found in lib/icky_venus/web_socket_server/post_created_server.ex
.
Which sends an updated total on the web socket connection for every :broadcast
handle_info. The :broadcast happen for every new event read on the Bluesky firehose. This is configured in lib/blue_sky/stream_reader.ex
def handle_frame({_, msg}, state) do
{:ok, result, payload_raw} = CBOR.decode(msg)
if Map.get(result, "t", "") == "#commit" do
{:ok, payload, _} = CBOR.decode(payload_raw)
operations = Map.get(payload, "ops", [])
if Enum.any?(operations, fn x ->
Map.get(x, "action", "") == "create" &&
String.starts_with?(Map.get(x, "path", ""), "app.bsky.feed.post")
end) do
Registry.dispatch(BlueSky.EventRegistry.PostCreated, :post_created, fn entries ->
for {pid, _} <- entries, do: send(pid, :broadcast)
end)
end
end
{:ok, state}
end
After updating the mix.exs
application/0
to start icky_venus' application:
def application do
[
extra_applications: [:logger],
# mod: {BlueSky.Application, []}
mod: {IckyVenus.Application, []}
]
end
Running mix run --no-halt
will start the server at http://localhost:4000/. All of the post created metrics are stored in memory. So when the application stops, the accumulated counts is lost.
The site is currently hosted at https://icky-venus.jack-develops.com/.