Flow DSL: Declarative Business Logic

Focus on What Your Code Does, Not How It Does It

Built for Java 21 with Virtual Thread Support

The "What" Over "How" Concept

In traditional imperative programming, we often focus on how to perform tasks - the specific steps, the order of operations, and the implementation details. This approach can lead to:

Declarative programming, on the other hand, emphasizes what we want to achieve rather than how to achieve it. This paradigm shift brings several advantages:

Why Flow DSL?

Flow DSL is a practical implementation of the "What" over "How" principle. It provides a declarative way to express business workflows while handling the complex "how" details under the hood.

Traditional Approach


try {
    // Validate order
    if (!validateOrder(order)) {
        throw new ValidationException();
    }
    
    // Process payment
    PaymentResult payment = null;
    for (int i = 0; i < 3; i++) {
        try {
            payment = processPayment(order);
            break;
        } catch (Exception e) {
            if (i == 2) throw e;
            Thread.sleep(100 * (i + 1));
        }
    }
    
    // Check inventory
    List inventory = new ArrayList<>();
    ExecutorService executor = Executors.newFixedThreadPool(order.items.size());
    try {
        List> futures = new ArrayList<>();
        for (OrderItem item : order.items) {
            futures.add(executor.submit(() -> checkInventory(item)));
        }
        for (Future future : futures) {
            inventory.add(future.get(5, TimeUnit.SECONDS));
        }
    } finally {
        executor.shutdown();
    }
} catch (Exception e) {
    // Compensation logic
    if (payment != null) {
        rollbackPayment(payment);
    }
    for (InventoryResult result : inventory) {
        rollbackInventory(result);
    }
    throw e;
}
                    

Flow DSL Approach


Flow.of(() -> order)
    .map(OrderService::validateOrder)
    .flatMap(validOrder -> 
        Flow.of(() -> processPayment(validOrder))
            .withRetry(3)
            .withBackoff(Duration.ofMillis(100))
            .withTimeout(Duration.ofSeconds(5))
            .withCompensation(PaymentService::rollback))
    .flatMap(payment -> 
        Flow.of(() -> order.items)
            .parallelMap(InventoryService::checkInventory)
            .withParallelism(order.items.size())
            .withCompensation(InventoryService::rollback))
    .execute();
                    

Key Features

Parallel Processing

  • Virtual thread support for optimal performance
  • Controlled parallelism with configurable thread pools
  • Thread-safe context propagation
  • Automatic resource cleanup

Resilience Patterns

  • Circuit breaker pattern
  • Retry with backoff support
  • Timeout handling
  • Compensation actions

Error Handling

  • Checked exception support
  • Parallel error propagation
  • Fallback mechanisms
  • Comprehensive error context

Examples

Basic Flow


Flow.of(() -> "Hello")
    .map(str -> str + " ")
    .map(str -> str + "World")
    .map(String::toUpperCase)
    .execute();  // Returns "HELLO WORLD"
            

Error Handling


Flow.of(() -> "data")
    .onError(error -> log.error("Flow failed", error))
    .onComplete(result -> log.info("Flow completed with: {}", result))
    .withRetry(3)
    .withBackoff(Duration.ofSeconds(1))
    .execute();
            

Parallel Processing


Flow.parallel(
    () -> "Task 1",
    () -> "Task 2",
    () -> "Task 3"
)
.withParallelism(3)
.execute();
            

Spring Boot Integration

Flow DSL integrates seamlessly with Spring Boot applications. Here's how to use it in your services:

Service Layer Integration


@Service
@Slf4j
public class OrderService {
    private final PaymentService paymentService;
    private final InventoryService inventoryService;
    private final NotificationService notificationService;

    public OrderResult processOrder(Order order) {
        return Flow.of(() -> order)
            .map(this::validateOrder)
            .flatMap(validOrder -> 
                Flow.of(() -> paymentService.processPayment(validOrder))
                    .withRetry(3)
                    .withTimeout(Duration.ofSeconds(5))
                    .withCompensation(paymentService::rollbackPayment)
                    .onError(e -> log.error("Payment failed", e))
                    .onComplete(payment -> log.info("Payment processed: {}", payment)))
            .flatMap(payment -> 
                Flow.of(() -> inventoryService.reserveItems(order.getItems()))
                    .withRetry(2)
                    .withCompensation(inventoryService::releaseItems))
            .thenCompose(inventory -> 
                Flow.of(() -> notificationService.notifyCustomer(order))
                    .withRetry(3)
                    .async())
            .execute();
    }

    @Transactional
    public OrderResult processOrderWithTransaction(Order order) {
        return Flow.of(() -> order)
            .withContextData("transactionId", UUID.randomUUID().toString())
            .onEvent(event -> {
                if (event.getType() == FlowEvent.EventType.FLOW_ERROR) {
                    TransactionAspectSupport.currentTransactionStatus().setRollbackOnly();
                }
            })
            .map(this::validateOrder)
            .flatMap(this::processPayment)
            .flatMap(this::updateInventory)
            .execute();
    }

    private Flow processPayment(Order order) {
        return Flow.of(() -> paymentService.processPayment(order))
            .withRetry(3)
            .withTimeout(Duration.ofSeconds(5))
            .withCompensation(paymentService::rollbackPayment);
    }

    private Flow updateInventory(PaymentResult payment) {
        return Flow.of(() -> inventoryService.updateInventory(payment.getOrder()))
            .withRetry(2)
            .withCompensation(inventoryService::rollback);
    }
}
            

Advanced Examples

Context Sharing


Flow.of(() -> new FlowContext())
    .map(ctx -> {
        ctx.put("userId", "12345");
        return ctx;
    })
    .flatMap(ctx ->
        Flow.of(() -> userService.getUser(ctx.get("userId")))
            .withContext(ctx)
    )
    .execute();
            

Event Monitoring


Flow.of(() -> "process")
    .withEventListener(event -> {
        if (event.getType() == FlowEventType.STEP_START) {
            log.info("Starting step: {}", event.getStepName());
        }
    })
    .map(str -> str + " step 1")
    .map(str -> str + " step 2")
    .execute();
            

Conditional Flows


Flow.of(() -> order)
    .filter(Order::isValid)
    .branch(
        order -> order.getTotal() > 1000,
        premium -> premium
            .map(OrderService::applyPremiumDiscount)
            .map(NotificationService::sendPremiumNotification),
        regular -> regular
            .map(OrderService::applyRegularDiscount)
    )
    .execute();
            

Circuit Breaker Pattern


Flow.of(() -> externalService.call())
    .withCircuitBreaker(CircuitBreakerConfig.builder()
        .failureThreshold(5)
        .resetTimeout(Duration.ofMinutes(1))
        .build())
    .withFallback(() -> "fallback-response")
    .execute();
            

Async Composition


CompletableFuture result = Flow.of(() -> order)
    .thenCompose(order -> 
        Flow.parallel(
            () -> paymentService.processAsync(order),
            () -> inventoryService.checkAsync(order),
            () -> fraudService.validateAsync(order)
        )
        .withParallelism(3)
        .async()
    )
    .thenApply(results -> OrderResult.combine(results))
    .executeAsync();
            

Parallel Processing with Context


// Process items in parallel with shared context
List items = List.of("item1", "item2", "item3");
List results = Flow.just(items)
    .parallelMap(Flow.CheckedFunction.wrap((String item) -> {
        // Access shared context safely in parallel operations
        Integer multiplier = Flow.currentContext()
            .get("multiplier", Integer.class)
            .orElse(1);
        return item.toUpperCase() + "-" + multiplier;
    }))
    .withParallelism(3)
    .withTimeout(Duration.ofSeconds(1))
    .withContextData("multiplier", 42)
    .execute();
            

Controlled Parallel Execution


// Process large datasets with controlled parallelism
List numbers = IntStream.range(0, 100).boxed().toList();
List doubled = Flow.parallel(5, numbers.stream()
    .map(n -> (Supplier) () -> n * 2)
    .toArray(Supplier[]::new))
    .execute();
            

Error Handling in Parallel Operations


Flow.parallel(suppliers)
    .withRetry(3)
    .withBackoff(Duration.ofMillis(100))
    .withFallback(() -> fallbackValue)
    .withCompensation(result -> cleanup(result))
    .execute();
            

Spring Boot Integration with Context


@Service
@Slf4j
public class OrderService {
    public OrderResult processOrdersInParallel(List orders) {
        return Flow.just(orders)
            .withContextData("batchId", UUID.randomUUID().toString())
            .withContextData("timestamp", Instant.now())
            .parallelMap(order -> {
                // Access shared context in parallel threads
                String batchId = Flow.currentContext()
                    .get("batchId", String.class)
                    .orElseThrow();
                    
                return processOrder(order, batchId);
            })
            .withParallelism(5)
            .withTimeout(Duration.ofMinutes(1))
            .withCompensation(this::rollbackOrders)
            .execute();
    }
}
            

Performance and Resource Management

Flow DSL leverages Java 21's virtual threads for optimal performance in parallel operations: