The Java Executor Framework, introduced in Java 5 as part of the java.util.concurrent package, provides a powerful and flexible thread pool implementation for running asynchronous tasks. It separates task submission from task execution mechanics, making concurrent programming more manageable and efficient.
Introduction
The Java Executor Framework, introduced in Java 5 as part of the java.util.concurrent package, provides a powerful and flexible thread pool implementation for running asynchronous tasks. It separates task submission from task execution mechanics, making concurrent programming more manageable and efficient.
Core Components
The Executor Framework is built around several key interfaces and classes:
Main Interfaces
- Executor: The base interface with a single
execute(Runnable)method - ExecutorService: Extends Executor with more methods for managing tasks and lifecycle
- ScheduledExecutorService: Adds scheduling capabilities for delayed and periodic tasks
- Future: Represents the result of an asynchronous computation
- Callable: Similar to Runnable, but can return a result and throw exceptions
Important Implementation Classes
- Executors: Factory class with static methods to create different executor types
- ThreadPoolExecutor: The primary implementation of ExecutorService
- ScheduledThreadPoolExecutor: Implementation of ScheduledExecutorService
- FutureTask: Implementation of Future that also implements Runnable
- CompletableFuture: Enhanced Future with support for composition, chaining, and exception handling
Basic Usage
Let’s start with a simple example:
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class BasicExecutorExample {
public static void main(String[] args) {
// Create a fixed thread pool with 3 threads
ExecutorService executor = Executors.newFixedThreadPool(3);
// Submit tasks to the executor
for (int i = 0; i < 10; i++) {
final int taskId = i;
executor.execute(() -> {
System.out.println("Task " + taskId + " is running on " +
Thread.currentThread().getName());
try {
// Simulate some work
Thread.sleep(500);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
System.out.println("Task " + taskId + " completed");
});
}
// Shut down the executor
executor.shutdown();
// Wait for all tasks to complete or timeout
System.out.println("All tasks submitted. Waiting for completion...");
}
}
This example creates a fixed thread pool with three threads and submits ten tasks to it. The thread pool will reuse the three threads to execute all tasks.
Thread Pool Types
The Executors factory class provides several static methods to create different types of thread pools:
1. Fixed Thread Pool
ExecutorService executor = Executors.newFixedThreadPool(nThreads);
- Creates a thread pool with a fixed number of threads
- If all threads are active when a new task is submitted, it will wait in a queue
- Good for limiting resource usage and when you have a known number of tasks
2. Cached Thread Pool
ExecutorService executor = Executors.newCachedThreadPool();
- Creates an expandable thread pool
- Creates new threads as needed and reuses idle threads
- Threads that remain idle for 60 seconds are terminated
- Good for many short-lived tasks
3. Single Thread Executor
ExecutorService executor = Executors.newSingleThreadExecutor();
- Creates an executor with a single worker thread
- Tasks are guaranteed to execute sequentially
- Good for tasks that must be executed in a specific order
4. Scheduled Thread Pool
ScheduledExecutorService executor = Executors.newScheduledThreadPool(corePoolSize);
- Creates a thread pool that can schedule tasks to run after a delay or periodically
- Good for tasks that need to run on a schedule
5. Work Stealing Pool (Java 8+)
ExecutorService executor = Executors.newWorkStealingPool(parallelism);
- Creates a thread pool using the work-stealing algorithm
- Automatically adjusts the number of threads to maintain parallelism level
- Good for tasks that split into smaller subtasks
Working with Callable and Future
While Runnable is useful for tasks that don’t return results, Callable allows you to execute tasks that return values and throw checked exceptions.
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
public class CallableFutureExample {
public static void main(String[] args) {
ExecutorService executor = Executors.newFixedThreadPool(3);
List<Future<Integer>> futures = new ArrayList<>();
// Submit Callable tasks
for (int i = 0; i < 5; i++) {
final int taskId = i;
Future<Integer> future = executor.submit(() -> {
System.out.println("Computing task " + taskId);
// Simulate work
Thread.sleep(1000);
return taskId * 100;
});
futures.add(future);
}
// Retrieve results from futures
for (Future<Integer> future : futures) {
try {
// get() blocks until the result is available
Integer result = future.get();
System.out.println("Result: " + result);
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
executor.shutdown();
}
}
Key Future Methods
get(): Blocks until the task completesget(long timeout, TimeUnit unit): Blocks with a timeoutisDone(): Checks if the task is completecancel(boolean mayInterruptIfRunning): Attempts to cancel executionisCancelled(): Checks if the task was cancelled
CompletableFuture API
Java 8 introduced CompletableFuture, which enhances the Future interface with support for composing asynchronous operations:
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class CompletableFutureExample {
public static void main(String[] args) {
ExecutorService executor = Executors.newFixedThreadPool(3);
// Create a CompletableFuture
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return "Hello";
}, executor);
// Chain transformations
CompletableFuture<String> transformedFuture = future
.thenApply(s -> s + " World")
.thenApply(String::toUpperCase);
// Add a callback
transformedFuture.thenAccept(result ->
System.out.println("Result: " + result));
// Wait a bit to let the async operations complete
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
executor.shutdown();
}
}
Key CompletableFuture Features
- Chaining operations:
thenApply(),thenCompose(),thenCombine() - Error handling:
exceptionally(),handle() - Callbacks:
thenAccept(),thenRun() - Combining futures:
allOf(),anyOf() - Timeouts:
completeOnTimeout(),orTimeout()
Scheduled Tasks
The ScheduledExecutorService interface allows you to schedule tasks to run after a delay or periodically:
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class ScheduledTasksExample {
public static void main(String[] args) {
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(2);
// Schedule a task to run after a delay
scheduler.schedule(() ->
System.out.println("Delayed task executed"),
2, TimeUnit.SECONDS);
// Schedule a task to run periodically
scheduler.scheduleAtFixedRate(() ->
System.out.println("Fixed rate task - " + System.currentTimeMillis()/1000),
1, 2, TimeUnit.SECONDS);
// Schedule with fixed delay between tasks
scheduler.scheduleWithFixedDelay(() -> {
System.out.println("Fixed delay task - " + System.currentTimeMillis()/1000);
try {
Thread.sleep(1000); // Simulate work
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}, 0, 2, TimeUnit.SECONDS);
// Let the tasks run for a while
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
scheduler.shutdown();
}
}
Key Scheduling Methods
schedule(Runnable, delay, unit): Executes once after the delayschedule(Callable, delay, unit): Executes and returns a result after the delayscheduleAtFixedRate(Runnable, initialDelay, period, unit): Executes periodically at a fixed ratescheduleWithFixedDelay(Runnable, initialDelay, delay, unit): Executes with a fixed delay between tasks
The key difference between scheduleAtFixedRate and scheduleWithFixedDelay:
scheduleAtFixedRate: Attempts to maintain a constant execution frequencyscheduleWithFixedDelay: Maintains a delay period between the end of one execution and the start of the next
Thread Pool Configuration
When using ThreadPoolExecutor directly, you have fine-grained control over thread pool behavior:
import java.util.concurrent.*;
public class ThreadPoolConfigExample {
public static void main(String[] args) {
// Create a custom thread pool
ThreadPoolExecutor executor = new ThreadPoolExecutor(
2, // corePoolSize
5, // maximumPoolSize
60, TimeUnit.SECONDS, // keepAliveTime
new LinkedBlockingQueue<>(10), // workQueue
new CustomThreadFactory(), // threadFactory
new CustomRejectionHandler() // rejectionHandler
);
// Set additional properties
executor.allowCoreThreadTimeOut(true);
// Submit tasks
for (int i = 0; i < 15; i++) {
final int taskId = i;
executor.submit(() -> {
System.out.println("Task " + taskId + " running on " +
Thread.currentThread().getName());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return taskId;
});
}
// Shutdown after tasks complete
executor.shutdown();
}
// Custom thread factory
static class CustomThreadFactory implements ThreadFactory {
private int counter = 0;
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r, "CustomWorker-" + counter++);
thread.setPriority(Thread.NORM_PRIORITY);
thread.setDaemon(false);
return thread;
}
}
// Custom rejection handler
static class CustomRejectionHandler implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
System.out.println("Task rejected: " + r.toString());
// Implement custom logic for rejected tasks
// e.g., save to disk, retry later, etc.
}
}
}
Key Configuration Parameters
- corePoolSize: The minimum number of threads to keep alive
- maximumPoolSize: The maximum number of threads allowed
- keepAliveTime: How long excess idle threads will wait for new tasks before terminating
- workQueue: The queue used for holding tasks before execution
- threadFactory: Factory for creating new threads
- rejectionHandler: Handler for tasks that cannot be executed
Queue Types
- LinkedBlockingQueue: Unbounded queue (default for newFixedThreadPool)
- ArrayBlockingQueue: Bounded queue with fixed capacity
- SynchronousQueue: No capacity, handoffs tasks directly (default for newCachedThreadPool)
- PriorityBlockingQueue: Priority-based queue
Rejection Policies
- AbortPolicy: Throws RejectedExecutionException (default)
- CallerRunsPolicy: Runs the task in the caller’s thread
- DiscardPolicy: Silently discards the task
- DiscardOldestPolicy: Discards the oldest unexecuted task and tries again
Exception Handling
Exception handling in thread pools requires special attention:
import java.util.concurrent.*;
public class ExceptionHandlingExample {
public static void main(String[] args) {
ExecutorService executor = Executors.newFixedThreadPool(2);
// Handling exceptions with Future
Future<Integer> future = executor.submit(() -> {
throw new RuntimeException("Deliberate exception");
});
try {
Integer result = future.get(); // This will throw ExecutionException
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
System.out.println("Task interrupted: " + e.getMessage());
} catch (ExecutionException e) {
System.out.println("Task threw an exception: " + e.getCause().getMessage());
}
// Using try-catch within the task
executor.execute(() -> {
try {
// Task code that might throw an exception
int x = 1 / 0; // Will throw ArithmeticException
} catch (Exception e) {
System.out.println("Caught exception in task: " + e.getMessage());
}
});
// Using a custom UncaughtExceptionHandler
ThreadFactory threadFactory = r -> {
Thread t = new Thread(r);
t.setUncaughtExceptionHandler((thread, throwable) ->
System.out.println("Uncaught exception in thread " +
thread.getName() + ": " + throwable.getMessage()));
return t;
};
ExecutorService customExecutor = Executors.newFixedThreadPool(2, threadFactory);
// This exception won't be caught by UncaughtExceptionHandler because it's handled by ExecutorService
customExecutor.execute(() -> {
throw new RuntimeException("Exception in task");
});
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
executor.shutdown();
customExecutor.shutdown();
}
}
Best Practices for Exception Handling
- Always catch and handle exceptions within tasks when possible
- Use Future.get() to catch exceptions from Callable tasks
- Consider using CompletableFuture’s exception handling methods like
exceptionally()andhandle() - Set an UncaughtExceptionHandler for truly uncaught exceptions
- Log exceptions properly to aid debugging
Graceful Shutdown
Proper shutdown of thread pools is essential to avoid resource leaks:
import java.util.concurrent.*;
public class GracefulShutdownExample {
public static void main(String[] args) {
ExecutorService executor = Executors.newFixedThreadPool(4);
// Submit some tasks
for (int i = 0; i < 10; i++) {
final int taskId = i;
executor.submit(() -> {
try {
System.out.println("Starting task " + taskId);
Thread.sleep(taskId * 500); // Variable execution time
System.out.println("Completed task " + taskId);
return taskId;
} catch (InterruptedException e) {
System.out.println("Task " + taskId + " interrupted");
Thread.currentThread().interrupt();
return -1;
}
});
}
// Initiate graceful shutdown
executor.shutdown();
try {
// Wait for tasks to complete, with a timeout
if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
System.out.println("Some tasks didn't finish, forcing shutdown...");
// Cancel currently executing tasks
executor.shutdownNow();
// Wait again for tasks to respond to interruption
if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
System.out.println("Thread pool did not terminate");
}
} else {
System.out.println("All tasks completed successfully");
}
} catch (InterruptedException e) {
// (Re-)Cancel if current thread also interrupted
executor.shutdownNow();
Thread.currentThread().interrupt();
}
}
}
Shutdown Process
shutdown(): Stops accepting new tasks, but allows existing tasks to completeawaitTermination(): Blocks until all tasks complete or the timeout occursshutdownNow(): Attempts to stop all actively executing tasks and returns pending tasks- Use
isTerminated()to check if all tasks have completed following shutdown
Best Practices
- Choose the right pool type based on your workload characteristics
- Size your thread pools appropriately
- generally near the number of available processors for CPU-bound tasks, higher for I/O-bound tasks
- Avoid blocking operations in tasks when possible
- Use time limits with
Future.get()to avoid indefinite blocking - Always shut down your executors when they’re no longer needed
- Monitor thread pool metrics in production environments
- Avoid submitting tasks from within other tasks (can lead to thread pool starvation)
- Use meaningful thread names for easier debugging
- Consider task dependencies and use CompletableFuture for complex flows
- Handle exceptions properly in all tasks
Real-World Examples
Web Crawler
import java.util.*;
import java.util.concurrent.*;
import java.net.URL;
public class WebCrawlerExample {
private final ConcurrentHashMap<String, Boolean> visitedUrls = new ConcurrentHashMap<>();
private final ExecutorService executor = Executors.newFixedThreadPool(10);
private final int maxDepth = 3;
public void crawl(String startUrl, int depth) {
if (depth > maxDepth || visitedUrls.putIfAbsent(startUrl, true) != null) {
return;
}
try {
// Download and parse the page (simplified)
System.out.println("Crawling: " + startUrl + " at depth " + depth);
// Simulate getting links from the page
List<String> links = getLinksFromPage(startUrl);
// Process each link in parallel
List<CompletableFuture<Void>> futures = new ArrayList<>();
for (String link : links) {
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
crawl(link, depth + 1);
}, executor);
futures.add(future);
}
// Wait for all crawl operations at this depth to complete
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
} catch (Exception e) {
System.out.println("Error crawling " + startUrl + ": " + e.getMessage());
}
}
private List<String> getLinksFromPage(String url) {
// This would actually parse HTML and extract links
// Simplified for the example
try {
Thread.sleep(100); // Simulate network delay
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
// Return fake links
return Arrays.asList(
url + "/page1",
url + "/page2",
url + "/page3"
);
}
public void shutdown() {
executor.shutdown();
try {
if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
executor.shutdownNow();
}
} catch (InterruptedException e) {
executor.shutdownNow();
Thread.currentThread().interrupt();
}
}
public static void main(String[] args) {
WebCrawlerExample crawler = new WebCrawlerExample();
crawler.crawl("https://example.com", 0);
crawler.shutdown();
}
}
Image Processing Pipeline
import java.util.concurrent.*;
import java.util.List;
import java.util.ArrayList;
import java.io.File;
public class ImageProcessingExample {
private final ExecutorService downloadExecutor = Executors.newFixedThreadPool(5);
private final ExecutorService processingExecutor = Executors.newFixedThreadPool(
Runtime.getRuntime().availableProcessors());
private final ExecutorService savingExecutor = Executors.newFixedThreadPool(3);
public void processImages(List<String> imageUrls) {
List<CompletableFuture<Void>> pipelines = new ArrayList<>();
for (String url : imageUrls) {
// Create a processing pipeline for each image
CompletableFuture<byte[]> downloadFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("Downloading image: " + url);
return downloadImage(url);
}, downloadExecutor);
CompletableFuture<byte[]> processingFuture = downloadFuture.thenApplyAsync(imageData -> {
System.out.println("Processing image: " + url);
return applyFilters(imageData);
}, processingExecutor);
CompletableFuture<Void> savingFuture = processingFuture.thenAcceptAsync(processedData -> {
System.out.println("Saving image: " + url);
saveImage(url, processedData);
}, savingExecutor);
// Add error handling
CompletableFuture<Void> pipeline = savingFuture.exceptionally(ex -> {
System.err.println("Error processing image " + url + ": " + ex.getMessage());
return null;
});
pipelines.add(pipeline);
}
// Wait for all pipelines to complete
CompletableFuture.allOf(pipelines.toArray(new CompletableFuture[0])).join();
}
private byte[] downloadImage(String url) {
// Simulate downloading an image
try {
Thread.sleep(500);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return new byte[1024]; // Dummy data
}
private byte[] applyFilters(byte[] imageData) {
// Simulate image processing
try {
Thread.sleep(200);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return imageData; // Return processed data
}
private void saveImage(String url, byte[] imageData) {
// Simulate saving the image
try {
Thread.sleep(300);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
String filename = url.substring(url.lastIndexOf('/') + 1);
System.out.println("Saved image to: " + filename);
}
public void shutdown() {
downloadExecutor.shutdown();
processingExecutor.shutdown();
savingExecutor.shutdown();
}
public static void main(String[] args) {
List<String> imageUrls = List.of(
"https://example.com/image1.jpg",
"https://example.com/image2.jpg",
"https://example.com/image3.jpg",
"https://example.com/image4.jpg",
"https://example.com/image5.jpg"
);
ImageProcessingExample processor = new ImageProcessingExample();
processor.processImages(imageUrls);
processor.shutdown();
}
}
Task Prioritization Example
import java.util.concurrent.*;
public class PriorityTaskExample {
// Define task priority levels
public enum TaskPriority {
HIGH(1),
MEDIUM(2),
LOW(3);
private final int value;
TaskPriority(int value) {
this.value = value;
}
public int getValue() {
return value;
}
}
// Create a task class that implements comparable
static class PriorityTask implements Runnable, Comparable<PriorityTask> {
private final String name;
private final TaskPriority priority;
public PriorityTask(String name, TaskPriority priority) {
this.name = name;
this.priority = priority;
}
@Override
public void run() {
System.out.println("Executing " + priority + " priority task: " + name +
" on thread " + Thread.currentThread().getName());
try {
// Simulate work
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
@Override
public int compareTo(PriorityTask other) {
// Higher priority (lower value) comes first
return Integer.compare(this.priority.getValue(), other.priority.getValue());
}
}
public static void main(String[] args) {
// Create thread pool with priority queue
ThreadPoolExecutor executor = new ThreadPoolExecutor(
2, 2, 0L, TimeUnit.MILLISECONDS,
new PriorityBlockingQueue<>());
// Submit tasks with different priorities
executor.execute(new PriorityTask("Task 1", TaskPriority.LOW));
executor.execute(new PriorityTask("Task 2", TaskPriority.HIGH));
executor.execute(new PriorityTask("Task 3", TaskPriority.MEDIUM));
executor.execute(new PriorityTask("Task 4", TaskPriority.HIGH));
executor.execute(new PriorityTask("Task 5", TaskPriority.LOW));
executor.execute(new PriorityTask("Task 6", TaskPriority.MEDIUM));
// Wait for all tasks to complete
executor.shutdown();
try {
executor.awaitTermination(10, TimeUnit.SECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
Conclusion
The Java Executor Framework provides a robust and flexible solution for managing concurrent tasks and thread pools. By separating task submission from execution mechanics, it enables cleaner, more maintainable code while offering powerful features like task scheduling, result handling, and composition of asynchronous operations.
When designing concurrent applications, carefully consider your workload characteristics to choose the appropriate thread pool type and configuration. Remember to follow best practices for exception handling and proper shutdown to ensure your applications run reliably.
With the introduction of CompletableFuture in Java 8 and subsequent enhancements, the Executor Framework has become even more powerful for designing complex asynchronous workflows with dependencies, transformations, and robust error handling.
