Resilience4J
Installation (Gradle)¶
dependencies {
implementation(platform("org.http4k:http4k-bom:5.30.0.0"))
implementation("org.http4k:http4k-resilience4j")
}
About¶
This module provides configurable Filters to provide CircuitBreaking, RateLimiting, Retrying and Bulkheading, by integrating with the awesome Resilience4J library.
Circuit Breaking ¶
A Circuit Filter detects failures and then Opens for a set period to allow the underlying system to recover.
package guide.reference.resilience4j
import io.github.resilience4j.circuitbreaker.CircuitBreaker
import io.github.resilience4j.circuitbreaker.CircuitBreakerConfig
import io.github.resilience4j.circuitbreaker.CircuitBreakerConfig.SlidingWindowType.COUNT_BASED
import org.http4k.core.Method.GET
import org.http4k.core.Request
import org.http4k.core.Response
import org.http4k.core.Status.Companion.INTERNAL_SERVER_ERROR
import org.http4k.core.Status.Companion.OK
import org.http4k.core.then
import org.http4k.filter.ResilienceFilters
import java.time.Duration
import java.util.ArrayDeque
// Circuit state transition: CLOSED (ok) -> OPEN (dead) -> HALF_OPEN (test) -> CLOSED (ok)
fun main() {
// these example responses are queued up to trigger the circuit state changes
val responses = ArrayDeque<Response>()
responses.add(Response(INTERNAL_SERVER_ERROR))
responses.add(Response(OK))
responses.add(Response(OK))
// configure the circuit breaker filter here
val circuitBreaker = CircuitBreaker.of(
"circuit",
CircuitBreakerConfig.custom()
.slidingWindow(2, 2, COUNT_BASED)
.permittedNumberOfCallsInHalfOpenState(2)
.waitDurationInOpenState(Duration.ofSeconds(1))
.build()
)
val circuited = ResilienceFilters.CircuitBreak(circuitBreaker,
isError = { r: Response -> !r.status.successful } // this defaults to >= 500
).then { responses.removeFirst() }
println(
"Result: " + circuited(
Request(
GET,
"/"
)
).status + " Circuit is: " + circuitBreaker.state
)
println(
"Result: " + circuited(
Request(
GET,
"/"
)
).status + " Circuit is: " + circuitBreaker.state
)
Thread.sleep(1100) // wait for reset
println(
"Result: " + circuited(
Request(
GET,
"/"
)
).status + " Circuit is: " + circuitBreaker.state
)
println(
"Result: " + circuited(
Request(
GET,
"/"
)
).status + " Circuit is: " + circuitBreaker.state
)
}
Rate Limiting ¶
A RateLimit Filter monitors the number of requests over a set window.
package guide.reference.resilience4j
import io.github.resilience4j.ratelimiter.RateLimiter
import io.github.resilience4j.ratelimiter.RateLimiterConfig
import org.http4k.core.Method.GET
import org.http4k.core.Request
import org.http4k.core.Response
import org.http4k.core.Status.Companion.OK
import org.http4k.core.then
import org.http4k.filter.ResilienceFilters
import java.time.Duration
fun main() {
// configure the rate limiter filter here
val config = RateLimiterConfig.custom()
.limitRefreshPeriod(Duration.ofSeconds(1))
.limitForPeriod(1)
.timeoutDuration(Duration.ofMillis(10)).build()
// set up the responses to sleep for a bit
val rateLimits = ResilienceFilters.RateLimit(RateLimiter.of("ratelimiter", config))
.then { Response(OK) }
println(rateLimits(Request(GET, "/")).status)
println(rateLimits(Request(GET, "/")).status)
}
Retrying ¶
A Retrying Filter retries requests if a failure is generated.
package guide.reference.resilience4j
import io.github.resilience4j.retry.Retry
import io.github.resilience4j.retry.RetryConfig
import org.http4k.core.Method.GET
import org.http4k.core.Request
import org.http4k.core.Response
import org.http4k.core.Status.Companion.INTERNAL_SERVER_ERROR
import org.http4k.core.Status.Companion.OK
import org.http4k.core.then
import org.http4k.filter.ResilienceFilters
import java.util.ArrayDeque
fun main() {
// configure the retry filter here, with max attempts and backoff
val retry = Retry.of("retrying", RetryConfig.custom<RetryConfig>()
.maxAttempts(3)
.intervalFunction { attempt: Int -> (attempt * 2).toLong() }
.build())
// queued up responses
val responses = ArrayDeque<Response>()
responses.add(Response(INTERNAL_SERVER_ERROR))
responses.add(Response(OK))
val retrying = ResilienceFilters.RetryFailures(retry,
isError = { r: Response -> !r.status.successful }
).then {
val response = responses.removeFirst()
println("trying request, will return " + response.status)
response
}
println(retrying(Request(GET, "/")))
}
Bulkheading ¶
A Bulkhead Filter limits the amount of parallel calls that can be executed.
package guide.reference.resilience4j
import io.github.resilience4j.bulkhead.Bulkhead
import io.github.resilience4j.bulkhead.BulkheadConfig
import org.http4k.core.Method.GET
import org.http4k.core.Request
import org.http4k.core.Response
import org.http4k.core.Status.Companion.OK
import org.http4k.core.then
import org.http4k.filter.ResilienceFilters
import java.time.Duration
import kotlin.concurrent.thread
fun main() {
// configure the Bulkhead filter here
val config = BulkheadConfig.custom()
.maxConcurrentCalls(5)
.maxWaitDuration(Duration.ofMillis(1000))
.build()
val bulkheading = ResilienceFilters.Bulkheading(Bulkhead.of("bulkhead", config)).then {
Thread.sleep(100)
Response(OK)
}
// throw a bunch of requests at the filter - only 5 should pass
for (it in 1..10) {
thread {
println(bulkheading(Request(GET, "/")).status)
}
}
}