Project Reactor: Programación Reactiva en Java/Kotlin

Fecha de publicación: 2023-11-29
Project Reactor es la biblioteca reactiva detrás de Spring WebFlux. Implementa la especificación Reactive Streams y te da dos tipos fundamentales: Mono (0 o 1 elemento) y Flux (0 a N elementos). Si quieres entender WebFlux, primero necesitas entender Reactor.

¿Por qué programación reactiva?

El modelo bloqueante tradicional asigna un hilo por cada request entrante. Ese hilo queda suspendido esperando respuestas de bases de datos, llamadas HTTP externas u otras operaciones de I/O. El resultado: hilos ocupados sin hacer nada útil, recursos desperdiciados y cuellos de botella bajo carga alta.
La programación reactiva propone un modelo basado en flujos de datos asíncronos y no bloqueantes. Un número reducido de hilos atiende muchos más requests porque nunca se quedan esperando: delegan la espera al sistema operativo y se liberan para procesar otros eventos. No es un concepto nuevo: es esencialmente el patrón Observer con la especificación Reactive Streams encima, lo que garantiza interoperabilidad entre bibliotecas.

Mono y Flux: los dos pilares

Todo en Project Reactor gira en torno a estos dos tipos. Entenderlos bien es el primer paso para trabajar con comodidad en el ecosistema reactivo.

Mono: un solo valor (o vacío)

Mono<T> representa una secuencia que emite como máximo un elemento y luego termina, ya sea con éxito o con un error. Es el equivalente reactivo de un valor opcional o de una promesa.
kotlin
1// Mono que emite un valor
2val mono: Mono<String> = Mono.just("Hola Reactor")
3
4// Mono vacío
5val empty: Mono<String> = Mono.empty()
6
7// Mono desde una operación que puede fallar
8val fromCallable: Mono<User> = Mono.fromCallable {
9    userRepository.findById(1) // operación bloqueante envuelta
10}

Flux: múltiples valores

Flux<T> representa una secuencia de 0 a N elementos. Puede emitir valores de forma continua o terminar cuando la fuente se agota.
kotlin
1// Flux con valores fijos
2val flux: Flux<Int> = Flux.just(1, 2, 3, 4, 5)
3
4// Flux desde una colección
5val fromList: Flux<String> = Flux.fromIterable(listOf("a", "b", "c"))
6
7// Flux infinito con intervalo
8val ticker: Flux<Long> = Flux.interval(Duration.ofSeconds(1))

Operadores esenciales

Reactor ofrece un catálogo extenso de operadores para transformar, filtrar y combinar flujos. Estos son los que vas a usar con más frecuencia al principio:
kotlin
1// map: transformar cada elemento
2Flux.just(1, 2, 3)
3    .map { it * 2 }
4    .subscribe { println(it) } // 2, 4, 6
5
6// filter: filtrar elementos
7Flux.range(1, 10)
8    .filter { it % 2 == 0 }
9    .subscribe { println(it) } // 2, 4, 6, 8, 10
10
11// flatMap: transformar cada elemento en otro Mono/Flux
12Flux.just("user-1", "user-2")
13    .flatMap { userId -> fetchUser(userId) } // retorna Mono<User>
14    .subscribe { println(it) }
15
16// zip: combinar múltiples fuentes
17Mono.zip(
18    fetchUser("user-1"),
19    fetchOrders("user-1")
20).map { tuple ->
21    UserWithOrders(tuple.t1, tuple.t2)
22}
La diferencia entre map y flatMap es importante: map transforma cada elemento en otro valor del mismo tipo, mientras que flatMap transforma cada elemento en un nuevo Mono o Flux y luego aplana los resultados en una sola secuencia. Se usa cuando la transformación es en sí misma una operación asíncrona.

El concepto clave: nada se ejecuta hasta el subscribe

Este es el punto que más confunde a quienes vienen del modelo imperativo. Cuando defines una cadena de operadores sobre un Mono o Flux, no estás ejecutando nada todavía. Estás construyendo una descripción del pipeline, un plan de ejecución.
Solo cuando alguien llama a subscribe() (o un operador terminal equivalente), el pipeline se activa y comienza a procesar datos. Este comportamiento se llama evaluación perezosa o lazy evaluation.
La buena noticia es que en Spring WebFlux no tienes que preocuparte por esto en los controladores: cuando retornas un Mono o Flux desde un método de controlador, el framework se suscribe por ti automáticamente. Solo necesitas recordarlo cuando usas Reactor fuera del contexto de un request HTTP, por ejemplo en tareas programadas o procesos en background.

Manejo de errores

En un pipeline reactivo, los errores también son valores del flujo. Reactor ofrece varios operadores para manejarlos de forma declarativa, sin try/catch:
kotlin
1// onErrorReturn: retornar un valor por defecto ante cualquier error
2Mono.fromCallable { riskyOperation() }
3    .onErrorReturn("Valor por defecto")
4    .subscribe { println(it) }
5
6// onErrorResume: retornar un Mono alternativo con lógica personalizada
7Mono.fromCallable { riskyOperation() }
8    .onErrorResume { error ->
9        logger.error("Falló: ${error.message}")
10        Mono.just("Fallback")
11    }
12    .subscribe { println(it) }
13
14// onErrorContinue: ignorar el error y continuar con el siguiente elemento
15Flux.just(1, 2, 3)
16    .map { if (it == 2) throw RuntimeException("Error en $it") else it }
17    .onErrorContinue { error, value ->
18        logger.warn("Ignorando error para $value: ${error.message}")
19    }
20    .subscribe { println(it) } // 1, 3
Un error no manejado en un pipeline reactivo termina el stream completo. Por eso es importante definir una estrategia de manejo de errores explícita. onErrorContinue es útil para flujos donde quieres procesar el mayor número posible de elementos aunque algunos fallen.

Schedulers: controlando en qué hilo se ejecuta

Por defecto, un pipeline reactivo se ejecuta en el hilo que lo suscribe. LosSchedulers permiten cambiar el contexto de ejecución en cualquier punto de la cadena. Reactor incluye tres schedulers principales:
  • Schedulers.boundedElastic(): pool elástico de hilos para operaciones de I/O bloqueante
  • Schedulers.parallel(): pool de tamaño fijo (CPU cores) para procesamiento intensivo
  • Schedulers.single(): un único hilo reutilizable para tareas ligeras
kotlin
1Mono.fromCallable { blockingDatabaseCall() }
2    .subscribeOn(Schedulers.boundedElastic()) // Ejecutar en hilo de I/O
3    .map { processResult(it) } // Sigue en el mismo hilo
4    .publishOn(Schedulers.parallel()) // Cambiar a hilo de procesamiento
5    .map { heavyComputation(it) }
6    .subscribe()
La diferencia entre subscribeOn y publishOn es sutil pero importante:
  • subscribeOn: afecta en qué hilo se inicia la suscripción e impacta toda la cadena hacia arriba, desde la fuente
  • publishOn: cambia el hilo de ejecución desde ese punto hacia abajo en la cadena
En la práctica, subscribeOn se usa para envolver operaciones bloqueantes (como llamadas JDBC) en un pool de I/O, y publishOn para mover el procesamiento posterior a un pool diferente.

Ejemplo práctico: controller con WebFlux

Con estos conceptos claros, veamos cómo se traduce en un controlador real de Spring WebFlux. Observa cómo el código retorna directamente Mono y Flux: el framework maneja la suscripción y la serialización de la respuesta HTTP.
kotlin
1@RestController
2@RequestMapping("/api/products")
3class ProductController(
4    private val productRepository: ReactiveProductRepository,
5    private val priceService: PriceService
6) {
7    @GetMapping
8    fun getAll(): Flux<Product> = productRepository.findAll()
9
10    @GetMapping("/{id}")
11    fun getById(@PathVariable id: String): Mono<ResponseEntity<Product>> =
12        productRepository.findById(id)
13            .map { ResponseEntity.ok(it) }
14            .defaultIfEmpty(ResponseEntity.notFound().build())
15
16    @GetMapping("/{id}/enriched")
17    fun getEnriched(@PathVariable id: String): Mono<EnrichedProduct> =
18        Mono.zip(
19            productRepository.findById(id),
20            priceService.getPrice(id)
21        ).map { (product, price) ->
22            EnrichedProduct(product, price)
23        }
24}
El endpoint /enriched es especialmente interesante: usa Mono.zip para ejecutar las dos consultas en paralelo y combinar sus resultados cuando ambas completan. Esto es algo que en el modelo bloqueante requeriría hilos adicionales o CompletableFuture; en Reactor es una línea de código.

Errores comunes

Estos son los errores que aparecen con más frecuencia cuando se empieza a trabajar con Reactor:
  • Bloquear dentro de un pipeline reactivo: llamar a .block() dentro de un map o flatMap anula los beneficios del modelo reactivo y puede causar deadlocks en schedulers de tamaño fijo.
  • No suscribirse nunca: construir la cadena de operadores y olvidar llamar a subscribe() (o retornar el resultado desde un controlador) hace que nada se ejecute.
  • Confundir subscribeOn con publishOn: usar uno cuando se necesita el otro produce comportamientos de hilo inesperados y difíciles de depurar.
  • Ignorar el manejo de errores: un error no manejado termina el stream completo. Siempre define qué debe ocurrir cuando algo falla.

Conclusión

Reactor tiene una curva de aprendizaje real. El cambio de mentalidad desde el modelo imperativo (pensar en flujos de datos en lugar de pasos secuenciales) requiere práctica. Pero una vez que internalizas Mono y Flux, los operadores básicos y el modelo de suscripción, el código reactivo se vuelve natural y expresivo.
El siguiente paso es ver cómo estos conceptos se aplican en un proyecto Spring WebFlux completo: repositorios reactivos con R2DBC, clientes HTTP no bloqueantes con WebClient y propagación de contexto entre hilos. Eso lo veremos en la siguiente parte de esta serie.

Posts que podrian interesarte