Personal Log »

Not winning with Akka

I got to a blocker with my TCP server using Akka Streams.

Looking for some examples, I found a few references to this piece in the Streaming TCP section of the docs:

Closing connections is possible by cancelling the incoming connection Flow from your server logic (e.g. by connecting its downstream to a Sink.cancelled and its upstream to a Source.empty). It is also possible to shut down the server’s socket by cancelling the IncomingConnection source connections.

Which sounds complicated, but after a couple of days of looking at the problem, it doesn’t make a lot of sense. Neither did for this fellow programmer 5 years ago (his solution is around the server, and I think the answer should come from terminating cleanly the stream).

And you know it is a tough problem when the only examples of TCP servers you can find online are pretty much the ones you have in the Akka docs –if this wasn’t a learning exercise, I think I would rather use a different technology–.

The problem I think is that when you are processing the request, you are in the flow, and is not possibleto do anything from there to complete the stream. Not sure if there is a reason to force this by design.

Sounds to me like a common pattern: client makes a request, the server sends back a response, and then it closes the connection. With Akka streams you can’t do that from the place where you are processing the request, at least as far as I can tell.

So it goes for ever, unless the client (or the server in case of an exception; for example an idle timeout) closes the connection.

My current code uses a Source.queue because it is supposed to complete the stream when complete() is called on the queue.

    Tcp()
      .bindWithTls(conf.address, conf.port, () => createSSLEngine)
      .runForeach { connection =>
        logger.debug(s"new connection ${connection.remoteAddress}")

        val (queue, source) =
          Source
            .queue[ByteString](1024, OverflowStrategy.backpressure)
            .preMaterialize()

        val sink = Sink.foreach { b: ByteString =>
          handleReq(connection.remoteAddress.getHostName(), b.utf8String)
            .map(_.compact)
            .map(b => queue.offer(b))
            }
            .run()
            .onComplete {
              case Failure(error) => logger.error(error)("Internal error")
              case Success(_) =>
                logger.debug("setting queue to complete")
                queue.complete()
            }
        }

        val handler = Flow.fromSinkAndSource(sink, source)

        connection.flow
          .idleTimeout(conf.idleTimeout)
          .joinMat(handler)(Keep.right)
          .run()
      }

The terminology of Akka is very specific, but basically I connect a flow to the incoming flow, created from a sink and a source. The sink reads the input and adds the response to a queue, that is where the source gets the data to send back to the client.

This seems to work. But the call to complete() is not completing the stream and closing the connection. If it change it for a call to fail, it does work by cancelling the stream.

And this is where I am at. I asked for help on Akka’s Gitter channel, without answer so far, but at least someone solved the issue with queues, so I must be doing something wrong!

Update: it is important that you read the second part.

Would you like to discuss the post? You can send me an email!