Project Reactor Interview Questions (Free Preview)
Free sample of 15 from 38 questions available
Testing
What is the purpose of expectNext, expectComplete, and verify?
The 30-Second Answer:
These are the core StepVerifier methods: expectNext() validates that specific values are emitted in order, expectComplete() validates that the stream completes successfully without errors, and verify() triggers the actual subscription and executes all expectations, making the test run.
The 2-Minute Answer (If They Want More): These three methods form the foundation of reactive stream testing and each serves a distinct purpose in the verification lifecycle.
expectNext() is used to assert that the publisher emits specific values in the exact order specified. I can chain multiple calls or pass multiple values to a single call. It validates both the value itself and the sequence position, ensuring that the reactive pipeline produces the expected data transformations. If the actual emitted value doesn't match or arrives out of order, the test fails immediately with a clear error message.
expectComplete() verifies that the publisher sends an onComplete signal, indicating successful termination of the stream. This is crucial for testing finite streams where I need to ensure all data has been processed and the stream closed properly. It's different from just checking values because a stream might emit expected values but then hang indefinitely or error out instead of completing cleanly.
verify() is the trigger method that actually subscribes to the publisher and executes all the defined expectations. Until verify() is called, no subscription happens and the test doesn't run—the expectation methods only build up a test scenario. verify() can optionally accept a Duration parameter to set a timeout, preventing tests from hanging if the publisher doesn't emit events as expected. There's also a convenience method verifyComplete() that combines expectComplete() and verify() in one call.
Together, these methods provide a declarative API for testing reactive streams: I define what should happen (expectNext, expectComplete), then trigger the execution (verify), and StepVerifier handles all the asynchronous complexity.
Code Example:
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
import org.junit.jupiter.api.Test;
import java.time.Duration;
public class StepVerifierMethodsTest {
@Test
public void demonstrateExpectNext() {
Flux<String> flux = Flux.just("Apple", "Banana", "Cherry");
StepVerifier.create(flux)
.expectNext("Apple") // First emission
.expectNext("Banana") // Second emission
.expectNext("Cherry") // Third emission
.expectComplete()
.verify();
}
@Test
public void expectNextWithMultipleValues() {
Flux<Integer> flux = Flux.range(1, 5);
// Can pass multiple expected values at once
StepVerifier.create(flux)
.expectNext(1, 2, 3, 4, 5) // All values in one call
.expectComplete()
.verify();
}
@Test
public void demonstrateExpectComplete() {
// Empty Mono - only emits completion signal
Mono<Void> mono = Mono.empty();
StepVerifier.create(mono)
.expectComplete() // No values, just completion
.verify();
}
@Test
public void demonstrateVerify() {
Flux<String> flux = Flux.just("A", "B");
// verify() without timeout
StepVerifier.create(flux)
.expectNext("A", "B")
.expectComplete()
.verify(); // Blocks until completion or failure
}
@Test
public void verifyWithTimeout() {
Flux<Integer> slowFlux = Flux.range(1, 3)
.delayElements(Duration.ofMillis(100));
// verify(Duration) sets maximum wait time
StepVerifier.create(slowFlux)
.expectNext(1, 2, 3)
.expectComplete()
.verify(Duration.ofSeconds(2)); // Fail if not complete within 2s
}
@Test
public void verifyCompleteConvenience() {
Flux<String> flux = Flux.just("X", "Y", "Z");
// verifyComplete() = expectComplete() + verify()
StepVerifier.create(flux)
.expectNext("X", "Y", "Z")
.verifyComplete(); // Combines both methods
}
@Test
public void expectNextWithPredicate() {
Flux<Integer> flux = Flux.just(10, 20, 30);
StepVerifier.create(flux)
.expectNext(10)
.expectNextMatches(n -> n > 15 && n < 25) // Predicate-based
.expectNext(30)
.verifyComplete();
}
@Test
public void expectNextCount() {
Flux<Integer> flux = Flux.range(1, 100);
StepVerifier.create(flux)
.expectNextCount(100) // Expect 100 items without checking values
.verifyComplete();
}
}
Mermaid Diagram:
sequenceDiagram
participant Test as Test Code
participant SV as StepVerifier
participant Pub as Publisher
Test->>SV: create(publisher)
Note over SV: Build expectations
Test->>SV: expectNext("A")
Test->>SV: expectNext("B")
Test->>SV: expectComplete()
Note over Test,SV: Nothing happens yet!
Test->>SV: verify()
activate SV
SV->>Pub: subscribe()
Pub->>SV: onNext("A")
Note over SV: âś“ Matches expectNext("A")
Pub->>SV: onNext("B")
Note over SV: âś“ Matches expectNext("B")
Pub->>SV: onComplete()
Note over SV: âś“ Matches expectComplete()
SV->>Test: Test passes
deactivate SV
References:
- StepVerifier API - Project Reactor
- Testing with StepVerifier - Reactor Guide
- Reactor Testing Examples - GitHub
Project Reactor Fundamentals
What is the difference between reactive and imperative programming?
The 30-Second Answer: Imperative programming executes code sequentially and blocks on I/O operations, while reactive programming is event-driven, non-blocking, and handles data as asynchronous streams. Reactive programming uses backpressure to prevent overwhelming consumers and enables better resource utilization through non-blocking operations.
The 2-Minute Answer (If They Want More): Imperative programming follows a traditional, step-by-step execution model where each operation must complete before the next one begins. When you make a database call or HTTP request, the thread blocks and waits for the response, consuming resources even while idle. This approach is intuitive and straightforward but doesn't scale well under high load - if you have 1000 concurrent requests and each blocks a thread, you need 1000 threads consuming memory and context-switching overhead.
Reactive programming, in contrast, is built around asynchronous data streams and the propagation of change. Instead of blocking and waiting, operations emit events when data becomes available. A single thread can handle thousands of concurrent operations by switching between them when I/O operations are pending. This is achieved through an event-driven model where subscribers register interest in data, and publishers notify them when data is ready.
The key differences include: (1) Execution Model - imperative is sequential and blocking, reactive is asynchronous and non-blocking; (2) Data Handling - imperative pulls data (you ask for it and wait), reactive pushes data (it comes to you when ready); (3) Error Handling - imperative uses try-catch blocks, reactive treats errors as first-class events in the stream; (4) Backpressure - imperative has no concept of this, reactive provides mechanisms for consumers to signal how much data they can handle.
In practical terms, reactive programming shines in scenarios with high I/O, such as microservices communicating over networks, real-time data processing, or applications serving many concurrent users. However, it comes with increased complexity - debugging can be harder, and the learning curve is steeper. For CPU-bound operations or simple applications, imperative programming may be more appropriate.
Code Example:
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
public class ImperativeVsReactive {
// IMPERATIVE APPROACH
public static List<String> fetchUserDataImperative(List<Integer> userIds) {
// Each call blocks the thread until completion
List<String> results = new ArrayList<>();
for (Integer userId : userIds) {
// Thread blocks here waiting for database
User user = userRepository.findById(userId); // Blocking call
// Thread blocks here waiting for external API
String profile = externalApi.getProfile(user); // Blocking call
results.add(profile);
}
return results; // Returns only when ALL operations complete
}
// REACTIVE APPROACH
public static Flux<String> fetchUserDataReactive(List<Integer> userIds) {
// No blocking - operations execute asynchronously
return Flux.fromIterable(userIds)
// Non-blocking database call
.flatMap(userId -> userRepository.findByIdReactive(userId))
// Non-blocking API call - can run in parallel
.flatMap(user -> externalApi.getProfileReactive(user))
// Operations start immediately, results stream as they arrive
.timeout(Duration.ofSeconds(5)); // Built-in timeout handling
// Returns immediately with a Flux that will emit items as they're ready
}
// Demonstrating key differences
public static void compareApproaches() {
System.out.println("=== IMPERATIVE ===");
long imperativeStart = System.currentTimeMillis();
// Blocking operations - sequential execution
String result1 = blockingDatabaseCall(); // Waits 1 second
String result2 = blockingDatabaseCall(); // Waits another 1 second
String result3 = blockingDatabaseCall(); // Waits another 1 second
// Total time: ~3 seconds
System.out.println("Imperative time: " +
(System.currentTimeMillis() - imperativeStart) + "ms");
System.out.println("\n=== REACTIVE ===");
long reactiveStart = System.currentTimeMillis();
// Non-blocking operations - parallel execution
Mono<String> mono1 = nonBlockingDatabaseCall(); // Starts immediately
Mono<String> mono2 = nonBlockingDatabaseCall(); // Starts immediately
Mono<String> mono3 = nonBlockingDatabaseCall(); // Starts immediately
// Combine all three - they run in parallel
Mono.zip(mono1, mono2, mono3)
.subscribe(tuple -> {
System.out.println("Reactive time: " +
(System.currentTimeMillis() - reactiveStart) + "ms");
// Total time: ~1 second (parallel execution)
});
}
// PULL vs PUSH model demonstration
public static void pullVsPush() {
// IMPERATIVE (PULL) - you ask for data and wait
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5);
for (Integer num : numbers) {
// Pull each item and process
int squared = num * num;
System.out.println(squared);
}
// REACTIVE (PUSH) - data comes to you when ready
Flux.just(1, 2, 3, 4, 5)
.map(num -> num * num)
.subscribe(System.out::println); // Data is pushed to subscriber
// BACKPRESSURE - reactive can control flow
Flux.range(1, 1000)
.onBackpressureBuffer(10) // Buffer only 10 items
.subscribe(
item -> {
// Slow consumer - process only what you can handle
Thread.sleep(100);
System.out.println(item);
}
);
}
}
// Mock blocking operations
class ImperativeOperations {
public static String blockingDatabaseCall() {
try {
Thread.sleep(1000); // Blocks thread for 1 second
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return "data";
}
}
// Mock non-blocking operations
class ReactiveOperations {
public static Mono<String> nonBlockingDatabaseCall() {
return Mono.delay(Duration.ofSeconds(1))
.map(tick -> "data"); // Doesn't block - returns immediately
}
}
Mermaid Diagram:
flowchart TD
subgraph Imperative["Imperative Programming"]
I1[Request 1] -->|blocks thread| I2[Wait for I/O]
I2 --> I3[Process Result]
I3 --> I4[Request 2]
I4 -->|blocks thread| I5[Wait for I/O]
I5 --> I6[Process Result]
I7[Thread] -.->|dedicated| I1
I8[Thread] -.->|dedicated| I4
I9[High thread count<br/>High memory usage]
end
subgraph Reactive["Reactive Programming"]
R1[Request 1] -->|non-blocking| R2[Register callback]
R3[Request 2] -->|non-blocking| R4[Register callback]
R5[Request 3] -->|non-blocking| R6[Register callback]
R2 -.->|event| R7[Process when ready]
R4 -.->|event| R8[Process when ready]
R6 -.->|event| R9[Process when ready]
R10[Single Thread<br/>Event Loop] -.->|multiplexes| R1
R10 -.-> R3
R10 -.-> R5
R11[Low thread count<br/>Efficient resources]
end
style Imperative fill:#ffcccc
style Reactive fill:#ccffcc
References:
- Reactive Programming vs Imperative - Spring Documentation
- The Reactive Manifesto
- Reactor 3 Reference Guide - Introduction to Reactive Programming
What is Project Reactor and how does it relate to Reactive Streams?
The 30-Second Answer: Project Reactor is a fourth-generation reactive library for building non-blocking applications on the JVM, based on the Reactive Streams specification. It provides the implementation of Reactive Streams APIs through its core types (Mono and Flux) and serves as the foundation for Spring WebFlux.
The 2-Minute Answer (If They Want More): Project Reactor is a fully non-blocking reactive programming foundation for the JVM, developed by VMware (formerly Pivotal). It implements the Reactive Streams specification, which defines a standard for asynchronous stream processing with non-blocking backpressure. Reactive Streams provides four key interfaces: Publisher, Subscriber, Subscription, and Processor, and Reactor implements these through its core types.
The relationship between Reactor and Reactive Streams is foundational - Reactor's Mono and Flux are both Publishers in the Reactive Streams sense. This means they can interoperate with any other Reactive Streams-compliant library, including RxJava 2+, Akka Streams, and Vert.x. The specification ensures that reactive libraries can work together seamlessly, preventing issues like unbounded buffering or resource exhaustion.
Reactor goes beyond the basic Reactive Streams specification by providing a rich set of operators (over 500) for transforming, combining, and handling reactive sequences. It also includes advanced features like schedulers for controlling execution contexts, hooks for debugging and monitoring, and Context for passing metadata through the reactive chain. As the reactive engine behind Spring WebFlux, Reactor has become the de facto standard for reactive programming in the Spring ecosystem.
The library is designed for high throughput and low latency scenarios, making it ideal for microservices, real-time data processing, and any application requiring efficient resource utilization under high load.
Code Example:
// Project Reactor implementing Reactive Streams Publisher interface
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
public class ReactorBasics {
// Flux and Mono are both Publishers (Reactive Streams)
public static void demonstrateReactiveStreams() {
// Create a Publisher (Flux implements Publisher<T>)
Publisher<Integer> publisher = Flux.range(1, 5);
// Subscribe with custom Subscriber (Reactive Streams interface)
publisher.subscribe(new Subscriber<Integer>() {
private Subscription subscription;
@Override
public void onSubscribe(Subscription s) {
this.subscription = s;
// Request items with backpressure control
s.request(2); // Request only 2 items initially
}
@Override
public void onNext(Integer item) {
System.out.println("Received: " + item);
// Request next item after processing
subscription.request(1);
}
@Override
public void onError(Throwable t) {
System.err.println("Error: " + t.getMessage());
}
@Override
public void onComplete() {
System.out.println("Completed");
}
});
// Simpler subscription using Reactor's lambda-based API
Flux.just("A", "B", "C")
.subscribe(
item -> System.out.println("Item: " + item), // onNext
error -> System.err.println("Error: " + error), // onError
() -> System.out.println("Done") // onComplete
);
}
// Interoperability with any Reactive Streams implementation
public static Mono<String> fromAnyPublisher(Publisher<String> publisher) {
// Reactor can wrap any Reactive Streams Publisher
return Mono.from(publisher);
}
}
Mermaid Diagram:
flowchart TD
A[Reactive Streams Specification] -->|defines| B[Publisher Interface]
A -->|defines| C[Subscriber Interface]
A -->|defines| D[Subscription Interface]
A -->|defines| E[Processor Interface]
B -->|implemented by| F[Project Reactor]
C -->|implemented by| F
D -->|implemented by| F
E -->|implemented by| F
F -->|provides| G[Mono<T>]
F -->|provides| H[Flux<T>]
G -->|0..1 elements| I[Publisher<T>]
H -->|0..N elements| I
F -->|foundation for| J[Spring WebFlux]
F -->|interoperates with| K[RxJava 2+]
F -->|interoperates with| L[Akka Streams]
style F fill:#6db33f
style A fill:#87ceeb
References:
- Project Reactor Reference Documentation
- Reactive Streams Specification
- Understanding Reactive Types - Reactor Core
What is backpressure and why is it important in reactive programming?
The 30-Second Answer: Backpressure is a mechanism that allows a slow consumer to signal to a fast producer how much data it can handle, preventing overwhelming the consumer with more data than it can process. It's essential in reactive programming to avoid memory exhaustion, resource starvation, and application crashes when producers emit data faster than consumers can process it.
The 2-Minute Answer (If They Want More): In reactive streams, backpressure is the consumer's ability to control the rate at which it receives data from the producer. Imagine a scenario where a database can return 10,000 records per second, but your application can only process 100 records per second - without backpressure, the extra 9,900 records per second would accumulate in memory, eventually causing an OutOfMemoryError. Backpressure provides a feedback mechanism where the consumer tells the producer: "I can handle N items right now."
The Reactive Streams specification defines backpressure through the request(n) method on the Subscription interface. When a Subscriber subscribes to a Publisher, it receives a Subscription and uses request(n) to signal how many items it's ready to receive. The Publisher should not emit more items than requested. This creates a pull-push hybrid model: the publisher pushes data, but only as much as the subscriber has pulled (requested).
Project Reactor provides several strategies for handling backpressure: (1) Buffering - accumulate items in a bounded buffer, error if exceeded; (2) Dropping - discard items that can't be processed; (3) Latest - keep only the most recent item; (4) Error - signal an error when the consumer can't keep up. Each strategy suits different scenarios - for example, dropping might be acceptable for real-time sensor data where the latest reading matters most, while buffering might be needed for critical financial transactions.
Understanding backpressure is crucial for building resilient reactive applications. Without it, you essentially have an unbounded queue that can grow indefinitely. With proper backpressure handling, your application can gracefully handle load, apply flow control across service boundaries, and maintain predictable resource usage even under stress.
Code Example:
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import org.reactivestreams.Subscription;
import java.time.Duration;
public class BackpressureExamples {
// PROBLEM: Fast producer, slow consumer (without backpressure handling)
public static void demonstrateProblem() {
Flux.range(1, 1000000) // Fast producer: 1 million items
.doOnNext(i -> System.out.println("Produced: " + i))
.subscribe(i -> {
try {
// Slow consumer: takes 100ms per item
Thread.sleep(100);
System.out.println("Consumed: " + i);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
// Without backpressure, all 1M items would be produced immediately
// Memory would fill up with unconsumed items
}
// SOLUTION 1: Manual backpressure control
public static void manualBackpressure() {
Flux.range(1, 100)
.subscribe(new BaseSubscriber<Integer>() {
@Override
protected void hookOnSubscribe(Subscription subscription) {
// Request only 1 item initially
request(1);
}
@Override
protected void hookOnNext(Integer value) {
System.out.println("Processing: " + value);
try {
Thread.sleep(100); // Simulate slow processing
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
// Request next item only after processing current one
request(1);
}
});
}
// SOLUTION 2: onBackpressureBuffer - Buffer with bounded size
public static void bufferStrategy() {
Flux.range(1, 1000)
.onBackpressureBuffer(
100, // Buffer size
i -> System.out.println("Dropped: " + i) // Overflow handler
)
.delayElements(Duration.ofMillis(100)) // Simulate slow consumer
.subscribe(
i -> System.out.println("Consumed: " + i),
error -> System.err.println("Error: " + error)
);
// Buffers up to 100 items, drops extras with callback
}
// SOLUTION 3: onBackpressureDrop - Drop excess items
public static void dropStrategy() {
Flux.interval(Duration.ofMillis(1)) // Fast producer: 1 item/ms
.onBackpressureDrop(dropped ->
System.out.println("Dropped: " + dropped)
)
.delayElements(Duration.ofMillis(100)) // Slow consumer: 1 item/100ms
.subscribe(i -> System.out.println("Consumed: " + i));
// Consumer gets items it can handle, rest are dropped
// Useful for real-time data where old data becomes irrelevant
}
// SOLUTION 4: onBackpressureLatest - Keep only latest
public static void latestStrategy() {
Flux.interval(Duration.ofMillis(1)) // Fast producer
.onBackpressureLatest() // Keep only the most recent item
.delayElements(Duration.ofMillis(100)) // Slow consumer
.subscribe(i -> System.out.println("Consumed: " + i));
// Consumer always gets the latest available item
// Perfect for UI updates or sensor readings
}
// SOLUTION 5: onBackpressureError - Signal error when overwhelmed
public static void errorStrategy() {
Flux.range(1, 1000)
.onBackpressureError() // Fail fast when consumer can't keep up
.delayElements(Duration.ofMillis(100))
.subscribe(
i -> System.out.println("Consumed: " + i),
error -> System.err.println("Backpressure error: " + error)
);
// Signals OverflowException if consumer can't keep up
// Good for strict scenarios where all data must be processed
}
// PRACTICAL EXAMPLE: Database streaming with backpressure
public static class DatabaseStreamingService {
public Flux<User> streamLargeResultSet() {
return Flux.create(sink -> {
// Simulate database cursor
DatabaseCursor cursor = database.openCursor("SELECT * FROM users");
// Respect backpressure from downstream
sink.onRequest(n -> {
// Fetch exactly what was requested
for (int i = 0; i < n && cursor.hasNext(); i++) {
sink.next(cursor.next());
}
if (!cursor.hasNext()) {
sink.complete();
}
});
// Cleanup on cancel
sink.onDispose(() -> cursor.close());
});
}
// Consumer controls pace
public void processUsers() {
streamLargeResultSet()
.buffer(100) // Process in batches of 100
.flatMap(batch ->
processUserBatch(batch)
.subscribeOn(Schedulers.parallel()),
4 // Max 4 concurrent batches (backpressure on batches)
)
.subscribe();
}
}
// PRACTICAL EXAMPLE: HTTP client with backpressure
public static class HttpClientService {
public Flux<Response> streamApiResults(List<String> urls) {
return Flux.fromIterable(urls)
.flatMap(
url -> makeHttpRequest(url),
8 // Max 8 concurrent requests (apply backpressure)
)
.onBackpressureBuffer(
100,
BufferOverflowStrategy.DROP_OLDEST
);
}
private Mono<Response> makeHttpRequest(String url) {
return webClient.get()
.uri(url)
.retrieve()
.bodyToMono(Response.class);
}
}
// DEMONSTRATING THE REQUEST-RESPONSE PROTOCOL
public static void showRequestProtocol() {
Flux.range(1, 10)
.doOnRequest(n ->
System.out.println("Requested: " + n + " items")
)
.subscribe(new BaseSubscriber<Integer>() {
@Override
protected void hookOnSubscribe(Subscription subscription) {
request(3); // Request 3 items
}
@Override
protected void hookOnNext(Integer value) {
System.out.println("Received: " + value);
if (value % 3 == 0) {
request(3); // Request 3 more every 3 items
}
}
});
// Output:
// Requested: 3 items
// Received: 1
// Received: 2
// Received: 3
// Requested: 3 items
// Received: 4
// ...
}
// COMPARING BACKPRESSURE STRATEGIES
public static void compareStrategies() {
// Scenario: Fast producer (1 item/ms), Slow consumer (1 item/100ms)
Flux<Long> fastProducer = Flux.interval(Duration.ofMillis(1)).take(1000);
Duration slowConsumerDelay = Duration.ofMillis(100);
// Buffer: Stores items up to limit, then errors
fastProducer
.onBackpressureBuffer(50)
.delayElements(slowConsumerDelay)
.subscribe(i -> System.out.println("Buffer: " + i));
// Drop: Discards items consumer can't handle
fastProducer
.onBackpressureDrop()
.delayElements(slowConsumerDelay)
.subscribe(i -> System.out.println("Drop: " + i));
// Latest: Keeps only the newest item
fastProducer
.onBackpressureLatest()
.delayElements(slowConsumerDelay)
.subscribe(i -> System.out.println("Latest: " + i));
}
}
Mermaid Diagram:
sequenceDiagram
participant Producer
participant Subscriber
Note over Producer,Subscriber: Subscription Phase
Subscriber->>Producer: subscribe()
Producer->>Subscriber: onSubscribe(subscription)
Note over Producer,Subscriber: Backpressure in Action
Subscriber->>Producer: request(3)
Producer->>Subscriber: onNext(item1)
Producer->>Subscriber: onNext(item2)
Producer->>Subscriber: onNext(item3)
Note over Producer: Waits... (no more requests)
Subscriber->>Producer: request(2)
Producer->>Subscriber: onNext(item4)
Producer->>Subscriber: onNext(item5)
Subscriber->>Producer: request(Long.MAX_VALUE)
Note over Producer: Unbounded mode
Producer->>Subscriber: onNext(item6)
Producer->>Subscriber: onNext(item7)
Producer->>Subscriber: ...
Producer->>Subscriber: onComplete()
Note over Producer,Subscriber: Consumer Controls Flow
flowchart TD
A[Fast Producer] -->|unlimited items| B{Backpressure Strategy}
B -->|onBackpressureBuffer| C[Buffer Strategy]
C -->|buffer full?| C1{Check Size}
C1 -->|No| C2[Store in Buffer]
C1 -->|Yes| C3[Error/Drop based on policy]
B -->|onBackpressureDrop| D[Drop Strategy]
D --> D1[Drop excess items]
D1 --> D2[Consumer gets what it can handle]
B -->|onBackpressureLatest| E[Latest Strategy]
E --> E1[Keep only newest item]
E1 --> E2[Discard older items]
B -->|onBackpressureError| F[Error Strategy]
F --> F1[Signal OverflowException]
F1 --> F2[Terminate stream]
C2 --> G[Slow Consumer]
D2 --> G
E2 --> G
style C fill:#90ee90
style D fill:#ffeb9c
style E fill:#87ceeb
style F fill:#ffcccc
References:
- Reactive Streams Specification - Backpressure
- Project Reactor - Backpressure and Ways to Reshape Requests
- Spring WebFlux - Backpressure Handling
Mono and Flux
What is the difference between Mono.empty() and Mono.just(null)?
The 30-Second Answer: Mono.empty() represents a valid stream that completes without emitting any value, while Mono.just(null) throws a NullPointerException because the just() operator doesn't accept null values. To handle nullable values properly, I use Mono.justOrEmpty(nullableValue) which returns empty for null or a Mono with the value otherwise.
The 2-Minute Answer (If They Want More): This is a common source of confusion that highlights an important principle in reactive programming: null is not a valid signal in reactive streams. The difference is fundamental to understanding how Reactor handles optional values.
Mono.empty() is the correct way to represent "no value" in reactive programming. It's a complete, valid reactive stream that emits zero items and then completes normally by calling onComplete(). This is semantically equivalent to an empty Optional or an empty collection - the absence of a value is explicitly communicated through the stream's lifecycle, not through null.
Mono.just(null), on the other hand, will throw a NullPointerException at creation time because the just() operator explicitly rejects null values. This is by design in the Reactive Streams specification - null is forbidden as a stream element because it would create ambiguity in the protocol. How would downstream operators distinguish between "no value" and "the value null"?
When working with potentially null values from external sources (like database queries or API responses), I use Mono.justOrEmpty(). This method safely handles both cases: if the value is null, it returns Mono.empty(); if the value is non-null, it returns Mono.just(value). This pattern is crucial when integrating with non-reactive code that might return null.
The reactive way to represent optionality is through the stream lifecycle, not through null values. An empty Mono communicates "no result" clearly and allows operators like defaultIfEmpty() and switchIfEmpty() to handle the absence of data in a composable, type-safe way.
Code Example:
// Correct: Using Mono.empty() for "no value"
Mono<String> emptyMono = Mono.empty();
emptyMono.subscribe(
value -> System.out.println("Got: " + value), // Never called
error -> System.err.println("Error: " + error), // Never called
() -> System.out.println("Completed without value") // This is called
);
// WRONG: This throws NullPointerException at creation time
try {
Mono<String> nullMono = Mono.just(null); // NPE here!
} catch (NullPointerException e) {
System.err.println("Cannot create Mono.just(null)");
}
// Correct: Using Mono.justOrEmpty() for nullable values
String nullableValue = repository.findOptionalValue(); // might be null
Mono<String> safeMono = Mono.justOrEmpty(nullableValue);
// If nullableValue is null -> returns Mono.empty()
// If nullableValue is "data" -> returns Mono.just("data")
// Practical example: Database query that might return null
public Mono<User> findUserByEmail(String email) {
return Mono.fromCallable(() -> database.findByEmail(email))
.subscribeOn(Schedulers.boundedElastic())
.flatMap(Mono::justOrEmpty) // Convert null to empty
.switchIfEmpty(Mono.error(new UserNotFoundException(email)));
}
// Handling optionality the reactive way
Mono<String> result = findData()
.defaultIfEmpty("default-value") // Provide default for empty
.switchIfEmpty(Mono.defer(() -> { // Or switch to alternative source
return fetchFromCache();
}));
// Comparing with Optional
Optional<String> optional = Optional.ofNullable(nullableValue);
Mono<String> mono = Mono.justOrEmpty(nullableValue);
// Converting between Optional and Mono
Mono<String> fromOptional = Mono.justOrEmpty(optional);
Optional<String> toOptional = mono.blockOptional(); // Blocking!
// Common patterns
public Mono<User> getUserSafely(String id) {
return userRepository.findById(id) // Returns User or null
.map(Mono::justOrEmpty) // Wrap in Mono
.orElse(Mono.empty()) // Handle if Optional is empty
.onErrorResume(e -> {
log.error("Error fetching user", e);
return Mono.empty(); // Return empty on error
});
}
// Filtering vs Empty
Flux<Integer> numbers = Flux.just(1, 2, 3, 4, 5);
// filter() removes items but stream continues
Flux<Integer> filtered = numbers.filter(n -> n > 10); // Emits nothing, then completes
// This is different from a Mono that starts empty
Mono<Integer> emptyFromStart = Mono.empty(); // Never emits, just completes
Mermaid Diagram:
flowchart TD
A[Need to represent 'no value'] --> B{Do you have the value now?}
B -->|No value exists| C[Use Mono.empty]
B -->|Value might be null| D[Use Mono.justOrEmpty]
B -->|Value is definitely non-null| E[Use Mono.just]
C --> F[Emits: onComplete only]
D --> G{Is value null?}
G -->|Yes| F
G -->|No| H[Emits: onNext + onComplete]
E --> H
I[NEVER use Mono.just null] --> J[Throws NullPointerException]
style C fill:#90EE90
style D fill:#87CEEB
style E fill:#FFD700
style I fill:#FFB6C6
style J fill:#FF6B6B
References:
- Reactive Streams Specification - Rule 2.13
- Project Reactor - Null Safety
- Reactor API - Mono.justOrEmpty
What is a Mono and when would you use it?
The 30-Second Answer: A Mono is a reactive type that represents a stream of 0 or 1 element. I use it when I expect at most one result, like fetching a single user from a database, making an HTTP request that returns one response, or executing an operation that may or may not produce a value.
The 2-Minute Answer (If They Want More): Mono is one of the two core reactive types in Project Reactor, representing an asynchronous sequence that emits at most one item and then completes (or errors). It's conceptually similar to Java's CompletableFuture or Optional, but with the power of reactive operators.
I use Mono when the operation semantically produces zero or one result. Common use cases include database queries for a single entity, REST API calls that return a single object, cache lookups, authentication operations, or any computation that produces a single value. The key advantage over blocking alternatives is that Mono allows non-blocking composition of operations through operators like map, flatMap, filter, and zip.
Mono provides backpressure handling automatically since it can only emit 0 or 1 items, making it simpler than Flux for single-value scenarios. It also offers specific operators like defaultIfEmpty() and switchIfEmpty() that are optimized for optional value handling.
The lifecycle of a Mono is straightforward: it can either complete with a value (onNext + onComplete), complete empty (just onComplete), or terminate with an error (onError). This makes it easier to reason about than multi-value streams.
Code Example:
// Creating Mono from different sources
Mono<String> monoJust = Mono.just("Hello");
Mono<String> monoEmpty = Mono.empty();
Mono<User> monoCallable = Mono.fromCallable(() -> userRepository.findById(123));
Mono<String> monoSupplier = Mono.fromSupplier(() -> "Lazy value");
// Practical example: Database query
public Mono<User> getUserById(String userId) {
return Mono.fromCallable(() -> database.findUser(userId))
.subscribeOn(Schedulers.boundedElastic())
.map(user -> {
user.setLastAccessed(Instant.now());
return user;
})
.defaultIfEmpty(User.anonymous());
}
// Combining multiple Mono sources
public Mono<OrderSummary> getOrderSummary(String orderId) {
Mono<Order> order = orderService.findById(orderId);
Mono<Customer> customer = customerService.findById(orderId);
return Mono.zip(order, customer)
.map(tuple -> new OrderSummary(tuple.getT1(), tuple.getT2()));
}
// Error handling
public Mono<String> getConfigValue(String key) {
return configService.getValue(key)
.onErrorReturn("default-value")
.timeout(Duration.ofSeconds(5))
.retry(3);
}
Mermaid Diagram:
flowchart TD
A[Mono Source] --> B{Has Value?}
B -->|Yes| C[onNext: emit value]
B -->|No| D[Skip onNext]
C --> E[onComplete: signal completion]
D --> E
E --> F[Stream Terminated]
A -->|Error Occurs| G[onError: emit error]
G --> H[Stream Terminated]
style C fill:#90EE90
style E fill:#87CEEB
style G fill:#FFB6C6
References:
- Project Reactor Reference Guide - Mono
- Reactor Core Documentation - Mono API
- Baeldung - Introduction to Reactor Core
What is a Flux and when would you use it?
The 30-Second Answer: A Flux is a reactive type that represents a stream of 0 to N elements. I use it when dealing with multiple items like querying a list of users, streaming real-time events, processing file contents line-by-line, or handling any sequence of data that requires backpressure management.
The 2-Minute Answer (If They Want More): Flux is the second core reactive type in Project Reactor, representing an asynchronous sequence that can emit zero to many items over time. It's the reactive equivalent of a Java Stream or Collection, but designed for asynchronous, non-blocking operations with built-in backpressure support.
I use Flux whenever I'm working with multiple values that arrive over time. This includes database queries returning multiple rows, WebSocket message streams, server-sent events, file processing, batch operations, or any scenario where data is produced and consumed asynchronously. Unlike Java Streams which are pull-based and synchronous, Flux is push-based and can handle both bounded and unbounded data sources.
Flux shines in scenarios requiring backpressure - when the producer generates data faster than the consumer can process it. The reactive streams specification built into Flux ensures that subscribers can signal demand, preventing overwhelming downstream components. This is critical in high-throughput systems like message processing, data pipelines, or real-time analytics.
Flux provides a rich set of operators for transformation (map, flatMap), filtering (filter, take, skip), combination (merge, concat, zip), and error handling (onErrorResume, retry). These operators are lazy and composable, meaning they only execute when someone subscribes, allowing for efficient pipeline construction.
Code Example:
// Creating Flux from different sources
Flux<Integer> fluxRange = Flux.range(1, 10);
Flux<String> fluxArray = Flux.fromArray(new String[]{"a", "b", "c"});
Flux<User> fluxIterable = Flux.fromIterable(userList);
Flux<Long> fluxInterval = Flux.interval(Duration.ofSeconds(1));
// Practical example: Database query returning multiple results
public Flux<Product> searchProducts(String category) {
return Flux.defer(() -> Flux.fromIterable(database.findByCategory(category)))
.subscribeOn(Schedulers.boundedElastic())
.filter(product -> product.isAvailable())
.map(this::enrichWithPricing)
.onErrorResume(e -> {
log.error("Error searching products", e);
return Flux.empty();
});
}
// Streaming data with backpressure
public Flux<LogEntry> streamLogs(String filename) {
return Flux.create(sink -> {
try (BufferedReader reader = new BufferedReader(new FileReader(filename))) {
String line;
while ((line = reader.readLine()) != null && !sink.isCancelled()) {
sink.next(new LogEntry(line));
}
sink.complete();
} catch (IOException e) {
sink.error(e);
}
}, FluxSink.OverflowStrategy.BUFFER);
}
// Combining multiple Flux sources
public Flux<Notification> mergeNotificationSources() {
Flux<Notification> emailNotifications = emailService.getNotifications();
Flux<Notification> smsNotifications = smsService.getNotifications();
Flux<Notification> pushNotifications = pushService.getNotifications();
return Flux.merge(emailNotifications, smsNotifications, pushNotifications)
.sort(Comparator.comparing(Notification::getTimestamp))
.distinct(Notification::getId);
}
// Processing with batching
public Flux<BatchResult> processBatch(Flux<Order> orders) {
return orders
.buffer(100) // Batch 100 orders at a time
.flatMap(batch -> processOrderBatch(batch))
.onBackpressureBuffer(1000)
.publishOn(Schedulers.parallel());
}
Mermaid Diagram:
flowchart TD
A[Flux Source] --> B[onNext: item 1]
B --> C[onNext: item 2]
C --> D[onNext: item 3]
D --> E[onNext: item N]
E --> F[onComplete: signal completion]
F --> G[Stream Terminated]
A -->|Error Occurs| H[onError: emit error]
H --> I[Stream Terminated]
J[Subscriber] -.request N.-> A
A -.emit items.-> J
style B fill:#90EE90
style C fill:#90EE90
style D fill:#90EE90
style E fill:#90EE90
style F fill:#87CEEB
style H fill:#FFB6C6
References:
- Project Reactor Reference Guide - Flux
- Reactor Core Documentation - Flux API
- Spring WebFlux Documentation
Operators
What is the difference between map and flatMap in Reactor?
The 30-Second Answer:
map performs synchronous one-to-one transformations and returns a plain value, while flatMap handles asynchronous transformations that return a Publisher (Mono/Flux) and automatically flattens the result. Use map for simple conversions, flatMap for operations involving async calls or reactive chains.
The 2-Minute Answer (If They Want More):
The distinction between map and flatMap is fundamental to reactive programming. When I use map, I'm transforming each element synchronously into another value - think of it as a simple function application. The transformation happens immediately and returns a concrete value. For example, converting a User object to a UserDTO or multiplying a number by 2.
However, flatMap is designed for asynchronous operations that themselves return reactive types. When I need to make a database call, invoke a REST API, or perform any I/O operation that returns a Mono<T> or Flux<T>, flatMap is the answer. It subscribes to the inner publisher and flattens the results back into the outer stream, preventing nested publishers like Flux<Flux<T>>.
A critical performance consideration: flatMap executes inner publishers concurrently by default, which means if you have Flux.range(1,10).flatMap(id -> fetchUser(id)), all 10 user fetches could happen simultaneously. You can control this with flatMap(function, concurrency). If you need sequential processing, concatMap is the better choice.
I think of it this way: if my transformation function returns T, use map. If it returns Mono<T> or Flux<T>, use flatMap.
Code Example:
// map: Synchronous transformation (returns plain value)
Flux<Integer> numbers = Flux.just(1, 2, 3, 4, 5);
Flux<String> stringNumbers = numbers
.map(n -> "Number: " + n); // Returns String directly
// Result: "Number: 1", "Number: 2", "Number: 3", ...
// flatMap: Asynchronous transformation (returns Publisher)
Flux<Integer> userIds = Flux.just(101, 102, 103);
Flux<User> users = userIds
.flatMap(id -> userRepository.findById(id)); // Returns Mono<User>
// flatMap subscribes to each Mono and flattens results
// Common mistake - trying to use map with async operation
Flux<Mono<User>> wrong = userIds
.map(id -> userRepository.findById(id)); // Creates Flux<Mono<User>> - WRONG!
// Correct approach with flatMap
Flux<User> correct = userIds
.flatMap(id -> userRepository.findById(id)); // Returns Flux<User> - CORRECT!
// Real-world example: Processing orders
Flux<Order> orders = orderRepository.findPendingOrders();
// Use map for synchronous transformations
orders.map(order -> {
order.setStatus("PROCESSING");
return order; // Returns Order directly
});
// Use flatMap for async operations
orders.flatMap(order -> {
// Each of these returns Mono, so we need flatMap
return paymentService.processPayment(order.getId()) // Returns Mono<Payment>
.flatMap(payment ->
inventoryService.reserveItems(order.getItems())) // Returns Mono<Reservation>
.map(reservation -> order); // Final map returns Order
});
// Controlling concurrency with flatMap
userIds.flatMap(
id -> userRepository.findById(id),
3 // Process max 3 concurrent requests
).subscribe();
Mermaid Diagram:
flowchart TD
subgraph "map: Synchronous"
A1[Input: 1, 2, 3] --> B1[map: n * 2]
B1 --> C1[Output: 2, 4, 6]
D1[Returns: T] -.-> B1
end
subgraph "flatMap: Asynchronous"
A2[Input: id1, id2, id3] --> B2[flatMap: fetchUser]
B2 --> C2[Inner: Mono, Mono, Mono]
C2 --> D2[Flatten]
D2 --> E2[Output: user1, user2, user3]
F2[Returns: Mono/Flux T] -.-> B2
end
style B1 fill:#e1f5ff
style B2 fill:#ffe1e1
style D2 fill:#fff4e1
References:
↑ Back to topWhat is the filter operator and how does it work?
The 30-Second Answer:
The filter operator allows only elements that satisfy a given predicate (boolean condition) to pass through the stream, while dropping all others. It's a non-blocking, synchronous operator that evaluates each element and either propagates it downstream or discards it based on the condition.
The 2-Minute Answer (If They Want More):
The filter operator is one of the simplest yet most essential operators in Reactor. I use it whenever I need to conditionally process only certain elements from a stream. It takes a Predicate<T> - a function that returns true or false - and only emits elements for which the predicate returns true.
What makes filter particularly useful in reactive programming is that it maintains the reactive chain without blocking. Unlike traditional imperative filtering with loops and conditionals, filter processes elements as they arrive in the stream. This is crucial for handling real-time data streams, event processing, or any scenario where backpressure and non-blocking behavior matter.
I commonly use filter for validation scenarios (filtering out invalid data), security checks (allowing only authorized requests), business logic (processing only active users), or data quality (removing null or empty values). It's often one of the first operators in my pipeline to eliminate unwanted data early, reducing downstream processing.
An important consideration: filter can reduce the number of elements in a stream, but each element still passes through the predicate evaluation. For expensive predicates, this could become a performance concern. In such cases, I consider whether the filtering logic should happen earlier (e.g., in the database query) or if I need to optimize the predicate itself.
Code Example:
// Basic filtering: Keep only even numbers
Flux.range(1, 10)
.filter(n -> n % 2 == 0)
.subscribe(System.out::println); // Output: 2, 4, 6, 8, 10
// Real-world example: Filter active users
Flux<User> allUsers = userRepository.findAll();
Flux<User> activeUsers = allUsers
.filter(user -> user.isActive())
.filter(user -> user.getLastLoginDate() != null)
.filter(user -> user.getLastLoginDate().isAfter(
LocalDateTime.now().minusDays(30)
));
// Filtering with method references
Flux<String> messages = messageStream
.filter(Objects::nonNull)
.filter(String::isBlank) // Note: using negate
.filter(msg -> msg.length() > 10);
// Chaining filters vs combining conditions
// Option 1: Separate filters (more readable)
userStream
.filter(user -> user.getAge() >= 18)
.filter(user -> user.hasVerifiedEmail())
.filter(user -> user.getCountry().equals("US"));
// Option 2: Combined filter (more efficient)
userStream
.filter(user ->
user.getAge() >= 18 &&
user.hasVerifiedEmail() &&
user.getCountry().equals("US")
);
// Practical example: Order processing
Flux<Order> orders = orderService.streamOrders();
Flux<Order> eligibleOrders = orders
.filter(order -> order.getStatus() == OrderStatus.PENDING)
.filter(order -> order.getTotal().compareTo(BigDecimal.ZERO) > 0)
.filter(order -> !order.getItems().isEmpty())
.filter(order -> order.getCustomer().isCreditApproved())
.doOnNext(order -> logger.info("Processing eligible order: {}", order.getId()));
// Using filter with flatMap for conditional async operations
customerStream
.filter(customer -> customer.isPremium()) // Only premium customers
.flatMap(customer ->
promotionService.getSpecialOffers(customer.getId()) // Async call
)
.subscribe(offer -> sendNotification(offer));
// Filtering null values and empty optionals
Flux<Optional<String>> optionalStream = /* some source */;
Flux<String> validValues = optionalStream
.filter(Optional::isPresent)
.map(Optional::get);
// Better approach using handle or filterWhen for complex conditions
Flux<User> verifiedUsers = userStream
.filterWhen(user ->
verificationService.isVerified(user.getId()) // Returns Mono<Boolean>
);
Mermaid Diagram:
flowchart TD
A[Input: 1, 2, 3, 4, 5, 6, 7, 8, 9, 10] --> B{filter: n % 2 == 0}
B -->|true: 2| C[Pass]
B -->|false: 1| D[Drop]
B -->|true: 4| C
B -->|false: 3| D
B -->|true: 6| C
B -->|false: 5| D
B -->|true: 8| C
B -->|false: 7| D
B -->|true: 10| C
B -->|false: 9| D
C --> E[Output: 2, 4, 6, 8, 10]
style B fill:#fff4e1
style C fill:#e1ffe1
style D fill:#ffe1e1
References:
- Project Reactor Reference - Filtering Operators
- Reactor API - Flux.filter
- Baeldung - Reactor Core Filter Operator
What is the zip operator and when would you use it?
The 30-Second Answer:
The zip operator combines elements from multiple publishers by pairing them together based on their emission index, waiting for all sources to emit before producing a combined result. I use it when I need to aggregate data from multiple independent async sources that should be processed together.
The 2-Minute Answer (If They Want More):
The zip operator is essential when I need to coordinate multiple independent reactive streams and combine their results. It waits for each source to emit an element at the same index, then combines them using a combinator function. For example, zipping three publishers will wait until all three have emitted their first element, combine them, then wait for all three to emit their second element, and so on.
I commonly use zip in scenarios like: fetching user profile, preferences, and activity history from different services and combining them into a single view model; aggregating data from multiple microservices before responding to a client; or coordinating parallel API calls where the results need to be processed together.
A critical behavior to understand: zip emits only as many elements as the smallest source. If one publisher emits 3 items and another emits 5, zip will produce only 3 combined results. The operator also maintains strict ordering - it won't emit the second combined result until ALL sources have emitted their second element, which can create backpressure if one source is slower than others.
For scenarios where I need to combine the latest values rather than synchronized indexes, I use combineLatest instead. When I need all values from all sources collected together, Mono.zip with a list of publishers is the right choice.
Code Example:
// Basic zip: Combine two Flux streams
Flux<Integer> numbers = Flux.range(1, 5);
Flux<String> letters = Flux.just("A", "B", "C", "D", "E");
Flux<String> combined = Flux.zip(numbers, letters)
.map(tuple -> tuple.getT1() + "-" + tuple.getT2());
// Result: "1-A", "2-B", "3-C", "4-D", "5-E"
// Using custom combinator function
Flux<String> result = Flux.zip(numbers, letters,
(num, letter) -> letter + num);
// Result: "A1", "B2", "C3", "D4", "E5"
// Real-world example: Fetching user dashboard data
public Mono<UserDashboard> getUserDashboard(Long userId) {
Mono<User> userMono = userService.findById(userId);
Mono<List<Order>> ordersMono = orderService.getRecentOrders(userId);
Mono<AccountBalance> balanceMono = accountService.getBalance(userId);
return Mono.zip(userMono, ordersMono, balanceMono)
.map(tuple -> new UserDashboard(
tuple.getT1(), // User
tuple.getT2(), // Orders
tuple.getT3() // Balance
));
}
// Zipping more than 8 publishers (tuple limitation)
public Mono<CompleteProfile> getCompleteProfile(Long userId) {
Mono<User> user = userService.findById(userId);
Mono<Address> address = addressService.getPrimary(userId);
Mono<PaymentInfo> payment = paymentService.getInfo(userId);
Mono<Preferences> prefs = preferencesService.get(userId);
// Use array/list for more than 8 sources
return Mono.zip(
objects -> new CompleteProfile(
(User) objects[0],
(Address) objects[1],
(PaymentInfo) objects[2],
(Preferences) objects[3]
),
user, address, payment, prefs
);
}
// Zip with different emission rates (shortest wins)
Flux<Integer> fast = Flux.range(1, 100);
Flux<String> slow = Flux.just("A", "B", "C");
Flux.zip(fast, slow, (num, letter) -> letter + num)
.subscribe(System.out::println);
// Output: Only "A1", "B2", "C3" - stops after slow completes
// Practical example: Parallel API calls
public Mono<OrderConfirmation> processOrder(Order order) {
Mono<Payment> payment = paymentService.charge(order);
Mono<Shipment> shipment = shippingService.createShipment(order);
Mono<Invoice> invoice = invoiceService.generate(order);
return Mono.zip(payment, shipment, invoice)
.map(tuple -> new OrderConfirmation(
order.getId(),
tuple.getT1().getTransactionId(),
tuple.getT2().getTrackingNumber(),
tuple.getT3().getInvoiceNumber()
))
.doOnError(error -> {
// All three must succeed or entire operation fails
logger.error("Order processing failed: {}", error.getMessage());
});
}
// Using zipWith for two sources
Mono<User> user = userService.getCurrentUser();
Mono<Settings> settings = settingsService.getSettings();
Mono<UserView> userView = user
.zipWith(settings)
.map(tuple -> new UserView(tuple.getT1(), tuple.getT2()));
// Alternative with custom combinator
Mono<UserView> userView2 = user
.zipWith(settings, (u, s) -> new UserView(u, s));
Mermaid Diagram:
flowchart TD
subgraph "Zip Operator"
A1[Source 1: 1, 2, 3] --> Z[zip]
A2[Source 2: A, B, C] --> Z
A3[Source 3: X, Y, Z] --> Z
Z --> B1[Wait for all 3 to emit index 0]
B1 --> C1["Combine: (1, A, X)"]
Z --> B2[Wait for all 3 to emit index 1]
B2 --> C2["Combine: (2, B, Y)"]
Z --> B3[Wait for all 3 to emit index 2]
B3 --> C3["Combine: (3, C, Z)"]
C1 --> D[Output]
C2 --> D
C3 --> D
end
style Z fill:#fff4e1
style B1 fill:#e1f5ff
style B2 fill:#e1f5ff
style B3 fill:#e1f5ff
References:
- Project Reactor Reference - zip Operator
- Reactor API - Flux.zip
- Baeldung - Combining Publishers in Reactor
Subscription and Lifecycle
What is the difference between block() and subscribe()?
The 30-Second Answer:
block() is a blocking synchronous operation that waits for the Mono/Flux to complete and returns the result, while subscribe() is asynchronous and returns immediately with a Disposable. block() turns reactive code back into imperative code and should be avoided in production reactive applications.
The 2-Minute Answer (If They Want More):
The fundamental difference lies in execution model and thread behavior. When you call subscribe(), you're staying in the reactive paradigm - the subscription happens asynchronously, the calling thread continues immediately, and callbacks handle results when they arrive. When you call block(), you're breaking out of the reactive world into the blocking imperative world - the calling thread stops and waits until the publisher emits a value or completes.
subscribe() returns a Disposable immediately, allowing you to cancel the subscription later. block() returns the actual value (or throws an exception) but only after the operation completes. This means block() can freeze your thread for unpredictable amounts of time, defeating the purpose of reactive programming's non-blocking nature.
Using block() in production reactive code is generally an anti-pattern because it negates the benefits of reactive programming - you lose non-blocking execution, efficient resource utilization, and backpressure management. It's acceptable in test code, main methods, or when integrating reactive code with legacy blocking systems, but should never be used in request-handling code paths in reactive applications.
For Flux, there's also blockFirst() and blockLast() for getting the first or last element, but these have the same blocking characteristics and should be used sparingly. If you find yourself needing block() often, it's a sign that reactive programming might not be the right fit for that part of your system.
Code Example:
import reactor.core.publisher.Mono;
import reactor.core.publisher.Flux;
import java.time.Duration;
public class BlockVsSubscribeExample {
public static void main(String[] args) throws InterruptedException {
Mono<String> mono = Mono.just("Hello")
.delayElement(Duration.ofSeconds(2));
// Using subscribe() - Non-blocking
System.out.println("Before subscribe: " + System.currentTimeMillis());
mono.subscribe(value -> {
System.out.println("Received: " + value + " at " + System.currentTimeMillis());
});
System.out.println("After subscribe: " + System.currentTimeMillis());
// Output shows "After subscribe" prints immediately
// "Received" prints 2 seconds later
Thread.sleep(3000); // Keep main thread alive to see async result
// Using block() - Blocking
System.out.println("\nBefore block: " + System.currentTimeMillis());
String result = mono.block();
System.out.println("After block: " + System.currentTimeMillis());
System.out.println("Result: " + result);
// Output shows 2-second delay before "After block" prints
// Practical example - Testing
Mono<Integer> calculation = Mono.fromCallable(() -> 2 + 2);
// In tests - block() is acceptable
Integer testResult = calculation.block();
assert testResult == 4;
// In production - subscribe() is correct
calculation.subscribe(
value -> System.out.println("Calculated: " + value),
error -> System.err.println("Error: " + error)
);
// Flux blocking variants
Flux<Integer> numbers = Flux.range(1, 5);
Integer firstElement = numbers.blockFirst(); // Returns 1
Integer lastElement = numbers.blockLast(); // Returns 5
System.out.println("First: " + firstElement + ", Last: " + lastElement);
}
}
Mermaid Diagram:
sequenceDiagram
participant Thread
participant Mono
rect rgb(200, 230, 255)
Note over Thread,Mono: subscribe() - Non-blocking
Thread->>Mono: subscribe()
Mono-->>Thread: Returns Disposable immediately
Thread->>Thread: Continues execution
Note over Thread: Thread is free to do other work
Mono--)Thread: Callback with value (async)
end
rect rgb(255, 230, 230)
Note over Thread,Mono: block() - Blocking
Thread->>Mono: block()
Note over Thread: Thread waits (blocked)
Mono-->>Mono: Processing...
Mono-->>Thread: Returns value after completion
Thread->>Thread: Resumes execution
end
References:
- Project Reactor - Blocking vs Subscribe
- Spring WebFlux Documentation - Reactive Programming Model
- Reactor Core - When to use block()
What is a Disposable and how do you manage subscriptions?
The 30-Second Answer:
A Disposable is an interface representing a cancellable subscription returned by the subscribe() method. It allows you to cancel active subscriptions, prevent memory leaks, and clean up resources by calling dispose(). Proper subscription management prevents resource exhaustion in long-running reactive applications.
The 2-Minute Answer (If They Want More):
When you subscribe to a reactive stream, the subscribe() method returns a Disposable object that represents the active subscription. This object is your handle for lifecycle management - you can check if the subscription is still active with isDisposed() and cancel it with dispose().
Managing Disposables is critical in reactive applications to prevent memory leaks and resource exhaustion. Without proper cleanup, subscriptions can accumulate, holding references to resources like database connections, file handles, or network sockets. This is especially important for infinite streams (like Flux.interval() or WebSocket connections) that won't complete on their own.
Reactor provides several utilities for managing multiple subscriptions. Disposables.composite() creates a container for multiple Disposables that can be disposed together. Disposables.swap() and Disposables.single() manage single Disposable instances with replacement semantics. These are particularly useful in lifecycle-aware components like Spring beans or UI components.
In Spring WebFlux, the framework handles subscription management for you in controllers - when a request completes or is cancelled, subscriptions are automatically disposed. However, in custom components, background tasks, or scheduled jobs, you must manage Disposables manually. A common pattern is storing Disposables as instance variables and disposing them in cleanup methods (like @PreDestroy in Spring or componentWillUnmount in React).
Best practice is to avoid manual subscription management when possible by using operators that handle lifecycle automatically (like takeUntil(), timeout(), or take()), but when you must subscribe manually, always store and dispose the Disposable appropriately.
Code Example:
import reactor.core.publisher.Flux;
import reactor.core.Disposable;
import reactor.core.Disposables;
import java.time.Duration;
import org.springframework.stereotype.Component;
import javax.annotation.PreDestroy;
public class DisposableManagementExample {
// Example 1: Basic Disposable usage
public static void basicExample() {
// Create an infinite stream
Flux<Long> intervalFlux = Flux.interval(Duration.ofSeconds(1));
// Subscribe and get Disposable
Disposable disposable = intervalFlux.subscribe(
value -> System.out.println("Tick: " + value)
);
// Later... cancel the subscription
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Is disposed? " + disposable.isDisposed()); // false
disposable.dispose(); // Cancel subscription
System.out.println("Is disposed? " + disposable.isDisposed()); // true
// No more ticks will be printed
}
// Example 2: Managing multiple subscriptions
public static class MultiSubscriptionManager {
private final Disposable.Composite disposables = Disposables.composite();
public void startMonitoring() {
// Add multiple subscriptions to composite
Disposable sub1 = Flux.interval(Duration.ofSeconds(1))
.subscribe(i -> System.out.println("Monitor 1: " + i));
Disposable sub2 = Flux.interval(Duration.ofSeconds(2))
.subscribe(i -> System.out.println("Monitor 2: " + i));
Disposable sub3 = Flux.interval(Duration.ofSeconds(3))
.subscribe(i -> System.out.println("Monitor 3: " + i));
// Add all to composite
disposables.addAll(sub1, sub2, sub3);
}
public void stopMonitoring() {
// Dispose all subscriptions at once
disposables.dispose();
System.out.println("All monitoring stopped");
}
}
// Example 3: Spring component with proper cleanup
@Component
public static class ScheduledTaskComponent {
private Disposable scheduledTask;
public void startScheduledTask() {
scheduledTask = Flux.interval(Duration.ofMinutes(1))
.flatMap(tick -> performBackgroundWork())
.subscribe(
result -> System.out.println("Task completed: " + result),
error -> System.err.println("Task error: " + error)
);
}
@PreDestroy
public void cleanup() {
// Cleanup when Spring destroys the bean
if (scheduledTask != null && !scheduledTask.isDisposed()) {
scheduledTask.dispose();
System.out.println("Scheduled task disposed");
}
}
private Mono<String> performBackgroundWork() {
return Mono.just("work-result");
}
}
// Example 4: Swap Disposable - replacing subscriptions
public static class DynamicSubscriptionManager {
private final Disposable.Swap swap = Disposables.swap();
public void switchToSource(Flux<String> source) {
// Dispose previous subscription and replace with new one
Disposable newSubscription = source.subscribe(
value -> System.out.println("New source: " + value)
);
swap.update(newSubscription); // Automatically disposes old one
}
public void cleanup() {
swap.dispose(); // Dispose current subscription
}
}
// Example 5: Memory leak prevention
public static class LeakExample {
// ❌ BAD - Memory leak
public void memoryLeak() {
for (int i = 0; i < 1000; i++) {
Flux.interval(Duration.ofSeconds(1))
.subscribe(); // Disposable is lost - leak!
}
// 1000 active subscriptions that can never be cancelled
}
// âś… GOOD - Proper management
private final Disposable.Composite disposables = Disposables.composite();
public void noMemoryLeak() {
for (int i = 0; i < 1000; i++) {
Disposable d = Flux.interval(Duration.ofSeconds(1))
.subscribe();
disposables.add(d);
}
}
public void cleanup() {
disposables.dispose(); // Clean up all subscriptions
}
}
}
Mermaid Diagram:
flowchart TD
A[subscribe called] --> B[Disposable returned]
B --> C{Store Disposable?}
C -->|Yes| D[Subscription managed]
C -->|No| E[⚠️ Memory leak risk]
D --> F{Lifecycle Event}
F -->|Component destroyed| G[dispose called]
F -->|Timeout reached| H[Auto-disposed]
F -->|takeUntil triggered| H
G --> I[Resources cleaned up]
H --> I
E --> J[Infinite subscription]
J --> K[Resources held forever]
K --> L[Memory exhaustion]
M[Multiple Subscriptions] --> N[Disposable.Composite]
N --> O[Add all Disposables]
O --> P[dispose once]
P --> Q[All cleaned up]
style D fill:#d4edda
style I fill:#d4edda
style Q fill:#d4edda
style E fill:#f8d7da
style L fill:#f8d7da
References:
- Project Reactor - Disposable Interface
- Reactor Core - Programmatically Creating a Sequence
- Spring Blog - Reactive Programming Best Practices
Error Handling
What is the difference between onErrorReturn, onErrorResume, and onErrorMap?
The 30-Second Answer:
onErrorReturn provides a static fallback value when an error occurs, onErrorResume switches to an alternative Publisher (allowing dynamic fallback logic), and onErrorMap transforms one exception type into another without recovering from the error. Use onErrorReturn for simple defaults, onErrorResume for fallback reactive sequences, and onErrorMap for exception wrapping.
The 2-Minute Answer (If They Want More): These three operators represent different error handling strategies in Project Reactor, each serving distinct use cases. Understanding when to use each is critical for building robust reactive applications.
onErrorReturn is the simplest operator—it catches any error (or specific error types) and returns a predefined static value, completing the sequence normally. This is ideal when you have a sensible default value that doesn't require computation. For example, returning an empty list when a database query fails, or a default configuration when loading fails.
onErrorResume is more powerful as it accepts a function that receives the error and returns a new Publisher. This allows you to implement dynamic fallback logic, such as switching to a backup service, retrying with different parameters, or returning a computed fallback. The key difference from onErrorReturn is that you're not limited to static values—you can execute entire reactive chains as fallback.
onErrorMap doesn't actually handle or recover from errors—it transforms them. It's used to wrap low-level exceptions in domain-specific exceptions, add context to errors, or normalize different exception types. The error still propagates downstream, but as a different exception type. This is crucial for maintaining clean architectural boundaries where you don't want infrastructure exceptions leaking into your business layer.
You can also chain these operators to create layered error handling strategies, where you might map certain errors, resume for others, and return defaults for the rest.
Code Example:
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.io.IOException;
import java.util.List;
public class ErrorOperatorComparison {
// 1. onErrorReturn - Static fallback value
public Mono<String> getUserName(Long userId) {
return fetchUserFromDatabase(userId)
.map(user -> user.getName())
// If any error occurs, return "Unknown User"
.onErrorReturn("Unknown User");
}
public Flux<String> getProductList() {
return fetchProductsFromService()
// Return empty list on any error
.onErrorReturn(List.of())
.flatMapMany(Flux::fromIterable);
}
// 2. onErrorResume - Dynamic fallback with alternative Publisher
public Mono<UserData> getUserWithFallback(Long userId) {
return fetchFromPrimaryCache(userId)
.onErrorResume(cacheError -> {
System.err.println("Cache miss: " + cacheError.getMessage());
return fetchFromDatabase(userId);
})
.onErrorResume(dbError -> {
System.err.println("Database error: " + dbError.getMessage());
return fetchFromBackupService(userId);
})
.onErrorReturn(new UserData("guest", "guest@example.com"));
}
// Conditional resume based on error type
public Mono<String> smartErrorRecovery() {
return riskyOperation()
.onErrorResume(IOException.class,
ex -> Mono.just("IO error - using cached data"))
.onErrorResume(IllegalArgumentException.class,
ex -> Mono.error(new ValidationException("Invalid input", ex)))
.onErrorReturn("Unknown error - using default");
}
// 3. onErrorMap - Transform exceptions
public Mono<Order> processOrder(OrderRequest request) {
return validateOrder(request)
// Map validation errors to domain exception
.onErrorMap(IllegalArgumentException.class,
ex -> new InvalidOrderException("Order validation failed", ex))
.flatMap(this::saveOrder)
// Map persistence errors to domain exception
.onErrorMap(IOException.class,
ex -> new OrderProcessingException("Failed to save order", ex));
}
// Comparison: All three operators in one chain
public Mono<PaymentResult> processPayment(PaymentRequest payment) {
return callPaymentGateway(payment)
// First, map low-level exceptions to domain exceptions
.onErrorMap(IOException.class,
ex -> new PaymentGatewayException("Gateway communication failed", ex))
// Then, resume with retry logic for gateway errors
.onErrorResume(PaymentGatewayException.class,
ex -> retryPaymentWithBackupGateway(payment))
// Finally, return a failed result for any remaining errors
.onErrorReturn(new PaymentResult(false, "Payment failed"));
}
// Helper methods
private Mono<User> fetchUserFromDatabase(Long userId) {
return Mono.error(new RuntimeException("DB connection failed"));
}
private Mono<List<String>> fetchProductsFromService() {
return Mono.error(new RuntimeException("Service unavailable"));
}
private Mono<UserData> fetchFromPrimaryCache(Long userId) {
return Mono.error(new RuntimeException("Cache miss"));
}
private Mono<UserData> fetchFromDatabase(Long userId) {
return Mono.error(new RuntimeException("DB error"));
}
private Mono<UserData> fetchFromBackupService(Long userId) {
return Mono.just(new UserData("backup", "backup@example.com"));
}
private Mono<String> riskyOperation() {
return Mono.error(new IOException("Network error"));
}
private Mono<Order> validateOrder(OrderRequest request) {
return Mono.just(new Order());
}
private Mono<Order> saveOrder(Order order) {
return Mono.just(order);
}
private Mono<PaymentResponse> callPaymentGateway(PaymentRequest payment) {
return Mono.error(new IOException("Gateway timeout"));
}
private Mono<PaymentResult> retryPaymentWithBackupGateway(PaymentRequest payment) {
return Mono.just(new PaymentResult(true, "Success via backup"));
}
// Domain classes
static class User {
String getName() { return "John"; }
}
static class UserData {
String name, email;
UserData(String name, String email) {
this.name = name;
this.email = email;
}
}
static class OrderRequest {}
static class Order {}
static class PaymentRequest {}
static class PaymentResponse {}
static class PaymentResult {
boolean success;
String message;
PaymentResult(boolean success, String message) {
this.success = success;
this.message = message;
}
}
static class InvalidOrderException extends RuntimeException {
InvalidOrderException(String msg, Throwable cause) { super(msg, cause); }
}
static class OrderProcessingException extends RuntimeException {
OrderProcessingException(String msg, Throwable cause) { super(msg, cause); }
}
static class PaymentGatewayException extends RuntimeException {
PaymentGatewayException(String msg, Throwable cause) { super(msg, cause); }
}
static class ValidationException extends RuntimeException {
ValidationException(String msg, Throwable cause) { super(msg, cause); }
}
}
Mermaid Diagram:
flowchart TD
A[Reactive Stream with Error] --> B{Which Operator?}
B -->|onErrorReturn| C[onErrorReturn]
C --> D[Return Static Value]
D --> E[Sequence Completes Successfully]
B -->|onErrorResume| F[onErrorResume]
F --> G[Execute Fallback Function]
G --> H[Returns New Publisher]
H --> I[Subscribe to Alternative Stream]
B -->|onErrorMap| J[onErrorMap]
J --> K[Transform Exception]
K --> L[Wrap in New Exception Type]
L --> M[Error Still Propagates Downstream]
style C fill:#90EE90
style F fill:#87CEEB
style J fill:#FFB6C1
style E fill:#90EE90
style I fill:#87CEEB
style M fill:#FFB6C1
References:
- Project Reactor Reference - Error Handling Operators
- Spring WebFlux Error Handling
- Reactor Core API Documentation
Schedulers and Threading
What is the difference between subscribeOn and publishOn?
The 30-Second Answer:
subscribeOn() controls which thread performs the subscription and typically affects the entire upstream chain, while publishOn() switches threads for downstream operators only from the point it's applied. In simple terms: subscribeOn() affects where the data source starts emitting, publishOn() affects where subsequent operators receive the data.
The 2-Minute Answer (If They Want More):
The distinction between subscribeOn() and publishOn() is fundamental to understanding Reactor's threading model, yet it's one of the most commonly confused concepts.
subscribeOn() influences the execution context of the subscription process itself. When you call subscribe() on a reactive chain, the subscription signal travels upstream to the source. subscribeOn() determines which Scheduler's thread will handle this subscription signal and, importantly, where the source will emit its initial data. Regardless of where you place subscribeOn() in the chain, it affects the entire upstream from the source. Having multiple subscribeOn() calls only honors the closest one to the source.
publishOn(), on the other hand, is a switching operator that affects only the downstream chain from where it's placed. It introduces an asynchronous boundary by using an internal queue to hand off signals from the upstream thread to a thread from the specified Scheduler. You can have multiple publishOn() calls in a chain, and each one switches the execution context for the operators that follow it.
A practical analogy: subscribeOn() is like deciding which factory floor produces your product (affects production start), while publishOn() is like having conveyor belts that hand off the product to different departments for processing (affects downstream processing).
This design allows precise control: you might use subscribeOn(Schedulers.boundedElastic()) to ensure a blocking database call happens on an I/O thread, then publishOn(Schedulers.parallel()) to process the results on a computation thread, and finally another publishOn() to switch to a UI thread for rendering.
Code Example:
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
public class SubscribeOnVsPublishOn {
public static void main(String[] args) throws InterruptedException {
System.out.println("Main thread: " + Thread.currentThread().getName());
Flux.range(1, 3)
.map(i -> {
System.out.println("Map 1 (" + i + "): " +
Thread.currentThread().getName());
return i * 10;
})
.publishOn(Schedulers.boundedElastic())
.map(i -> {
System.out.println("Map 2 (" + i + "): " +
Thread.currentThread().getName());
return i + 1;
})
.subscribeOn(Schedulers.parallel()) // Affects upstream from source
.map(i -> {
System.out.println("Map 3 (" + i + "): " +
Thread.currentThread().getName());
return i * 2;
})
.subscribe(value ->
System.out.println("Subscribe (" + value + "): " +
Thread.currentThread().getName())
);
Thread.sleep(1000);
// Output shows:
// - Map 1 runs on parallel (due to subscribeOn)
// - Map 2 and Map 3 run on boundedElastic (due to publishOn)
// - Subscribe runs on boundedElastic
}
}
Mermaid Diagram:
flowchart TD
subgraph "subscribeOn Impact"
S1[Source] -->|parallel thread| S2[Operator 1]
S2 -->|parallel thread| S3[Operator 2]
S3 -->|parallel thread| S4[subscribeOn parallel]
S4 -->|parallel thread| S5[Operator 3]
end
subgraph "publishOn Impact"
P1[Source] -->|thread A| P2[Operator 1]
P2 -->|thread A| P3[publishOn elastic]
P3 -->|elastic thread| P4[Operator 2]
P4 -->|elastic thread| P5[Operator 3]
end
style S4 fill:#ffeb3b
style P3 fill:#4caf50
References:
- Project Reactor Reference Guide - subscribeOn and publishOn
- Baeldung - Reactor subscribeOn vs publishOn
- Reactor Core API - Flux.publishOn
Spring WebFlux Integration
What is the difference between Spring MVC and Spring WebFlux?
The 30-Second Answer: Spring MVC is synchronous and blocking, using one thread per request with traditional servlet containers. Spring WebFlux is asynchronous and non-blocking, using a small thread pool to handle many concurrent requests through reactive streams. WebFlux excels in I/O-heavy scenarios with high concurrency, while MVC is simpler for traditional CRUD applications.
The 2-Minute Answer (If They Want More): The fundamental difference is the threading and I/O model. Spring MVC uses the servlet API and blocking I/O, where each HTTP request gets a thread from the servlet container's thread pool (typically 200-400 threads). That thread blocks on I/O operations like database calls or external HTTP requests, which limits scalability to the number of threads you can allocate. This model is simple, well-understood, and works great for most applications.
Spring WebFlux uses reactive streams and non-blocking I/O, typically running on Netty with a thread pool sized to the number of CPU cores (often 4-8 threads). Instead of blocking, operations return Mono or Flux publishers that emit data when available. A small number of threads can handle thousands or millions of concurrent connections because threads aren't blocked waiting for I/O—they're free to process other requests while waiting for data.
From a programming perspective, MVC controllers return objects or ResponseEntity, and you can write normal synchronous code with try-catch blocks. WebFlux controllers return Mono/Flux, and you compose operations using reactive operators, with error handling through onErrorResume or similar. MVC works with JDBC and JPA for databases; WebFlux requires R2DBC or reactive drivers. MVC uses RestTemplate or modern blocking HTTP clients; WebFlux uses WebClient.
The choice depends on your use case: Use WebFlux for microservices with high I/O wait time, streaming APIs, or applications needing massive concurrency. Use MVC when you have blocking dependencies (legacy JDBC, blocking libraries), simpler requirements, or teams unfamiliar with reactive programming. Spring allows both in the same application for gradual migration.
Code Example:
// SPRING MVC - Blocking approach
@RestController
@RequestMapping("/mvc/users")
public class MvcUserController {
@Autowired
private JpaUserRepository jpaRepository;
@Autowired
private RestTemplate restTemplate;
// Thread blocks until database returns result
@GetMapping("/{id}")
public ResponseEntity<User> getUser(@PathVariable Long id) {
User user = jpaRepository.findById(id)
.orElseThrow(() -> new UserNotFoundException(id));
// Thread blocks during HTTP call
ExternalData data = restTemplate.getForObject(
"https://api.example.com/data/" + id,
ExternalData.class
);
user.setExternalData(data);
return ResponseEntity.ok(user);
}
// Thread blocks reading entire request body, then blocks saving
@PostMapping
public ResponseEntity<User> createUser(@RequestBody User user) {
User saved = jpaRepository.save(user);
return ResponseEntity.status(HttpStatus.CREATED).body(saved);
}
// Traditional exception handling
@ExceptionHandler(UserNotFoundException.class)
public ResponseEntity<ErrorResponse> handleNotFound(UserNotFoundException ex) {
return ResponseEntity.status(HttpStatus.NOT_FOUND)
.body(new ErrorResponse(ex.getMessage()));
}
}
// Configuration for Spring MVC
@Configuration
public class MvcConfig {
@Bean
public RestTemplate restTemplate() {
return new RestTemplate();
}
}
// ========================================
// SPRING WEBFLUX - Non-blocking approach
@RestController
@RequestMapping("/webflux/users")
public class WebFluxUserController {
@Autowired
private R2dbcUserRepository r2dbcRepository;
@Autowired
private WebClient webClient;
// Returns immediately with Mono, thread doesn't block
@GetMapping("/{id}")
public Mono<User> getUser(@PathVariable Long id) {
return r2dbcRepository.findById(id)
.switchIfEmpty(Mono.error(new UserNotFoundException(id)))
.flatMap(user ->
// Non-blocking HTTP call, composed with flatMap
webClient.get()
.uri("https://api.example.com/data/{id}", id)
.retrieve()
.bodyToMono(ExternalData.class)
.map(data -> {
user.setExternalData(data);
return user;
})
);
}
// Streams request body reactively, saves reactively
@PostMapping
@ResponseStatus(HttpStatus.CREATED)
public Mono<User> createUser(@RequestBody Mono<User> userMono) {
return userMono.flatMap(r2dbcRepository::save);
}
// Reactive error handling
@ExceptionHandler(UserNotFoundException.class)
public Mono<ResponseEntity<ErrorResponse>> handleNotFound(UserNotFoundException ex) {
return Mono.just(
ResponseEntity.status(HttpStatus.NOT_FOUND)
.body(new ErrorResponse(ex.getMessage()))
);
}
}
// Configuration for Spring WebFlux
@Configuration
public class WebFluxConfig {
@Bean
public WebClient webClient() {
return WebClient.builder()
.baseUrl("https://api.example.com")
.build();
}
}
// Performance comparison example
@RestController
public class ComparisonController {
// MVC: 200 threads can handle ~200 concurrent requests (one thread per request)
// If each request takes 1 second waiting for I/O, throughput = 200 req/sec
@GetMapping("/mvc/slow")
public String blockingEndpoint() throws InterruptedException {
Thread.sleep(1000); // Simulates I/O - thread is blocked!
return "Done after 1 second";
}
// WebFlux: 8 threads can handle thousands of concurrent requests
// Same 1 second delay, but threads aren't blocked, so throughput >> 8 req/sec
@GetMapping("/webflux/slow")
public Mono<String> nonBlockingEndpoint() {
return Mono.delay(Duration.ofSeconds(1)) // Simulates I/O - thread is free!
.thenReturn("Done after 1 second");
}
}
Mermaid Diagram:
flowchart LR
subgraph "Spring MVC - Blocking"
A1[Request 1] --> T1[Thread 1]
A2[Request 2] --> T2[Thread 2]
A3[Request 3] --> T3[Thread 3]
T1 -.Blocked on DB.-> DB1[(Database)]
T2 -.Blocked on DB.-> DB1
T3 -.Blocked on HTTP.-> EXT1[External API]
DB1 --> R1[Response 1]
DB1 --> R2[Response 2]
EXT1 --> R3[Response 3]
end
subgraph "Spring WebFlux - Non-blocking"
B1[Request 1] --> W[Worker Pool<br/>4-8 threads]
B2[Request 2] --> W
B3[Request 3] --> W
B4[Request 100] --> W
W <-->|Non-blocking| DB2[(R2DBC DB)]
W <-->|Non-blocking| EXT2[WebClient]
W --> R4[Response 1]
W --> R5[Response 2]
W --> R6[Response 3]
W --> R7[Response 100]
end
style T1 fill:#ffcccc
style T2 fill:#ffcccc
style T3 fill:#ffcccc
style W fill:#ccffcc
References:
- Spring MVC vs WebFlux Comparison
- Reactive Programming with Spring WebFlux
- When to Use Spring WebFlux