博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Java多线程知识点整理(线程池)
阅读量:6970 次
发布时间:2019-06-27

本文共 7098 字,大约阅读时间需要 23 分钟。

hot3.png

1.线程池的使用

    线程池一般配合队列一起工作,是线程池限制并发处理任务的数量。然后设置队列的大小,当任务超过队列大小时,通过一定的拒绝策略来处理,这样可以保护系统免受大流量而导致崩溃--只是部分拒绝服务,还是有一部分是可以正常服务的。

    线程池一般有核心线程池大小和线程池最大大小配置,当线程池中的线程空闲一段时间时将会回收,而核心线程池中的线程不会被回收。

    多少个线程合适呢?建议根据实际业务情况来压测决定,或者根据利特法则来算出一个合理的线程池大小。Java提供了ExecutorService的几种实现:

    a.ThreadPoolExecutor:标准线程池。

    b.newCachedThreadPool创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。

    c.newFixedThreadPool 创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。

    d.newScheduledThreadPool 创建一个定长线程池,支持定时及周期性任务执行。

    e.newSingleThreadExecutor 创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。

    f.ForkJoinPool:类似于ThreadPoolExecutor,但是使用work-stealing模式,其会为线程池中的每个线程创建一个队列,从而用work-stealing(任务窃取)算法使得线程可以从其他线程队列里窃取任务来执行。即如果自己的任务处理完成了,则可以去忙碌的工作线程那里窃取任务执行。

2.线程池简单分析

 2.1 、创建单线程的线程池:newSingleThreadExecutor 

ExecutorService  executorService=  Executors.newSingleThreadExecutor();

等价于

return new FinalizableDelegatedExecutorService            (new ThreadPoolExecutor(1, 1,                                    0L, TimeUnit.MILLISECONDS,                                    new LinkedBlockingQueue
()));

2.2、创建固定数量的线程池

ExecutorService  executorService1=  Executors.newFixedThreadPool(10);

等价于

return new ThreadPoolExecutor(nThreads, nThreads,                                      0L, TimeUnit.MILLISECONDS,                                      new LinkedBlockingQueue
());

注意:单线程的线程池与固定数量的线程池使用队列的策略是一样的,如果固定数量的线程池为1,则相当于单线程的线程池。

dubbo源码的使用:

// 文件缓存定时写入    private final ExecutorService registryCacheExecutor = Executors.newFixedThreadPool(1, new NamedThreadFactory("DubboSaveRegistryCache", true));//执行代码if (syncSaveFile) {      doSaveProperties(version); } else {         registryCacheExecutor.execute(new SaveProperties(version)); }

 

2.3、创建可缓存的线程池,初始大小为0,线程池最大大小为Integer.MAX_VALUE。其使用SynchronousQueue队列,一个没有数据缓冲的阻塞队列。对其执行put操作后必须等待take操作消费该数据,反之亦然。该线程池不限制最大大小,如果线程池有空闲则复用,否则会创建一个新线程。如果线程池中的线程空闲60秒,则将被回收。该线程默认最大大小为Integer.MAX_VALUE,请确定必要后再使用该线程池。

ExecutorService  executorService2=  Executors.newCachedThreadPool();

等价于

return new ThreadPoolExecutor(0, Integer.MAX_VALUE,                                      60L, TimeUnit.SECONDS,                                      new SynchronousQueue
());

dubbo源码:在集群的时候使用到了,因为并不知道,传递过来的集群参数是多少。

private ExecutorService executor = Executors.newCachedThreadPool(new NamedThreadFactory("mergeable-cluster-executor", true));       @SuppressWarnings("rawtypes")	public Result invoke(final Invocation invocation) throws RpcException {        List
> invokers = directory.list(invocation); Map
> results = new HashMap
>(); for( final Invoker
invoker : invokers ) { Future
future = executor.submit( new Callable
() { public Result call() throws Exception { return invoker.invoke(new RpcInvocation(invocation, invoker)); } } ); results.put( invoker.getUrl().getServiceKey(), future ); }

2.4、支持延迟执行的线程池,其使用DelayedWorkQueue实现任务延迟。

ExecutorService  executorService3=  Executors.newScheduledThreadPool(10);

等价于

public ScheduledThreadPoolExecutor(int corePoolSize) {        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,              new DelayedWorkQueue());    }

使用的案例:

Dubbo源码

//1.检测并连接注册中心,使用的是newScheduledThreadPool  //定义一个全局的线程池:// 定时任务执行器private final ScheduledExecutorService retryExecutor = Executors.newScheduledThreadPool(1, new NamedThreadFactory("DubboRegistryFailedRetryTimer", true));    // 失败重试定时器,定时检查是否有请求失败,如有,无限次重试    private final ScheduledFuture
retryFuture;public FailbackRegistry(URL url) { super(url); int retryPeriod = url.getParameter(Constants.REGISTRY_RETRY_PERIOD_KEY, Constants.DEFAULT_REGISTRY_RETRY_PERIOD); this.retryFuture = retryExecutor.scheduleWithFixedDelay(new Runnable() { public void run() { // 检测并连接注册中心 try { retry(); } catch (Throwable t) { // 防御性容错 logger.error("Unexpected error occur at failed retry, cause: " + t.getMessage(), t); } } }, retryPeriod, retryPeriod, TimeUnit.MILLISECONDS); }

2.5、work-stealing线程池,默认为并行行数为Runtime.getRuntime().availableProcessors()

ExecutorService  executorService4=  Executors.newWorkStealingPool(2);

等价于

return new ForkJoinPool            (Runtime.getRuntime().availableProcessors(),             ForkJoinPool.defaultForkJoinWorkerThreadFactory,             null, true);

 

3.线程池终止

线程池不在使用记得停止线程,可以调用shutdown以确保不接受新任务,并等待线程池中任务处理完成后再退出,或调用shutdownNow清除未执行任务,并用Thread.interrupt停止正在执行的任务。然后调用awaitTermination方法等待终止操作执行完成。

static ExecutorService  executorService3=  Executors.newScheduledThreadPool(10);	public static void main(String[] args) {		executorService3.shutdown();		try {			executorService3.awaitTermination(30, TimeUnit.SECONDS);		} catch (InterruptedException e) {			// TODO Auto-generated catch block			e.printStackTrace();		}			}

总结:在使用线程池时务必须设置池大小、队列大小并设置相应的拒绝策略(RejectedExcutionHandler)。线程池执行情况下无法捕获堆栈上下文,因此任务要记录相关参数,以方便定位提交任务的源头及定位引起问题的源头。

4.ThreadPoolExecutor六个核心参数

public ThreadPoolExecutor(int corePoolSize,                          int maximumPoolSize,                          long keepAliveTime,                          TimeUnit unit,                          BlockingQueue
workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)

4.1、corePoolSize

核心池的大小。在创建了线程池之后,默认情况下,线程池中没有任何线程,而是等待有任务到来才创建线程去执行任务。默认情况下,在创建了线程池之后,线程池钟的线程数为0,当有任务到来后就会创建一个线程去执行任务。

4.2、maximumPoolSize

池中允许的最大线程数,这个参数表示了线程池中最多能创建的线程数量,当任务数量比corePoolSize大时,任务添加到workQueue,当workQueue满了,将继续创建线程以处理任务,maximumPoolSize表示的就是wordQueue满了,线程池中最多可以创建的线程数量。

4.3、keepAliveTime

只有当线程池中的线程数大于corePoolSize时,这个参数才会起作用。当线程数大于corePoolSize时,终止前多余的空闲线程等待新任务的最长时间。

4.4、unit

keepAliveTime时间单位。

4.5、workQueue

存储还没来得及执行的任务。

4.6、threadFactory

执行程序创建新线程时使用的工厂。

4.7、handler

由于超出线程范围和队列容量而使执行被阻塞时所使用的处理程序。

总结:上面的内容,其他应该都相对比较好理解,只有corePoolSize和maximumPoolSize需要多思考。这里要特别再举例以四条规则解释一下这两个参数:

1、池中线程数小于corePoolSize,新任务都不排队而是直接添加新线程

2、池中线程数大于等于corePoolSize,workQueue未满,首选将新任务加入workQueue而不是添加新线程

3、池中线程数大于等于corePoolSize,workQueue已满,但是线程数小于maximumPoolSize,添加新的线程来处理被添加的任务

4、池中线程数大于大于corePoolSize,workQueue已满,并且线程数大于等于maximumPoolSize,新任务被拒绝,使用handler处理被拒绝的任务

ThreadPoolExecutor的使用很简单,前面的代码也写过例子了。通过execute(Runnable command)方法来发起一个任务的执行,通过shutDown()方法来对已经提交的任务做一个有效的关闭。尽管线程池很好,但我们要注意JDK API的一段话:

强烈建议程序员使用较为方便的Executors工厂方法Executors.newCachedThreadPool()(无界线程池,可以进行线程自动回收)、Executors.newFixedThreadPool(int)(固定大小线程池)和Executors.newSingleThreadExecutor()(单个后台线程),它们均为大多数使用场景预定义了设置。

所以,跳开对ThreadPoolExecutor的关注(还是那句话,有问题查询JDK API),重点关注一下JDK推荐的Executors。

4.8、四种拒绝策略

所谓拒绝策略之前也提到过了,任务太多,超过maximumPoolSize了怎么把?当然是接不下了,接不下那只有拒绝了。拒绝的时候可以指定拒绝策略,也就是一段处理程序。

决绝策略的父接口是RejectedExecutionHandler,JDK本身在ThreadPoolExecutor里给用户提供了四种拒绝策略,看一下:

1、AbortPolicy

直接抛出一个RejectedExecutionException,这也是JDK默认的拒绝策略。

2、CallerRunsPolicy

尝试直接运行被拒绝的任务,如果线程池已经被关闭了,任务就被丢弃了。

3、DiscardOldestPolicy

移除最晚的那个没有被处理的任务,然后执行被拒绝的任务。同样,如果线程池已经被关闭了,任务就被丢弃了。

4、DiscardPolicy

不能执行的任务将被删除。

 

转载于:https://my.oschina.net/u/2380961/blog/1595372

你可能感兴趣的文章
ipad%E5%A3%81%E7%BA%B81-1000x288.jpg
查看>>
浏览器兼容之旅的第一站:如何创建条件样式
查看>>
docker swarm英文文档学习-5-在swarm模式中运行Docker引擎
查看>>
利用NX Open在NX中创建点并获取点的坐标信息在listing_windows中输出
查看>>
【Erlang新手成长日记】JSON编码与解码
查看>>
Linux下安装tomcat和jdk
查看>>
实验四+065+方绎杰
查看>>
js中函数作用域,作用域链,变量提升
查看>>
c/c++ sizeof运算符详解以及对象大小
查看>>
HDU2046 骨牌铺方格【递推】
查看>>
第二个spring,第一天
查看>>
问题集录--从初级java程序员到架构师,从小工到专家
查看>>
CSS3自定义滚动条样式 -webkit-scrollbar(转)
查看>>
VC下动态数据交换技术
查看>>
docker容器的分层思想
查看>>
决策树
查看>>
【转】app瘦身
查看>>
拓扑排序
查看>>
【转】获取Windows系统明文密码神器
查看>>
Rhel6-keepalived+lvs配置文档
查看>>