Java中的一些并发工具

1. 【转载】类CountDownLatch

1.1 CountDownLatch介绍

CountDownLatch是一个同步工具类,它允许一个或多个线程一直等待,直到其他线程执行完后再执行。例如,应用程序的主线程希望在负责启动框架服务的线程已经启动所有框架服务之后执行。

1.2 CountDownLatch原理

CountDownLatch是通过一个计数器来实现的,计数器的初始化值为线程的数量。每当一个线程完成了自己的任务后,计数器的值就相应得减1。当计数器到达0时,表示所有的线程都已完成任务,然后在闭锁上等待的线程就可以恢复执行任务。

712573813fd4c09376b3e84.jpg

1.2.1 CountDownLatch的伪代码

Main thread start
Create CountDownLatch for N threads
Create and start N threads
Main thead wait on latch
N threads completes there tasks are returns
Main thread resume execution

1.2.2 CountDownLatch.java中定义的构造函数

//用等待的线程数量来进行初始化
public void CountDownLatch(int count){...}

计数器count是闭锁需要等待的线程数量,只能被设置一次,且CountDownLatch没有提供任何机制去重新设置计数器count。

与CountDownLatch的第一次交互是主线程等待其他线程。主线程必须在启动其他线程后立即调用CountDownLatch.await()方法。这样主线程的操作就会在这个方法上阻塞,直到其他线程完成各自的任务。

其他N个线程必须引用CountDownLatch闭锁对象,因为它们需要通知CountDownLatch对象,它们各自完成了任务;这种通知机制是通过CountDownLatch.countDown()方法来完成的;每调用一次,count的值就减1,因此当N个线程都调用这个方法,count的值就等于0,然后主线程就可以通过await()方法,恢复执行自己的任务。

1.3 在实时系统中的使用场景

  • 实现最大的并行性:有时我们想同时启动多个线程,实现最大程度的并行性。例如,我们想测试一个单例类。如果我们创建一个初始计数器为1的CountDownLatch,并让其他所有线程都在这个锁上等待,只需要调用一次countDown()方法就可以让其他所有等待的线程同时恢复执行。
  • 开始执行前等待N个线程完成各自任务:例如应用程序启动类要确保在处理用户请求前,所有N个外部系统都已经启动和运行了。
  • 死锁检测:一个非常方便的使用场景是你用N个线程去访问共享资源,在每个测试阶段线程数量不同,并尝试产生死锁。

1.4 CountDownLatch使用例子

原博客写的过于冗长,我这里举一个简单的例子:
比如在动漫中那些机甲运行的时候,会出现系统启动成功

package com.test.excellearn.demo.learn;

import java.util.Arrays;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

public class MachineArmorSystem {
    /**定义要启动的系统*/
    private static List<String> auxiliarySystem = Arrays.asList("防御系统","电源系统","视野系统","战斗系统");

    public static void main(String[] args) throws InterruptedException {
        //不同的线程启动不同的系统
        Thread[] systemThreads = new Thread[auxiliarySystem.size()];
        //定义CountDownLatch,传入要启动的系统数量
        CountDownLatch cdl = new CountDownLatch(auxiliarySystem.size());
        for (int i = 0; i < auxiliarySystem.size(); i++) {
            String name = auxiliarySystem.get(i);
            systemThreads[i] = new Thread(()->{
                System.out.printf("%s 系统开始启动\n",name);
                //模拟启动时间
                int val = new Random().nextInt(10);
                try {
                    TimeUnit.SECONDS.sleep(val);
                    System.out.printf("%s 系统启动成功!耗时:%s s\n",name,val);
                    cdl.countDown();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
            systemThreads[i].start();
        }
        //唤醒主线程,当然这里还可以做判断,假如之前的系统如果有启动不成功的,那么就不唤醒主系统
        cdl.await();
        System.out.println("进入系统!!!");

    }


}

注意!
CountDownLatch的await()方法实际并不是唤醒主线程!!在类中的描述是
image.png,这几句话什么意思呢

这个方法可以导致当前线程进入等待状态直到这个latch减为0

也就是说当CountDownLatch的变量数量为0的时候,当前线程才会被唤醒,而不是说这个await()方法唤醒了主线程,这里面的逻辑完全不一样!!!

运行结果
image.png

1.5 工作中的使用(优雅地完成初始化)

在移动应用开发中(以Android为例),随着功能的增多,应用初始化工作开始增多,网络,账号,推送服务,预加载数据等依次登场,开发人员都会临时在Application中找到现有初始化逻辑,将自己的代码插在其中。随着版本的迭代,新老员工的交替,几乎没人能对应用的初始化过程完全了解,删除一行初始化代码甚至移动位置都可能造成严重的后果。

应用初始化过程极其重要,它是应用后续平稳运行的前提和保证。开发初始化配置模块(公司内部开源不宜公开),更好地管理初始化逻辑,对初始化地工作进行分层,分优先级,多线程地规划,进而在大幅提升初始化效率,同时还有完整地日志监控体系功能。有了它,规划整个初始化工作将简单而优雅

参考

  1. 什么时候使用CountDownLatch
  2. Java concurrency – CountDownLatch Example

作者:码农历险记
链接:https://www.jianshu.com/p/4b6fbdf5a08f
来源:简书
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。

2 类CyclicBarrier

字面意思回环栅栏,通过它可以实现让一组线程等待至某个状态之后再全部同时执行。叫做回环是因为当所有等待线程都被释放以后,CyclicBarrier可以被重用。我们暂且把这个状态就叫做barrier当调用await()方法之后,线程就处于barrier了。
使用

package com.test.excellearn.demo.learn;

import java.util.Random;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;

public class Test2 {

    public static void main(String[] args) {
        CyclicBarrier barrier = new CyclicBarrier(5);
        for (int i = 0; i <5; i++) {
            new Thread(()->{
                try {
                    TimeUnit.SECONDS.sleep(new Random().nextInt(10));
                    System.out.println(Thread.currentThread().getName()+":准备好了!");
                    barrier.await();
                } catch (InterruptedException | BrokenBarrierException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName()+"起跑,他的起跑时间是:"+System.currentTimeMillis());
            },"运动员"+i).start();
        }
    }

}

执行结果
image.png

3 【转载】类Semaphore

11464886a58b677a59d8ca6c.jpg
SemaphoreCountDownLatch相似,不同的地方在于Semaphore的值被获取到后是可以释放的,并不像CountDownLatch那样一直减到底。它也被更多地用来限制流量类似阀门的 功能。如果限定某些资源最多有N个线程可以访问,那么超过N个主不允许再有线程来访问,同时当现有线程结束后,就会释放,然后允许新的线程进来。有点类似于锁的lock与 unlock过程。相对来说他也有两个主要的方法:

  • 用于获取权限的acquire(),其底层实现与CountDownLatch.countdown()类似;
  • 用于释放权限的release(),其底层实现与acquire()是一个互逆的过程。

3.1 Semaphore信号量的同步控制

信号量表示可以使用的资源数量

使用的时候步骤分为3个

  1. 请求资源acquire,获取许可
  2. 使用资源(业务处理)
  3. 释放资源release()

使用时应该注意的点:
release()说的是
Releases a permit, returning it to the semaphore.
翻译过来就是释放许可证,将其返回到信号
意味着及时semaphore初始化的时候是0 ,但是你 直接调用release()会导致semaphore的容量增加1
可以做一个测试进行验证

public class TestSemaphore {

    public static void main(String[] args) {
        Semaphore semaphore = new Semaphore(0);
        new Thread(()->{
            semaphore.release();
            System.out.println("进入线程执行了"+semaphore.tryAcquire());
        }).start();
        System.out.println("程序结束");
    }
}

当执行到semaphore.tryAcquire()这一行代码的时候,会semaphore中的state进行
-1,所以在semaphore中的实现的AQS执行CAS-1的时候查看state状态就行
image.png
执行结果,竟然是1!
image.png

意味着这产生一种新的使用方法,也就是如果你想让同一个对象的A方法必须在B方法之前进行执行,那么如下伪代码

class {
	semaphore semaphore = new semaphore(0);
	A(){  
    		semaphore.release();
	}  
	  
	B(){  
		//假如那个线程先进入的B方法,那么就会进入等待状态,
		//等待A方法执行完成使得semaphore中的state=1才执行B方法
  		semaphore.acquire();
	}

}

3.1 使用方法

package com.test.excellearn.demo.learn;

import java.util.Random;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;

public class SemaphoreCarDemo {

    public static void main(String[] args) {
        //创建Semaphore
        Semaphore semaphore = new Semaphore(10);
        //请求许可
        for (int i = 0; i < 20; i++) {
            new Thread(()->{
                try {
                    TimeUnit.SECONDS.sleep(2);
                    semaphore.acquire();
                    System.out.println("汽车"+Thread.currentThread().getName()+"可以进停车场");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                //使用资源
                try {
                    TimeUnit.SECONDS.sleep(new Random().nextInt(10));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

                //释放资源
                semaphore.release();
                System.out.println(Thread.currentThread().getName()+"离开停车场");

            },"car"+i).start();
        }

    }

}

------------------------------------------------------------------------------------------------------------------------

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;

@Slf4j
public class SemaphoreExample1 {

    private final static int threadCount = 20;

    public static void main(String[] args) throws Exception {

        ExecutorService exec = Executors.newCachedThreadPool();
       // 每次最多三个线程获取许可
        final Semaphore semaphore = new Semaphore(3);

        for (int i = 0; i < threadCount; i++) {
            final int threadNum = i;
            exec.execute(() -> {
                try {
                    semaphore.acquire(); // 获取一个许可
                    test(threadNum);
                    semaphore.release(); // 释放一个许可
                } catch (Exception e) {
                    log.error("exception", e);
                }
            });
        }
        exec.shutdown();
    }

    private static void test(int threadNum) throws Exception {
        log.info("{}", threadNum);
        Thread.sleep(1000);
    }
}
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;

@Slf4j
public class SemaphoreExample2 {

    private final static int threadCount = 20;

    public static void main(String[] args) throws Exception {

        ExecutorService exec = Executors.newCachedThreadPool();

        final Semaphore semaphore = new Semaphore(3);

        for (int i = 0; i < threadCount; i++) {
            final int threadNum = i;
            exec.execute(() -> {
                try {
                    semaphore.acquire(3); // 获取多个许可
                    test(threadNum);
                    semaphore.release(3); // 释放多个许可
                } catch (Exception e) {
                    log.error("exception", e);
                }
            });
        }
        exec.shutdown();
    }

    private static void test(int threadNum) throws Exception {
        log.info("{}", threadNum);
        Thread.sleep(1000);
    }
}
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;

@Slf4j
public class SemaphoreExample3 {

    private final static int threadCount = 20;

    public static void main(String[] args) throws Exception {

        ExecutorService exec = Executors.newCachedThreadPool();

        final Semaphore semaphore = new Semaphore(3);

        for (int i = 0; i < threadCount; i++) {
            final int threadNum = i;
            exec.execute(() -> {
                try {
                    if (semaphore.tryAcquire()) { // 尝试获取一个许可
                        test(threadNum);
                        semaphore.release(); // 释放一个许可
                    }
                } catch (Exception e) {
                    log.error("exception", e);
                }
            });
        }
        exec.shutdown();
    }

    private static void test(int threadNum) throws Exception {
        log.info("{}", threadNum);
        Thread.sleep(1000);
    }
}
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;

@Slf4j
public class SemaphoreExample4 {

    private final static int threadCount = 20;

    public static void main(String[] args) throws Exception {

        ExecutorService exec = Executors.newCachedThreadPool();

        final Semaphore semaphore = new Semaphore(3);

        for (int i = 0; i < threadCount; i++) {
            final int threadNum = i;
            exec.execute(() -> {
                try {
                    if (semaphore.tryAcquire(5000, TimeUnit.MILLISECONDS)) { // 尝试获取一个许可
                        test(threadNum);
                        semaphore.release(); // 释放一个许可
                    }
                } catch (Exception e) {
                    log.error("exception", e);
                }
            });
        }
        exec.shutdown();
    }

    private static void test(int threadNum) throws Exception {
        log.info("{}", threadNum);
        Thread.sleep(1000);
    }
}

作者:一行代码一首诗
链接:https://www.jianshu.com/p/bb5105303d85
来源:简书
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。

# Java  并发  工具 

评论

Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×