ScheduledThreadPoolExecutor 任务调度与取消机制深度解析
引言
在Java并发编程中,ScheduledThreadPoolExecutor
是一个非常重要的线程池实现,它提供了定时任务和周期性任务的调度功能。而Future.cancel()
方法则是控制任务执行的关键机制,允许我们在任务执行过程中取消任务。
本文将深入分析ScheduledThreadPoolExecutor
的工作原理,以及Future.cancel()
方法的实现机制,帮助开发者更好地理解和使用这些并发工具。
ScheduledThreadPoolExecutor概述
1. 基本概念
ScheduledThreadPoolExecutor
是ThreadPoolExecutor
的子类,专门用于处理定时任务和周期性任务。它实现了ScheduledExecutorService
接口,提供了以下核心功能:
- 延迟执行:任务在指定延迟后执行
- 周期性执行:任务按照固定间隔重复执行
- 固定频率执行:任务按照固定频率执行,不受任务执行时间影响
- 固定延迟执行:任务按照固定延迟执行,考虑任务执行时间
2. 核心特性
public class ScheduledThreadPoolExecutor extends ThreadPoolExecutor
implements ScheduledExecutorService {
// 核心数据结构
private final DelayQueue<RunnableScheduledFuture<?>> delayedWorkQueue;
// 线程工厂
private final ThreadFactory threadFactory;
// 拒绝策略
private final RejectedExecutionHandler handler;
}
Future.cancel()方法详解
1. Future接口定义
public interface Future<V> {
// 尝试取消任务的执行
boolean cancel(boolean mayInterruptIfRunning);
// 判断任务是否已被取消
boolean isCancelled();
// 判断任务是否已完成
boolean isDone();
// 获取任务结果(阻塞)
V get() throws InterruptedException, ExecutionException;
// 获取任务结果(带超时)
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
2. cancel()方法参数说明
cancel(boolean mayInterruptIfRunning)
方法的参数含义:
- mayInterruptIfRunning = true:如果任务正在运行,尝试中断执行线程
- mayInterruptIfRunning = false:如果任务正在运行,不中断执行线程,等待任务自然完成
3. 返回值说明
- true:任务被成功取消(包括任务还未开始执行的情况)
- false:任务无法取消(通常是因为任务已经完成或已经被取消)
ScheduledThreadPoolExecutor内部实现
1. 核心数据结构
public class ScheduledThreadPoolExecutor extends ThreadPoolExecutor {
// 延迟队列,存储待执行的任务
private final DelayQueue<RunnableScheduledFuture<?>> delayedWorkQueue;
// 线程工厂
private final ThreadFactory threadFactory;
// 拒绝策略
private final RejectedExecutionHandler handler;
// 是否在关闭时继续执行已调度的任务
private volatile boolean continueExistingPeriodicTasksAfterShutdown;
// 是否在关闭时继续执行延迟任务
private volatile boolean executeExistingDelayedTasksAfterShutdown = true;
}
2. 任务包装类
private class ScheduledFutureTask<V> extends FutureTask<V>
implements RunnableScheduledFuture<V> {
// 任务序列号,用于排序
private final long sequenceNumber;
// 下次执行时间
private long time;
// 执行周期(0表示非周期性任务)
private final long period;
// 实际任务
RunnableScheduledFuture<V> outerTask = this;
// 在延迟队列中的索引
int heapIndex;
}
3. 任务调度机制
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
// 创建ScheduledFutureTask
RunnableScheduledFuture<?> t = decorateTask(command,
new ScheduledFutureTask<Void>(command, null, triggerTime(delay, unit)));
// 延迟执行任务
delayedExecute(t);
return t;
}
private void delayedExecute(RunnableScheduledFuture<?> task) {
if (isShutdown())
reject(task);
else {
// 将任务添加到延迟队列
super.getQueue().add(task);
// 如果线程池已关闭且不允许执行延迟任务,则移除任务
if (isShutdown() && !canRunInCurrentRunState(task.isPeriodic()) &&
remove(task))
task.cancel(false);
else
// 确保有工作线程运行
ensurePrestart();
}
}
Future.cancel()实现机制
1. ScheduledFutureTask中的cancel实现
public boolean cancel(boolean mayInterruptIfRunning) {
// 调用父类FutureTask的cancel方法
boolean cancelled = super.cancel(mayInterruptIfRunning);
if (cancelled && removeOnCancel && heapIndex >= 0)
// 从延迟队列中移除任务
remove(this);
return cancelled;
}
2. FutureTask中的cancel实现
public boolean cancel(boolean mayInterruptIfRunning) {
// 使用CAS操作设置状态
if (!(state == NEW &&
UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
return false;
try {
if (mayInterruptIfRunning) {
try {
Thread t = runner;
if (t != null)
t.interrupt();
} finally {
// 设置最终状态为INTERRUPTED
UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
}
}
} finally {
// 唤醒所有等待的线程
finishCompletion();
}
return true;
}
3. 状态转换机制
// FutureTask的状态定义
private static final int NEW = 0; // 新建
private static final int COMPLETING = 1; // 完成中
private static final int NORMAL = 2; // 正常完成
private static final int EXCEPTIONAL = 3; // 异常完成
private static final int CANCELLED = 4; // 已取消
private static final int INTERRUPTING = 5; // 中断中
private static final int INTERRUPTED = 6; // 已中断
实际应用示例
1. 基本使用示例
import java.util.concurrent.*;
public class ScheduledExecutorExample {
public static void main(String[] args) {
// 创建调度线程池
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(2);
try {
// 延迟执行任务
ScheduledFuture<?> future1 = scheduler.schedule(() -> {
System.out.println("延迟任务执行: " + System.currentTimeMillis());
}, 2, TimeUnit.SECONDS);
// 周期性任务(固定频率)
ScheduledFuture<?> future2 = scheduler.scheduleAtFixedRate(() -> {
System.out.println("固定频率任务: " + System.currentTimeMillis());
try {
Thread.sleep(1000); // 模拟任务执行时间
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}, 0, 3, TimeUnit.SECONDS);
// 周期性任务(固定延迟)
ScheduledFuture<?> future3 = scheduler.scheduleWithFixedDelay(() -> {
System.out.println("固定延迟任务: " + System.currentTimeMillis());
try {
Thread.sleep(1000); // 模拟任务执行时间
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}, 0, 3, TimeUnit.SECONDS);
// 等待一段时间后取消任务
Thread.sleep(10000);
// 取消任务
System.out.println("取消任务1: " + future1.cancel(false));
System.out.println("取消任务2: " + future2.cancel(true));
System.out.println("取消任务3: " + future3.cancel(true));
// 检查任务状态
System.out.println("任务1是否取消: " + future1.isCancelled());
System.out.println("任务2是否取消: " + future2.isCancelled());
System.out.println("任务3是否取消: " + future3.isCancelled());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
// 关闭线程池
scheduler.shutdown();
}
}
}
2. 高级使用示例
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
public class AdvancedScheduledExecutorExample {
private static final AtomicInteger taskCounter = new AtomicInteger(0);
public static void main(String[] args) {
// 创建自定义的ScheduledThreadPoolExecutor
ScheduledThreadPoolExecutor scheduler = new ScheduledThreadPoolExecutor(2) {
@Override
protected void beforeExecute(Thread t, Runnable r) {
System.out.println("任务开始执行: " + t.getName());
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
System.out.println("任务执行完成");
if (t != null) {
System.out.println("任务执行异常: " + t.getMessage());
}
}
};
// 设置拒绝策略
scheduler.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
// 设置线程工厂
scheduler.setThreadFactory(r -> {
Thread t = new Thread(r, "ScheduledTask-" + taskCounter.incrementAndGet());
t.setDaemon(false);
return t;
});
try {
// 创建可取消的任务
CancellableTask task = new CancellableTask();
ScheduledFuture<?> future = scheduler.scheduleAtFixedRate(task, 0, 2, TimeUnit.SECONDS);
// 让任务运行一段时间
Thread.sleep(8000);
// 取消任务
System.out.println("尝试取消任务...");
boolean cancelled = future.cancel(true);
System.out.println("任务取消结果: " + cancelled);
// 等待任务完全停止
Thread.sleep(2000);
// 检查任务状态
System.out.println("任务是否取消: " + future.isCancelled());
System.out.println("任务是否完成: " + future.isDone());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
scheduler.shutdown();
try {
if (!scheduler.awaitTermination(5, TimeUnit.SECONDS)) {
scheduler.shutdownNow();
}
} catch (InterruptedException e) {
scheduler.shutdownNow();
Thread.currentThread().interrupt();
}
}
}
// 可取消的任务实现
static class CancellableTask implements Runnable {
private volatile boolean cancelled = false;
@Override
public void run() {
if (cancelled) {
return;
}
System.out.println("执行任务: " + System.currentTimeMillis());
// 检查中断状态
if (Thread.currentThread().isInterrupted()) {
System.out.println("任务被中断");
cancelled = true;
return;
}
try {
// 模拟任务执行
Thread.sleep(500);
} catch (InterruptedException e) {
System.out.println("任务执行过程中被中断");
Thread.currentThread().interrupt();
cancelled = true;
}
}
public void cancel() {
this.cancelled = true;
}
}
}
3. 任务取消的最佳实践
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
public class TaskCancellationBestPractices {
public static void main(String[] args) {
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(2);
try {
// 示例1:优雅取消任务
gracefulCancellationExample(scheduler);
// 示例2:超时取消任务
timeoutCancellationExample(scheduler);
// 示例3:批量取消任务
batchCancellationExample(scheduler);
} finally {
scheduler.shutdown();
}
}
// 优雅取消任务示例
private static void gracefulCancellationExample(ScheduledExecutorService scheduler) {
System.out.println("=== 优雅取消任务示例 ===");
AtomicBoolean shouldStop = new AtomicBoolean(false);
ScheduledFuture<?> future = scheduler.scheduleAtFixedRate(() -> {
if (shouldStop.get()) {
System.out.println("任务收到停止信号,准备退出");
return;
}
System.out.println("执行任务...");
// 检查中断状态
if (Thread.currentThread().isInterrupted()) {
System.out.println("任务被中断,准备退出");
shouldStop.set(true);
return;
}
}, 0, 1, TimeUnit.SECONDS);
// 等待一段时间后优雅取消
try {
Thread.sleep(3000);
shouldStop.set(true);
future.cancel(false); // 不中断,让任务自然完成
System.out.println("任务已优雅取消");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
// 超时取消任务示例
private static void timeoutCancellationExample(ScheduledExecutorService scheduler) {
System.out.println("=== 超时取消任务示例 ===");
Future<?> future = scheduler.submit(() -> {
try {
System.out.println("开始执行长时间任务...");
Thread.sleep(10000); // 模拟长时间任务
System.out.println("长时间任务完成");
} catch (InterruptedException e) {
System.out.println("任务被中断");
Thread.currentThread().interrupt();
}
});
try {
// 等待5秒,如果任务未完成则取消
future.get(5, TimeUnit.SECONDS);
System.out.println("任务在超时前完成");
} catch (TimeoutException e) {
System.out.println("任务超时,开始取消...");
boolean cancelled = future.cancel(true);
System.out.println("任务取消结果: " + cancelled);
} catch (Exception e) {
System.out.println("任务执行异常: " + e.getMessage());
}
}
// 批量取消任务示例
private static void batchCancellationExample(ScheduledExecutorService scheduler) {
System.out.println("=== 批量取消任务示例 ===");
List<ScheduledFuture<?>> futures = new ArrayList<>();
// 创建多个任务
for (int i = 0; i < 5; i++) {
final int taskId = i;
ScheduledFuture<?> future = scheduler.scheduleAtFixedRate(() -> {
System.out.println("任务" + taskId + "执行中...");
}, 0, 1, TimeUnit.SECONDS);
futures.add(future);
}
try {
Thread.sleep(3000);
// 批量取消所有任务
System.out.println("开始批量取消任务...");
for (ScheduledFuture<?> future : futures) {
boolean cancelled = future.cancel(true);
System.out.println("任务取消结果: " + cancelled);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
性能优化与注意事项
1. 线程池大小设置
// 根据任务类型设置合适的线程池大小
public class ThreadPoolSizeOptimization {
public static void main(String[] args) {
// CPU密集型任务:线程数 = CPU核心数 + 1
int cpuCores = Runtime.getRuntime().availableProcessors();
ScheduledExecutorService cpuIntensiveScheduler =
Executors.newScheduledThreadPool(cpuCores + 1);
// IO密集型任务:线程数 = CPU核心数 * 2
ScheduledExecutorService ioIntensiveScheduler =
Executors.newScheduledThreadPool(cpuCores * 2);
// 混合型任务:根据实际情况调整
ScheduledExecutorService mixedScheduler =
Executors.newScheduledThreadPool(cpuCores * 3 / 2);
}
}
2. 内存管理
public class MemoryManagementExample {
public static void main(String[] args) {
ScheduledThreadPoolExecutor scheduler = new ScheduledThreadPoolExecutor(2);
// 设置队列大小限制,避免内存溢出
scheduler.setMaximumPoolSize(4);
// 设置线程工厂,控制线程创建
scheduler.setThreadFactory(r -> {
Thread t = new Thread(r);
t.setDaemon(true); // 设置为守护线程
return t;
});
// 设置拒绝策略
scheduler.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy());
}
}
3. 异常处理
public class ExceptionHandlingExample {
public static void main(String[] args) {
ScheduledThreadPoolExecutor scheduler = new ScheduledThreadPoolExecutor(2) {
@Override
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
if (t != null) {
// 记录异常日志
System.err.println("任务执行异常: " + t.getMessage());
t.printStackTrace();
// 可以在这里添加告警机制
sendAlert(t);
}
}
};
// 提交任务时添加异常处理
ScheduledFuture<?> future = scheduler.scheduleAtFixedRate(() -> {
try {
// 任务逻辑
riskyOperation();
} catch (Exception e) {
// 任务内部异常处理
System.err.println("任务内部异常: " + e.getMessage());
// 可以选择重新抛出或记录日志
}
}, 0, 5, TimeUnit.SECONDS);
}
private static void riskyOperation() {
// 模拟可能抛出异常的操作
if (Math.random() < 0.3) {
throw new RuntimeException("随机异常");
}
}
private static void sendAlert(Throwable t) {
// 发送告警的逻辑
System.out.println("发送告警: " + t.getMessage());
}
}
常见问题与解决方案
1. 任务无法取消的问题
public class CancellationProblemExample {
public static void main(String[] args) {
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
// 问题:任务在阻塞操作中无法响应中断
ScheduledFuture<?> problematicFuture = scheduler.scheduleAtFixedRate(() -> {
try {
// 阻塞操作,无法响应中断
System.in.read(); // 这会导致任务无法取消
} catch (IOException e) {
e.printStackTrace();
}
}, 0, 5, TimeUnit.SECONDS);
// 解决方案:使用可中断的阻塞操作
ScheduledFuture<?> correctFuture = scheduler.scheduleAtFixedRate(() -> {
try {
// 使用可中断的阻塞操作
while (!Thread.currentThread().isInterrupted()) {
if (System.in.available() > 0) {
System.in.read();
break;
}
Thread.sleep(100); // 短暂睡眠,可以响应中断
}
} catch (IOException | InterruptedException e) {
Thread.currentThread().interrupt();
}
}, 0, 5, TimeUnit.SECONDS);
}
}
2. 内存泄漏问题
public class MemoryLeakPreventionExample {
public static void main(String[] args) {
ScheduledThreadPoolExecutor scheduler = new ScheduledThreadPoolExecutor(2);
// 问题:长时间运行的任务可能导致内存泄漏
ScheduledFuture<?> leakyFuture = scheduler.scheduleAtFixedRate(() -> {
// 任务逻辑
}, 0, 1, TimeUnit.HOURS);
// 解决方案:定期清理和监控
scheduler.scheduleAtFixedRate(() -> {
// 清理过期的任务引用
cleanupExpiredTasks();
// 监控内存使用情况
monitorMemoryUsage();
}, 0, 10, TimeUnit.MINUTES);
}
private static void cleanupExpiredTasks() {
// 清理逻辑
System.out.println("清理过期任务");
}
private static void monitorMemoryUsage() {
Runtime runtime = Runtime.getRuntime();
long usedMemory = runtime.totalMemory() - runtime.freeMemory();
long maxMemory = runtime.maxMemory();
double memoryUsage = (double) usedMemory / maxMemory;
if (memoryUsage > 0.8) {
System.out.println("内存使用率过高: " + (memoryUsage * 100) + "%");
}
}
}
总结
ScheduledThreadPoolExecutor
和Future.cancel()
是Java并发编程中非常重要的工具,它们提供了强大的任务调度和取消功能。
关键要点:
- 理解机制:深入理解任务调度和取消的内部实现机制
- 正确使用:根据实际需求选择合适的取消策略
- 性能优化:合理设置线程池大小和队列参数
- 异常处理:妥善处理任务执行过程中的异常
- 资源管理:注意内存泄漏和资源清理
最佳实践:
- 优雅取消:优先使用
cancel(false)
,让任务自然完成 - 超时控制:为长时间任务设置超时机制
- 监控告警:建立完善的监控和告警机制
- 资源清理:及时清理不再需要的任务引用
- 测试验证:充分测试任务取消的各种场景
通过深入理解这些机制,开发者可以更好地构建健壮、高效的并发应用程序。
参考资料:
- Java并发编程实战
- Java并发编程的艺术
- Java官方文档
- 相关源码分析