Serve Server-Sent Events
Gradle setup¶
dependencies {
implementation(platform("org.http4k:http4k-bom:4.45.0.0"))
implementation("org.http4k:http4k-core")
implementation("org.http4k:http4k-server-undertow")
implementation("org.http4k:http4k-server-jetty")
}
http4k provides SSE (Server Sent Events) support using a simple, consistent, typesafe, and testable API on supported server backends (see above). SSE communication consists of 3 main concepts:
SseHandler
- represented as a typealias:SseHandler = (Request) -> SseConsumer
. This is responsible for matching an HTTP request to an SSE handler.SseConsumer
- represented as a typealias:SseConsumer = (Sse) -> Unit
. This function is called on connection of a Sse and allow the API user to receive to events coming from the connected SSE handler.SseMessage
- a message which is sent from the SSE handler SseMessages are immutable data classes.SseFilter
- represented as a interface:SseFilter = (SseConsumer) -> SseConsumer
. This allows for the decoration ofSseConsumers
to add pre or post matching behaviour in the same way as a standardFilter
.
SSE as a Function¶
The simplest possible SSE handler can be mounted as a SseConsumer
function onto a server with:
{ sse: Sse -> sse.send(SseMessage.Data("hello")) }.asServer(Undertow(9000)).start()
Mixing HTTP and SSE services
¶
Both SSE and Http handlers in http4k are routed using a similar path-based API. We combine them into a single PolyHandler
. SSE handlers react to HTTP traffic which send an Accept
header with text/event-stream
value:
import org.http4k.core.Request
import org.http4k.core.Response
import org.http4k.core.Status.Companion.OK
import org.http4k.lens.Path
import org.http4k.routing.bind
import org.http4k.routing.sse
import org.http4k.server.PolyHandler
import org.http4k.server.Undertow
import org.http4k.server.asServer
import org.http4k.sse.Sse
import org.http4k.sse.SseFilter
import org.http4k.sse.SseMessage
import org.http4k.sse.then
import kotlin.concurrent.thread
fun main() {
val namePath = Path.of("name")
// a filter allows us to intercept the call to the sse and do logging etc...
val sayHello = SseFilter { next ->
{
println("Hello from the sse!")
next(it)
}
}
val sse = sayHello.then(
sse(
"/{name}" bind { sse: Sse ->
val name = namePath(sse.connectRequest)
thread {
repeat(10) {
sse.send(SseMessage.Data("hello $it"))
Thread.sleep(100)
}
sse.close()
}
sse.onClose { println("$name is closing") }
}
)
)
val http = { _: Request -> Response(OK).body("hiya world") }
PolyHandler(http, sse = sse).asServer(Undertow(9000)).start()
}