深入理解并发和并行

1 并发与并行

为什么操作系统上可以同时运行多个程序而用户感觉不出来?

因为操作系统营造出了可以同时运行多个程序的假象,通过调度进程以及快速切换CPU上下文,每个进程执行一会就停下来,切换到下个被调度到的进程上,这种切换速度非常快,人无法感知到,从而产生了多个任务同时运行的错觉。

并发(concurrent) 是指的在宏观上多个程序或任务在同时运行,而在微观上这些程序交替执行,可以提高系统的资源利用率和吞吐量。

通常一个CPU内核在一个时间片只能执行一个线程(某些CPU采用超线程技术,物理核心数和逻辑核心数形成一个 1:2 的关系,比如4核CPU,逻辑处理器会有8个,可以同时跑8个线程),如果N个内核同时执行N个线程,就叫做并行(parallel)。我们编写的多线程代码具备并发特性,而不一定会并行。因为能否并行取决于操作系统的调度,程序员无法控制,但是调度算法会尽量让不同线程使用不同的CPU核心,所以在实际使用中几乎总是会并行。如果多个任务在一个内核中顺序执行,就是串行(Serial),如下图所示:



并发是多个程序在一段时间内同时执行的现象,而并行是多个任务在同一时刻同时执行,也是多核CPU的重要特性。

这里有一个疑问:并发一定并行吗?

并发并不一定并行。并发是逻辑上的同时发生,而并行是物理上的同时发生。并发可以跑在一个处理器上通过时间片进行切换,而并行需要两个或两个以上的线程跑在不同的处理器上。如果同一个任务的多个线程始终运行在不变的CPU核心上,那就不是并行。

举一个生活中的例子:

  • 你吃饭吃到一半,电话来了,你一直到吃完了以后才去接,这就说明你不支持并发也不支持并行。
  • 你吃饭吃到一半,电话来了,你停了下来接了电话,接完后继续吃饭,这说明你支持并发。
  • 你吃饭吃到一半,电话来了,你一边打电话一边吃饭,这说明你支持并行。

2 多核调度算法

在多核CPU系统中,调度算法的主要目标是有效地利用所有可用的CPU核心,以提高系统的整体性能和资源利用率。下面是一些常见的多核CPU调度算法:

  1. 抢占式调度(Preemptive Scheduling):这种调度算法允许操作系统随时中断当前正在执行的任务,并将处理器分配给其他任务。在多核系统中,抢占式调度器可以将任务迁移到其他核心上,以充分利用系统资源。
  2. 公平调度(Fair Scheduling):公平调度算法旨在公平地分配CPU时间给系统中的所有任务,以确保每个任务都有机会在一定的时间内执行。在多核系统中,公平调度器通常会尝试平衡各个核心上的负载,以避免出现某些核心过载而其他核心处于空闲状态的情况。
  3. 负载均衡调度(Load Balancing):负载均衡调度算法用于在多核系统中平衡各个核心上的任务负载,以确保所有核心都能够充分利用。这可以通过将任务从负载较重的核心迁移到负载较轻的核心来实现,或者通过动态地将新任务分配给负载较轻的核心来实现。
  4. 优先级调度(Priority Scheduling):优先级调度算法允许为每个任务分配一个优先级,并根据优先级来决定任务的执行顺序。在多核系统中,可以根据任务的优先级将其分配给不同的核心,以确保高优先级任务优先得到执行。
  5. 混合调度(Hybrid Scheduling):混合调度算法结合了多种调度策略的优点,以适应不同的应用场景和系统配置。例如,可以将公平调度算法和负载均衡调度算法结合起来,以在系统中实现公平且高效的任务调度。

这些调度算法可以根据系统的需求进行组合和调整,以实现对多核CPU系统资源的有效管理和利用。

抢占式调度(Preemptive Scheduling)的使用最为广泛,它允许操作系统在任何时候中断当前正在执行的任务,并将处理器分配给其他任务。这种调度策略使得操作系统能够及时响应各种事件和请求,从而提高系统的响应性和实时性。

在抢占式调度中,每个任务都被赋予一个优先级,操作系统会根据任务的优先级来决定哪个任务应该在当前时间片执行。如果某个高优先级任务准备就绪并且当前正在执行的任务的优先级低于它,操作系统会中断当前任务的执行,并将处理器分配给高优先级任务,从而实现抢占。抢占式调度的主要优点包括:

  1. 实时性:抢占式调度允许操作系统及时地响应外部事件和请求,从而满足实时性要求。
  2. 灵活性:操作系统可以根据任务的优先级动态地调整任务的执行顺序,以适应不同的系统负载和需求。
  3. 公平性:抢占式调度可以确保高优先级任务得到及时执行,而不会被低优先级任务长时间占用处理器。
  4. 多任务并发:通过在任务之间进行快速的切换,抢占式调度可以实现多任务并发执行,从而提高系统的吞吐量和效率。

抢占式调度也存在一些挑战和限制:

  1. 上下文切换开销:频繁的任务切换会导致上下文切换的开销增加,可能会影响系统的性能。
  2. 优先级反转:如果低优先级任务持有某些资源而高优先级任务需要访问这些资源,可能会导致优先级反转问题,从而影响系统的实时性。
  3. 饥饿问题:如果某个任务的优先级始终较低,并且总是被更高优先级的任务抢占,可能会导致该任务长时间无法执行,出现饥饿问题。

抢占式调度在许多操作系统中得到了广泛应用,包括Windows、Linux、MacOS等,它为实时系统和响应式系统提供了一种高效的任务调度机制。

3 Java并行编程

在编码层面上看,采用Java语言创建多线程代码,不需要程序员打上并行的标记,因为为了充分利用计算资源,操作系统一定会尽可能调度多线程到不同的CPU核心上。并发的任务通常有多线程竞争资源和频繁的CPU上下文切换,这些都会降低执行效率。

在实际的业务场景里,许多计算任务其实互不干扰,最后汇总结果就可以了,比如统计不同用户的每日活动次数。它们不存在竞争资源,并行处理的效率非常高,Java语言提供了多线程并行执行的 API。

3.1 Future

在Java并发编程中,Future是一种用于表示异步计算结果的接口。它允许你提交一个任务并且在将来的某个时候获取任务的结果。Future的原理是通过一个占位符来表示异步操作的结果,在任务完成之前,可以通过Future对象获取占位符,并且在需要的时候等待任务的完成并获取结果。Future接口定义了异步计算结果的标准,具体的异步计算由实现了Future接口的类来执行,比如ExecutorService的submit方法会返回一个Future对象,用于跟踪任务的执行状态和结果。

Future提供了以下主要方法:

  • isDone():判断任务是否已经完成。
  • cancel(boolean mayInterruptIfRunning):取消任务的执行。
  • get():获取任务的执行结果,在任务完成之前会阻塞当前线程。
  • get(long timeout, TimeUnit unit):获取任务的执行结果,但最多等待指定的时间,超时后会抛出TimeoutException。

看看下面这个代码示例:

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
 
public class FutureParallelExample {
    public static void main(String[] args) throws Exception {
        ExecutorService executor = Executors.newFixedThreadPool(2);
 
        Callable<Integer> task1 = () -> {
            // 模拟耗时计算
            Thread.sleep(2000);
            return 10;
        };
 
        Callable<Integer> task2 = () -> {
            // 模拟耗时计算
            Thread.sleep(3000);
            return 20;
        };
 
        Future<Integer> future1 = executor.submit(task1);
        Future<Integer> future2 = executor.submit(task2);
 
        // 异步执行,继续执行下面的代码
        System.out.println("Asynchronous computation is executing.");
 
        // 获取第一个任务的结果
        Integer result1 = future1.get(); // 这将会阻塞直到任务1完成
        System.out.println("Task 1 result: " + result1);
 
        // 获取第二个任务的结果
        Integer result2 = future2.get(); // 这将会阻塞直到任务2完成
        System.out.println("Task 2 result: " + result2);
 
        // 关闭ExecutorService
        executor.shutdown();
    }
}

在这个例子中,启动了两个异步任务,并分别获取了它们的 Future 对象。通过 Future.get() 方法,我们可以等待任务完成并获取结果。ExecutorService 使用了一个固定的线程池,大小为2。这意味着两个任务将会并行执行。

3.2 Fork / Join

Fork / Join 框架是Java 7中新增的并发编程工具,主要有两个步骤,第一是fork:将一个大任务分成很多个小任务;第二是 join:将第一个任务的结果 join 起来,生成最后的结果。如果第一步中并没有任何返回值,join将会等到所有的小任务都结束。

斐波那契数列由意大利数学家斐波那契首次提出,这个数列从第三项开始,每一项都等于前两项之和,通常以递归方式定义,即F(0)=1,F(1)=1,对于n>=2的任何正整数n,F(n)=F(n-1)+F(n-2),数列的前几个数字是1,1,2,3,5,8,13,21,34。我们尝试使用递归计算第n项的数值,代码如下:

/**
* 递归实现斐波那契数列
**/
public class FibonacciRecursion {
    public static int fibonacciRecursive(int n) {
        if (n <= 1)
            return n;
        return fibonacciRecursive(n - 1) + fibonacciRecursive(n - 2);
    }
 
    public static void main(String[] args) {
        int n = 10;
        System.out.println("Fibonacci of " + n + " is " + fibonacciRecursive(n));
    }
}

以上代码输出结果是:55。

import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;
/**
* fork/join实现斐波那契数列
**/
public class FibonacciFork extends RecursiveTask<Integer> {
    final int n;
 
    public FibonacciFork(int n) {
        this.n = n;
    }
 
    @Override
    protected Integer compute() {
        if (n <= 1)
            return n;
        FibonacciFork f1 = new FibonacciFork(n - 1);
        FibonacciFork f2 = new FibonacciFork(n - 2);
 
        f1.fork();
        return f2.compute() + f1.join();
    }
 
    public static void main(String[] args) {
        ForkJoinPool pool = new ForkJoinPool();
        FibonacciFork fib = new FibonacciFork(10);
        Integer result = pool.invoke(fib);
        System.out.println(result);
    }
}

以上代码中,定义了RecursiveTask的子类FibonacciFork类,用于计算斐波那契数列的第n项。在main方法中,创建了一个ForkJoinPool并提交了任务执行。这个任务会递归地分解成更小的子任务,并且使用fork/join模式来并行处理这些子任务,最后通过join方法获取子任务的结果并累加,输出结果是:55。

3.3 Stream API

Java 8 加入了新特性 Stream API(叫做流式计算或并行流),极大地提升了处理集合数据的灵活性与效率。Stream API 简化了集合操作的代码量,还通过 lambda 表达式增强了函数式编程风格,核心逻辑是将数据集合分成多个小块,然后在多个处理器上并行处理,最后将结果合并成一个结果集。Java Stream API 的底层原理主要涉及两个方面:流的管道化操作和惰性求值:

  • 流的管道化操作:Java Stream API 提供了一种功能强大的管道化操作模式,可以通过一系列的中间操作和终端操作对数据进行处理。这些操作可以串联起来形成一个流水线,每个中间操作都会生成一个新的流,而终端操作则会触发实际的计算。这种管道化操作的设计允许开发者通过简单的链式调用实现复杂的数据处理逻辑,同时也方便了 JVM 在内部进行优化,例如进行流的并行处理以提高性能。
  • 惰性求值:Java Stream API 采用了惰性求值的策略,也就是说中间操作并不会立即触发实际的计算,而是在终端操作被调用时才开始进行计算。这种设计使得 Stream API 可以在需要的时候才对数据进行处理,从而避免了不必要的计算开销。另外,惰性求值还使得 Stream API 具备了延迟特性,即使是处理大规模数据时也可以节省内存和计算资源。

我们来看看简单的代码示例:

public class StreamDemo {

    public static void main(String[] args) {
        Stream.of(1, 2, 3, 4, 5, 6, 7, 8, 9)
              .reduce((a, b) -> a + b)
              .ifPresent(System.out::println); // 输出结果:45
    }
}

上面代码创建了一个包含1到9整数的并行流,然后通过 reduce 方法计算所有数字的和,并打印结果。在默认情况下,这些操作是在单线程中按顺序逐个执行的。

public class StreamParallelDemo {
    public static void main(String[] args) {
        Stream.of(1, 2, 3, 4, 5, 6, 7, 8, 9)
              .parallel()
              .reduce((a, b) -> a + b)
              .ifPresent(System.out::println);
    }
}

上面代码调用了 parallel() 后,reduce()方法内部逻辑发生了变化,它会根据当前线程池资源分配任务,并行地在不同的工作线程上执行累加操作,而不是串行执行的。

Java 并行流是基于 Fork/Join 框架实现的,它使用了多线程来处理流操作。在多核环境中,Fork/Join 框架会根据系统资源自动调整任务分配,尽可能多地利用空闲核心,更充分地发挥硬件潜力。比如,当CPU具有8个内核时,并行计算的耗时远小于串行计算耗时的8倍,但是由于线程创建、销毁以及上下文切换等开销,实际性能提升并非线性。

并行计算并不总是适用于所有场景,特别是在数据集较小或者任务分解后产生的子任务粒度较小时,线程管理的开销可能超过并行计算带来的优势。如果硬件只有单核或少核,则并行计算效果有限甚至可能会因线程切换而降低效率。综合考虑以下因素:

  • 数据量:对于大规模数据集,尤其是需要复杂运算的任务,采用并行计算可以显著提高执行速度。
  • 硬件配置:确保运行环境为多核处理器,不适用于 IO 密集型操作,仅适用于 CPU 密集型操作。
  • 任务性质:若任务可以轻松拆分为独立的子任务,并且结果合并相对简单,更适合应用并行计算。
  • 系统负载:在高负载系统中,要避免过度增加并发,以免引发资源竞争和瓶颈问题。

3.4 CompletableFuture

CompletableFuture是一个实现了Future接口的类,它提供了一种更加灵活和强大的方式来进行异步编程。CompletableFuture可以用来表示一个异步计算的结果,并且提供了丰富的方法来处理异步操作的完成、组合多个异步操作、处理异常等。CompletableFuture相比于传统的Future接口,具有以下优势:

  1. 更加灵活的方法链:CompletableFuture提供了一系列的方法,可以链式地进行异步操作,比如thenApply、thenAccept、thenCompose等,使得代码更加简洁清晰。
  2. 组合多个异步操作:CompletableFuture允许你组合多个异步操作,可以按照顺序执行、并行执行,或者根据一定的条件来执行。
  3. 异常处理:CompletableFuture提供了exceptionally和handle等方法来处理异步操作中的异常情况,使得异常处理变得更加灵活。
  4. 支持回调函数:你可以通过thenApply、thenAccept等方法设置回调函数,以便在异步操作完成时执行特定的操作。
  5. 可编程式地完成异步操作:CompletableFuture提供了complete、completeExceptionally等方法,可以手动地完成异步操作,从而更加灵活地控制异步任务的执行过程。

我们看看简单的代码示例:

  • 简单的异步任务
import java.util.concurrent.CompletableFuture;

public class CompletableFutureExample {

    public static void main(String[] args) {
        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
            // 异步任务,返回结果为100
            return 100;
        });

        // 在任务完成后输出结果
        future.thenAccept(result -> System.out.println("异步任务结果为:" + result));
    }
}
  • 组合多个CompletableFuture
import java.util.concurrent.CompletableFuture;

public class CompletableFutureExample {

    public static void main(String[] args) {
        CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
            // 异步任务1,返回结果为100
            return 100;
        });

        CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
            // 异步任务2,返回结果为200
            return 200;
        });

        // 将两个异步任务的结果相加
        CompletableFuture<Integer> combinedFuture = future1.thenCombine(future2, (result1, result2) -> result1 + result2);

        // 在组合任务完成后输出结果
        combinedFuture.thenAccept(result -> System.out.println("两个异步任务的结果之和为:" + result));
    }
}
  • 处理异常情况
import java.util.concurrent.CompletableFuture;

public class CompletableFutureExample {

    public static void main(String[] args) {
        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
            // 模拟一个可能发生异常的异步任务
            if (Math.random() < 0.5) {
                throw new RuntimeException("Oops! Something went wrong.");
            }
            return 100;
        });

        // 处理异常情况
        future.exceptionally(throwable -> {
            System.out.println("异步任务发生异常:" + throwable.getMessage());
            return null; // 返回默认值或者做其他处理
        });

        // 在任务完成后输出结果
        future.thenAccept(result -> System.out.println("异步任务结果为:" + result));
    }
}
  • 自定义线程池
ExecutorService executorService = Executors.newSingleThreadExecutor();
        CompletableFuture<Void> voidCompletableFuture = CompletableFuture.runAsync(() -> {
            try {
                System.out.println("执行异步操作。。。");
                Thread.sleep((long) (Math.random() * 1000));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, executorService);
        System.out.println("结果:"+voidCompletableFuture.get());

这些示例展示了使用CompletableFuture进行异步编程的一些常见用法,包括简单的异步任务、组合多个CompletableFuture、处理异常情况等。总的来说,CompletableFuture是Java并发编程中一个强大而灵活的工具,它使得异步编程变得更加简单、清晰和可控。

4 总结

要更好地掌握Java并发编程技能,可以采取以下几个步骤:

  1. 学习基础知识: 对Java并发编程的基本概念和术语有清晰的理解,比如线程、锁、同步、并发问题等。可以通过阅读相关的书籍、教程或者在线课程来学习。
  2. 熟悉并发工具类: Java提供了丰富的并发工具类,比如 ThreadRunnableExecutorThreadPoolExecutorSemaphoreCountDownLatch等。深入了解这些工具类的使用方法和特性,以及在不同场景下的应用。
  3. 掌握多线程编程: 多线程编程是Java并发编程的核心,要熟练掌握如何创建线程、管理线程生命周期、线程同步和通信等技术。了解线程的状态、优先级、调度方式等概念,以及如何避免常见的多线程问题,比如死锁、竞态条件等。
  4. 深入理解并发模型: 了解并发模型,比如共享内存模型和消息传递模型,以及它们的优缺点。掌握在这些模型下如何设计和实现并发程序。
  5. 学习并发设计模式: 掌握常见的并发设计模式,比如生产者-消费者模式、读写锁模式、工作窃取模式等。了解这些模式的原理和实现方式,以及在实际项目中的应用。
  6. 实践项目经验: 通过实际项目来锻炼并发编程技能,尝试在项目中应用所学的知识解决实际的并发问题。可以选择一些开源项目或者自己构建小型项目来练习。

参考

https://zhuanlan.zhihu.com/p/622768247
https://www.cnblogs.com/badaoliumangqizhi/p/17021500.html
https://blog.csdn.net/weixin_44073836/article/details/123346035
https://blog.csdn.net/m0_71149992/article/details/125327370
https://www.php.cn/faq/530720.html
https://zhuanlan.zhihu.com/p/424501870
https://zhuanlan.zhihu.com/p/339472446
https://zhuanlan.zhihu.com/p/622218563