1.线程池的使用
线程池一般配合队列一起工作,是线程池限制并发处理任务的数量。然后设置队列的大小,当任务超过队列大小时,通过一定的拒绝策略来处理,这样可以保护系统免受大流量而导致崩溃--只是部分拒绝服务,还是有一部分是可以正常服务的。
线程池一般有核心线程池大小和线程池最大大小配置,当线程池中的线程空闲一段时间时将会回收,而核心线程池中的线程不会被回收。
多少个线程合适呢?建议根据实际业务情况来压测决定,或者根据利特法则来算出一个合理的线程池大小。Java提供了ExecutorService的几种实现:
a.ThreadPoolExecutor:标准线程池。
b.newCachedThreadPool创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。
c.newFixedThreadPool 创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。
d.newScheduledThreadPool 创建一个定长线程池,支持定时及周期性任务执行。
e.newSingleThreadExecutor 创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。
f.ForkJoinPool:类似于ThreadPoolExecutor,但是使用work-stealing模式,其会为线程池中的每个线程创建一个队列,从而用work-stealing(任务窃取)算法使得线程可以从其他线程队列里窃取任务来执行。即如果自己的任务处理完成了,则可以去忙碌的工作线程那里窃取任务执行。
2.线程池简单分析
2.1 、创建单线程的线程池:newSingleThreadExecutor
ExecutorService executorService= Executors.newSingleThreadExecutor();
等价于
return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue()));
2.2、创建固定数量的线程池
ExecutorService executorService1= Executors.newFixedThreadPool(10);
等价于
return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue());
注意:单线程的线程池与固定数量的线程池使用队列的策略是一样的,如果固定数量的线程池为1,则相当于单线程的线程池。
dubbo源码的使用:
// 文件缓存定时写入 private final ExecutorService registryCacheExecutor = Executors.newFixedThreadPool(1, new NamedThreadFactory("DubboSaveRegistryCache", true));//执行代码if (syncSaveFile) { doSaveProperties(version); } else { registryCacheExecutor.execute(new SaveProperties(version)); }
2.3、创建可缓存的线程池,初始大小为0,线程池最大大小为Integer.MAX_VALUE。其使用SynchronousQueue队列,一个没有数据缓冲的阻塞队列。对其执行put操作后必须等待take操作消费该数据,反之亦然。该线程池不限制最大大小,如果线程池有空闲则复用,否则会创建一个新线程。如果线程池中的线程空闲60秒,则将被回收。该线程默认最大大小为Integer.MAX_VALUE,请确定必要后再使用该线程池。
ExecutorService executorService2= Executors.newCachedThreadPool();
等价于
return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue());
dubbo源码:在集群的时候使用到了,因为并不知道,传递过来的集群参数是多少。
private ExecutorService executor = Executors.newCachedThreadPool(new NamedThreadFactory("mergeable-cluster-executor", true)); @SuppressWarnings("rawtypes") public Result invoke(final Invocation invocation) throws RpcException { List> invokers = directory.list(invocation); Map > results = new HashMap >(); for( final Invoker invoker : invokers ) { Future future = executor.submit( new Callable () { public Result call() throws Exception { return invoker.invoke(new RpcInvocation(invocation, invoker)); } } ); results.put( invoker.getUrl().getServiceKey(), future ); }
2.4、支持延迟执行的线程池,其使用DelayedWorkQueue实现任务延迟。
ExecutorService executorService3= Executors.newScheduledThreadPool(10);
等价于
public ScheduledThreadPoolExecutor(int corePoolSize) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue()); }
使用的案例:
Dubbo源码
//1.检测并连接注册中心,使用的是newScheduledThreadPool //定义一个全局的线程池:// 定时任务执行器private final ScheduledExecutorService retryExecutor = Executors.newScheduledThreadPool(1, new NamedThreadFactory("DubboRegistryFailedRetryTimer", true)); // 失败重试定时器,定时检查是否有请求失败,如有,无限次重试 private final ScheduledFuture retryFuture;public FailbackRegistry(URL url) { super(url); int retryPeriod = url.getParameter(Constants.REGISTRY_RETRY_PERIOD_KEY, Constants.DEFAULT_REGISTRY_RETRY_PERIOD); this.retryFuture = retryExecutor.scheduleWithFixedDelay(new Runnable() { public void run() { // 检测并连接注册中心 try { retry(); } catch (Throwable t) { // 防御性容错 logger.error("Unexpected error occur at failed retry, cause: " + t.getMessage(), t); } } }, retryPeriod, retryPeriod, TimeUnit.MILLISECONDS); }
2.5、work-stealing线程池,默认为并行行数为Runtime.getRuntime().availableProcessors()
ExecutorService executorService4= Executors.newWorkStealingPool(2);
等价于
return new ForkJoinPool (Runtime.getRuntime().availableProcessors(), ForkJoinPool.defaultForkJoinWorkerThreadFactory, null, true);
3.线程池终止
线程池不在使用记得停止线程,可以调用shutdown以确保不接受新任务,并等待线程池中任务处理完成后再退出,或调用shutdownNow清除未执行任务,并用Thread.interrupt停止正在执行的任务。然后调用awaitTermination方法等待终止操作执行完成。
static ExecutorService executorService3= Executors.newScheduledThreadPool(10); public static void main(String[] args) { executorService3.shutdown(); try { executorService3.awaitTermination(30, TimeUnit.SECONDS); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } }
总结:在使用线程池时务必须设置池大小、队列大小并设置相应的拒绝策略(RejectedExcutionHandler)。线程池执行情况下无法捕获堆栈上下文,因此任务要记录相关参数,以方便定位提交任务的源头及定位引起问题的源头。
4.ThreadPoolExecutor六个核心参数
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueueworkQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
4.1、corePoolSize
核心池的大小。在创建了线程池之后,默认情况下,线程池中没有任何线程,而是等待有任务到来才创建线程去执行任务。默认情况下,在创建了线程池之后,线程池钟的线程数为0,当有任务到来后就会创建一个线程去执行任务。
4.2、maximumPoolSize
池中允许的最大线程数,这个参数表示了线程池中最多能创建的线程数量,当任务数量比corePoolSize大时,任务添加到workQueue,当workQueue满了,将继续创建线程以处理任务,maximumPoolSize表示的就是wordQueue满了,线程池中最多可以创建的线程数量。
4.3、keepAliveTime
只有当线程池中的线程数大于corePoolSize时,这个参数才会起作用。当线程数大于corePoolSize时,终止前多余的空闲线程等待新任务的最长时间。
4.4、unit
keepAliveTime时间单位。
4.5、workQueue
存储还没来得及执行的任务。
4.6、threadFactory
执行程序创建新线程时使用的工厂。
4.7、handler
由于超出线程范围和队列容量而使执行被阻塞时所使用的处理程序。
总结:上面的内容,其他应该都相对比较好理解,只有corePoolSize和maximumPoolSize需要多思考。这里要特别再举例以四条规则解释一下这两个参数:
1、池中线程数小于corePoolSize,新任务都不排队而是直接添加新线程
2、池中线程数大于等于corePoolSize,workQueue未满,首选将新任务加入workQueue而不是添加新线程
3、池中线程数大于等于corePoolSize,workQueue已满,但是线程数小于maximumPoolSize,添加新的线程来处理被添加的任务
4、池中线程数大于大于corePoolSize,workQueue已满,并且线程数大于等于maximumPoolSize,新任务被拒绝,使用handler处理被拒绝的任务
ThreadPoolExecutor的使用很简单,前面的代码也写过例子了。通过execute(Runnable command)方法来发起一个任务的执行,通过shutDown()方法来对已经提交的任务做一个有效的关闭。尽管线程池很好,但我们要注意JDK API的一段话:
强烈建议程序员使用较为方便的Executors工厂方法Executors.newCachedThreadPool()(无界线程池,可以进行线程自动回收)、Executors.newFixedThreadPool(int)(固定大小线程池)和Executors.newSingleThreadExecutor()(单个后台线程),它们均为大多数使用场景预定义了设置。
所以,跳开对ThreadPoolExecutor的关注(还是那句话,有问题查询JDK API),重点关注一下JDK推荐的Executors。
4.8、四种拒绝策略
所谓拒绝策略之前也提到过了,任务太多,超过maximumPoolSize了怎么把?当然是接不下了,接不下那只有拒绝了。拒绝的时候可以指定拒绝策略,也就是一段处理程序。
决绝策略的父接口是RejectedExecutionHandler,JDK本身在ThreadPoolExecutor里给用户提供了四种拒绝策略,看一下:
1、AbortPolicy
直接抛出一个RejectedExecutionException,这也是JDK默认的拒绝策略。
2、CallerRunsPolicy
尝试直接运行被拒绝的任务,如果线程池已经被关闭了,任务就被丢弃了。
3、DiscardOldestPolicy
移除最晚的那个没有被处理的任务,然后执行被拒绝的任务。同样,如果线程池已经被关闭了,任务就被丢弃了。
4、DiscardPolicy
不能执行的任务将被删除。