The "What" Over "How" Concept
In traditional imperative programming, we often focus on how to perform tasks - the specific steps, the order of operations, and the implementation details. This approach can lead to:
- Complex, hard-to-maintain code
- Tight coupling between components
- Difficulty in understanding business logic
- Challenges in testing and modification
Declarative programming, on the other hand, emphasizes what we want to achieve rather than how to achieve it. This paradigm shift brings several advantages:
- Clearer expression of business intent
- Better separation of concerns
- Improved maintainability
- Enhanced testability
- Easier parallel processing
- Thread-safe state management
Why Flow DSL?
Flow DSL is a practical implementation of the "What" over "How" principle. It provides a declarative way to express business workflows while handling the complex "how" details under the hood.
Traditional Approach
try {
// Validate order
if (!validateOrder(order)) {
throw new ValidationException();
}
// Process payment
PaymentResult payment = null;
for (int i = 0; i < 3; i++) {
try {
payment = processPayment(order);
break;
} catch (Exception e) {
if (i == 2) throw e;
Thread.sleep(100 * (i + 1));
}
}
// Check inventory
List inventory = new ArrayList<>();
ExecutorService executor = Executors.newFixedThreadPool(order.items.size());
try {
List> futures = new ArrayList<>();
for (OrderItem item : order.items) {
futures.add(executor.submit(() -> checkInventory(item)));
}
for (Future future : futures) {
inventory.add(future.get(5, TimeUnit.SECONDS));
}
} finally {
executor.shutdown();
}
} catch (Exception e) {
// Compensation logic
if (payment != null) {
rollbackPayment(payment);
}
for (InventoryResult result : inventory) {
rollbackInventory(result);
}
throw e;
}
Flow DSL Approach
Flow.of(() -> order)
.map(OrderService::validateOrder)
.flatMap(validOrder ->
Flow.of(() -> processPayment(validOrder))
.withRetry(3)
.withBackoff(Duration.ofMillis(100))
.withTimeout(Duration.ofSeconds(5))
.withCompensation(PaymentService::rollback))
.flatMap(payment ->
Flow.of(() -> order.items)
.parallelMap(InventoryService::checkInventory)
.withParallelism(order.items.size())
.withCompensation(InventoryService::rollback))
.execute();
Key Features
Parallel Processing
- Virtual thread support for optimal performance
- Controlled parallelism with configurable thread pools
- Thread-safe context propagation
- Automatic resource cleanup
Resilience Patterns
- Circuit breaker pattern
- Retry with backoff support
- Timeout handling
- Compensation actions
Error Handling
- Checked exception support
- Parallel error propagation
- Fallback mechanisms
- Comprehensive error context
Examples
Basic Flow
Flow.of(() -> "Hello")
.map(str -> str + " ")
.map(str -> str + "World")
.map(String::toUpperCase)
.execute(); // Returns "HELLO WORLD"
Error Handling
Flow.of(() -> "data")
.onError(error -> log.error("Flow failed", error))
.onComplete(result -> log.info("Flow completed with: {}", result))
.withRetry(3)
.withBackoff(Duration.ofSeconds(1))
.execute();
Parallel Processing
Flow.parallel(
() -> "Task 1",
() -> "Task 2",
() -> "Task 3"
)
.withParallelism(3)
.execute();
Spring Boot Integration
Flow DSL integrates seamlessly with Spring Boot applications. Here's how to use it in your services:
Service Layer Integration
@Service
@Slf4j
public class OrderService {
private final PaymentService paymentService;
private final InventoryService inventoryService;
private final NotificationService notificationService;
public OrderResult processOrder(Order order) {
return Flow.of(() -> order)
.map(this::validateOrder)
.flatMap(validOrder ->
Flow.of(() -> paymentService.processPayment(validOrder))
.withRetry(3)
.withTimeout(Duration.ofSeconds(5))
.withCompensation(paymentService::rollbackPayment)
.onError(e -> log.error("Payment failed", e))
.onComplete(payment -> log.info("Payment processed: {}", payment)))
.flatMap(payment ->
Flow.of(() -> inventoryService.reserveItems(order.getItems()))
.withRetry(2)
.withCompensation(inventoryService::releaseItems))
.thenCompose(inventory ->
Flow.of(() -> notificationService.notifyCustomer(order))
.withRetry(3)
.async())
.execute();
}
@Transactional
public OrderResult processOrderWithTransaction(Order order) {
return Flow.of(() -> order)
.withContextData("transactionId", UUID.randomUUID().toString())
.onEvent(event -> {
if (event.getType() == FlowEvent.EventType.FLOW_ERROR) {
TransactionAspectSupport.currentTransactionStatus().setRollbackOnly();
}
})
.map(this::validateOrder)
.flatMap(this::processPayment)
.flatMap(this::updateInventory)
.execute();
}
private Flow processPayment(Order order) {
return Flow.of(() -> paymentService.processPayment(order))
.withRetry(3)
.withTimeout(Duration.ofSeconds(5))
.withCompensation(paymentService::rollbackPayment);
}
private Flow updateInventory(PaymentResult payment) {
return Flow.of(() -> inventoryService.updateInventory(payment.getOrder()))
.withRetry(2)
.withCompensation(inventoryService::rollback);
}
}
Advanced Examples
Context Sharing
Flow.of(() -> new FlowContext())
.map(ctx -> {
ctx.put("userId", "12345");
return ctx;
})
.flatMap(ctx ->
Flow.of(() -> userService.getUser(ctx.get("userId")))
.withContext(ctx)
)
.execute();
Event Monitoring
Flow.of(() -> "process")
.withEventListener(event -> {
if (event.getType() == FlowEventType.STEP_START) {
log.info("Starting step: {}", event.getStepName());
}
})
.map(str -> str + " step 1")
.map(str -> str + " step 2")
.execute();
Conditional Flows
Flow.of(() -> order)
.filter(Order::isValid)
.branch(
order -> order.getTotal() > 1000,
premium -> premium
.map(OrderService::applyPremiumDiscount)
.map(NotificationService::sendPremiumNotification),
regular -> regular
.map(OrderService::applyRegularDiscount)
)
.execute();
Circuit Breaker Pattern
Flow.of(() -> externalService.call())
.withCircuitBreaker(CircuitBreakerConfig.builder()
.failureThreshold(5)
.resetTimeout(Duration.ofMinutes(1))
.build())
.withFallback(() -> "fallback-response")
.execute();
Async Composition
CompletableFuture result = Flow.of(() -> order)
.thenCompose(order ->
Flow.parallel(
() -> paymentService.processAsync(order),
() -> inventoryService.checkAsync(order),
() -> fraudService.validateAsync(order)
)
.withParallelism(3)
.async()
)
.thenApply(results -> OrderResult.combine(results))
.executeAsync();
Parallel Processing with Context
// Process items in parallel with shared context
List items = List.of("item1", "item2", "item3");
List results = Flow.just(items)
.parallelMap(Flow.CheckedFunction.wrap((String item) -> {
// Access shared context safely in parallel operations
Integer multiplier = Flow.currentContext()
.get("multiplier", Integer.class)
.orElse(1);
return item.toUpperCase() + "-" + multiplier;
}))
.withParallelism(3)
.withTimeout(Duration.ofSeconds(1))
.withContextData("multiplier", 42)
.execute();
Controlled Parallel Execution
// Process large datasets with controlled parallelism
List numbers = IntStream.range(0, 100).boxed().toList();
List doubled = Flow.parallel(5, numbers.stream()
.map(n -> (Supplier) () -> n * 2)
.toArray(Supplier[]::new))
.execute();
Error Handling in Parallel Operations
Flow.parallel(suppliers)
.withRetry(3)
.withBackoff(Duration.ofMillis(100))
.withFallback(() -> fallbackValue)
.withCompensation(result -> cleanup(result))
.execute();
Spring Boot Integration with Context
@Service
@Slf4j
public class OrderService {
public OrderResult processOrdersInParallel(List orders) {
return Flow.just(orders)
.withContextData("batchId", UUID.randomUUID().toString())
.withContextData("timestamp", Instant.now())
.parallelMap(order -> {
// Access shared context in parallel threads
String batchId = Flow.currentContext()
.get("batchId", String.class)
.orElseThrow();
return processOrder(order, batchId);
})
.withParallelism(5)
.withTimeout(Duration.ofMinutes(1))
.withCompensation(this::rollbackOrders)
.execute();
}
}
Performance and Resource Management
Flow DSL leverages Java 21's virtual threads for optimal performance in parallel operations:
- Efficient thread utilization with virtual threads
- Automatic thread pool management
- Controlled resource cleanup
- Thread-safe context propagation
- Optimized memory usage