Writing your own Protocol
So you've read what an iroh protocol is and know what an iroh router is, and you're eager to start implementing your own iroh protocol? This is a short guide to show you how.
What we'll build
In this guide we'll implement a very basic echo protocol.
For simplicity, we're not going to implement a CLI for this (unlike what we did in the quickstart), and instead run both sides of the protocol as a test run in the main()
function.
The protocol itself works like this:
- The accepting side waits for the connecting side to open a connection.
- Once a connection is established, the accepting side waits for the connecting side to open a bi-directional stream.
- The connecting side transfers some payload on the bi-directional stream first.
- The accepting side reads the payload and transfers it back on the same bi-directional stream.
- Once the connecting side has finished sending, it reads "the echo" back and then closes the connection.
Listening for connections
As established in the router and protocol pages, you'll first need to decide on an "Application-Layer Protocol Negotiation" (ALPN) string. We'll use "iroh-example/echo/0":
const ALPN: &[u8] = b"iroh-example/echo/0";
The easiest way to start listening for incoming connections is by using iroh's Router
API.
async fn start_accept_side() -> anyhow::Result<iroh::protocol::Router> {
let endpoint = iroh::Endpoint::builder().discovery_n0().bind().await?;
let router = iroh::protocol::Router::builder(endpoint)
.spawn()
.await?;
Ok(router)
}
The router's spawn
function is what starts an accept loop.
As you saw in the quickstart, we would need to call accept
on the router's builder before, to avoid rejecting every incoming connection attempt, though.
The accept
function expects two arguments:
- The ALPN we defined for ourselves above and
- something that implements
ProtocolHandler
.
In the quickstart, we used the Blobs
struct from the existing iroh-blobs
protocol, which implements ProtocolHandler
.
In this example, we'll build our own struct and implement ProtocolHandler
ourselves.
Let's call this struct Echo
.
#[derive(Debug, Clone)]
struct Echo;
The struct is actually empty, because the protocol is fully stateless.
If we were building a protocol for a database, then this struct would contain a database connection or the database contents directly, so that all connections can access it.
We'll also stub out an implementation of ProtocolHandler
for this trait:
impl iroh::protocol::ProtocolHandler for Echo {
/// The `accept` method is called for each incoming connection for our ALPN.
///
/// The returned future runs on a newly spawned tokio task, so it can run as long as
/// the connection lasts without blocking other connections.
fn accept(&self, connection: iroh::Connection) -> n0_future::boxed::BoxFuture<Result<()>> {
Box::pin(async move {
// TODO!
Ok(())
})
}
}
We're using the n0-future
crate for the return type of accept
here.
This is just a shorthand for std::pin::Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>
(which is a mouthful!).
This shorthand is also provided by futures-lite
, futures-util
and many more.
We simply use n0-future
as it re-exports all the crates we've vetted and commonly use at number 0.
The accept
function is going to get called once an incoming connection with the correct ALPN is established.
Now, we can modify our router so it handles incoming connections with our newly created custom protocol:
async fn start_accept_side() -> anyhow::Result<iroh::protocol::Router> {
let endpoint = iroh::Endpoint::builder().discovery_n0().bind().await?;
let router = iroh::protocol::Router::builder(endpoint)
.accept(ALPN, Echo) // This makes the router handle incoming connections with our ALPN via Echo::accept!
.spawn()
.await?;
Ok(router)
}
Implementing the Accepting Side
At the moment, the Echo::accept
function is still stubbed out.
The way it is currently implemented, it would drop the iroh::Connection
immediately, causing the connection to close.
Instead, we need to hold on to either the connection or one of its streams for as long as we want to interact with it.
We'll do that by moving the connection to the future we return from Echo::accept
and handling the protocol logic within that future:
impl ProtocolHandler for Echo {
fn accept(&self, connection: Connection) -> BoxFuture<Result<()>> {
Box::pin(async move {
// We can get the remote's node id from the connection.
let node_id = connection.remote_node_id()?;
println!("accepted connection from {node_id}");
// Our protocol is a simple request-response protocol, so we expect the
// connecting peer to open a single bi-directional stream.
let (mut send, mut recv) = connection.accept_bi().await?;
// Echo any bytes received back directly.
// This will keep copying until the sender signals the end of data on the stream.
let bytes_sent = tokio::io::copy(&mut recv, &mut send).await?;
println!("Copied over {bytes_sent} byte(s)");
// By calling `finish` on the send stream we signal that we will not send anything
// further, which makes the receive stream on the other end terminate.
send.finish()?;
// Wait until the remote closes the connection, which it does once it
// received the response.
connection.closed().await;
Ok(())
})
}
}
We're using tokio::io::copy
here to just copy any bytes we receive via recv
to the send
side of the bi-directional stream.
Before we drop the connection, we briefly wait for connection.closed()
.
This effectively allows the connecting side to be the side that acknowledges that it received all data.
Remember: Dropping the connection essentially "interrupts" all work on that connection, including sending or retransmitting lost data.
Calling SendStream::finish()
only indicates that we're done sending data, but doesn't wait for all data to be sent.
Instead, we'll make the connecting side - as the side that last receives data - indicate proper protocol procedure by being the side to close the connection.
Closing connections properly with QUIC can be quite hard sometimes. We've written about it before, but it trips us up every now and then still.
Implementing the Connecting Side
The connecting side is going to be the mirror image of the accepting side:
- An
accept_bi
corresponds to anopen_bi
, - when data is received, the other side sends data,
- when one side waits for
connection.closed()
, the other callsconnection.close()
.
Summarizing our protocol again, the connecting side will open a connection, send some data, receives the echo, then finally closes the connection.
This is what that looks like:
async fn connect_side(addr: NodeAddr) -> Result<()> {
let endpoint = Endpoint::builder().discovery_n0().bind().await?;
// Open a connection to the accepting node
let conn = endpoint.connect(addr, ALPN).await?;
// Open a bidirectional QUIC stream
let (mut send, mut recv) = conn.open_bi().await?;
// Send some data to be echoed
send.write_all(b"Hello, world!").await?;
// Signal the end of data for this particular stream
send.finish()?;
// Receive the echo, but limit reading up to maximum 1000 bytes
let response = recv.read_to_end(1000).await?;
assert_eq!(&response, b"Hello, world!");
// Explicitly close the whole connection.
conn.close(0u32.into(), b"bye!");
// The above call only queues a close message to be sent (see how it's not async!).
// We need to actually call this to make sure this message is sent out.
endpoint.close().await;
// If we don't call this, but continue using the endpoint, we then the queued
// close call will eventually be picked up and sent.
// But always try to wait for endpoint.close().await to go through before dropping
// the endpoint to ensure any queued messages are sent through and connections are
// closed gracefully.
Ok(())
}
In this example we simply hard-coded the echo message "Hello World!", and we'll assert that that's what we receive back.
Note that we also take a NodeAddr
as a parameter.
This is the address of the accepting side, so we can use it to tell the Endpoint
where in the world to connect to in the endpoint.connect(addr, ALPN)
call.
Putting it all together
Now we have both sides of our protocol implemented!
The connect side in connect_side
and the accepting side in start_accept_side
.
In a simple main
function we can start the accepting side and concurrently connect to it before shutting down the accepting side again:
#[tokio::main]
async fn main() -> Result<()> {
let router = start_accept_side().await?;
let node_addr = router.endpoint().node_addr().await?;
connect_side(node_addr).await?;
// This makes sure the endpoint in the router is closed properly and connections close gracefully
router.shutdown().await?;
Ok(())
}
This is what the output can look like when running:
accepted connection from fb970f941d38eb5ef357316f13a6dc24f35f74d3403b1b9de79bd698a6531a79
Copied over 13 byte(s)
You can find all of the code in the echo.rs
example in the iroh repo.
Appendix
No router no problem
The router can make writing code with iroh easier, but it's not required.
If the Router
API is too limited or perhaps too complex for your use case, it's fairly simple to replace with your own accept loop based on only iroh::Endpoint
APIs.
To replace the router accept loop, you need to spawn your own tokio task instead of calling iroh::protocol::RouterBuilder::spawn
.
This task then calls iroh::Endpoint::accept
in a loop and passes the incoming connections on to the same handler we looked at before.
You also need to make sure to configure the right ALPNs on the endpoint yourself.
Putting it all together, you only need to change the start_accept_side
function:
async fn start_accept_side() -> anyhow::Result<iroh::Endpoint> {
let endpoint = Endpoint::builder()
.discovery_n0()
// The accept side needs to opt-in to the protocols it accepts,
// as any connection attempts that can't be found with a matching ALPN
// will be rejected.
.alpns(vec![ALPN.to_vec()])
.bind()
.await?;
// spawn a task so that `start_accept_side` returns immediately and we can continue in main().
tokio::spawn({
let endpoint = endpoint.clone();
async move {
// This task won't leak, because we call `endpoint.close()` in `main()`,
// which causes `endpoint.accept().await` to return `None`.
// In a more serious environment, we recommend avoiding `tokio::spawn` and use either a `TaskTracker` or
// `JoinSet` instead to make sure you're not accidentally leaking tasks.
while let Some(incoming) = endpoint.accept().await {
// spawn a task for each incoming connection, so we can serve multiple connections asynchronously
tokio::spawn(async move {
let connection = incoming.await?;
let result = Echo.accept(connection).await?;
result
});
}
anyhow::Ok(())
}
});
Ok(endpoint)
}
We also return an iroh::Endpoint
instead of an iroh::protocol::Router
.
This means our main
function would need to call endpoint.close()
instead of router.shutdown()
, but otherwise it's the same.
Note that in this case, you don't even need to implement the ProtocolHandler
trait.
The only reason it exists is to provide an interface between protocols and the Router
.
If we're not using the router, then we could replace our Echo.accept(connection)
call above with whatever function we'd like.
We could even inline the whole function call instead.
You can see a version of the echo example completely without using a router or protocol handler trait in the echo-no-router.rs
example.
General Guidance
The echo example is a very simple protocol. There's many ways in which a protocol in practice is going to be more complex. Here's some advice that might be useful if you write your own protocol:
- Re-use connections: The version of the echo protocol above simply closes the connection after having echo-ed one stream.
This is needlessly wasteful, if e.g. you'd want to echo multiple times or periodically.
Instead, you could put a loop around
connection.accept_bi()
to accept multiple streams to echo on for the same connection. In practice, protocols often re-use the same connection for performance. Opening a QUIC stream is really cheap, as it doesn't need extra round-trips for the stream to get established, which is not the case for connections (unless in special circumstances when you're using the QUIC 0-RTT feature). - Beware: QUIC streams are lazy: Make sure that when you call
connection.open_bi()
, you always send first before you receive data. This is because the other side doesn't even know about a stream unless you send data on the stream first. This property is called "laziness" - as opposed to being "eager". The other side that accepts the stream will know about it at the same time that it gets the first bits of data. - Closing QUIC connections can be hard: This was already mentioned above, but it's worth re-iterating.
As a general rule of thumb: The side to last read data should be the side to close a connection.
Also try to always wait for
Endpoint::close
before dropping your endpoint, as that's required to make connections close gracefully. For everything else, feel free to read our blog post about closing connections.
We hope the above helps you write your own iroh protocol. Should you do so, we'd love you to share your new protocol in the iroh discord! Have fun.