Concurrent Utilities

Project: A Concurrent Task Processor

15 min Lesson 10 of 13

Project: A Concurrent Task Processor

Throughout this tutorial you have built up a toolkit: thread pools, Callable and Future, CompletableFuture, concurrent collections, locks, and synchronizers. In this capstone project you will combine those pieces into a small but realistic application — a Concurrent Task Processor — that accepts a batch of work items, processes them in parallel using a bounded thread pool, aggregates the results, and reports progress as work completes.

The project has three layers:

  1. Task definition — a record that describes one unit of work.
  2. Processor — submits tasks to an ExecutorService, gathers CompletableFuture handles, and streams the finished results.
  3. Driver — wires everything together, feeds tasks in, and prints a final summary.

Step 1: Model the Work

A Java record makes the task description concise and immutable. Each WorkItem has an id and a simulated payload:

import java.util.concurrent.ThreadLocalRandom; public record WorkItem(int id, String payload) { /** Simulate CPU-bound or IO-bound work; may throw on bad input. */ public String process() { int delay = ThreadLocalRandom.current().nextInt(50, 300); try { Thread.sleep(delay); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new RuntimeException("Work interrupted", e); } if (payload.isBlank()) { throw new IllegalArgumentException("Empty payload for item " + id); } return "Item-%d: processed '%s' in %d ms".formatted(id, payload, delay); } }
Why a record? Records give you equals, hashCode, toString, and final fields for free. Immutable data flowing through concurrent code is always safer — no shared mutable state to protect.

Step 2: Build the Processor

The processor class owns a fixed-size thread pool and exposes a single method that accepts a list of work items and returns a list of CompletableFuture<String>. Using CompletableFuture instead of raw Future lets us chain callbacks without blocking the calling thread:

import java.util.List; import java.util.concurrent.*; import java.util.stream.Collectors; public final class TaskProcessor implements AutoCloseable { private final ExecutorService pool; public TaskProcessor(int parallelism) { this.pool = Executors.newFixedThreadPool(parallelism, Thread.ofVirtual().factory()); // virtual threads (Java 21+) } /** * Submit all items for concurrent processing. * Each future resolves to the result string or completes exceptionally. */ public List<CompletableFuture<String>> submitAll(List<WorkItem> items) { return items.stream() .map(item -> CompletableFuture .supplyAsync(item::process, pool) .exceptionally(ex -> "Item-%d FAILED: %s".formatted(item.id(), ex.getMessage()))) .collect(Collectors.toList()); } @Override public void close() { pool.shutdown(); try { if (!pool.awaitTermination(10, TimeUnit.SECONDS)) { pool.shutdownNow(); } } catch (InterruptedException e) { pool.shutdownNow(); Thread.currentThread().interrupt(); } } }
Java 21 virtual threads: Thread.ofVirtual().factory() creates a virtual-thread factory. Passing it to newFixedThreadPool means each task gets a lightweight virtual thread. For IO-heavy workloads this scales far better than platform threads — the JVM parks virtual threads during blocking calls without occupying an OS thread. For pure CPU work, keep the pool size at Runtime.getRuntime().availableProcessors().

Step 3: Aggregate Results with a Concurrent Counter

As futures complete we want a live success/failure count. LongAdder is designed exactly for high-contention increments — faster than AtomicLong under heavy writes because it stripes the counter across cells:

import java.util.concurrent.atomic.LongAdder; public final class ResultAggregator { private final LongAdder successes = new LongAdder(); private final LongAdder failures = new LongAdder(); public void record(String result) { if (result.contains("FAILED")) { failures.increment(); } else { successes.increment(); } } public void printSummary(int total) { System.out.printf("%n=== Summary ===%n"); System.out.printf("Total : %d%n", total); System.out.printf("OK : %d%n", successes.sum()); System.out.printf("FAILED: %d%n", failures.sum()); } }

Step 4: Drive the Pipeline

The driver creates the work items, submits them, attaches a per-item callback to print progress immediately as each future finishes, then waits for the whole batch before printing the summary:

import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.stream.IntStream; public final class Main { public static void main(String[] args) throws Exception { int batchSize = 20; int parallelism = 8; List<WorkItem> batch = IntStream.rangeClosed(1, batchSize) .mapToObj(i -> new WorkItem(i, i % 7 == 0 ? "" : "task-" + i)) .toList(); ResultAggregator agg = new ResultAggregator(); try (TaskProcessor processor = new TaskProcessor(parallelism)) { List<CompletableFuture<String>> futures = processor.submitAll(batch); // attach a non-blocking callback to each future List<CompletableFuture<Void>> reporters = futures.stream() .map(f -> f.thenAccept(result -> { System.out.println(result); agg.record(result); })) .toList(); // wait for every reporter to finish — does NOT block the pool CompletableFuture.allOf(reporters.toArray(new CompletableFuture[0])).join(); } agg.printSummary(batchSize); } }
Why thenAccept instead of looping over future.get()? Calling get() in a loop processes results in submission order — if item 1 takes 300 ms but item 2 finishes in 50 ms, you still wait. thenAccept fires the callback on whichever thread completes the future first, so you see output arrive in completion order, not submission order. allOf(...).join() then waits only for the slowest one.

Key Trade-offs in the Design

  • Pool size vs. workload shape: For IO-heavy tasks (network, disk), virtual threads or a large pool beats a small fixed pool. For CPU-bound tasks, a pool of size availableProcessors() maximises throughput without over-subscribing the CPU.
  • Error isolation: exceptionally converts an exception into a sentinel string so one bad item never cancels the batch. An alternative is handle(), which receives both result and exception together and lets you return a richer error object.
  • Back-pressure: If tasks are produced faster than they are consumed the pool queue grows unboundedly. For high-throughput systems consider a ThreadPoolExecutor with a bounded ArrayBlockingQueue and a CallerRunsPolicy rejection handler — that slows the producer naturally.
  • Observability: In production you would replace System.out.println with structured logging and expose the aggregator counters via a metrics endpoint (Micrometer, Prometheus).
Do not share the pool outside the try-with-resources block. The AutoCloseable implementation shuts the pool down when the block exits. If a reference escapes, callers can submit tasks to a shut-down executor and get RejectedExecutionException at runtime.

Extending the Project

Once the baseline works, consider these exercises to deepen your understanding:

  1. Timeout per task — wrap each supplyAsync with .orTimeout(2, TimeUnit.SECONDS). Tasks that run too long complete exceptionally with TimeoutException.
  2. Priority queue — replace the list with a PriorityBlockingQueue<WorkItem> sorted by a priority field. Feed items from the queue into submitAll in batches.
  3. Phased processing — chain a second CompletableFuture stage after the first that persists the result to a database. Use thenApplyAsync with a second, smaller executor to avoid blocking the compute pool with IO.

Summary

This project brought together every concept from the tutorial: an ExecutorService for pooled execution, CompletableFuture for non-blocking composition, exceptionally for fault isolation, LongAdder for high-performance aggregation, and allOf for barrier synchronisation. The key insight is that each tool has a job — combine them by role, keep shared state minimal and well-typed, and let the functional pipeline carry data through the stages cleanly.