Blog Index

Tutorial: Message Framing with iroh

by ramfox, matheus23, b5

This tutorial is a written version of the youtube tutorial published on the n0computer youtube channel:

Intro

The first time you dig into iroh to write your own protocol, you quickly realize that what you mostly interact with is the QUIC API.

The QUIC API can feel extremely low level: you are dealing with raw streams and sometimes need to use methods that involve literally reading and writing a single byte at a time.

In this post, we are going to talk about message framing as a technique to get you away from having to think of your protocol in terms of bytes and instead thinking of it in terms of logical messages.

Message framing is a generally applicable concept, and by no means limited to working with protocols on iroh!

Framed Messages

QUIC connections are made of streams and those streams can be bi-directional or uni-directional. Bi-directional streams are streams you can read from and write to in both directions. Uni-directional streams are streams that can only be written to on one side and read from by the other.

A single QUIC connection to another node can have many streams, some uni-directional, some bi-directional.

For the purposes of this tutorial, we are going to focus on a single, uni-directional stream, where one side writes to the stream and the other side reads from it.

Streams are made up of bytes. You write bytes to the stream and you read bytes out of it.

But the big question here is: once side A has written to the stream, how does side B know how much data to read?

The first thing you might do when trying to get familiar with working with streams, is you might just use APIs like write_all and read_all, to write a chunk of data to the stream and then read that full chunk of data off of it, then close the stream.

This is fine while you are getting familiar, but when you go to write your protocols you will want something more sophisticated.

What we really want to do is send multiple logical messages on the stream, and have some format for figuring out how to separate out those messages. We also must deal with the fact that not all messages are going to be the same length.

That's where message framing comes in. In message framing, you prepend the length, in bytes, of your message data to your message before sending over the data. On the receiving side, you read that length data first, then you create or adjust your buffer to accommodate that length, and finally read the exact number of bytes off your stream and into your buffer.

The next time you read off the stream, you read off another length, and repeat the process.

For an in-depth look at message framing, including a look into other forms of framing that already exist in networking, watch the youtube video that this tutorial is based on.

Code setup

First, head to your coding directory and run cargo init frame_me, which is what we are calling this project locally.

Open up your IDE and take a look at your main.rs, which should simply show a main function that prints "Hello, world!" For sanity, run cargo run in your terminal to ensure everything is set up correctly.

Now, let's add a few crates that we will be using in this example:

cargo add tokio iroh anyhow

Tokio is our async runtime, iroh is our networking library, and the Anyhow crate allows us to do easy error handling.

Setting up our main function

Heading back to main.rs, let's prep our main function to work with tokio:

#[tokio::main]
async fn main() -> anyhow::Result<()> {
	Ok(())
}

This doesn't do anything yet, but run cargo run to make sure everything is peachy!

For this example, we are going to show the send and receive sides in the same main function. This just makes the example easier to illustrate and keeps it down to a single file. Normally, we would create a whole CLI with different commands associated with the send and receive sides. Our examples of dumbpipe and sendme show this pattern off nicely.

For this demo, we are going to set up the receiving side, set up the sending side, write on the sending side, read from the receiving side, and then clean up and close down.

use iroh::{Endpoint, Watcher, protocol::Router};

const ALPN: &[u8] = b"iroh/smol/0";

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    // create the receive side
    let recv_ep = Endpoint::builder().discovery_n0().bind().await?;
    let recv_router = Router::builder(recv_ep).spawn();
    let addr = recv_router.endpoint().node_addr().initialized().await;

    // create a send side
    let send_ep = Endpoint::builder().discovery_n0().bind().await?;
    let conn = send_ep.connect(addr, ALPN).await?;

    Ok(())
}

The iroh::Endpoint is the endpoint on which we make connections. The discovery_n0 method on the iroh::EndpointBuilder allows us to dial by node_id without having to manually send address information.

We use the EndpointBuilder to create the receive endpoint, recv_ep.

The iroh::protocol::Router is where we will eventually add our custom protocol. We've already created the ALPN for our custom protocol, which we are calling ALPN.

The ALPN string is formatted based on the conventions we recommend. The first section indicates this is an iroh protocol, the second indicates that the protocol is called smol, and the last section shows this is version 0 of the protocol, all delimited by a "/".

We use the RouterBuilder to create the recv_router, which won't do much for us until we add our custom protocol later.

The addr is the NodeAddr of the receive endpoint, which contains all the address information we know about ourselves, including our node_id. We will use the addr to connect from the send endpoint to the receive endpoint, using the Endpoint::connect method.

Scaffolding out write_frame and read_frame

Now that we have a connection, let's open a stream and send some data!

We can use the Connection::open_bi to create a bi-directional stream, or Connection::open_uni to create a uni-directional stream. In this case, we will create a uni-directional stream.

To be more explicit, using open_bi would give us both a SendStream and RecvStream, while using open_uni will give us only a SendStream. In our example, we will be using open_uni.

Once we have our SendStream, we have many different options for how we want to write to the stream, but we will cover that in a later section. For now, let us continue with our main function, and we'll scaffold out write_frame and read_frame functions that we will include in the code for now, but actually implement later.

use iroh::{
    Endpoint, Watcher,
    endpoint::{RecvStream, SendStream},
    protocol::Router,
};

const ALPN: &[u8] = b"iroh/smol-msgs/0";

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    // create the receive side
    let recv_ep = Endpoint::builder().discovery_n0().bind().await?;
    let recv_router = Router::builder(recv_ep).spawn();
    let addr = recv_router.endpoint().node_addr().initialized().await;

    // create a send side & send some messages :)
    let send_ep = Endpoint::builder().discovery_n0().bind().await?;
    let conn = send_ep.connect(addr, ALPN).await?;
    let mut stream = conn.open_uni().await?;

    write_frame(&mut stream, "hi").await?;
    write_frame(&mut stream, "fren!").await?;

    // This will interrupt the loop in `read_frames`
    // once it finished reading all frames currently "in flight" to the other end.
    stream.finish()?;

    // We're done with the connection and will wait for the other end to tell us
    // when it finished reading everything :)
    conn.closed().await;

    // Close both ends gracefully:
    send_ep.close().await;
    recv_router.shutdown().await?;

    Ok(())
}

async fn write_frame(stream: &mut SendStream, message: &str) -> anyhow::Result<()> {
    todo!();
}

async fn read_frame(stream: &mut RecvStream) -> anyhow::Result<Option<String>> {
    todo!();
}

So, as you see, we're using todo! in the write_frame and read_frame functions. This allows the program to compile, but if we run the program and write_frame or read_frame are called, it will throw a panic. It's very useful in development!

In our main function, we call write_frame to send "hi" and then "fren!" to the receiving side. This is the data that will get framed.

Then we do some clean up!

stream.finish()? tells the stream "hey, I'm done sending data".

conn.closed().await will wait until the other side of the connection calls conn.close() before returning and won't let you send any other data across while it's waiting. conn.close (without the "d") informs the other side of the connection "hey, I'm closing the connection, and here is why".

send_ep.close().await closes the endpoint. The endpoint won't close until all the connections have closed and cleaned up.

recv_router.shutdown().await? stops the accept loop, closes all the connections, and closes the underlying endpoint.

If you call cargo run at this point, the example will compile, but it will error at runtime: the Router is expecting at least one protocol and will error when it builds.

Smol Protocol

We've already defined the ALPN. The next thing we want is to create a SmolProtocol that implements the ProtocolHandler trait. This requires us to implement the accept method on the ProtocolHandler trait.

use iroh::{endpoint::Connection, protocol::{ProtocolHandler, AcceptError}};

const SMOL_ALPN: &[u8] = b"iroh/smol/0";

#[derive(Debug, Clone)]
struct SmolProtocol;

impl ProtocolHandler for SmolProtocol {
	async fn accept(
			&self,
			connection: Connection,
	) -> Result<(), AcceptError> {
		todo!();
	}
}

Again, we are using todo! here, but we will come back to it.

One thing we can now do, is add the SmolProtocol to our router!

  let recv_router = Router::builder(recv_ep)
        .accept(ALPN, SmolProtocol)
        .spawn();

Our code so far:

use iroh::{
    Endpoint, Watcher,
    endpoint::{Connection, RecvStream, SendStream},
    protocol::{AcceptError, ProtocolHandler, Router},
};

const ALPN: &[u8] = b"iroh/smol/0";

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    // create the receive side
    let recv_ep = Endpoint::builder().discovery_n0().bind().await?;
    let recv_router = Router::builder(recv_ep).accept(ALPN, SmolProtocol).spawn();
    let addr = recv_router.endpoint().node_addr().initialized().await;

    // create a send side & send some messages :)
    let send_ep = Endpoint::builder().discovery_n0().bind().await?;
    let conn = send_ep.connect(addr, ALPN).await?;
    let mut stream = conn.open_uni().await?;

    write_frame(&mut stream, "hi").await?;
    write_frame(&mut stream, "fren!").await?;

    // This will interrupt the loop in `read_frames`
    // once it finished reading all frames currently "in flight" to the other end.
    stream.finish()?;

    // We're done with the connection and will wait for the other end to tell us
    // when it finished reading everything :)
    conn.closed().await;

    // Close both ends gracefully:
    send_ep.close().await;
    recv_router.shutdown().await?;

    Ok(())
}

async fn write_frame(stream: &mut SendStream, message: &str) -> anyhow::Result<()> {
    todo!();
}

async fn read_frame(stream: &mut RecvStream) -> anyhow::Result<Option<String>> {
    todo!();
}

#[derive(Debug, Clone)]
struct SmolProtocol;

impl ProtocolHandler for SmolProtocol {
    async fn accept(&self, connection: Connection) -> Result<(), AcceptError> {
        todo!();
    }
}

Implementing write_frame and read_frame

Now we get to the actual message framing part! When we go to write data to the stream, we first need to write the size of the data, then the data itself.

But what write methods should we use?

Thankfully, message framing is so popular that tokio provides extension traits for reading and writing that allow you to read and write specific integers to the stream. We are going to use write_u8 to do our length prefixing.

You'll note that by using a u8, we are limiting our message size to 255 bytes. In a protocol that sends larger messages, you would want to use a larger integer size. (Check out varints that use space-saving techniques when sending smaller numbers, but allow you to express very large numbers as well.)

But it's u8s for us in this example! So first, we will ensure that the length of the message is smaller than an 8-bit integer. Then, we write the u8 using the write_u8 method, and then we will use write_all to write the entire contents of the message onto the stream:

use iroh::endpoint::SendStream;
use tokio::io::AsyncWriteExt;

async fn write_frame(stream: &mut SendStream, message: &str) -> anyhow::Result<()> {
    assert!(message.len() <= u8::MAX as usize);

    stream.write_u8(message.len() as u8).await?;
    stream.write_all(message.as_bytes()).await?;

    Ok(())
}

The read side first reads a u8 from the stream. If there is no u8 to read, then there are no current messages on the stream to read, and it will return Ok(None). The Ok indicates that there were no read errors during this read, but the None indicates that no data was read off of the stream.

Next, we create a buffer the size of the u8 that we read. Then, we read that number of bytes from the stream and into the buffer using read_exact, which will fill the buffer with data from the stream.

We then need to create a UTF-8 string from the bytes in the buffer. Finally, we return the String:

use iroh::endpoint::RecvStream;
use tokio::io::AsyncReadExt;

async fn read_frame(stream: &mut RecvStream) -> anyhow::Result<Option<String>> {
    let Ok(frame_len) = stream.read_u8().await else {
        return Ok(None);
    };

    let mut buf = vec![0u8; frame_len as usize];
    stream.read_exact(&mut buf).await?;

    let smol_msg = String::from_utf8(buf)?;
    Ok(Some(smol_msg))
}

Implementing SmolProtocol

Time to implement the accept handler on SmolProtocol. This is what will get called each time we get an incoming connection to the Router.

First, we will take the given connection and create a uni-directional stream using the accept_uni method. This will give us a RecvStream.

Then we will use the above read_frame function to read messages from the stream and print them to stdout, looping until we find no more messages (i.e., read_frame returns None). If any errors occur while reading, we will convert them from an anyhow::Error to a standard library error before returning.

Then we will close the connection using connection.close, giving an integer close code and a close reason. In a production-grade protocol, you may have a list of close codes and close reasons to communicate to the other side of the connection why the connection was closed. Sending 0 typically means that the connection closed without error.

Finally, we return Ok:

const ALPN: &[u8] = b"iroh/smol/0";

#[derive(Debug, Clone)]
struct SmolProtocol;

impl ProtocolHandler for SmolProtocol {
    async fn accept(&self, connection: Connection) -> Result<(), AcceptError> {
        let mut stream = connection.accept_uni().await?;

        while let Some(msg) = read_frame(&mut stream)
            .await
            .map_err(anyhow::Error::into_boxed_dyn_error)?
        {
            println!("Got a smol message! {msg}");
        }

        println!("Done reading smol messages");
        connection.close(0u32.into(), b"thx for all the msgs :)");
        Ok(())
    }
}

The full example:

use iroh::{
    Endpoint, Watcher,
    endpoint::{Connection, RecvStream, SendStream},
    protocol::{AcceptError, ProtocolHandler, Router},
};
use tokio::io::{AsyncReadExt, AsyncWriteExt};

const ALPN: &[u8] = b"iroh/smol/0";

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    // create the receive side
    let recv_ep = Endpoint::builder().discovery_n0().bind().await?;
    let recv_router = Router::builder(recv_ep).accept(ALPN, SmolProtocol).spawn();
    let addr = recv_router.endpoint().node_addr().initialized().await;

    // create a send side & send some messages :)
    let send_ep = Endpoint::builder().discovery_n0().bind().await?;
    let conn = send_ep.connect(addr, ALPN).await?;
    let mut stream = conn.open_uni().await?;

    write_frame(&mut stream, "hi").await?;
    write_frame(&mut stream, "fren!").await?;

    // This will interrupt the loop in `read_frames`
    // once it finished reading all frames currently "in flight" to the other end.
    stream.finish()?;

    // We're done with the connection and will wait for the other end to tell us
    // when it finished reading everything :)
    conn.closed().await;

    // Close both ends gracefully:
    send_ep.close().await;
    recv_router.shutdown().await?;

    Ok(())
}

async fn write_frame(stream: &mut SendStream, message: &str) -> anyhow::Result<()> {
    assert!(message.len() <= u8::MAX as usize);

    stream.write_u8(message.len() as u8).await?;
    stream.write_all(message.as_bytes()).await?;

    Ok(())
}

async fn read_frame(stream: &mut RecvStream) -> anyhow::Result<Option<String>> {
    let Ok(frame_len) = stream.read_u8().await else {
        return Ok(None);
    };

    let mut buf = vec![0u8; frame_len as usize];
    stream.read_exact(&mut buf).await?;

    let smol_msg = String::from_utf8(buf)?;
    Ok(Some(smol_msg))
}

#[derive(Debug, Clone)]
struct SmolProtocol;

impl ProtocolHandler for SmolProtocol {
    async fn accept(&self, connection: Connection) -> Result<(), AcceptError> {
        let mut stream = connection.accept_uni().await?;

        while let Some(msg) = read_frame(&mut stream)
            .await
            .map_err(anyhow::Error::into_boxed_dyn_error)?
        {
            println!("Got a smol message! {msg}");
        }

        println!("Done reading smol messages");
        connection.close(0u32.into(), b"thx for all the msgs :)");
        Ok(())
    }
}

Further exploring

We hope you've learned a bit about writing protocols on this journey, specifically how framed messages are an incredibly useful technique.

In this example, we sent simple strings on our streams, but in a real-world use case, we often send structured data. For a more in-depth example exploring how you might send structured data, including how we at n0 like to serialize and deserialize data to and from the wire, take a look at the framed messages example in iroh-examples.

Iroh is a dial-any-device networking library that just works. Compose from an ecosystem of ready-made protocols to get the features you need, or go fully custom on a clean abstraction over dumb pipes. Iroh is open source, and already running in production on hundreds of thousands of devices.
To get started, take a look at our docs, dive directly into the code, or chat with us in our discord channel.