跳到文章开头
  1. 编程语言s/

并发进阶

·4 分钟
并发编程
春江花朝秋月夜
作者
春江花朝秋月夜
重湖叠𪩘清嘉。有三秋桂子,十里荷花。羌管弄晴,菱歌泛夜,嬉嬉钓叟莲娃。千骑拥高牙。乘醉听箫鼓,吟赏烟霞。异日图将好景,归去凤池夸。
 ·  页面点击量:
目录

线程池
#

new Thread缺点:频繁创建和销毁,浪费了线程资源,短时间创建大量线程和销毁 线程池是有限可重复使用的线程,用完需要归还

img.png

//构造方法,含有七个参数
public ThreadPoolExecutor(int corePoolSize,//核心线程池大小
                              int maximumPoolSize,//最大线程池大小
                              long keepAliveTime,//线程最大空闲时间
                              TimeUnit unit,//最大空闲时间单位
                              BlockingQueue<Runnable> workQueue//线程等待队列,当超出最大容量时,任务进入等待队列) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(),//线程创建工厂,可自定义
		defaultHandler//拒绝策略,实在不能加入新的任务时,拒绝任务);
    }

根据CPU类型分配线程池大小
#

ThreadPoolExecutor
#

    public static void main(String[] args) throws InterruptedException {
        ThreadPoolExecutor executor =
                new ThreadPoolExecutor(2, 4,   //2个核心线程,最大线程数为4个
                        3, TimeUnit.SECONDS,        //最大空闲时间为3秒钟
                        new ArrayBlockingQueue<>(2));     //这里使用容量为2的ArrayBlockingQueue队列

        for (int i = 0; i < 6; i++) {   //开始6个任务
            int finalI = i;
            executor.execute(() -> {
                try {
                    System.out.println(Thread.currentThread().getName()+" 开始执行!("+ finalI);
                    TimeUnit.SECONDS.sleep(1);
                    System.out.println(Thread.currentThread().getName()+" 已结束!("+finalI);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }

        TimeUnit.SECONDS.sleep(1);    //看看当前线程池中的线程数量
        System.out.println("线程池中线程数量:"+executor.getPoolSize());
        TimeUnit.SECONDS.sleep(5);     //等到超过空闲时间
        System.out.println("线程池中线程数量:"+executor.getPoolSize());

        executor.shutdownNow();    //使用完线程池记得关闭,不然程序不会结束,它会取消所有等待中的任务以及试图中断正在执行的任务,关闭后,无法再提交任务,一律拒绝
        //executor.shutdown();     同样可以关闭,但是会执行完等待队列中的任务再关闭
    }
}

ArrayBlockingQueue
#

作为有界的阻塞队列,具有响应的容量,可以自行设置,对比SynchronousQueue没有容量,选这个会出现爆栈的问题,原因 是线程池ThreadPoolExecutor发现容量超出时会poll线程,但SynchronousQueue没有容量poll无意义。 在线程池里面的等待队列需要具有容量。

线程池的拒绝策略
#

线程池超过最大的容量时,需要拒绝这个任务。

  • AbortPolicy(default): 直接抛异常
  • CallerRunsPolicy: 直接让提交任务的线程运行这个任务,如果向主线程提交了任务就让主线程去执行这个任务,谁提交谁执行
  • DiscardOldestPolicy: 丢弃队列中最近的任务,替换为当前任务
  • DiscardPolicy: Do nothing

自定义拒绝策略和线程生产工厂

img_1.png

任务过程出现异常,线程池当中执行任务时发生异常,线程将会自动销毁

ExecutorService
#

可以使用Executors工具类来快速创建线程池:

    ExecutorService executor = Executors.newFixedThreadPool(2);   //直接创建一个固定容量的线程池

ExecutorService本质内部实现其实是ThreadPoolExecutor,所以创建的线程都不是核心线程

//内部实现
public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}

直接将最大线程和核心线程数量设定为一样的,并且等待时间为0,因为压根不需要,并且采用的是一个无界的LinkedBlockingQueue作为等待队列。

//创建单个线程
public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}

这里并不是直接创建的一个ThreadPoolExecutor对象,而是套了一层FinalizableDelegatedExecutorService,目的是内部套了一层,外部不可见,不可修改,保证安全性

由于executorService内部直接最大容量为Integer. MAX_VALUE,非常大,不安全

newCachedThreadPool
#

img
注意生成的线程池里面都不是核心线程,同时最大容量非常大,需要慎用

执行带返回值的任务:
#

ExecutorService执行任务有两种方法:submit和execute

接受返回值的只有submit

img

public static void main(String[] args) throws InterruptedException, ExecutionException {
        ExecutorService executorService=Executors.newSingleThreadExecutor();
        Future<String> future= executorService.submit(()->{
            TimeUnit.SECONDS.sleep(3);
            return "你好世界";
        });
        System.out.println(future.cancel(true));//取消任务,这时候里面的字串将不会再返回出来
        executorService.shutdown();
        System.out.println(future.get());
    }

定时任务ScheduledThreadPoolExecutor
#

        ScheduledThreadPoolExecutor executor=new ScheduledThreadPoolExecutor(1);
//        executor.schedule(()-> System.out.println("开始定时任务"),3,TimeUnit.SECONDS);
//        ScheduledFuture<String> scheduledFuture=executor.schedule(()-> "开始定时任务",3,TimeUnit.SECONDS); 
//        System.out.println(scheduledFuture.isDone());
        //固定频率进行定时计划:
        executor.scheduleAtFixedRate(()-> System.out.println("hello!"),3,1,TimeUnit.SECONDS);
//        executor.shutdown();

线程延迟线程池:ScheduledWithFixedDelay

protected void finalize() {    //在GC时,会执行finalize方法,此方法中会关闭掉线程池,释放资源
        super.shutdown();
    }

线程池实现原理
#

//这个就是我们指定的阻塞队列
private final BlockingQueue<Runnable> workQueue;

//再次提醒,这里没加锁!!该有什么意识不用我说了吧,所以说ctl才会使用原子类。
public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();     //如果任务为null,那执行个寂寞,所以说直接空指针
    int c = ctl.get();      //获取ctl的值,一会要读取信息的
    if (workerCountOf(c) < corePoolSize) {   //判断工作线程数量是否小于核心线程数
        if (addWorker(command, true))    //如果是,那不管三七二十一,直接加新的线程执行,然后返回即可
            return;
        c = ctl.get();    //如果线程添加失败(有可能其他线程也在对线程池进行操作),那就更新一下c的值
    }
    if (isRunning(c) && workQueue.offer(command)) {   //继续判断,如果当前线程池是运行状态,那就尝试向阻塞队列中添加一个新的等待任务
        int recheck = ctl.get();   //再次获取ctl的值
        if (! isRunning(recheck) && remove(command))   //这里是再次确认当前线程池是否关闭,如果添加等待任务后线程池关闭了,那就把刚刚加进去任务的又拿出来
            reject(command);   //然后直接拒绝当前任务的提交(会根据我们的拒绝策略决定如何进行拒绝操作)
        else if (workerCountOf(recheck) == 0)   //如果这个时候线程池依然在运行状态,那么就检查一下当前工作线程数是否为0,如果是那就直接添加新线程执行
            addWorker(null, false);   //添加一个新的非核心线程,但是注意没添加任务
      	//其他情况就啥也不用做了
    }
    else if (!addWorker(command, false))   //这种情况要么就是线程池没有运行,要么就是队列满了,按照我们之前的规则,核心线程数已满且队列已满,那么会直接添加新的非核心线程,但是如果已经添加到最大数量,这里肯定是会失败的
        reject(command);   //确实装不下了,只能拒绝
}

addWorker实现
#

private boolean addWorker(Runnable firstTask, boolean core) {
  	//这里给最外层循环打了个标签,方便一会的跳转操作
    retry:
    for (;;) {    //无限循环,老套路了,注意这里全程没加锁
        int c = ctl.get();     //获取ctl值
        int rs = runStateOf(c);    //解析当前的运行状态

        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN &&   //判断线程池是否不是处于运行状态
            ! (rs == SHUTDOWN &&   //如果不是运行状态,判断线程是SHUTDOWN状态并、任务不为null、等待队列不为空,只要有其中一者不满足,直接返回false,添加失败
               firstTask == null &&   
               ! workQueue.isEmpty()))
            return false;

        for (;;) {   //内层又一轮无限循环,这个循环是为了将线程计数增加,然后才可以真正地添加一个新的线程
            int wc = workerCountOf(c);    //解析当前的工作线程数量
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))    //判断一下还装得下不,如果装得下,看看是核心线程还是非核心线程,如果是核心线程,不能大于核心线程数的限制,如果是非核心线程,不能大于最大线程数限制
                return false;
            if (compareAndIncrementWorkerCount(c))    //CAS自增线程计数,如果增加成功,任务完成,直接跳出继续
                break retry;    //注意这里要直接跳出最外层循环,所以用到了标签(类似于goto语句)
            c = ctl.get();  // 如果CAS失败,更新一下c的值
            if (runStateOf(c) != rs)    //如果CAS失败的原因是因为线程池状态和一开始的不一样了,那么就重新从外层循环再来一次
                continue retry;    //注意这里要直接从最外层循环继续,所以用到了标签(类似于goto语句)
            // 如果是其他原因导致的CAS失败,那只可能是其他线程同时在自增,所以重新再来一次内层循环
        }
    }

  	//好了,线程计数自增也完了,接着就是添加新的工作线程了
    boolean workerStarted = false;   //工作线程是否已启动
    boolean workerAdded = false;    //工作线程是否已添加
    Worker w = null;     //暂时理解为工作线程,别急,我们之后会解读Worker类
    try {
        w = new Worker(firstTask);     //创建新的工作线程,传入我们提交的任务
        final Thread t = w.thread;    //拿到工作线程中封装的Thread对象
        if (t != null) {      //如果线程不为null,那就可以安排干活了
            final ReentrantLock mainLock = this.mainLock;      //又是ReentrantLock加锁环节,这里开始就是只有一个线程能进入了
            mainLock.lock();
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                int rs = runStateOf(ctl.get());    //获取当前线程的运行状态

                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {    //只有当前线程池是正在运行状态,或是SHUTDOWN状态且firstTask为空,那么就继续
                    if (t.isAlive()) // 检查一下线程是否正在运行状态
                        throw new IllegalThreadStateException();   //如果是那肯定是不能运行我们的任务的
                    workers.add(w);    //直接将新创建的Work丢进 workers 集合中
                    int s = workers.size();   //看看当前workers的大小
                    if (s > largestPoolSize)   //这里是记录线程池运行以来,历史上的最多线程数
                        largestPoolSize = s;
                    workerAdded = true;   //工作线程已添加
                }
            } finally {
                mainLock.unlock();   //解锁
            }
            if (workerAdded) {
                t.start();   //启动线程
                workerStarted = true;  //工作线程已启动
            }
        }
    } finally {
        if (! workerStarted)    //如果线程在上面的启动过程中失败了
            addWorkerFailed(w);    //将w移出workers并将计数器-1,最后如果线程池是终止状态,会尝试加速终止线程池
    }
    return workerStarted;   //返回是否成功
}

并发工具类
#

计数器锁(CountDownLatch)
#

多任务同步神器。它允许一个或多个线程,等待其他线程完成工作,典型应用场景:多个线程分段相加最后统计所有线程的和

要实现这个需求,那么有一个很麻烦的地方,我们不知道任务到底什么时候执行完毕,那么可否将最终统计延迟一定时间进行呢?但是最终统计无论延迟多久进行, 要么不能保证所有任务都完成,要么可能所有任务都完成了而这里还在等。

    public static void main(String[] args) throws InterruptedException {
        CountDownLatch latch=new CountDownLatch(20);
        for (int i = 0; i < 20; i++) {
        int finalI=i;
        new Thread(()->{
        try {
        Thread.sleep((long) (2000 * new Random().nextDouble()));
        System.out.println("任务"+finalI+"完成");
        }catch (InterruptedException e){
        e.printStackTrace();
        }
        latch.countDown();   //每执行一次计数器都会-1
        }).start();
        }
        //开始等待所有的线程完成,当计数器为0时,恢复运行
        latch.await();   //这个操作可以同时被多个线程执行,一起等待,这里只演示了一个
        System.out.println("所有子任务都完成!任务完成!!!");
        }

例子: 用四个线程实现对1到10000完成累计加和

    public static void main(String[] args) throws InterruptedException {
        long begin=System.currentTimeMillis();
       CountDownLatch latch=new CountDownLatch(4);
       int res[]=new int[4];
        for (int i = 0; i < 4; i++) {
            int finalI=i;
            new Thread(()->{
                int sum=0;
                int numbersPerThread = (int) Math.ceil((double) 100/ 4);
                int startNumber = finalI * numbersPerThread + 1;
                int endNumber = (finalI + 1) * numbersPerThread;
                for (int j =startNumber ; j <=endNumber; j++) {
                  sum+=j;
                }
                System.out.println(sum);
                res[finalI]=sum;
                latch.countDown();   //每执行一次计数器都会-1
            }).start();
        }
        //开始等待所有的线程完成,当计数器为0时,恢复运行
        latch.await();   //这个操作可以同时被多个线程执行,一起等待,这里只演示了一个
        int result=0;
        for (Integer integer:res) {
            result+=integer;
        }
        long end=System.currentTimeMillis();
        System.out.println("所有子任务都完成!任务完成,结果是"+result+"花费时间:"+(end-begin)+"ms");
    }

实现效果:

img

  • 共享锁是线程共享的,同一时刻能有多个线程拥有共享锁。
  • 如果一个线程刚获取了共享锁,那么在其之后等待的线程也很有可能能够获取到锁,所以得传播下去继续尝试唤醒后面的结点,不像独占锁,独占的压根不需要考虑这些。
  • 如果一个线程刚释放了锁,不管是独占锁还是共享锁,都需要唤醒后续等待结点的线程。

实现原理:

在工具构建的时候创建多个共享锁,调用countDown()调用时就可以减去一把锁,当state减为0时即可获取共享锁,实现方式是通过 链表向后继节点一个个从等待状态唤醒

循环屏障CyclicBarrier
#

循环屏障会不断阻挡线程,直到被阻挡的线程足够多时,才能一起冲破屏障,并且在 冲破屏障时,我们也可以做一些其他的任务。这和人多力量大的道理是差不多的,当人足够多时方能冲破阻碍, 到达美好的明天。当然,屏障由于是可循环的,所以它在被冲破后,会重新开始计数,继续阻挡后续的线程:

CyclicBarrier barrier = new CyclicBarrier(5);  //创建一个初始值为5的循环屏障

屏障最大容量:parties 特点:等待线程冲破阻碍的时候才可以 一起完成任务,完成任务之后继续等待,直到下一次达到最大值 。

当await状态下的线程被中断,屏障将会被破坏,这一轮不能使用,除非重新开始

信号量 Semaphore
#

限制信号容量,在一段任务中,要求 只能部分线程完成这段工作的时候完成

线程之间进行数据交换 Exchanger
#

实现两个线程之间进行数据交换的通信,只有当两个线程完成交换之后才可以完成任务,否则线程阻塞。

Fork/Join框架
#

ForkJoinPool pool = new ForkJoinPool();

  • fork指的是划分任务,任务约分越细
  • join指的是加入任务,任务开始执行

END
#

相关文章

Java-Concurrent
·3 分钟
并发编程
如何通过hugo搭建自己的博客
·1 分钟
杂记
科学上网
·1 分钟
杂记