Personal Log »

Not winning with Akka II: Won!

I got some help on Akka’s Gitter channel, and finally got to the bottom of the problem, and it wasn’t me!

It was all down to the TLSClosing parameter when creating the TLS listener. It uses IgnoreComplete, and that was breaking my code.

The explanation:

All streams in Akka are unidirectional: while in a complex flow graph data may flow in multiple directions these individual flows are independent from each other. The difference between two half-duplex connections in opposite directions and a full-duplex connection is that the underlying transport is shared in the latter and tearing it down will end the data transfer in both directions.

When integrating a full-duplex transport medium that does not support half-closing (which means ending one direction of data transfer without ending the other) into a stream topology, there can be unexpected effects. Feeding a finite Source into this medium will close the connection after all elements have been sent, which means that possible replies may not be received in full. To support this type of usage, the sending and receiving of data on the same side (e.g. on the Client) need to be coordinated such that it is known when all replies have been received. Only then should the transport be shut down.

So that’s why they had to change the behaviour on the TLS wrapper.

The thing is that you don’t need to use queues –most of the time, at least–, because Akka is smart enough to do the right thing if you define your flow correctly.

For example:

    Tcp()
      .bind(conf.address, conf.port) // not using TLS!
      .runForeach { connection =>
        logger.debug(s"new connection ${connection.remoteAddress}")

        val handler = Flow[ByteString]
          .via(
            Framing
              .delimiter(
                ByteString("\r\n"),
                maximumFrameLength = maxReqLen,
                allowTruncation = true
              )
          )
          .map(b => Request(b.utf8String))
          .map { req =>
              handleReq(connection.remoteAddress.getHostName(), req)
          }
          .take(1)
          .flatMapConcat(_.toSource)

        connection.handleWith(handler)
    }

I’m using framing to define what is a request, I map that into a Request, then my handler does its thing and maps that request into a Response.

At that point the element that is being handled by the flow is a response, so I say that I want to send back only one (using take), and I convert that response into a sequence of sources that are processed and converted into a stream of bytes that end in the client.

Akka is smart enough to keep the connection open until the response is sent, and then close automatically (I said I wanted to send back only one response).

And that’s managed with an internal status in the flow that is complete, and it was ignored by the TLS wrapper. So my connection was stuck after I had finished sending the response, no matter how I did it.

I found that using IgnoreCancel is what preserves the behaviour that I need:

IgnoreCancel means to not react to cancellation of the receiving side unless the sending side has already completed.

So basically, if the client has finished sending data, we don’t close until we have sent all our data (the completed signal), but the fact that the completed signal closes the connection is OK because Gemini protocol doesn’t support responses by the client.

This is a bit more complicated than what I’ve shown in the example, because it is possible that the client sends more data than the request. That happens, for example, if we try to access the Gemini server using curl –that speaks HTML and expects sending multi-line requests–. In those cases we have to drain that input (I’m using prefixAndTail, that splits one element from the input and provides a source that I can drain to essentially consume the data ignoring it).

I can’t tell how many different solutions I tried to this problem, and the issue was in the TLS layer. Anyway, looks like the server may work at the end!

Would you like to discuss the post? You can send me an email!