Serve Server-Sent Events

Gradle setup

dependencies {
    implementation(platform("org.http4k:http4k-bom:5.16.2.0"))
    implementation("org.http4k:http4k-core")
    implementation("org.http4k:http4k-server-undertow")
    implementation("org.http4k:http4k-server-jetty")
}

http4k provides SSE (Server Sent Events) support using a simple, consistent, typesafe, and testable API on supported server backends (see above). SSE communication consists of 3 main concepts:

  1. SseHandler - represented as a typealias: SseHandler = (Request) -> SseConsumer. This is responsible for matching an HTTP request to an SSE handler.
  2. SseConsumer - represented as a typealias: SseConsumer = (Sse) -> Unit. This function is called on connection of a Sse and allow the API user to receive to events coming from the connected SSE handler.
  3. SseMessage - a message which is sent from the SSE handler SseMessages are immutable data classes.
  4. SseFilter - represented as a interface: SseFilter = (SseConsumer) -> SseConsumer. This allows for the decoration of SseConsumers to add pre or post matching behaviour in the same way as a standard Filter.

SSE as a Function

The simplest possible SSE handler can be mounted as a SseConsumer function onto a server with:

{ sse: Sse -> sse.send(SseMessage.Data("hello")) }.asServer(Undertow(9000)).start()

Mixing HTTP and SSE services

Both SSE and Http handlers in http4k are routed using a similar path-based API. We combine them into a single PolyHandler. SSE handlers react to HTTP traffic which send an Accept header with text/event-stream value:

import org.http4k.core.Request
import org.http4k.core.Response
import org.http4k.core.Status.Companion.OK
import org.http4k.lens.Path
import org.http4k.routing.sse
import org.http4k.routing.sse.bind
import org.http4k.server.PolyHandler
import org.http4k.server.Undertow
import org.http4k.server.asServer
import org.http4k.sse.Sse
import org.http4k.sse.SseFilter
import org.http4k.sse.SseMessage
import org.http4k.sse.SseResponse
import org.http4k.sse.then
import kotlin.concurrent.thread

fun main() {
    val namePath = Path.of("name")

    // a filter allows us to intercept the call to the sse and do logging etc...
    val sayHello = SseFilter { next ->
        {
            println("Hello from the sse!")
            next(it)
        }
    }

    val sse = sayHello.then(
        sse(
            "/{name}" bind { req ->
                SseResponse { sse: Sse ->
                    val name = namePath(req)
                    thread {
                        repeat(10) {
                            sse.send(SseMessage.Data("hello $it"))
                            Thread.sleep(100)
                        }
                        sse.close()
                    }
                    sse.onClose { println("$name is closing") }
                }
            }
        )
    )

    val http = { _: Request -> Response(OK).body("hiya world") }

    PolyHandler(http, sse = sse).asServer(Undertow(9000)).start()
}