太原天气预报:若何让ThreadPoolExecutor更早地建立非焦点线程

admin 6个月前 (04-29) 科技 43 0

最近在项目中遇到一个需要用线程池来处置义务的需求,于是我用ThreadPoolExecutor来实现,然则在实现过程中我发现提交大量义务时它的处置逻辑是这样的(提交义务另有一个submit方式内部也挪用了execute方式):

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /*
         * Proceed in 3 steps:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn't, by returning false.
         *
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         *
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
         */
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }

注释中已经写的异常明了:

  1. 若是线程数目小于corePoolSize,直接建立新线程处置义务
  2. 若是线程数目即是corePoolSize,实验将义务放到守候行列里
  3. 若是守候行列已满,实验建立非焦点线程处置义务(若是maximumPoolSIze > corePoolSize

然则在我的项目中一个线程启动需要10s左右的时间(需要启动一个浏览器工具),因此我希望实现一个更精致的逻辑提升资源的利用率:

  1. 线程池保持corePoolSize个线程确保有新义务到来时可以立刻获得执行
  2. 当没有空闲线程时,先把义务放到守候行列中(由于开启一个线程需要10s,以是若是在守候行列比较小的时刻,守候其他义务完成比守候新线程建立更快)
  3. 当守候行列的巨细大于设定的阈值threshold时,说明聚积的义务已经太多了,这个时刻最先建立非焦点线程直到线程数目已经即是maximumPoolSize
  4. 当线程数目已经即是maximumPoolSize,再将新来的义务放回到义务行列中守候(直到行列满后最先拒绝义务)
  5. 长时间空闲后退出非焦点线程接纳浏览器占用的内存资源

当我研究了常见的CachedThreadPoolFixedThreadPool以及实验自己设置ThreadPoolExecutor的组织函数后,发现无论如何都不能实现上面提到的逻辑,由于默认的实现只有在workQueue到达容量上限后才会最先建立非焦点线程,因此需要通过继续的方式实现一个新的类来完成需求。

怎么实现在workQueue到达容量上限前就建立非焦点线程?还要回首下execute函数的代码

					//实验将义务插入守候行列,若是返回false
					//说明行列已经到达容量上限,进入else if逻辑
if (isRunning(c) && workQueue.offer(command)) {
    int recheck = ctl.get();
    if (! isRunning(recheck) && remove(command))
        reject(command);
    else if (workerCountOf(recheck) == 0)
        addWorker(null, false);
}
//实验建立非焦点线程
else if (!addWorker(command, false))
    reject(command);

那么只要改变workQueue.offer()的逻辑,在线程数目还小于maximumPoolSize的时刻就返回false拒绝插入,让线程池挪用addWoker,等不能再建立更多线程时再允许添加到行列即可。

可以通过子类重写offer方式来实现添加逻辑的改变

@Override
public boolean offer(E e) {
    if (threadPoolExecutor == null) {
        throw new NullPointerException();
    }
    //当挪用该方式时,已经确定了workerCountOf(c) > corePoolSize
    //当数目小于threshold,在行列里守候
    if (size() < threshold) {
        return super.offer(e);
	//当数目大于即是threshold,说明聚积的义务太多,返回false
	//让线程池来建立新线程处置
    } else {
        //此处可能会由于多线程导致错误的拒绝
        if (threadPoolExecutor.getPoolSize() < threadPoolExecutor.getMaximumPoolSize()) {
            return false;
        //线程池中的线程数目已经到达上限,只能添加到义务行列中
        } else {
            return super.offer(e);
        }
    }
}

这样就实现了基本实现了我需要的功效,然则在写代码的过程中我找到了一个可能失足的地方:ThreadPoolExecutor是线程平安的,那么重写的offer方式也可能遇到多线程挪用的情形

//设想当poolSize = maximumPoolSize-1时,两个义务到达此处同时返回false
if (threadPoolExecutor.getPoolSize() < threadPoolExecutor.getMaximumPoolSize()) {
	return false;
}

由于添加到行列返回falseexecute方式进入到else if (!addWorker(command, false))

if (isRunning(c) && workQueue.offer(command)) {
	int recheck = ctl.get();
	if (! isRunning(recheck) && remove(command))
		reject(command);
	else if (workerCountOf(recheck) == 0)
		addWorker(null, false);
}
//添加到行列失败后进入addWorker方式中
else if (!addWorker(command, false))
	reject(command);
}

再来看一下addWorker方式的代码,这里只截取需要的一部门

for (;;) {
    int wc = workerCountOf(c);
    if (wc >= CAPACITY ||
    	//两个线程都以为还可以建立再建立一个新线程
        wc >= (core ? corePoolSize : maximumPoolSize))
        return false;
        //两个线程同时挪用cas方式只有一个能够乐成
        //乐成的线程break retry;进入后面的建立线程的逻辑
        //失败的线程重新回到上面的检查并返回false
    if (compareAndIncrementWorkerCount(c))
        break retry;
    c = ctl.get();  // Re-read ctl
    if (runStateOf(c) != rs)
        continue retry;
    // else CAS failed due to workerCount change; retry inner loop
}

最终,在竞争中失败的线程由于addWorker方式返回了false最终挪用了reject(command)。在前面写的要实现的逻辑里提到了,只有在守候行列容量到达上限无法再插入时才拒绝义务,然则由于多线程的缘故原由,这里只是跨越了threshold但没有跨越capacity的时刻就拒绝义务了,以是要对拒绝计谋的触发做出修改:第一次触发Reject时,实验重新添加到义务行列中(不举行poolSize的检测),若是仍然不能添加,再拒绝义务
这里通过对execute方式举行重写来实现重试

@Override
public void execute(Runnable command) {
    try {
        super.execute(command);
    } catch (RejectedExecutionException e) {
    	/*
    	这里参考源码中将义务添加到义务行列的实现
    	然则其中通过(workerCountOf(recheck) == 0)
    	检查当义务添加到行列后是否另有线程存活的部门
    	由于是private权限的,无法实现类似的逻辑,因此需要做一定的特殊处置
		if (isRunning(c) && workQueue.offer(command)) {
		     int recheck = ctl.get();
		     if (! isRunning(recheck) && remove(command))
		         reject(command);
		     else if (workerCountOf(recheck) == 0)
		         addWorker(null, false);
		 }
		*/
        if (!this.isShutdown() && ((MyLinkedBlockingQueue)this.getQueue()).offerWithoutCheck(command)) {
            if (this.isShutdown() && remove(command))
                //二次检查
                realRejectedExecutionHandler.rejectedExecution(command, this);
            } else {
                //插入失败,行列已经满了
                realRejectedExecutionHandler.rejectedExecution(command, this);
            }
        }
    }
}

这里有两个小问题

  1. 初始化线程池传入的RejectedExecutionHandler纷歧定会抛出异常(事实上,ThreadPoolExecutor自己实现的4中拒绝计谋中只有AbortPolicy能够抛出异常并被捕捉到),因此需要在初始化父类时传入AbortPolicy拒绝计谋并将组织函数中传入的自定义拒绝计谋保留下来,在重试失败后才挪用自己的rejectedExecution
  2. corePoolSize = 0 的极端情形下,可能泛起一个义务刚被插入行列的同时,所有的线程都结束义务然后被销毁了,此使这个被加入的义务就无法被执行,在ThreadPoolExecutor中是通过
    else if (workerCountOf(recheck) == 0)
    	addWorker(null, false);
    
    在添加后再检查工作线程是否为0来确保义务可以被执行,然则其中使用的方式是私有的,无法在子类中实现类似的逻辑,因此在初始化时只能强制corePoolSize至少为1来解决这个问题。

所有代码如下

public class MyThreadPool extends ThreadPoolExecutor {

    private RejectedExecutionHandler realRejectedExecutionHandler;

    public MyThreadPool(int corePoolSize,
                        int maximumPoolSize,
                        long keepAliveTime,
                        TimeUnit unit,
                        int queueCapacity) {
        this(corePoolSize,
                maximumPoolSize,
                keepAliveTime,
                unit,
                queueCapacity,
                new AbortPolicy());
    }

    public MyThreadPool(int corePoolSize,
                        int maximumPoolSize,
                        long keepAliveTime,
                        TimeUnit unit,
                        int queueCapacity,
                        RejectedExecutionHandler handler) {
        super(corePoolSize == 0 ? 1 : corePoolSize,
                maximumPoolSize,
                keepAliveTime,
                unit,
                new MyLinkedBlockingQueue<>(queueCapacity),
                new AbortPolicy());
        ((MyLinkedBlockingQueue)this.getQueue()).setThreadPoolExecutor(this);
        realRejectedExecutionHandler = handler;
    }

    @Override
    public void execute(Runnable command) {
        try {
            super.execute(command);
        } catch (RejectedExecutionException e) {
            if (!this.isShutdown() && ((MyLinkedBlockingQueue)this.getQueue()).offerWithoutCheck(command)) {
                if (this.isShutdown() && remove(command))
                    //二次检查
                    realRejectedExecutionHandler.rejectedExecution(command, this);
            } else {
                //插入失败,行列已经满了
                realRejectedExecutionHandler.rejectedExecution(command, this);
            }
        }
    }
}


public class MyLinkedBlockingQueue<E> extends LinkedBlockingQueue<E> {

    private int threshold = 20;

    private ThreadPoolExecutor threadPoolExecutor = null;

    public MyLinkedBlockingQueue(int queueCapacity) {
        super(queueCapacity);
    }

    public void setThreadPoolExecutor(ThreadPoolExecutor threadPoolExecutor) {
        this.threadPoolExecutor = threadPoolExecutor;
    }

    @Override
	public boolean offer(E e) {
	    if (threadPoolExecutor == null) {
	        throw new NullPointerException();
	    }
	    //当挪用该方式时,已经确定了workerCountOf(c) > corePoolSize
	    //当数目小于threshold,在行列里守候
	    if (size() < threshold) {
	        return super.offer(e);
		//当数目大于即是threshold,说明聚积的义务太多,返回false
		//让线程池来建立新线程处置
	    } else {
	        //此处可能会由于多线程导致错误的拒绝
	        if (threadPoolExecutor.getPoolSize() < threadPoolExecutor.getMaximumPoolSize()) {
	            return false;
	        //线程池中的线程数目已经到达上限,只能添加到义务行列中
	        } else {
	            return super.offer(e);
	        }
	    }
	}

    public boolean offerWithoutCheck(E e) {
        return super.offer(e);
    }
}

最后举行简朴的测试

corePoolSize:2
maximumPoolSize:5
queueCapacity:10
threshold:7
义务2
线程数目:2
守候行列巨细:0
守候行列巨细小于阈值,继续守候。
义务3
线程数目:2
守候行列巨细:1
守候行列巨细小于阈值,继续守候。
义务4
线程数目:2
守候行列巨细:2
守候行列巨细小于阈值,继续守候。
义务5
线程数目:2
守候行列巨细:3
守候行列巨细小于阈值,继续守候。
义务6
线程数目:2
守候行列巨细:4
守候行列巨细小于阈值,继续守候。
义务7
线程数目:2
守候行列巨细:5
守候行列巨细小于阈值,继续守候。
义务8
线程数目:2
守候行列巨细:6
守候行列巨细小于阈值,继续守候。
义务9
线程数目:2
守候行列巨细:7
守候行列巨细大于即是阈值,线程数目小于MaximumPoolSize,建立新线程处置。
义务10
线程数目:3
守候行列巨细:7
守候行列巨细大于即是阈值,线程数目小于MaximumPoolSize,建立新线程处置。
义务11
线程数目:4
守候行列巨细:7
守候行列巨细大于即是阈值,线程数目小于MaximumPoolSize,建立新线程处置。
义务12
线程数目:5
守候行列巨细:7
守候行列巨细大于即是阈值,但线程数目大于即是MaximumPoolSize,只能添加到行列中。
义务13
线程数目:5
守候行列巨细:8
守候行列巨细大于即是阈值,但线程数目大于即是MaximumPoolSize,只能添加到行列中。
义务14
线程数目:5
守候行列巨细:9
守候行列巨细大于即是阈值,但线程数目大于即是MaximumPoolSize,只能添加到行列中。
义务15
线程数目:5
守候行列巨细:10
守候行列巨细大于即是阈值,但线程数目大于即是MaximumPoolSize,只能添加到行列中。
行列已满
义务16
线程数目:5
守候行列巨细:10
守候行列巨细大于即是阈值,但线程数目大于即是MaximumPoolSize,只能添加到行列中。
行列已满

再重新温习一遍要实现的功效:

  1. 线程池保持corePoolSize个线程确保有新义务到来时可以立刻获得执行
  2. 当没有空闲线程时,先把义务放到守候行列中(由于开启一个线程需要10s,以是若是在守候行列比较小的时刻,守候其他义务完成比守候新线程建立更快)
  3. 当守候行列的巨细大于设定的阈值threshold时,说明聚积的义务已经太多了,这个时刻最先建立非焦点线程直到线程数目已经即是maximumPoolSize
  4. 当线程数目已经即是maximumPoolSize,再将新来的义务放回到义务行列中守候(直到行列满后最先拒绝义务)
  5. 长时间空闲后退出非焦点线程接纳浏览器占用的内存资源

可以看出,线程池运行的逻辑和要实现的目的是相同的。

,

sunbet 申博

Sunbet 申博www.1888ss.com秉承以客为先的理念,多年运营、专业团队、专注服务、值得信赖。

皇冠体育声明:该文看法仅代表作者自己,与本平台无关。转载请注明:太原天气预报:若何让ThreadPoolExecutor更早地建立非焦点线程

网友评论

  • (*)

最新评论

文章归档

站点信息

  • 文章总数:755
  • 页面总数:0
  • 分类总数:8
  • 标签总数:1285
  • 评论总数:395
  • 浏览总数:23318