Serve Server-Sent Events

Gradle setup

dependencies {
    implementation(platform("org.http4k:http4k-bom:4.45.0.0"))
    implementation("org.http4k:http4k-core")
    implementation("org.http4k:http4k-server-undertow")
    implementation("org.http4k:http4k-server-jetty")
}

http4k provides SSE (Server Sent Events) support using a simple, consistent, typesafe, and testable API on supported server backends (see above). SSE communication consists of 3 main concepts:

  1. SseHandler - represented as a typealias: SseHandler = (Request) -> SseConsumer. This is responsible for matching an HTTP request to an SSE handler.
  2. SseConsumer - represented as a typealias: SseConsumer = (Sse) -> Unit. This function is called on connection of a Sse and allow the API user to receive to events coming from the connected SSE handler.
  3. SseMessage - a message which is sent from the SSE handler SseMessages are immutable data classes.
  4. SseFilter - represented as a interface: SseFilter = (SseConsumer) -> SseConsumer. This allows for the decoration of SseConsumers to add pre or post matching behaviour in the same way as a standard Filter.

SSE as a Function

The simplest possible SSE handler can be mounted as a SseConsumer function onto a server with:

{ sse: Sse -> sse.send(SseMessage.Data("hello")) }.asServer(Undertow(9000)).start()

Mixing HTTP and SSE services

Both SSE and Http handlers in http4k are routed using a similar path-based API. We combine them into a single PolyHandler. SSE handlers react to HTTP traffic which send an Accept header with text/event-stream value:

import org.http4k.core.Request
import org.http4k.core.Response
import org.http4k.core.Status.Companion.OK
import org.http4k.lens.Path
import org.http4k.routing.bind
import org.http4k.routing.sse
import org.http4k.server.PolyHandler
import org.http4k.server.Undertow
import org.http4k.server.asServer
import org.http4k.sse.Sse
import org.http4k.sse.SseFilter
import org.http4k.sse.SseMessage
import org.http4k.sse.then
import kotlin.concurrent.thread

fun main() {
    val namePath = Path.of("name")

    // a filter allows us to intercept the call to the sse and do logging etc...
    val sayHello = SseFilter { next ->
        {
            println("Hello from the sse!")
            next(it)
        }
    }

    val sse = sayHello.then(
        sse(
            "/{name}" bind { sse: Sse ->
                val name = namePath(sse.connectRequest)
                thread {
                    repeat(10) {
                        sse.send(SseMessage.Data("hello $it"))
                        Thread.sleep(100)
                    }
                    sse.close()
                }
                sse.onClose { println("$name is closing") }
            }
        )
    )

    val http = { _: Request -> Response(OK).body("hiya world") }

    PolyHandler(http, sse = sse).asServer(Undertow(9000)).start()
}