JUC包下的并发工具类

# JUC包下的并发工具类

# 一、CountDownLatch

# CountDownLatch 的作用

作用: CountDownLatch 是一个同步辅助类,它允许一个或多个线程一直等待,直到其他线程中的操作执行完毕后再继续执行。主要用于一个线程需要等待其他线程完成某些操作之后再继续执行的场景。

使用示例: 假设有一只狗名字叫秀逗,它需要等待主人给他狗盆里面倒满谁,加满狗粮,然后才能开始吃饭。这种场景就可以使用 CountDownLatch 来实现。

import java.util.concurrent.CountDownLatch;

public class TestA {
    public static void main(String[] args) throws InterruptedException {

        CountDownLatch countDownLatch = new CountDownLatch(2);

        Thread t1 = new Thread(() -> {
            System.out.println("倒水");
            countDownLatch.countDown();
        }, "t1");

        Thread t2 = new Thread(() -> {
            System.out.println("倒狗粮");
            countDownLatch.countDown();
        }, "t2");

        t1.start();
        t2.start();

        countDownLatch.await(); // 等待 倒好水和狗粮才能开始吃饭
        System.out.println("秀逗开始吃饭");

    }
}

运行结果:

倒水
倒狗粮
秀逗开始吃饭

# CountDownLatch 的实现原理

CountDownLatch 的核心是一个计数器(count),这个计数器被初始化为一个给定的值,且后续不能再重新设置值。每次调用 countDown() 方法时,这个值会减一。当计数器的值变为零时,所有等待在这个计数器上的线程都会被唤醒并继续执行。

CountDownLatch 是通过 AbstractQueuedSynchronizer(AQS)的共享模式实现的。 请参考 AQS详解 (opens new window)

# 二、CyclicBarrier

# CyclicBarrier的作用

作用: CyclicBarrier 也是一个同步辅助类,它允许一组线程相互等待,直到所有线程都到达一个共同的屏障点。它的应用场景通常是需要多个线程并发执行,并且在某个点上需要彼此等待,之后再继续执行后续任务。CyclicBarrier 与 CountDownLatch 不同的是,它可以被重用,而 CountDownLatch 是一次性的。

使用示例: 假设狗狗们去玩漂流,一个漂流船必须等坐满3只小狗才能开始漂流,使用 CyclicBarrier可以实现这个需求。

import java.util.concurrent.CyclicBarrier;

public class TestA {
    public static void main(String[] args) throws Exception {

        CyclicBarrier cyclicBarrier = new CyclicBarrier(3);

        Thread t1 = new Thread(() -> {
            System.out.println("秀逗准备中");
            try {
                cyclicBarrier.await();
            } catch (Exception e) {
                e.printStackTrace();
            }
            System.out.println("秀逗开始漂流");
        }, "t1");

        Thread t2 = new Thread(() -> {
            System.out.println("四眼准备中");
            try {
                cyclicBarrier.await();
            } catch (Exception e) {
                e.printStackTrace();
            }
            System.out.println("四眼开始漂流");
        }, "t2");

        Thread t3 = new Thread(() -> {
            System.out.println("大黄准备中");
            try {
                cyclicBarrier.await();
            } catch (Exception e) {
                e.printStackTrace();
            }
            System.out.println("大黄开始漂流");
        }, "t3");

        Thread t4 = new Thread(() -> {
            System.out.println("小黑准备中");
            try {
                cyclicBarrier.await();
            } catch (Exception e) {
                e.printStackTrace();
            }
            System.out.println("小黑开始漂流");
        }, "t4");


        t1.start();
        t2.start();
        t3.start();
        t4.start();
    }
}

运行结果:

mixureSecure

可以看到,三个准备好的已经开始漂流了,还剩一个四眼需要等船上再上2只狗子才能开始漂流。

# CyclicBarrier的实现原理

CyclicBarrier 的核心是一个计数器和一个可重用的栅栏点。当线程调用 await() 方法时,这个计数器减一,当计数器到达零时,表示所有线程都已到达栅栏点,所有线程被唤醒并继续执行。然后,计数器重置以便再次使用。

CyclicBarrier 通过一个内部类 Generation 来表示当前的栅栏点。每个 Generation 对象包含一个标志位,表示栅栏点是否被打破(broken)。 并且通过 Lock + Condition 实现线程的等待和通知。 具体实现细节这里就不详细看了,可以先看AQS的细节然后再看CyclicBarrier的源码。

# 三、Semaphore

# Semaphore的作用

作用: Semaphore 是一个计数信号量,用于控制同时访问某特定资源的操作数量,或者同时执行某个指定操作的数量。Semaphore 可以用来实现资源池、限流等功能。它维护了一组许可证,通过获取和释放许可证来控制资源的访问。
acquire(): 获取一个许可证,如果没有许可证可用,则阻塞直到有许可证可用。
release(): 释放一个许可证,将其归还给信号量。

使用示例: 假设有一个大狗盆,狗盆最多能让3只狗同时吃食,可以使用 Semaphore 实现这个场景。

import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;

public class TestA {
    public static void main(String[] args) throws Exception {

        Semaphore semaphore = new Semaphore(3);

        Thread t1 = new Thread(() -> {
            try {
                semaphore.acquire();
            } catch (Exception e) {
                e.printStackTrace();
            }
            System.out.println("秀逗开始吃");
            try {
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            semaphore.release();
            System.out.println("秀逗吃饱离开了");
        }, "t1");

        Thread t2 = new Thread(() -> {
            try {
                semaphore.acquire();
            } catch (Exception e) {
                e.printStackTrace();
            }
            System.out.println("四眼开始吃");
        }, "t2");

        Thread t3 = new Thread(() -> {
            try {
                semaphore.acquire();
            } catch (Exception e) {
                e.printStackTrace();
            }
            System.out.println("大黄开始吃");
        }, "t3");

        Thread t4 = new Thread(() -> {
            try {
                semaphore.acquire();
            } catch (Exception e) {
                e.printStackTrace();
            }
            System.out.println("小黑开始吃");
        }, "t4");


        t1.start();
        t2.start();
        t3.start();
        t4.start();
    }
}

运行结果:

秀逗开始吃
四眼开始吃
大黄开始吃
秀逗吃饱离开了
小黑开始吃

# Semaphore的实现原理

Semaphore 通过计数器来实现,其内部维护了一个许可数(permits)。当一个线程请求许可证时,计数器减一;当一个线程释放许可证时,计数器加一。如果计数器的值为零,表示没有可用的许可证,线程将进入等待状态,直到有线程释放许可证。

Semaphore 的实现依赖于 AbstractQueuedSynchronizer(AQS)。细节还是参考 AQS详解 (opens new window) 和Semaphore源码。

# 四、Exchanger

# Exchanger 的作用

作用:
Exchanger 是一个用于线程之间交换数据的同步点。它允许两个线程在某个同步点交换数据。两个线程通过调用 exchange 方法来交换彼此的数据。当这两个线程都到达同步点时,它们交换数据并继续执行。

使用示例:
假设秀逗在玩扔网球的游戏,它的主人在玩扔飞盘的游戏,当秀逗和主人一块奔跑到相遇的时候,彼此交换玩具。

import java.util.concurrent.Exchanger;

public class TestA {
    public static void main(String[] args) throws Exception {

        Exchanger<String> exchanger = new Exchanger<>();

        Thread t1 = new Thread(() -> {
            System.out.println("秀逗在玩扔网球游戏");
            try {
                String exchangeData = exchanger.exchange("网球");
                System.out.println("秀逗从主人那里获得了:" + exchangeData);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, "t1");

        Thread t2 = new Thread(() -> {
            System.out.println("主人在玩扔飞盘游戏");
            try {
                String exchangeData = exchanger.exchange("飞盘");
                System.out.println("主人从秀逗那里获得了:" + exchangeData);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, "t2");

        t1.start();
        t2.start();
    }
}

运行结果:

秀逗在玩扔网球游戏
主人在玩扔飞盘游戏
秀逗从主人那里获得了:飞盘
主人从秀逗那里获得了:网球

# Exchanger 的实现原理

Exchanger的核心是一个同步点,两个线程在此同步点上交换数据。每个线程通过 exchange 方法进入同步点,等待另一个线程到达。当两个线程都到达时,它们交换数据并继续执行。 Exchanger是可复用的。

Exchanger 的实现依赖于内部类 Node 和一个原子引用 AtomicReference<Node>,用于表示和管理交换点。Node 对象包含线程交换的数据和同步状态。 具体需要参考源码,这里就不再整理源码了。

# 伪共享(false sharing)引起的内存争用问题

/**
     * Nodes hold partially exchanged data, plus other per-thread
     * bookkeeping. Padded via @sun.misc.Contended to reduce memory
     * contention.
     */
    @sun.misc.Contended static final class Node {
        int index;              // Arena index
        int bound;              // Last recorded value of Exchanger.bound
        int collides;           // Number of CAS failures at current bound
        int hash;               // Pseudo-random for spins
        Object item;            // This thread's current item
        volatile Object match;  // Item provided by releasing thread
        volatile Thread parked; // Set to this thread when parked, else null
    }

注意点:
@sun.misc.Contended注解,用于减少伪共享(false sharing)引起的内存争用问题。伪共享是一种现象,发生在多个线程同时访问和修改属于同一个缓存行(cache line)的不同数据时。由于缓存行的粒度通常大于单个变量的大小,不同的变量可能会共享同一个缓存行。如果多个线程频繁地修改位于同一缓存行内的不同变量,会导致缓存行不断地在多个处理器之间传输,降低性能。
关于处理器缓存行 可以参考 Java中volatile关键字详解 (opens new window)

使用 @sun.misc.Contended 注解可以在内存中填充额外的字节,将这些变量分散到不同的缓存行中,从而减少伪共享问题。具体到Exchanger 中的 Node 类,@sun.misc.Contended 注解的作用如下:

  • 减少内存争用:将 Node 类的实例变量分散到不同的缓存行中,减少多个线程访问和修改这些变量时的冲突。
  • 提高并发性能:减少由于缓存行无效化导致的性能损失,提高多线程访问和修改这些变量时的性能。

这种方式在JDK中使用的地方挺多的。
比如JDK8的ConcurrentHashMap也有用到:

 /**
     * A padded cell for distributing counts.  Adapted from LongAdder
     * and Striped64.  See their internal docs for explanation.
     */
    @sun.misc.Contended static final class CounterCell {
        volatile long value;
        CounterCell(long x) { value = x; }
    }

# 五、Phaser

# Phaser的作用

作用: Phaser 同样是 Java 并发库(java.util.concurrent)中的一个同步辅助类,用于多个线程之间的相互同步。与 CountDownLatch 和 CyclicBarrier 类似,Phaser 也用于协调多个线程的执行,但它更为灵活,支持动态增加或减少参与者(parties)。Phaser 可以用来处理多阶段任务,每个阶段结束时,所有参与的线程都必须等待其他线程完成当前阶段,然后一起进入下一个阶段。

# Phaser的基本概念

在使用Phaser 之前先了解一下Phaser 的几个基本概念帮助理解:

①、Parties(参与者):
参与者是指在每个阶段需要同步的线程数量。
每个线程在开始工作前都需要向 Phaser 注册(使用 register() 方法),增加 Phaser 的参与者数量。
当一个参与者完成当前阶段的工作并调用 arrive() 或 arriveAndAwaitAdvance() 方法时,Phaser 会记录该参与者已完成当前阶段。

②、Phases(阶段):
Phaser 将同步过程划分为多个阶段,每个阶段表示一轮所有参与者的同步操作。 当所有参与者都完成一个阶段并调用 arriveAndAwaitAdvance() 方法时,Phaser 会进入下一个阶段。

③、Arrive(到达):
当一个参与者完成当前阶段的工作时,它需要调用 arrive() 方法通知 Phaser 自己已完成。 该方法只通知 Phaser 自己已完成,不会等待其他参与者完成。

④、Await Advance(等待推进):
当一个参与者完成当前阶段的工作并希望等待其他参与者完成时,它可以调用 arriveAndAwaitAdvance() 方法。 该方法会阻塞当前线程,直到所有参与者都完成当前阶段,Phaser 进入下一个阶段。

⑤、Deregister(注销):
当一个参与者完成所有阶段的工作并希望退出同步过程时,它可以调用 arriveAndDeregister() 方法。 该方法会通知 Phaser 自己已完成并减少参与者数量。

稍微理解下上面的基本概念。
下面使用示例:
秀逗和四眼两只狗的主人给他们准备了三种美食,分别是骨头、牛肉、鸡排。但是主人要求秀逗和四眼必须每种食物都吃好之后再一起吃下一种食物。假如秀逗吃了一分钟骨头吃好了,此时秀逗想吃牛肉但是它必须等待四眼也吃好骨头之后一块去吃牛肉。
这种场景可以用Phaser来实现。

import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit;

public class TestA {
    public static void main(String[] args) {
        Phaser phaser = new Phaser(3); // 初始化参与者数量为3,包括主线程和两个子线程

        Thread t1 = new Thread(() -> {
            for (int i = 0; i < 3; i++) {
                String food = "";
                switch (i) {
                    case 0:
                        food = "骨头";
                        break;
                    case 1:
                        food = "牛肉";
                        break;
                    case 2:
                        food = "鸡排";
                        break;
                }
                System.out.println("秀逗开始吃" + food);
                try {
                    TimeUnit.MILLISECONDS.sleep((long) (Math.random() * 1000));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                phaser.arriveAndAwaitAdvance(); // 到达并等待
            }
            phaser.arriveAndDeregister(); // 完成后注销
        }, "t1");
        t1.start();

        Thread t2 = new Thread(() -> {
            for (int i = 0; i < 3; i++) {
                String food = "";
                switch (i) {
                    case 0:
                        food = "骨头";
                        break;
                    case 1:
                        food = "牛肉";
                        break;
                    case 2:
                        food = "鸡排";
                        break;
                }
                System.out.println("四眼开始吃" + food);
                try {
                    TimeUnit.MILLISECONDS.sleep((long) (Math.random() * 3000));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                phaser.arriveAndAwaitAdvance(); // 到达并等待
            }
            phaser.arriveAndDeregister(); // 完成后注销
        }, "t2");
        t2.start();

        // 主线程等待所有狗完成三轮吃食
        for (int i = 0; i < 3; i++) {
            // 这里等待 只是为了打印不被其他线程干扰
            try {
                TimeUnit.MILLISECONDS.sleep(4000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            phaser.arriveAndAwaitAdvance(); // 主线程到达并等待
            System.out.println(2 == i ? "都吃完了" : "=== 狗主人:你俩可以去吃下一个东西了");
        }

        // 主线程注销,表示所有轮次都已完成
        phaser.arriveAndDeregister();
        System.out.println();
        System.out.println("===========狗吃食活动结束===========");
    }
}

如果Phases(阶段)是确定的,就比如上面狗只吃3各阶段,那么可以重写onAdvance来在每个阶段(phase)结束时执行特定操作,同时决定是否终止 Phaser。
代码示例:

import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit;

public class TestA {
    public static void main(String[] args) {
        // 执行3轮
        int phases = 3;

        // 初始化 2个参与者
        Phaser phaser = new Phaser(2) {
            @Override
            protected boolean onAdvance(int phase, int registeredParties) {
                System.out.println("===== 第[" + (phase + 1) + "]轮吃好了" + " 参与者有[" + registeredParties + "]个");
                if((phase + 1) >= phases || registeredParties == 0){
                    System.out.println("===========狗吃食活动结束===========");
                }
                return (phase + 1) >= phases || registeredParties == 0;
            }
        };

        Thread t1 = new Thread(() -> {
            for (int i = 0; i < 3; i++) {
                String food = "";
                switch (i) {
                    case 0:
                        food = "骨头";
                        break;
                    case 1:
                        food = "牛肉";
                        break;
                    case 2:
                        food = "鸡排";
                        break;
                }
                System.out.println("秀逗开始吃" + food);
                try {
                    TimeUnit.MILLISECONDS.sleep((long) (Math.random() * 1000));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                phaser.arriveAndAwaitAdvance(); // 到达并等待
            }
            phaser.arriveAndDeregister(); // 完成后注销
        }, "t1");
        t1.start();

        Thread t2 = new Thread(() -> {
            for (int i = 0; i < 3; i++) {
                String food = "";
                switch (i) {
                    case 0:
                        food = "骨头";
                        break;
                    case 1:
                        food = "牛肉";
                        break;
                    case 2:
                        food = "鸡排";
                        break;
                }
                System.out.println("四眼开始吃" + food);
                try {
                    TimeUnit.MILLISECONDS.sleep((long) (Math.random() * 3000));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                phaser.arriveAndAwaitAdvance(); // 到达并等待
            }
            phaser.arriveAndDeregister(); // 完成后注销
        }, "t2");
        t2.start();

    }
}

执行结果:

秀逗开始吃骨头
四眼开始吃骨头
=====[1]轮吃好了 参与者有[2]个
四眼开始吃牛肉
秀逗开始吃牛肉
=====[2]轮吃好了 参与者有[2]个
四眼开始吃鸡排
秀逗开始吃鸡排
=====[3]轮吃好了 参与者有[2]===========狗吃食活动结束===========

# Phaser的实现原理

Phaser 内部通过一个 long 类型的状态变量来跟踪阶段、注册的参与者和到达的参与者。这个状态变量的高位部分用于表示当前的阶段,中间部分表示注册的参与者数量,低位部分表示已经到达的参与者数量。

注册和注销 register(): 注册一个新的参与者。会增加注册的参与者计数。
arriveAndDeregister(): 表示当前线程到达并注销。会减少注册的参与者计数。

同步
arrive(): 表示当前线程到达当前阶段。增加到达的参与者计数。
awaitAdvance(int phase): 等待所有参与者到达当前阶段。阻塞直到所有参与者到达。

到达并等待
arriveAndAwaitAdvance(): 组合了 arrive() 和 awaitAdvance(int phase) 的功能。表示当前线程到达并等待其他线程到达。

onAdvance 方法在每个阶段结束时被调用,可以通过子类化 Phaser 并覆盖该方法来实现自定义的行为。返回 true 表示 Phaser 终止,返回 false 表示继续进入下一阶段。

Phaser的源码实现是比较复杂的,内部使用QNode 节点( static final class QNode implements ForkJoinPool.ManagedBlocker )来封装线程,并构造成队列。利用evenQ(偶数阶段) 和 oddQ(奇数阶段)来交替处理线程,达到平衡负载,提高处理效率的目的。

建议了解了Fork/Join的设计模式后再去看源码。了解了一些前置知识后建议看这篇文档
https://www.skjava.com/series/article/8170507912 (opens new window),我觉得算是找的资料中对Phaser讲解的非常详细的一篇博客了。

# 五种并发工具类的对比总结

并发工具类 主要功能 用法示例 特点与注意事项
CountDownLatch (倒计数器) 允许一个或多个线程等待其他线程完成操作后再继续执行 CountDownLatch latch = new CountDownLatch(1); latch.await(); latch.countDown(); - 只能使用一次,不能重置 - 适用于一次性事件,例如等待多线程计算完成后汇总结果
CyclicBarrier (循环屏障) 使一组线程到达一个屏障点时被阻塞,直到最后一个线程到达屏障点,所有线程才继续执行 CyclicBarrier barrier = new CyclicBarrier(3); barrier.await(); - 可以重用,适用于多次执行的场景- 适用于需要多个线程同时开始某些任务的场景,例如多线程处理数据后同时汇总
Semaphore (信号量) 控制访问某资源的线程数量,允许多个线程并发访问,但限制同时访问的线程数 Semaphore semaphore = new Semaphore(3); semaphore.acquire(); semaphore.release(); - 可用于限流控制,控制对资源的并发访问量- 适用于限制同时访问某些资源(如数据库连接、文件访问)的线程数
Exchanger (交换器) 允许两个线程在同步点交换数据,两个线程都必须到达同步点,交换才会发生 Exchanger<Object> exchanger = new Exchanger<>(); exchanger.exchange(data); - 适用于两个线程之间的数据交换- 线程间的交换必须是同步的,两个线程都到达交换点才会继续
Phaser (相位器) 灵活的线程同步工具,可以动态增加或减少参与同步的线程数,适用于分阶段执行的任务 Phaser phaser = new Phaser(1); phaser.arriveAndAwaitAdvance(); phaser.arriveAndDeregister(); - 比 CyclicBarrier 更加灵活,可动态调整线程数- 适用于复杂的多阶段任务,可以动态增加或减少参与线程