`
粟谷_sugu
  • 浏览: 25376 次
社区版块
存档分类
最新评论

java的JUC包系列文章(一): 聊一聊countdownLatch

阅读更多
    从今天开始,尽量多花一点时间在这个技术博客上,去成为一个优秀程序员 dloading...

    好了这个系列是想要系统的研究一下java的几乎可以是最重要的并发包(java.util.concurrent 以下简称JUC)里的一些东西,聊到JUC就不可避免的要聊到AbstractQueuedSynchronizer(以下简称AQS)这个关键的类,在我们本系列的文章中也会反复的出现。但是作为本系列的第一篇文章,我并不想先对AQS这个东西做详细的介绍,我会先从一个我认为比较简单的实现方法中进入到整个的java并发的世界,所以开始我们的并发之旅吧
介绍我们本文的主人公CountdownLatch(以下简称CDL)。
首先这个CDL是干什么的呢?为什么java的作者大神会把他放到并发的范围内呢。通俗的话来说,他的作用是使主任务可以在指定的几个其他任务完成之后再开始运行,CDL在场景里的作用就相当于一个计数器,ok,三个任务都完成了,那我就开始我的主要任务了。看到这里,可能会有人问,这种描述的场景好像跟join这个方法的用法很类似,也可以直接将三个任务都join到主任务不就行了?这里先留个问题,在文章的最后会有解答
好的,现在直接看一下CDL的源码。
    
/**
     * Constructs a {@code CountDownLatch} initialized with the given count.
     *
     * @param count the number of times {@link #countDown} must be invoked
     *        before threads can pass through {@link #await}
     * @throws IllegalArgumentException if {@code count} is negative
     */
    public CountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
    }

构建方法也非常简单,只需要传入一个count,传入的这个count会作为内部实现的AQS的阈值参数
countdownLatch除了构造函数和toString方法外,总共只提供了3个方法,一个await方法,一个带超时时间的await方法,一个countdown方法,一个getcount方法,接下来我会主要对着几个方法主要进行介绍。
类似上面的例子中,主流程调用await方法,等待其他线程中调用countdown方法,满足条件后,主流程中的堵塞才解除。那await到底在等什么呢?
    public boolean await(long timeout, TimeUnit unit)
        throws InterruptedException {
        return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
    }

可以看到里面只有一行代码,就是调用之前定义的AQS的子类的tryAcquireSharedNanos方法。再看一下AQS这个方法里面做了什么
    //AbstractQueuedSynchronizer.java
    public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }
        protected final int getState() {
        return state;
    }
    // CountDownLatch.Sync.java
            protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
        }

    因为这是本系列第一篇文章,暂时先不对AQS做太多解释,可以从上面的代码中看到调用了AQS的acquireSharedInterruptibly方法,这里先提一句,在AQS中,资源抢占模式主要有两种,一个是Share模式,一个是exclusive模式,两个模式的的抢占方法都有所不同,在方法名中可以看出,这个方法明显是share模式下的获取资源方法。这个方法会先调用tryAcquireShared方法,这个方法在AQS中是一个抽象方法,由每个继承的子类自己实现具体 的方法,从此可以看出,AQS是实现了一个并发抢占资源的框架,抽象出了几个方法待具体实现,本系列中接下来会提到的各种实现方式的不同,造就了他们具体功能的不同。继续回到我们的countDownLatch方法中,他自己内部继承实现的Sync类中,实现了这个tryAcquireShared。
        protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
        }

方法看起来很简单,判断竞争的资源还有没有,如果已经已经为0,则返回1,否则返回-1.这个返回值在上面的方法中是进行判断是否要进行堵塞的标志位,如果返回的为1,则不会进入if判断内的方法,如果返回-1,则会再调用这个doAcquireSharedInterruptibly方法
        //AbstractQueuedSynchronizer.java
        private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

    在AQS框架内实现的这个方法,很明显看到里面有一个永真循环,暂时可以简单理解为会一直去尝试查看资源是否已经获取到,若最终获取到,则逃出这个永真循环,否则会一直进行堵塞。
    以上就是await方法在堵塞时和AQS框架之间进行调用的关系。
    再看一下另一个countdown方法
    public void countDown() {
        sync.releaseShared(1);
    }
    //AQS.java
        public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }
        public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }

   // countDownLatch.sync.java
    protected boolean tryReleaseShared(int releases) {
            // Decrement count; signal when transition to zero
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c-1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }

    调用的是AQS中releaseShared方法,同样的在框架中预留了一个tryReleaseShared的抽象方法由继承的子类去实现,在countDownLatch实现的方法中,是用CAS的方法去将资源释放掉,若释放成功,则在AQS框架中
/**
     * Release action for shared mode -- signals successor and ensures
     * propagation. (Note: For exclusive mode, release just amounts
     * to calling unparkSuccessor of head if it needs signal.)
     */
    private void doReleaseShared() {
        /*
         * Ensure that a release propagates, even if there are other
         * in-progress acquires/releases.  This proceeds in the usual
         * way of trying to unparkSuccessor of head if it needs
         * signal. But if it does not, status is set to PROPAGATE to
         * ensure that upon release, propagation continues.
         * Additionally, we must loop in case a new node is added
         * while we are doing this. Also, unlike other uses of
         * unparkSuccessor, we need to know if CAS to reset status
         * fails, if so rechecking.
         */
        for (;;) {
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) {
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                    unparkSuccessor(h);
                }
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            if (h == head)                   // loop if head changed
                break;
        }
    }

    这个方法留在下一篇文章中详细解释。
    以上就是对countDownLatch这个同步类的简单解释,主要还是引出本系列中最重要的AQS框架,所以本文有部分概念并没有过多的解释,下一篇会进行详细解释AQS的内容.


    最后简单附一个使用CountDownLatch类的小例子
/**
 * Created by sugu on 2018/12/24.
 */
public class CountDownLatchDemo {

    public static void main(String[] args) throws InterruptedException {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        CountTo50 countTo50 = new CountTo50("a",countDownLatch, 5);
        CountTo50 countTo100 = new CountTo50("b",countDownLatch, 10);
        countTo50.start();
        countTo100.start();
        countDownLatch.await();
        for (int i = 0; i < 10; i++) {
            System.out.println("current_thread:[" + Thread.currentThread().getId() + "] : " + i);
        }
    }
}
class CountTo50 extends Thread{
    private CountDownLatch countDownLatch;
    private int max;
    private String name;
    public CountTo50(String name, CountDownLatch countDownLatch,int max) {
        this.countDownLatch = countDownLatch;
        this.max = max;
        this.name = name;
    }

    public void run() {
        for(int i=0 ; i<20 ; i++ ){
            System.out.println("current_thread:["+ name+"] : "+ i );
            if(i == this.max){
                System.out.println("current_thread:["+ name+"] : "+ i +": count down");
                countDownLatch.countDown();
                try {
                    Thread.sleep(1000l);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

分享到:
评论

相关推荐

    Java JUC包 思维导图总结1

    自己学习过程对Java JUC包知识的总结,看完并理解基本能应付各种基本的问题点,墙裂推荐分享 博客跳转链接: https://blog.csdn.net/qq_35642036/article/details/82767070

    【小家java】JUC并发编程之:虚假唤醒以及推荐的解决方案.docx

    【小家java】JUC并发编程之:虚假唤醒以及推荐的解决方案.docx

    java之JUC系列.pdf

    java之JUC系列.pdf

    Java 高频面试题:聊一聊 JUC 下的 CopyOnWriteArrayList

    ArrayList 是我们常用的工具类之一,但是在多线程的情况下,ArrayList 作为共享变量时,并不是线程安全的。主要有以下两个原因: 1、ArrayList 自身的 elementData、size、modCount 在进行操作的时候,都没有加锁 ...

    Java后端开发,JUC并发编程Java后端开发,JUC并发编程

    Java后端开发,JUC并发编程Java后端开发,JUC并发编程Java后端开发,JUC并发编程Java后端开发,JUC并发编程Java后端开发,JUC并发编程Java后端开发,JUC并发编程Java后端开发,JUC并发编程Java后端开发,JUC并发编程...

    JUC多线程学习个人笔记

    JUC(Java Util Concurrent)是Java中用于并发编程的工具包,提供了一组接口和类,用于处理多线程和并发操作。JUC提供了一些常用的并发编程模式和工具,如线程池、并发集合、原子操作等。 JUC的主要特点包括: ...

    java编发编程:JUC综合讲解

    原子操作是不可再分割的基本操作,JUC 提供了一系列原子操作类,如 AtomicInteger、AtomicLong 等。 4. 同步器(Synchronizers):JUC 中的同步器主要通过 AQS(AbstractQueuedSynchronizer)提供支持。

    22 尚硅谷Java JUC线程高级视频

    教程视频:在 Java 5.0 提供了 java.util.concurrent(简称JUC)包,在此包中增加了在并发编程中很常用的工具类, 用于定义类似于线程的自定义子系统,包括线程池,异步 IO 和轻量级任务框架;还提供了设计用于多线程上下文...

    Java——JUC

    用于定义类似于线程的自定义子系统,包括线程池,异步 IO 和轻量级任务框架;包含下载地址

    Java-JUC学习PPT

    Java基础之JUC学习!!!

    java8集合源码分析-JUC:高并发与多线程

    Java虚拟机中并没有严格规定synchronized需要如何实现,只要能满足锁住一个对象,一个一个线程的去执行其中的代码块即可。 sync(Object)锁住一个对象,这时会markWord,记录这个线程的ID,这时只有一个线程来,其实...

    尚硅谷Java视频_JUC 视频教程

    尚硅谷_JUC线程高级_CountDownLatch 闭锁 ·6. 实现 Callable 接口 ·7. 尚硅谷_JUC线程高级_同步锁 Lock ·8. 尚硅谷_JUC线程高级_生产者消费者案例-虚假唤醒 ·9. 尚硅谷_JUC线程高级_Condition 线程通信 ·10. ...

    java并发编程:juc线程池

    而了解 Java 并发编程以及其中的 JUC(java.util.concurrent)线程池,对于构建高性能、高可伸缩性的应用程序具有重要意义。 多核处理器的出现使得并发执行成为一种重要的优化手段。了解并发编程和线程池的工作原理...

    Java多线程和JUC.pdf

    java多线程的一些知识与问题的总结

    java并发编程:juc、aqs

    Java 并发编程中的 JUC(java.util.concurrent)库以及其核心组件 AQS(AbstractQueuedSynchronizer)在构建高性能、可伸缩性的多线程应用方面具有重要的地位。 AQS 是 JUC 中的核心组件,它提供了一个框架,让...

    谈一谈对JUC的理解Java系列2021.pdf

    谈一谈对JUC的理解Java系列2021.pdf

    juc-demo:JUC包下常用工具练习Demo

    juc-demo JUC包下常用工具练习Demo 内容: 1、Semaphore 2、CountDownLatch 3、CyclicBarrier 4、ReentrantLock + Condition实现阻塞队列 Created by @minghui.y.

    JUC_CSDN.md

    此篇文章对应周阳老师的大厂面试教程,几乎包含了大厂所有JUC部分的面试题,虽然自己无缘大厂,但是这些知识也让我在面试中得到了不错的评价

    JUC2019.6V1.5.zip

    首先我们知道,JUC就是java.util.concurrent包,俗称java并发包,那首先我们要知道java并发包是用来干嘛 的,然后要知道java并发包包括哪些知识点,这些知识点在平常中有哪些重要的运用,简单来说,这个学习 方法...

    JUC线程锁框架

    JUC线程锁框架深度解析,包含整套视频和源码以及开发文档。

Global site tag (gtag.js) - Google Analytics