聊城seo:CyclicBarrier是若何成为一个"栅栏"的

admin 6个月前 (04-10) 科技 51 0

CyclicBarrier是一种类似于栅栏的存在,意思就是在栅栏开放之前你都只能被挡在栅栏的一侧,当栅栏移除之后,之前被挡在一侧的多个工具则同时最先动起来。

1. 若何使用CyclicBarrier

  在先容其原理之前,先领会一下CyclicBarrier应该若何使用。

  假设现在有这样的场景,我们需要开一个集会,需要张1、张2、张3三小我私家加入,
集会需要三小我私家都到齐之后才气最先,否则只能干等着;这个场景用CyclicBarrier可以很契合的模拟出来。代码如下:

public static void main(String[] args) {
    // 线程池,每个线程代表一小我私家
    ThreadPoolExecutor executor = ThreadPoolProvider.getInstance();
    // 集会所需的人数为3
    CyclicBarrier barrier = new CyclicBarrier(3);

    executor.execute(() -> {
        try {
            System.err.println("张1到达集会室");
            barrier.await();
            System.err.println("集会最先,张1最先谈话");
        } catch (InterruptedException | BrokenBarrierException e) {
            e.printStackTrace();
        }

    });

    executor.execute(() -> {
        try {
            System.err.println("张2到达集会室");
            barrier.await();
            System.err.println("集会最先,张2最先谈话");
        } catch (InterruptedException | BrokenBarrierException e) {
            e.printStackTrace();
        }

    });

    executor.execute(() -> {
        try {
            System.err.println("张3先去个茅厕,内急解决再去开会");
            TimeUnit.SECONDS.sleep(1);
            System.err.println("张3到达集会室");
            barrier.await();
            System.err.println("集会最先,张3最先谈话");
        } catch (InterruptedException | BrokenBarrierException e) {
            e.printStackTrace();
        }

    });


    executor.shutdown();
}

效果图:

  通过上方代码可以知道CyclicBarrier的几点:

  1. 使用await()来示意完成了某些事情。(上方例子的表现为到达了集会室)
  2. 使用await()之后当前线程就进入壅闭状态,需要守候完全知足CyclicBarrier的条件后叫醒才气继续接下来的操作。(上方例子中 为3小我私家都到达集会室)
  3. 在最后一个线程到达条件之后,之前壅闭的线程所有铺开,继续接下来的操作。(上方例子为张3到达集会室)

  这个简朴的例子也让我们领会CyclicBarrier的使用方式,那来看看其内部究竟是若何实现栅栏的效果的。

2. CyclicBarrier是若何成为"栅栏"的

  从第一节的代码中我们也能看到,需要关注的就两个地方

  1. 组织函数
  2. await()方式

只要领会这两个方式的内部,相当于领会了CyclicBarrier的内部。
那在深入领会之前,先来看下CyclicBarrier的几个变量,不用刻意去记,看代码的时刻知道这个器械做什么用的就行了:

lock:CyclicBarrier类建立的ReentrantLock实例,关于ReentrantLock不清晰的可以->传送。

trip:lock中的conditionCyclicBarrier使用该变量来实现各线程之间的壅闭和同时叫醒。同样,不明白condition作用的=>传送门。

parties:需要知足条件(挪用await方式)的总数,就是说当有parties个线程await()之后就会叫醒所有线程。

barrierCommand:一个Runnable变量,在await方式的挪用次数到达总数parties之后,在叫醒所有线程之前执行其run()方式

generation:其内部类,可以明白为周期,周期内需要完成n个义务,只要一个义务失败,当前周期的所有义务就算失败,竣事当前周期,再开启下个周期。

count:当前周期剩余需要完成的义务数(剩余挪用await方式的次数)

以下为源码:

public class CyclicBarrier {
    // 内部类,可明白为周期
    private static class Generation {
        // 当前周期是否失败
        boolean broken = false;
    }

    // 锁的实例
    private final ReentrantLock lock = new ReentrantLock();
    // ReentrantLock的condition变量,用来控制线程叫醒和壅闭
    private final Condition trip = lock.newCondition();
    // 需要知足条件的次数,即需要挪用await方式的次数
    private final int parties;
    // 知足条件次数到达parties之后,叫醒所有线程之前执行其 run()方式
    private final Runnable barrierCommand;
    // 当前周期
    private Generation generation = new Generation();
    // 剩余知足条件次数
    private int count;
    
    // ...
}

  看完CyclicBarrier的几个变量后,来看其详细的内部实现。

  首先来看组织函数,其组织函数有两个,一个在到达条件总数(parties)后直接叫醒所有线程;另一个指定一个Runnable在到达条件总数后先执行其run()方式再叫醒。

  • 不指定Runnable,参数只有一个:需要杀青的义务数
public CyclicBarrier(int parties) {
    // 直接挪用另一个组织方式,Runnable传null,示意不执行
    this(parties, null);
}
  • 指定Runnable的组织方式,赋值义务总数、剩余义务数、叫醒操作之前的Runnable
public CyclicBarrier(int parties, Runnable barrierAction) {
    if (parties <= 0) throw new IllegalArgumentException();
    // 义务总数
    this.parties = parties;
    // 剩余需要完成的义务数
    this.count = parties;
    // 叫醒之前执行的Runnable
    this.barrierCommand = barrierAction;
}

  在第一节我们使用的是第一个组织方式,来试试第二个

public static void main(String[] args) throws InterruptedException {

    ThreadPoolExecutor executor = ThreadPoolProvider.getInstance();
    /** =======增添Runnable,其他地方保持一致=============*/
    CyclicBarrier barrier = new CyclicBarrier(3, ()-> System.err.println("在集会最先之前,先给人人发下开会资料"));

    executor.execute(() -> {
        try {
            System.err.println("张1到达集会室");
            barrier.await();
            System.err.println("集会最先,张1最先谈话");
        } catch (InterruptedException | BrokenBarrierException e) {
            e.printStackTrace();
        }

    });

    executor.execute(() -> {
        try {
            System.err.println("张2到达集会室");
            barrier.await();
            System.err.println("集会最先,张2最先谈话");
        } catch (InterruptedException | BrokenBarrierException e) {
            e.printStackTrace();
        }

    });

    executor.execute(() -> {
        try {
            System.err.println("张3先去个茅厕,内急解决再去开会");
            TimeUnit.SECONDS.sleep(1);
            System.err.println("张3到达集会室");
            barrier.await();
            System.err.println("集会最先,张3最先谈话");
        } catch (InterruptedException | BrokenBarrierException e) {
            e.printStackTrace();
        }

    });


    executor.shutdown();
}

效果图:

 看完组织函数,就算明白了一半CyclicBarrier了,接下来来看另一半——await();跟踪代码,看到是这样的

public int await() throws InterruptedException, BrokenBarrierException {
    try {
        return dowait(false, 0L);
    } catch (TimeoutException toe) {
        throw new Error(toe); // cannot happen
    }
}

直接挪用dowait方式,传参为false0,意思就是不限时守候,除非线程被打断或者叫醒。再进入dowait方式,这个方式就是CyclicBarrier的另一半,在下方的代码中很清晰的写了整个执行流程

/** 参数说明, timed:是否限时, nanos:限时时间*/
private int dowait(boolean timed, long nanos)
        throws InterruptedException, BrokenBarrierException, TimeoutException {
    // 锁
    final ReentrantLock lock = this.lock;
    // 获取锁,若是失败的话线程睡眠,进入同步行列(AQS中的知识)
    lock.lock();
    try {
        /* 拿到锁之后进入代码处置逻辑*/
        
        // 当前周期
        final Generation g = generation;

        // 若是当前周期是失败的,那么直接抛错
        if (g.broken)
            throw new BrokenBarrierException();

        // 若是当前线程被打断了,那么此次周期失败,设置相关参数,然后抛错
        if (Thread.interrupted()) {
            // 实现代码在下行的注释中,设置相关参数来提醒其他线程周期失败了
            breakBarrier();
            /*
             * private void breakBarrier() {
             *     generation.broken = true;
             *     count = parties;
             *     // 叫醒condition中的所有线程
             *     trip.signalAll();
             * }
             */
            throw new InterruptedException();
        }

        // 若是乐成了,那么剩余义务数(count)减1
        int index = --count;
        // 若是为0则示意到达剩余的义务数没有了,到达CyclicBarrier的条件总数了,需要叫醒其他线程
        if (index == 0) {  
            boolean ranAction = false;
            try {
                // 叫醒之前的Runnable
                final Runnable command = barrierCommand;
                // 若是不为空的话执行其run方式
                if (command != null)
                    command.run();
                ranAction = true;
                // 开启下个周期,这个方式是CyclicBarrier可以复用的缘故原由,详细实现在下行注释
                nextGeneration();
                /* private void nextGeneration() {
                 *     // 首先叫醒当前周期的其他线程,告诉其周期竣事了,可以执行接下来的操作了
                 *     trip.signalAll();
                 *     // 然后开启下个周期,剩余义务数重置
                 *     count = parties;
                 *     // 下个周期
                 *     generation = new Generation();
                 * }
                 */
                return 0;
            } finally {
                if (!ranAction)
                    breakBarrier();
            }
        }

        // 若是还不能竣事本周期,就一直守候直到竣事或者周期失败
        for (;;) {
            try {
                // await的过程中是释放锁的
                // 不限时的话就一直守候直到被叫醒或者打断
                if (!timed)
                    trip.await();
                else if (nanos > 0L)
                    // 否则的话守候一段时间后醒来
                    nanos = trip.awaitNanos(nanos);
            } catch (InterruptedException ie) {
                if (g == generation && ! g.broken) {
                    breakBarrier();
                    throw ie;
                } else {
                    // We're about to finish waiting even if we had not
                    // been interrupted, so this interrupt is deemed to
                    // "belong" to subsequent execution.
                    Thread.currentThread().interrupt();
                }
            }

            if (g.broken)
                throw new BrokenBarrierException();

            if (g != generation)
                return index;

            if (timed && nanos <= 0L) {
                breakBarrier();
                throw new TimeoutException();
            }
        }
    } finally {
        // 释放锁
        lock.unlock();
    }
}

  到这里就基本明白CyclicBarrier的内部实现了,其他像带参数的await也是一样逻辑,只不过是多了限时的条件而已。

  实在若是你领会ReentrantLock的话,就知道CyclicBarrier整个就是对ReentrantLockcondition的活用而已。

3.总结

  整体来说CyclicBarrier的实现相对较简朴,说是ReentrantLockcondition的升级版也不为过。其要害点为两个,一个为其组织函数,决议义务个数和叫醒前操作;另外一个点为await方式,在正常情况下每次await都市削减一个义务数(总数由组织方式决议),在义务数变为0的时刻示意周期竣事,需要叫醒condition的其他线程,而途中遇到失败的话当前周期失败,叫醒其他线程一起抛错。



失败不会让你变得弱小,畏惧失败会。

,

sunbet

www.ysycy.com与伊顺源清真餐饮达成战略合作,在伊顺及亚太地区建立直营平台。为Sunbet会员提供线上多种娱乐游戏,将用完善的技术、贴心的服务、雄厚的资金赢取每位Sunbet代理、会员的口碑。

皇冠体育声明:该文看法仅代表作者自己,与本平台无关。转载请注明:聊城seo:CyclicBarrier是若何成为一个"栅栏"的

网友评论

  • (*)

最新评论

文章归档

站点信息

  • 文章总数:755
  • 页面总数:0
  • 分类总数:8
  • 标签总数:1285
  • 评论总数:395
  • 浏览总数:23318