2020-11-23 Writing streaming servers in Perl using Mojo::IOLoop

I’m working on rewriting Phoebe to use the Mojo::IOLoop framework. That framework allows me to create multiple servers listening on multiple ports, for multiple addresses, using different TLS certs… I wrote about a minimal example, yesterday. Today I want to look at another problem: the framework gives me a streaming server. That means there’s bytes arriving in bursts and it isn’t entirely clear when the client is done sending.

Phoebe

Yesterday's blog post

Now, if all I was doing was writing a simple Gopher or Gemini server, I could simply assume that the client sends the entire request (the Gopher selector or the Gemini URL) in one burst: one line terminated by CR LF (\r\n). But if I’m accepting uploads (like I need to, for Titan support), then I need to continue listening if the client indicated that it’s going to send more bytes. For Titan, this means that the URL carries a “size” parameter; and parameters are separated from the rest of the URL by a semicolon.

So basically, I need a buffer per connection. I need to append the incoming bytes to the buffer and check: do I have a complete line ending in CR LF in my buffer? If so, process it. If that first line (let’s call it a header) indicated that more bytes will follow, keep listening until the necessary number of bytes have arrived. As long as we’re listening, don’t write to the stream. When we write to the stream, we’re done and we should close the connection.

OK, fine. But there’s more: in an asynchronous world, where do we keep these buffers? If every connection has an id, we could maintain a big hash map, of course. But how do we make sure these buffers are freed when the connections end, over the weeks and months this server is going to be running? Better to use a language construct that does this for us: a closure.

In Perl, anonymous functions are closures. They keep access to the local variables they had access to at the time they were defined.

A simple example where we have a list of names, for every name we call “greeting” which returns an anonymous function (a sub with no name) that retains access to the variable $name at the time it was defined, and so when we call the code references later, the names are still greeted.

use Modern::Perl;

sub greeting_for {
  my $name = shift;
  return sub { say "Hello $name" }
}

my @greetings = map { greeting_for($_) } qw(Alex Berta);
for my $code (@greetings) {
  $code->();
}

Output:

Hello Alex
Hello Berta

The thing I learned yesterday is that this doesn’t work for named functions!

use Modern::Perl;

sub greeting_for {
  my $name = shift;
  sub greeting { say "Hello $name" }
  return \&greeting;
}

my @greetings = map { greeting_for($_) } qw(Alex Berta);
for my $code (@greetings) {
  $code->();
}

Output:

Variable "$name" will not stay shared at - line 5.
Hello Alex
Hello Alex

Perl: “Variable will not stay shared”, on StackOverflow

Anyway, back to the tiny server I’m trying to write. At its core, I can provide some code anytime there are bytes to read:

  $stream->on(read => sub { my ($stream, $bytes) = @_; ... });

That is, if this sub is going to be closure, then I can refer to variables it knows about:

  my ($buffer, $length, $header);
  $stream->on(read => sub {
    my ($stream, $bytes) = @_;
    $buffer .= $bytes;
    ...
  });

Sadly, this means that a lot of code ends up in the closure:

use Mojo::IOLoop;
use Modern::Perl;

Mojo::IOLoop->server({address => 'localhost', port => '3000'} => sub {
  my ($loop, $stream) = @_;
  my ($buffer, $length, $header);
  $stream->on(read => sub {
    my ($stream, $bytes) = @_;
    $buffer .= $bytes;
    if (not $header) {
      if ($buffer =~ /^(.*?)(?:;size=(\d+))?\r\n/) {
	$header = $1;
	$length = $2;
	warn "$header ($length)\n";
	if (not $length) {
	  echo($stream, $header, "none");
	  $stream->close_gracefully();
	} else {
	  $buffer =~ s/^.*\r\n//;
	}
      }
    }
    my $actual = length($buffer);
    if ($actual == $length) {
      echo($stream, $header, $buffer);
      $stream->close_gracefully();
    } elsif ($actual > $length) {
      $stream->write("ERROR! TOO MUCH DATA: $actual bytes > $length bytes!\r\n");
      $stream->close_gracefully();
    }
    warn "Waiting for " . ($length - $actual) . " more bytes\n";
	      })});

sub echo {
  my ($stream, $header, $buffer) = @_;
  # Write response
  $stream->write("Header: $header\n");
  $stream->write("Buffer: $buffer\n");
}

# Start event loop if necessary
Mojo::IOLoop->start unless Mojo::IOLoop->is_running;

Once you start it, you can test it with telnet:

$ telnet localhost 3000
Trying ::1...
Connected to localhost.
Escape character is '^]'.
hallo;size=3
x
Header: hallo
Buffer: x

Connection closed by foreign host.

The reason I have a size of 3 and yet I only type the single letter “x” is that telnet appends a CR LF for me, so the server is seeing “x\r\n” as the second line.

​#Perl