0%

[TOC]

cgroups学习笔记

一、名词解释

  • hierarchy 类似文件书树的树形结构,树上的节点表示cgroup,hierarchy与一个或多个subsystem,即资源控制器关联,子节点的资源不能超过父节点的资源限制
  • cgroup 树形结构上的节点,表示共享资源限制的一组task
  • task cgroup中的进程
  • subsystem 资源控制器,控制不同系统硬件资源
    • blkio 限制对磁盘,固态硬盘,usb等存储设备的访问速率
    • cpu 限制cpu时间片占比
    • cpuacct 生成cgroup中进程(task)的cpu资源使用情况
    • cpuset 将独立的cpu和内存结点分配给cgroup
    • devices 控制cgroup对设备的访问权限
    • freezer 挂起或恢复cgroup中进程
    • memory 限制cgroup的内存使用限制并生成使用情况报告
    • net_cls 给网络包打赏标签classid,用于区分不同cgroup的网络包
    • net_prio 动态设置网络流量的优先级
    • ns namespace subsystem,后续单独讲到
    • perf_event 识别task所属的cgroup,并用于性能分析

二、hierarchy、cgroup、subsystem、task的规则

规则 示意图
同一个hierarchy可以关联多个subsystem Rule 1
一个subsystem不能同时关联到两个hierarchy上,前提是其中任意一个已经关联了一个subsystem Rule 2—The numbered bullets represent a time sequence in which the subsystems are attached.
一个task可以同时是两个hierarchy中的cgroup的成员,但不能同时是同一个hierarchy中两个cgroup的成员 Rule 3
进程fork出的子进程继承原进程的cgroup设置,fork完成后可独立修改 Rule 4—The numbered bullets represent a time sequence in which the task forks.

上述规则带来的结果:

  • 由于一个task在一个hierarchy内只能属于一个cgroup,也就意味着这个hierarchy关联的资源控制器对于这个task只有一种限制方式
  • 可以将多个subsystem组合在一起对一个hierarchy起作用
  • 可以将一个hierarchy关联的多个subsystem拆分出去并关联其他的hierarchy,类似的也可以进行合并
  • 可以对一个task在所有subsystem管理的资源上进行指定的配置

三、cgroups的使用

通过安装libcgroup可以方便的配置cgroups。cgconfig服务利用/etc/cgconfig.conf配置文件和/etc/cgconfig.d/文件夹中的配置文件控制hierarchy的创建,subsystem的关联,cgroup的创建。
hierarchy的挂载示例如下:

1
mount {
2
	cpuset	= /cgroup/cpuset;
3
	cpu	= /cgroup/cpu;
4
	cpuacct	= /cgroup/cpuacct;
5
	memory	= /cgroup/memory;
6
	devices	= /cgroup/devices;
7
	freezer	= /cgroup/freezer;
8
	net_cls	= /cgroup/net_cls;
9
	blkio	= /cgroup/blkio;
10
}

cgroup创建的示例如下:

1
group daemons {
2
     cpuset {
3
         cpuset.mems = 0;
4
         cpuset.cpus = 0;
5
     }
6
}
7
group daemons/sql {
8
     perm {
9
         task {
10
             uid = root;
11
             gid = sqladmin;
12
         } admin {
13
             uid = root;
14
             gid = root;
15
         }
16
     }
17
     cpuset {
18
         cpuset.mems = 0;
19
         cpuset.cpus = 0;
20
     }
21
}

lssubsys命令可以列出所有subsystem以及其目前关联的hierarchy。

使用mount命令控制subsystem与hierarchy的关联,包括新增,删除等。

使用unmount可以删掉hierarchy,使用cgclear可以在hierarchy仍然有cgroup的情况下将其删除。

其他命令不再赘述,可参考参考资料。

四、subsystem调参

由于subsystem数量较多,下面主要写一下cpu和memory这两个。

4.1 cpu

假设现有一个叫/cgroup/cpu的hierarchy,关联了cpu subsyatem。

限制一个cgroup使用25%cpu,另一个cgroup使用75%cpu,示例如下:

1
~]# echo 250 > /cgroup/cpu/blue/cpu.shares
2
~]# echo 750 > /cgroup/cpu/red/cpu.shares

限制一个cgroup使用整个cpu示例如下:

1
~]# echo 10000 > /cgroup/cpu/red/cpu.cfs_quota_us
2
~]# echo 10000 > /cgroup/cpu/red/cpu.cfs_period_us

限制一个cgroup使用单个cpu的10%示例如下

1
~]# echo 10000 > /cgroup/cpu/red/cpu.cfs_quota_us
2
~]# echo 100000 > /cgroup/cpu/red/cpu.cfs_period_us

在多核机器上允许一个cgroup使用两个cpu示例如下:

1
~]# echo 200000 > /cgroup/cpu/red/cpu.cfs_quota_us
2
~]# echo 100000 > /cgroup/cpu/red/cpu.cfs_period_us

4.2 memory

下面的示例展示在一个限制了memory使用上限的cgroup中的进程如何被oom killer杀死并发送通知的。

  1. 创建一个cgroup并关联memory subsystem

    1
    ~]# mount -t cgroup -o memory memory /cgroup/memory
    2
    ~]# mkdir /cgroup/memory/blue
  2. 设置cgroup中的进程使用内存的上线为100m

    1
    ~]# echo 104857600 > memory.limit_in_bytes
  3. 进入cgroup目录确认oom killer已开启

    1
    ~]# cd /cgroup/memory/blue
    2
    blue]# cat memory.oom_control
    3
    oom_kill_disable 0
    4
    under_oom 0
  4. 将当前shell进程移动至cgroup中的tasks文件中,这样在这个shell中启动的程序就自动加入了这个cgroup

    1
    blue]# echo $$ > tasks
  5. 启动一个测试程序,测试程序尝试分配大量内存超出限制。一旦达到cgroup的内存上线就会被oom killer杀死。

    1
    blue]# ~/mem-hog
    2
    Killed

    测试程序如下:

    1
    #include <stdio.h>
    2
    #include <stdlib.h>
    3
    #include <string.h>
    4
    #include <unistd.h>
    5
    6
    #define KB (1024)
    7
    #define MB (1024 * KB)
    8
    #define GB (1024 * MB)
    9
    10
    int main(int argc, char *argv[])
    11
    {
    12
    	char *p;
    13
    14
    again:
    15
    	while ((p = (char *)malloc(GB)))
    16
    		memset(p, 0, GB);
    17
    18
    	while ((p = (char *)malloc(MB)))
    19
    		memset(p, 0, MB);
    20
    21
    	while ((p = (char *)malloc(KB)))
    22
    		memset(p, 0,
    23
    				KB);
    24
    25
    	sleep(1);
    26
    27
    	goto again;
    28
    29
    	return 0;
    30
    }
  6. 关闭oom killer,然后再启动测试程序,测试程序用完内存后会一直暂停,直到有新的可用内存出现

    1
    blue]# echo 1 > memory.oom_control
    2
    blue]# ~/mem-hog
  7. 尽管测试程序没有被杀死,但cgroup的under_oom状态已经表明达到了内存上限。重新启动oom killer会将测试进程立刻杀死

    1
    ~]# cat /cgroup/memory/blue/memory.oom_control
    2
    oom_kill_disable 1
    3
    under_oom 1
  8. 为了获得oom的通知,创建一个接收通知的程序oom_notification

    1
    #include <sys/types.h>
    2
    #include <sys/stat.h>
    3
    #include <fcntl.h>
    4
    #include <sys/eventfd.h>
    5
    #include <errno.h>
    6
    #include <string.h>
    7
    #include <stdio.h>
    8
    #include <stdlib.h>
    9
    10
    static inline void die(const char *msg)
    11
    {
    12
    	fprintf(stderr, "error: %s: %s(%d)\n", msg, strerror(errno), errno);
    13
    	exit(EXIT_FAILURE);
    14
    }
    15
    16
    static inline void usage(void)
    17
    {
    18
    	fprintf(stderr, "usage: oom_eventfd_test <cgroup.event_control> <memory.oom_control>\n");
    19
    	exit(EXIT_FAILURE);
    20
    }
    21
    22
    #define BUFSIZE 256
    23
    24
    int main(int argc, char *argv[])
    25
    {
    26
    	char buf[BUFSIZE];
    27
    	int efd, cfd, ofd, rb, wb;
    28
    	uint64_t u;
    29
    30
    	if (argc != 3)
    31
    		usage();
    32
    33
    	if ((efd = eventfd(0, 0)) == -1)
    34
    		die("eventfd");
    35
    36
    	if ((cfd = open(argv[1], O_WRONLY)) == -1)
    37
    		die("cgroup.event_control");
    38
    39
    	if ((ofd = open(argv[2], O_RDONLY)) == -1)
    40
    		die("memory.oom_control");
    41
    42
    	if ((wb = snprintf(buf, BUFSIZE, "%d %d", efd, ofd)) >= BUFSIZE)
    43
    		die("buffer too small");
    44
    45
    	if (write(cfd, buf, wb) == -1)
    46
    		die("write cgroup.event_control");
    47
    48
    	if (close(cfd) == -1)
    49
    		die("close cgroup.event_control");
    50
    51
    	for (;;) {
    52
    		if (read(efd, &u, sizeof(uint64_t)) != sizeof(uint64_t))
    53
    			die("read eventfd");
    54
    55
    		printf("mem_cgroup oom event received\n");
    56
    	}
    57
    58
    	return 0;
    59
    }
  9. 再另一个控制台运行接收通知的程序oom_notification ,再在另一个控制台运行测试程序,可以看到在oom_notification 程序的std output中展示字符串”mem_cgroup oom event received“

    1
    ~]$ ./oom_notification /cgroup/memory/blue/cgroup.event_control /cgroup/memory/blue/memory.oom_control
    1
    blue]# ~/mem-hog

五、使用场景

  • 使用blkio subsyatem提升数据量io的优先级
  • 使用net_prio subsystem修改网络流量的优先级
  • 给不同的用户组按比例分配cpu和内存

参考资料

https://access.redhat.com/documentation/en-us/red_hat_enterprise_linux/6/html/resource_management_guide/index

JAVA-从异步编程到响应式编程

Q:响应式编程带来了什么好处?
A:当并发连接增加,很多时候我们是通过多线程、线程池的方式来提升性能,然而线程数增加后系统开销也增加了,而且很多场景下线程大部分是执行IO操作,其大部分时间是处于idle状态的,资源利用率不高。响应式编程通过引入非阻塞non-blocking的特性,实现了线程资源的充分利用。

这是一幅盗来的图^1,展示了响应式编程的发展历程,尤其是在JVM或者说JAVA领域。这里面比较核心的就是Reactive Streams规范、ReacticeX项目及其包含的RxJava、Project Reactor项目、JAVA方面的CompletableFuture API和Flow API。

CompletableFuture API and Reactive Programming Timeline

一、CompletableFuture

首先,CompletableFuture是Future,意味着它支持任务异步执行。在此基础上,它还支持异步任务之间的依赖关系、异步任务的回调等。对于日常业务逻辑,使用CompletableFuture能够通过将业务逻辑异步化,来提升cpu利用率、缩短响应时间,达到提升性能的目的。

那么CompletableFuture和响应式编程的区别是什么呢?

关键区别在于Stream这个概念上,CompletableFuture描述的是单次执行的结果,尽管可以通过异步任务之间的依赖关系构建一串异步任务组成的执行图,本质上面向的是单次操作,例如一次rpc请求的处理^1

Java CompletableFuture API Example Invoice Path

响应式编程则面向的是Stream,或者说叫“流”,也即是数据或事件的序列,有点类似JAVA的Stream API。

当然用CompletableFuture也能够实现对数据/事件流的处理,但是有一个关键的特性是响应式编程具备而CompletableFuture则需要重新实现的,并且实现起来很复杂,那就是back pressure。back pressure就是在下游无法承载上游的压力时,采取一些措施。

至于Java中提供的CallBack、Future机制在实现响应式编程中的问题和缺点,例如“callback hell”,不支持lazy computation等,可以从https://projectreactor.io/docs/core/release/reference/#getting-started中找到答案。

所以,我们需要一套框架或者说类库来更好的实现响应式编程,需要具备以下特性:

  • 支持异步任务的封装及组装,需要API来对异步任务进行包装,并且需要很多种算子来对异步任务进行组装,包括过滤、组装、异常处理等算子
  • 减少异步任务的嵌套,减少代码的复杂性,增加可读性,避免“callback hell”等复杂度很高的代码
  • 支持背压back pressure,也就是下游和上游之间能协商数据流的速度

二、响应式编程-从ReactiveX到RxJava

项目主页:http://reactivex.io/

github:https://github.com/ReactiveX?page=2

官网定义如下:

ReactiveX is a collection of open source projects. The content of this page is licensed under Creative Commons Attribution 3.0 License, and code samples are licensed under the BSD License.

ReactiveX is a combination of the best ideas from the Observer pattern, the Iterator pattern, and functional programming

ReactiveX is a library for composing asynchronous and event-based programs by using observable sequences.

ReactiveX基于观察者模式、迭代者模式和函数式编程的组合,为构建异步的基于事件的应用提供了一套类库,并且它是一个多语言的项目,例如,JAVA中对应的就是RxJava类库。

2.1 核心概念

2.1.1 Observable^2

img

如上图所示,一个Observable其实就是一个产生数据流的对象,它能够转换为另一个Observable。

Observable可以被Observer订阅(subscribe),一个完整的Observer订阅Observable的例子的伪代码如下:

1
def myOnNext     = { item -> /* do something useful with item */ };
2
def myError      = { throwable -> /* react sensibly to a failed call */ };
3
def myComplete   = { /* clean up after the final response */ };
4
def myObservable = someMethod(itsParameters);
5
myObservable.subscribe(myOnNext, myError, myComplete);
6
// go on about my business

其中onNext, onError, onComplete三个方法是Observer需要实现的。分别用于对数据流的正常处理,异常处理,完成操作。

2.1.2 Observable的各种算子(Operator)

算子可用于连接Observable,或者改变Observable的行为。Operator能够将Observable串联起来,形成一条chain。

ReactiveX支持的算子数量较多,这里就不展开了,可以到 http://reactivex.io/documentation/operators.html 这里查看。

2.1.3 Single

Single是RxJava中引进的类似Observable,但只产生一个数据的对象,因此订阅它的Observer只需要实现onSuccess,onError方法即可。

Single也有很多对应的算子,可用于创建Single、对Single进行转换、组装、连接、延时等,具体可查看:http://reactivex.io/documentation/single.html

2.1.4 Subject

http://reactivex.io/documentation/subject.html

一个Subject既是Observable,又是Observer,表明它既可以订阅一个或多个Observable,又可以作为Observable将其订阅到的数据流重新发出,因此它类似一种桥梁或者说代理。

2.1.5 Scheduler

http://reactivex.io/documentation/scheduler.html

Rx默认是单线程的,整条链路上的调用都在Subscribe方法被调用的线程中执行。不过一些算子接收Scheduler作为参数,控制算子在哪个Scheduler上执行,这里可以认为Scheduler是某种线程池,提供了多线程资源。

SubscribeOn算子指定Observable开始执行的线程,而SubscribeOn算子在链路的什么地方被调用并不重要。

ObserveOn算子指定了从该调用开始往后的算子所在的执行线程,如果要改变线程,则需要重新调用ObserveOn,指定一个新的Scheduler。

2.2 RxJava

github:https://github.com/ReactiveX/RxJava

github wiki:https://github.com/ReactiveX/RxJava/wiki

RxJava是ReactiveX项目的一部分,它也遵守Reactive Streams规范。不要以为RxJava是只针对Java语言的,RxJava本身也是支持多语言的,支持一堆基于JVM的语言。下面是RxJava的最最最入门级demo,包括Observable的创建,组装,订阅:

1
public class RxJavaDemo {
2
    
3
    public static void main(String[] args) {
4
        RxJavaDemo demo = new RxJavaDemo();
5
        demo.hello("pcl", "j");
6
        demo.customObservableBlocking().subscribe(s -> System.out.println(s));
7
        demo.customObservableNonBlocking().subscribe(s -> System.out.println(s));
8
        demo.simpleComposition();
9
    }
10
    /**
11
     * hello world
12
     *
13
     * @param args
14
     */
15
    private void hello(String... args) {
16
        Flowable.fromArray(args).subscribe(s -> System.out.println("Hello " + s + "!"));
17
    }
18
    /**
19
     * Creating an Observable from an Existing Data Structure
20
     */
21
    private void createObservables() {
22
        Observable<String> o1 = Observable.fromArray(new String[]{"a", "b", "c"});
23
24
        Integer[] list = {5, 6, 7, 8};
25
        Observable<Integer> o2 = Observable.fromArray(list);
26
27
        Observable<String> o3 = Observable.just("one object");
28
    }
29
    /**
30
     * This example shows a custom Observable that blocks
31
     * when subscribed to (does not spawn an extra thread).
32
     *
33
     * @return
34
     */
35
    private Observable<String> customObservableBlocking() {
36
        return Observable.<String>create(emitter -> {
37
            for (int i = 0; i < 50; i++) {
38
                emitter.onNext("count_" + i);
39
            }
40
            emitter.onComplete();
41
        });
42
    }
43
    /**
44
     * 创建一个非阻塞的Observable
45
     *
46
     * @return
47
     */
48
    private Observable<String> customObservableNonBlocking() {
49
        return Observable.<String>create(emitter -> {
50
            new Thread() {
51
                @Override
52
                public void run() {
53
                    for (int i = 0; i < 50; i++) {
54
                        emitter.onNext("count_" + i);
55
                    }
56
                    emitter.onComplete();
57
                }
58
            }.start();
59
        });
60
    }
61
		/**
62
     * 用算子对Observable进行组装,或者转变
63
     */
64
    private void simpleComposition() {
65
        customObservableNonBlocking().skip(10).take(5)
66
                .map(stringValue -> {return stringValue + "_xform";})
67
                .subscribe(s -> System.out.println("onNext => " + s));
68
    }
69
}

其中simpleComposition()方法的marble diagram如下图所示^3,在reactive编程模型中,常常用marble diagram表示数据流的流转。

img

2.2.1 异常处理

Observable一般不抛出异常,而是发出一个onError通知,不过有些情况下,例如调用onError()方法本身失败的话,会抛出RuntimeException、OnErrorFailedException、OnErrorNotImplementedException异常。

可以使用异常处理算子来处理onError()方法通知的异常。subscribe的时候可以传入第二个方法处理异常。

2.2.1 RxJava的算子

只能说RxJava支持的算子很多,就不多说了,当然你也可以自己写新的算子,这就属于比较难的操作了。

2.2.2 back pressure

其实从上面一路写下来,有没有发现RxJava和Stream API非常相似,我是这么觉得的。但是这里就要说一说RxJava所具备的背压这一特性,是响应式编程的非常重要的一个特性。

back pressure就是在下游无法承载上游的压力时,采取一些措施,通知上游放慢速度。

1
PublishProcessor<Integer> source = PublishProcessor.create();
2
3
    source
4
    .observeOn(Schedulers.computation())
5
    .subscribe(v -> compute(v), Throwable::printStackTrace);
6
7
    for (int i = 0; i < 1_000_000; i++) {
8
        source.onNext(i);
9
    }
10
11
    Thread.sleep(10_000);

这段代码会抛出MissingBackpressureException异常,因为PublishProcessor不支持背压,而observeOn算子内部的buffer是有边界的,当PublishProcessor产生数据的速度超过计算的速度时,数据会存在observeOn的内部buffer中,当溢出时就抛出MissingBackpressureException异常。

如果改成下面的代码则可以正常运行,因为Flowable.range支持背压,range可以和observeOn之间通过类似协程的机制,协商应该以什么样的速度产生数据。具体的机制是,range通过调用observeOn的onSubscribe方法发送一个callback方法(org.reactivestreams.Subscription接口的实现)给订阅者,observeOn回调Subscription.request(n)方法告诉range产生n个数据。

1
Flowable.range(1, 1_000_000)
2
.observeOn(Schedulers.computation())
3
.subscribe(v -> compute(v), Throwable::printStackTrace);
4
5
Thread.sleep(10_000);

下面通过一个更显式的例子来说明,先调用onStart方法要求range发送一个数据,然后异步调用onNext方法进行数据计算,并再发一个request要求一个数据,注意onStart中request执行完后,range就会发送数据,会异步触发onNext,若此时onStart中初始化操作尚未完成,则可能产生异常

1
Flowable.range(1, 1_000_000)
2
                .subscribe(new DisposableSubscriber<Integer>() {
3
                    @Override
4
                    public void onStart() {
5
                        request(1);
6
                    }
7
                    public void onNext(Integer v) {
8
                        v = v * 2;
9
                        System.out.println(v);
10
                        request(1);
11
                    }
12
                    @Override
13
                    public void onError(Throwable ex) {
14
                        ex.printStackTrace();
15
                    }
16
                    @Override
17
                    public void onComplete() {
18
                        System.out.println("Done!");
19
                    }
20
                });
2.2.2.1 解决背压问题-增加buffer size
1
PublishProcessor<Integer> source = PublishProcessor.create();
2
3
    source.observeOn(Schedulers.computation(), 1024 * 1024)
4
          .subscribe(e -> { }, Throwable::printStackTrace);
5
6
    for (int i = 0; i < 1_000_000; i++) {
7
        source.onNext(i);
8
    }

仍然有机会产生MissingBackpressureException

2.2.2.2 解决背压问题-批量算子/采样算子
1
PublishProcessor<Integer> source = PublishProcessor.create();
2
3
    source
4
          .buffer(1024)
5
          .observeOn(Schedulers.computation(), 1024)
6
          .subscribe(list -> { 
7
              list.parallelStream().map(e -> e * e).first();
8
          }, Throwable::printStackTrace);
9
10
    for (int i = 0; i < 1_000_000; i++) {
11
        source.onNext(i);
12
    }
13
    
14
PublishProcessor<Integer> source = PublishProcessor.create();
15
16
    source
17
          .sample(1, TimeUnit.MILLISECONDS)
18
          .observeOn(Schedulers.computation(), 1024)
19
          .subscribe(v -> compute(v), Throwable::printStackTrace);
20
21
    for (int i = 0; i < 1_000_000; i++) {
22
        source.onNext(i);
23
    }

仍然有机会产生MissingBackpressureException

2.2.2.3 解决背压问题-onBackpressureXXX 算子

onBackpressureBuffer()
onBackpressureBuffer(int capacity)
onBackpressureBuffer(int capacity, Action onOverflow)
onBackpressureBuffer(int capacity, Action onOverflow, BackpressureOverflowStrategy strategy)
onBackpressureDrop()
onBackpressureLatest()

还有一个办法是创建支持背压的数据源。

三、响应式编程-从Reactive Streams到Project Reactor

上文提到RxJava属于ReactiveX项目,也遵守Reactive Streams规范,这里就介绍一下Reactive Streams规范,以及在其基础上的Project Reactor。

3.1 Reactive Streams

主页:https://www.reactive-streams.org/

Reactive Streams is an initiative to provide a standard for asynchronous stream processing with non-blocking back pressure. This encompasses efforts aimed at runtime environments (JVM and JavaScript) as well as network protocols.

Reactive Streams是一套规范,为异步流处理及非阻塞的背压提供了标准。

对于Java程序员,Reactive Streams是一个API。Reactive Streams为我们提供了Java中的Reactive Programming的通用API。Reactive Streams API中仅仅包含了如下四个接口^4

1
//发布者
2
public  interface  Publisher < T > {
3
	public  void  subscribe(Subscriber <?super  T >  s);
4
}
5
6
//订阅者
7
public  interface  Subscriber < T > {
8
	public  void  onSubscribe(Subscription  s);
9
	public  void  onNext(T  t);
10
	public  void  onError(Throwable  t);
11
	public  void  onComplete();
12
}
13
14
//表示Subscriber消费Publisher发布的一个消息的生命周期
15
public interface Subscription {
16
	public void request(long n);
17
	public void cancel();
18
}
19
20
//处理器,表示一个处理阶段,它既是订阅者也是发布者,并且遵守两者的契约
21
public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {
22
	
23
}

https://github.com/reactive-streams/reactive-streams-jvm 定义了Reactive Streams的JVM规范。

Java9中Flow类下的内容与Reactive Streams完全一致,这部分留到后面研究。

其他还有AKKA Streams支持Reactive Streams。

Publisher-Subscriber接口既有观察者模式,又包含了迭代器模式。Subscriber通过订阅的方式实现了观察者模式。由Publisher通过push元素实现对元素的迭代,而相应的Java中的Iterator是通过pull的方式实现迭代。push的方式是响应式编程的关键,Publisher通过调用Subscriber的onNext方法通知Subscriber下一个元素,通过onError通知异常,通过onComplete通知完成。

3.2 Project Reactor

主页:https://projectreactor.io/

Reactor is a fourth-generation reactive library, based on the Reactive Streams
specification, for building non-blocking applications on the JVM

Reactor文档:

https://projectreactor.io/docs/core/release/reference/

Reactor Core运行需要Java8+环境,可以直接和Java8的函数式API交互,包括CompletableFuture,Stream和Duration。它实现了Reactive Streams规范,并提供Flux和Mono两个异步序列API,Flux是支持N个元素的序列,Mono是支持0个或1个的序列(类似RxJava中的Single)。reactor-netty项目支持非阻塞的IPC(inter-process communication),它还提供了支持HTTP(包括WebSockets)、TCP、UDP协议的带有背压特性的网络引擎,支持响应式的边界码,适合微服务框架。

Reactor实现了Reactive Streams规范,因此实现了Publisher, Subscriber接口。需要注意的一点是Publisher创建后,并不开始产生数据,各种算子对Publisher的包装也不会导致数据开始产生,只有当调用链由Subscriber订阅后,内部调用request方法才会触发数据开始流动。

Reactor背压的实现机制和RxJava类似。也是通过上面提到的request机制来进行协商。

3.2.1 Flux与Mono

Flux与Mono是Publisher的实现,其创建代码示例如下:

1
private void createFluxMono() {
2
        Flux<String> seq1 = Flux.just("foo", "bar", "foobar");
3
4
        List<String> iterable = Arrays.asList("foo", "bar", "foobar");
5
        Flux<String> seq2 = Flux.fromIterable(iterable);
6
7
        Mono<String> noData = Mono.empty();
8
9
        Mono<String> data = Mono.just("foo");
10
11
        Flux<Integer> numbersFromFiveToSeven = Flux.range(5, 3);
12
    }

其subscribe代码示例如下:

1
private void subscribe() {
2
  			//最后一个函数表示对每一个元素执行的操作
3
        Flux<Integer> ints1 = Flux.range(1, 3);
4
        ints1.subscribe(i -> System.out.println(i));
5
				//最后一个函数表示异常时执行的操作
6
        Flux<Integer> ints2 = Flux.range(1, 4)
7
                .map(i -> {
8
                    if (i <= 3) return i;
9
                    throw new RuntimeException("Got to 4");
10
                });
11
        ints2.subscribe(i -> System.out.println(i),
12
                error -> System.err.println("Error: " + error));
13
				//最后一个函数表示完成时执行的操作
14
        Flux<Integer> ints3 = Flux.range(1, 4);
15
        ints3.subscribe(i -> System.out.println(i),
16
                error -> System.err.println("Error " + error),
17
                () -> System.out.println("Done"));
18
  			//最后一个函数表示订阅时执行的操作
19
  			Flux<Integer> ints4 = Flux.range(1, 4);
20
        ints4.subscribe(i -> System.out.println(i),
21
                error -> System.err.println("Error " + error),
22
                () -> System.out.println("Done"),
23
                sub -> sub.request(2));
24
    }

以上是Reactor的一些基操,其他部分的内容包括算子的使用、背压的实现、异步的实现、异常处理、测试、Debug以及一些高级特性等,建议还是看官方文档,不是一篇文章能讲完的。

总的来说,Reactor的实现和RxJava在很多方面是相似的。

3.3 Spring WebFlux

Spring Reactive diagram

在Project Reactor提供的类库的基础上,Spring构建了自己的reactive技术栈。Spring WebFlux可以运行在Netty, Undertow服务器上或者Servlet 3.1+ 的web容器中。

再一次推荐官方文档:

https://docs.spring.io/spring/docs/current/spring-framework-reference/web-reactive.html#webflux

参考文献

[TOC]

JAVA并发基础

一、多线程基础

1. 多线程的风险

  • 安全性问题:竞态条件race condition、指令重排序等
  • 活跃性问题:死锁、饥饿、活锁
  • 性能问题:上下文切换、同步机制抑制编译器优化,使内存缓存数据无效,增加共享内存总线的同步流量

2. 线程安全性

线程安全本质上就是对共享且可变的状态进行安全的访问,共享指可以被多个线程访问,可变指状态可以被修改。线程安全有以下特性:

  • 无状态的对象一定是线程安全的
  • 不可变对象一定是线程安全的
  • 有状态的可变对象在并发情况下存在竞态条件,即多个线程可能同时修改同一个变量,可通过原子操作或锁实现线程安全
  • 原子性:对状态的原子性操作可以保证线程安全,例如jdk自带的原子类,或者加锁
  • 锁:通过加锁/解锁操作避免竞态条件,保证原子性,java中的锁有:内部锁/监视器锁(synchronized关键字),监视器锁是可重入的,通过在锁对象上的一个计数器实现;显式锁

2.1 原子性、可见性、顺序性

  • 原子性上面已经提到过,主要是解决了并发情况下的竞态条件
  • 可见性:同步带来的另一个好处是内存可见性,可以避免并发情况下的过期数据、非原子的64位操作等问题
  • 过期数据问题:例如,指令指令重排序会造成一个线程读取到的共享变量是过期的,指令重排序本质上是一个顺序性问题;还有一种可能是CPU多级缓存和内存间的不一致造成读到过期数据
  • 非原子的64位操作问题:当读写非volatile的double或long时,若处理器不支持64位算数原子操作,JVM允许将其分为两个32位的操作,如果没有通过加volatile关键字或没有加同步锁保护,那么可能得到一个值的高32位和另一个值的低32位
  • volatile只能保证可见性,加锁可以保证可见性与原子性
  • 以上提到的原子性、可见性、顺序性问题,会在下一节对JMM的描述中展开来说

2.2 对象的发布publish与逸出escape

一个对象能被外部代码使用称为发布,未准备好的对象被发布称为逸出,例如,未构造完毕的对象被传递给外部引用,对象的发布需要注意以下问题:

  • 不要让this引用在构造期间逸出,this引用在构造期间逸出可能导致对象未构造完毕即能被使用

  • 局部创建对象:例如,多线程共享的对象,构造时由于可见性的问题,可能只完成了部分创建,即其他线程看见的是未完成构造的对象,例如:在double-check-locking(DCL)机制实现的懒汉单例模式中,如果instance变量不加volatile关键字,那么由于步骤(5)对象初始化并赋值给引用过程的指令重排序,可能造成instance指向的是未创建完成的对象时被另一个线程使用,代码如下^5

1
public class LazySingleton {
2
    private int id;
3
    private static LazySingleton instance;
4
    private LazySingleton() {
5
        this.id= new Random().nextInt(200)+1;                 // (1)
6
    }
7
    public static LazySingleton getInstance() {
8
        if (instance == null) {                               // (2)
9
            synchronized(LazySingleton.class) {               // (3)
10
                if (instance == null) {                       // (4)
11
                    instance = new LazySingleton();           // (5)
12
                }
13
            }
14
        }
15
        return instance;                                      // (6)
16
    }
17
    public int getId() {
18
        return id;                                            // (7)
19
    }
20
}

2.3 变量的线程封闭

不需要共享的变量可通过线程封闭来实现线程安全,这样对象不会被其他线程访问到,即不发布对象。

  • 规范的方式是使用ThreadLocal:线程首次调用ThreadLocal.get方法时,会请求initialValue提供一个初始值

2.4 不可变性

  • 不可变对象的条件:不可修改的状态,所有field都是final类型的,正确的构造(没有在构造时发生this引用的逸出)。不可变对象天生是线程安全的。
  • 不可变对象保证了初始化安全性,因为final关键字确保了初始化安全性,即对象发布时不会发生局部创建对象的问题,即保证了可见性
  • 使用volatile发布不可变对象,可实现线程安全,即使用一个不可变对象保存所有变量,而用一个volatile引用指向不可变对象,当需要更新时,直接替换不可变对象,保证了可见性与原子性

2.5 对象的安全发布

非不可变对象需要被安全的发布,对象的安全发布意味着对象的状态必须在被其他线程(除发布线程)引用的同时可见,一个正确创建的对象可以通过以下方式安全发布:

  • 通过静态初始化器初始化对该对象的引用,利用JVM在类加载时的同步机制保证安全发布
  • 将该对象的引用存储到volatile域或AutomicReference:volatile保证了对象的可见性,AutomicReference保证对该对象的操作满足原子性,即存在同步机制,这里说下个人理解:原子性保证了可见性,如果可见性无法保证,那么原子性也无法实现
  • 将该对象的引用存储到正确创建的对象的final域中:final保证了不可变性
  • 将该对象的引用存储到由锁正确保护的域中,例如线程安全的容器类,其他线程在获取放入容器的对象时可以通过同步机制保证对象的安全发布

3. 基本组件

3.1 同步容器

  • Vector、HashTable、Collections.synchronizedXxx
  • 对同步容器的复合操作,例如,put-if-abscent语义的手动实现是不保证线程安全的
  • 迭代器使用过程中如果容器被修改会抛出ConcurrentModificationException,它是fail-fast的,通过维护一个计数器,当计数器被修改则抛出异常
  • 隐藏迭代器:容器的toString、hashCode、equals的方法,容器本身作为元素或作为另一个容器的key时,containsAll、removeAll、retainAll方法以及把容器做为构造函数参数,都会隐式迭代,可能抛出异常

3.2 并发容器

  • ConcurrentHashMap:提供了不会抛出ConcurrentModificationException的迭代器;附加了put-if-abscent、remove-if-equal、replace-if-equal的原子操作
  • ConcurrentSkipListMap, ConcurrentSkipListSet
  • CopyOnWriteArrayList, CopyOnWriteArraySet:copy-on-write容器在每次需要修改时创建并重新发布一个新的容器拷贝
  • ConcurrentLinkedQueue

3.3 阻塞队列与生产者-消费者模式

  • LinkedBlockingQueue, ArrayBlockingQueue
  • PriorityBlockingQueue
  • SynchronousQueue:不存储队列元素,适合消费者充足的场景

3.4 双端队列与工作窃取

这个在Fork/Join框架中展开。

3.5 阻塞和可中断的方法

线程阻塞挂起时,被设置成某个状态(BLOCKED, WAITING, TIMED_WAITING)。

BlockingQueue的put和take方法会抛出一个InterruptedException,Thread.sleep也会抛出这个异常,当一个方法能够抛出这个异常,说明这是一个可阻塞的方法,并且如果被中断,可以提前结束阻塞状态。Thread提供interrupt方法,用来中断一个线程或者查询某线程是否已被中断,具体实现是:线程维护一个bool类型属性,代表中断状态,中断时设置这个值。

3.6 Synchronizer

  • latch闭锁:用于保证特定活动直到其他活动结束后才开始。CountDownLatch是一个实现。
  • FutureTask:描述一个可携带结果的计算,通过Callable执行计算,通过Future.get获取结果或者异常,异常统一封装为ExecutionException,如果计算被取消,返回CancellationException。
  • 信号量Semaphore:用来控制能够同时访问某资源的并发数量,可用于实现资源池或者给容器限定边界
  • 关卡barrier:用于限制所有线程同时到达关卡点。CyclicBarrier,Exchanger是具体实现。

CountDownLatch基于AQS实现,CyclicBarrier基于ReentrantLock,而ReentrantLock基于AQS实现。AQS提供了一个基于队列的同步器框架,许多同步器可以基于AQS实现,AQS的原理在第五节详细展开描述。

二、Java内存模型 (Java Memory Model)

在可共享内存的多处理器架构中,存在CPU多级缓存与内存的缓存一致性问题,不同的架构由不同的缓存一致性协议,本质上是通过内存屏障来实现。另外还有指令重排序问题,指令重排序提升了性能,然而在多线程环境中如果无法确认代码的执行顺序,就无法确认代码的正确性。

JMM(Java Memory Model)通过提供自己的存储模型,屏蔽了java虚拟机与底层硬件存储模型的差异化,在语言层面定义了内存屏障,用来屏蔽不同硬件存储模型的内存屏障的不同实现。

1. 从硬件平台的存储模型到Java存储模型

1.1 缓存一致性问题 cache coherence

处理器Cache模型

CPU一般有多级缓存,与主内存之间通过同步协议保证一致性,比较经典的是MESI协议,参考https://blog.csdn.net/muxiqingyang/article/details/6615199、https://www.cnblogs.com/yanlong300/p/8986041.html。

1.2 指令重排序

指令重排序可能是编译器指令重排序(编译器级别)或CPU指令重排序(处理器级别,out-of-order execution),指令重排序可以使计算性能得到提升。
即使指令没有重排序,由于CPU缓存的存在,缓存刷新至内存的时许不同也会导致重排序问题。

1.3 memory barrier 内存屏障

内存屏障(Memory Barrier,或有时叫做内存栅栏,Memory Fence)是一种CPU指令,用于控制特定条件下的重排序和内存可见性问题。Java编译器也会根据内存屏障的规则禁止重排序。

内存屏障可以被分为以下几种类型:

  • LoadLoad屏障:对于这样的语句Load1; LoadLoad; Load2,在Load2及后续读取操作要读取的数据被访问前,保证Load1要读取的数据被读取完毕。
  • StoreStore屏障:对于这样的语句Store1; StoreStore; Store2,在Store2及后续写入操作执行前,保证Store1的写入操作对其它处理器可见。
  • LoadStore屏障:对于这样的语句Load1; LoadStore; Store2,在Store2及后续写入操作被刷出前,保证Load1要读取的数据被读取完毕。
  • StoreLoad屏障:对于这样的语句Store1; StoreLoad; Load2,在Load2及后续所有读取操作执行前,保证Store1的写入对所有处理器可见。它的开销是四种屏障中最大的。在大多数处理器的实现中,这个屏障是个万能屏障,兼具其它三种内存屏障的功能。

内存屏障示意表

以上关于指令重排序、内存屏障的描述参考了https://tech.meituan.com/2014/09/23/java-memory-reordering.html,希望深入的了解的可直接阅读原文。

1.4 Java存储模型的happens-before法则^4

JMM为程序中的所有动作定义了一种happens-before关系,两个操作如果满足happens-before关系,则前者的结果一定对后者可见,保证了顺序性及可见性,而不满足happens-before关系的动作之间可以任意重排序。

  • 程序次序法则:同一线程中,代码中先出现的动作happens-before代码中后出现的动作,这只保证最终执行结果与顺序执行一致,并不能保证指令不重排序,只是结果上表现为happens-before。这个可以解释为,同一线程中前面的写操作对后面操作可见。
  • 监视器锁法则:对同一个监视器锁的解锁happens-before后续对该锁的加锁,显式锁同样适用。
  • volatile法则:对volatile修饰的域的写happens-before后续对其的读操作,原子变量同样适用。
  • 线程启动法则:一个线程内,对Thread.start的调用happens-before每一个被启动线程中的动作。
  • 线程终结法则:线程中的任何动作happens-before其他线程监测到这个线程已经终结,或者从Thread.join调用中成功返回,或者Thread.isAlive返回false
  • 中断法则:一个线程调用另一个线程的interrupt happens-before被中断的线程发现中断
  • 终结法则:一个对象的构造函数的结束happens-before这个对象的finalizer的开始
  • 传递性:如果A happens-before B,且B happens-before C,则A happens-before C

基于happens-before可以推断出多线程情况下代码的执行顺序,当然如果正确没有使用相应的同步机制,大部分操作是无法推断的😣。

1.5 volatile、synchorized、final^3

  • volatile关键字:volatile关键字可以保证直接从主存中读取一个变量,如果这个变量被修改后,总是会被写回到主存中去。普通变量与volatile变量的区别是:volatile的特殊规则保证了新值能立即同步到主内存,以及每个线程在每次使用volatile变量前都立即从主内存刷新。因此我们可以说volatile保证了多线程操作时变量的可见性,而普通变量则不能保证这一点。
  • synchorized关键字:同步块的可见性是由以下机制保证的:
    “如果对一个变量执行lock操作,将会清空工作内存中变量的值,在执行引擎使用这个变量前需要重新执行load或assign操作初始化变量的值”
    “对一个变量执行unlock操作之前,必须先把此变量的值同步到主内存中(执行store和write操作)
  • final关键字:final关键字的可见性是指,被final修饰的字段在构造器中一旦被初始化完成,并且构造器没有把“this”的引用传递出去(this引用逃逸是一件很危险的事情,其他线程有可能通过这个引用访问到“初始化了一半”的对象),那么在其他线程就能看见final字段的值(无须同步)

2. CPU缓存的伪共享问题

CPU缓存的最小单位是缓存行 (Cache Line) ,一个缓存行的大小通常是 64 字节(取决于 CPU),它有效地引用主内存中的一块地址。一个 Java 的 long 类型是 8 字节,因此在一个缓存行中可以存 8 个 long 类型的变量。

假设两个线程A和B运行在两个CPU上,每个CPU都有一个缓存行中存放了两个volatile long类型的变量X和Y,A更新X后,由于X是volatile的,B所在CPU的缓存行就失效了,需要重新加载,即使B想要更新的是Y,两者逻辑上不存在竞争关系,但在缓存行这个层次上发生了冲突。这是一个伪共享问题的典型场景。

上述场景中,假如A和B交替执行,那么伪共享问题一直发生,对性能影响会很大。

2.1 @sun.misc.Contended注解

在Java 7之前,可以在属性的前后进行padding来避免伪共享问题。

在Java 8中,提供了@sun.misc.Contended注解来避免伪共享,原理是在使用此注解的对象或字段的前后各增加128字节大小的padding,使用2倍于大多数硬件缓存行的大小来避免相邻扇区预取导致的伪共享冲突。

以上关于伪共享问题的内容参考了https://www.jianshu.com/p/c3c108c3dcfd。

3. 重新理解对象的安全发布与初始化安全性

在了解了JMM之后,可以回顾一下之前提到的对象的安全发布以及初始化安全性。

理解对象安全发布的一个很好的例子就是懒汉单例模式(当然现在已经不推荐使用懒汉单例模式了,它复杂且节约的性能有限),理解为何instance变量必须被volatile修饰才能保证安全。推荐思考一下。

三、原子变量类

1. CAS

Java内部很多机制以及很多标准类库中都用到了CAS机制,Java的CAS操作依赖硬件对CAS的支持,主流处理器基本都有自己的CAS实现。使用CAS相比于使用锁,可以减少线程上下文切换,减小竞争的颗粒度,一般来说性能优于锁,但是基于CAS的无锁算法实现上会更复杂,相关例子可以参考ConcurrentLinkedQueue的算法。

2. 原子变量类

原子变量类保证了可见性与原子性,相比volatile只能保证可见性,功能更为强大。以下是一些常用的原子变量类。

原子变量类 详情
AtomicBoolean 原子化的boolean
AtomicInteger、AtomicLong 原子化的int、long
AtomicIntegerArray、AtomicLongArray 数组内的元素可以原子化的更新
AtomicReference 可以被原子化更新的对象引用
AtomicReferenceArray
AtomicStampedReference、AtomicMarkableReference 支持原子化的更新引用及附带的stamp integer或mark bit,相当于版本号,可防止ABA问题
DoubleAccumulator、LongAccumulator
DoubleAdder、LongAdder

另外还要介绍一下原子化的域更新器:

原子化的域更新器 作用
AtomicIntegerFieldUpdater 原子化的更新指定类的volatile int 变量
AtomicLongFieldUpdater 原子化的更新指定类的volatile long 变量
AtomicReferenceFieldUpdater 原子化的更新指定类的volatile引用的field

四、锁

1. synchronized关键字

关于synchronized关键字的文章已经比较多了,可参考以下文章:

2. Lock

2.1 ReentrantLock

ReentrantLock是Lock接口的实现,ReentrantLock支持与synchronized一样的语义,包括可重入性,之所以创建ReentrantLock这么一个显式锁机制,主要是synchronized存在一些局限性,例如:无法在获取锁时取消或设置超时或获取失败立即返回,不支持公平锁(虽然绝大多数情况下出于性能考虑使用非公平锁)等。需要注意的是ReentrantLock和synchronized的性能上差距很小,因此出于简化程序的目的,应尽量避免使用ReentrantLock。

Lock的使用规范如下,一定要在finally块中释放锁,否则可能由于异常导致锁无法释放。

1
class ReentrantLockDemo {
2
   private final ReentrantLock lock = new ReentrantLock();
3
   public void demoMethod() { 
4
     lock.lock();  // block until condition holds
5
     try {
6
       // ... method body
7
     } finally {
8
       lock.unlock()
9
     }
10
   }
11
}

ReentrantLock底层是基于AQS实现的,具体在第五节中描述。

2.2 ReadWriteLock与ReentrantReadWriteLock

ReentrantReadWriteLock是ReadWriteLock接口的实现,是一个读写锁,在读多写少的并发场景下,使用读写锁可以提升性能。ReentrantReadWriteLock有以下特性:

  • 写锁可以降级为读锁,读锁不能升级为写锁
  • 写锁是互斥的,读锁是共享的

ReentrantReadWriteLock底层是基于AQS实现的,它使用AQS的state变量的高16位用作读锁,低16位用作写锁。

3. 条件队列

线程在某个条件不满足的情况下进入条件队列并释放锁,由另一个线程在某个条件满足的情况下唤醒处于条件队列中的线程。类似于提供了synchronized和Lock两种锁的实现,Java也提供了两种条件队列的实现。

Object的内部条件队列 Lock的Condition
Object的wait、notify、notifyAll方法 Condition的await、signal、signalAll方法
一个对象只有一个内部条件队列,多个条件的情况下使用一个对象进行wait、notify、notifyAll操作 一个Lock可以new多个Condition,对应不同的条件,分别进行await、signal、signalAll操作

注意Condition只是一个接口,需要Lock的具体实现类的newCondition方法提供实现。ReentrantLock的newCondition方法返回的是一个AQS中的ConditionObject类型的对象,第五节中会对ConditionObject的原理有解释,可以看到是通过一个链表实现的条件队列。

Condition的官方使用示例如下:

1
class BoundedBuffer {
2
   final Lock lock = new ReentrantLock();
3
   final Condition notFull  = <b>lock.newCondition(); 
4
   final Condition notEmpty = <b>lock.newCondition(); 
5
6
   final Object[] items = new Object[100];
7
   int putptr, takeptr, count;
8
9
   public void put(Object x) throws InterruptedException {
10
     lock.lock();
11
     try {
12
       while (count == items.length)
13
         <b>notFull.await();
14
       items[putptr] = x;
15
       if (++putptr == items.length) putptr = 0;
16
       ++count;
17
       notEmpty.signal();
18
     } finally {
19
       lock.unlock();
20
     }
21
   }
22
23
   public Object take() throws InterruptedException {
24
     lock.lock();
25
     try {
26
       while (count == 0)
27
         notEmpty.await();
28
       Object x = items[takeptr];
29
       if (++takeptr == items.length) takeptr = 0;
30
       --count;
31
       notFull.signal();
32
       return x;
33
     } finally {
34
       lock.unlock();
35
     }
36
   }
37
 }

4. 关于锁的总结

参考文献【1】^1做了非常好的总结,推荐直接阅读该文章。

五、AQS(AbstractQueuedSynchronizer)原理及应用

1. AQS原理

aqs

上面在讲ReentrantLock等的过程中说到它是基于AQS实现的。AQS提供了一个框架,该框架可用于实现基于FIFO队列的阻塞锁或相关同步器(semaphores, events, etc)。由于只是提供了一个框架,其子类需要提供具体实现,一般来说子类应该被定义为non-public的内部辅助类(例如ReentrantLock类内部的Sync类,如上面类图所示),用于实现其外部类的同步性质。AQS框架支持独占模式和共享模式,供具体实现来选择。下面列出其子类需要具体实现的方法列表。

需要子类实现的方法 作用
tryAcquire 尝试在互斥模式下acquire
tryRelease 尝试在互斥模式下release
tryAcquireShared 尝试在共享模式下acquire
tryReleaseShared 尝试在共享模式下acquire
isHeldExclusively 判断是否被当前线程独占

子类通过实现上面这些方法决定了同步器在获取同步时的行为。

下面来解释一下AQS的运行机制。

1.1 Node类

从上面的类图可以看到AQS类内部有一个Node类,该类用于实现一个双向链表,表示等待获取的线程队列,该类有5个成员变量
| 变量 | 含义 |
| —————– | ———————– |
| waitStatus | 该节点的状态:CANCELLED(acquire取消,在锁的场景下可以理解为取消加锁), SIGNAL(等待唤醒), CONDITION(等待一个condition的唤醒), PROPAGATE(共享模式下), 0(初始状态) |
| thread | 该等待节点对应的线程 |
| prev | 等待队列的前一个节点 |
| next | 等待队列的后一个节点 |
| nextWaiter | 若该节点在等待一个condition,则nextWaiter指向等待该condition的下一个节点 |

1.2 ConditionObject类

ConditionObject类实现了Condition接口,Condition一般是配合Lock使用,这里ConditionObject用于配合AQS实现类似的效果,例如,可以创建多个ConditionObject类表示不同的条件,满足某一个条件则唤醒该ConditionObject对应的等待队列中的节点,并将其加入AQS的等待队列,去尝试获取锁。

变量 含义
firstWaiter 该ConditionObject的等待队列的头节点
nextWaiter 该ConditionObject的等待队列的尾节点

1.3 AQS类

有3个成员变量

变量 含义
state 保存状态的变量,在锁的场景下可以是锁的状态,如0表示未加锁,1表示加锁;或者可重入锁的情况下保存锁重入的次数,0表示未加锁,3表示已加锁并重入了3次
head 等待获取锁的队列的头节点
tail 等待获取锁的队列的尾节点

通过上面对3个类以及AQS源码的分析,我们可以得出AQS的运行时数据结构,当尝试获取锁时,将对应线程加入等待队列,释放锁时,将其移出队列。若要支持Condition,则可以利用ConditionObject,ConditionObject实现了条件队列。

aqs

1.4 基于AQS实现的锁及其他同步器

基于AQS实现的锁及其他同步器如下:

使用了AQS的同步器实现 使用场景
ReentrantLock 可重入锁,类似synchronized
ReentrantReadWriteLock 读写锁,适用于需要加锁的读多写少的场景
Semaphore 信号量,用于控制并发量
CountdownLatch 闭锁,让线程等待其他线程的完成

2. ReentrantLock

ReentrantLock的特性如下:

  • 可重入:同一个线程最大重入次数2147483647

  • 支持公平锁与非公平锁:公平锁是指线程获取锁时要先判断当前排队的线程队列是否为空,为空则直接通过CAS机制尝试获取锁,不为空则排队;非公平锁是指线程获取锁时先尝试获取锁,失败则再次尝试获取锁(自旋),再次失败则进入排队队列,进入排队队列后,所有线程都排队,死循环至获取锁成功或中断。

ReentrantLock是利用AQS实现的,具体的分析可以查看美团技术团队的这篇文章

https://tech.meituan.com/2019/12/05/aqs-theory-and-apply.html[^2]

非常详细,示意图清晰。

六、多线程框架

1. Executor框架(JAVA 5)

executor

Executor接口提供了execute方法,该方法将任务提交,并在之后的某个时间点执行该任务,具体执行策略取决于其具体实现。ExecutorService在Executor的基础上提供了管理Executor生命周期的方法,如shutDown, shutDownNow方法。ThreadPoolExecutor是Executor的实现类,实现了基于线程池的Executor框架。

1.1 线程池

Executors类提供了生成ThreadPoolExecutor,ScheduledThreadPoolExecutor,ForkJoinPool的工厂方法:

生成线程池方法 描述
newFixedThreadPool new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
newSingleThreadExecutor new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()));
newCachedThreadPool new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
newScheduledThreadPool new ScheduledThreadPoolExecutor(corePoolSize);
newSingleThreadScheduledExecutor new DelegatedScheduledExecutorService (new ScheduledThreadPoolExecutor(1));
newWorkStealingPool new ForkJoinPool (Runtime.getRuntime().availableProcessors(), ForkJoinPool.defaultForkJoinWorkerThreadFactory, null, true);

可以看到newFixedThreadPool,newSingleThreadExecutor,newCachedThreadPool方法返回的都是ThreadPoolExecutor对象,只不过配置不同;newScheduledThreadPool,newSingleThreadScheduledExecutor方法返回的都是ScheduledThreadPoolExecutor对象;newWorkStealingPool返回的是ForkJoinPool。

1.2 线程池配置

1.2.1 ThreadPoolExecutor

ForkJoinPool这个后面单独讲,先讲一下ThreadPoolExecutor,其构造函数即配置参数如下:

1
public ThreadPoolExecutor(int corePoolSize,
2
                          int maximumPoolSize,
3
                          long keepAliveTime,
4
                          TimeUnit unit,
5
                          BlockingQueue<Runnable> workQueue,
6
                          ThreadFactory threadFactory,
7
                          RejectedExecutionHandler handler)
参数 含义
corePoolSize 线程池中的最小线程数,线程池初始化后核心线程并不开始,除非调用了prestartAllCoreThreads
maximumPoolSize 最大线程数
keepAliveTime 超过corePoolSize数量的线程的最长空闲时间
workQueue 任务的排队队列:有限队列、无限队列、同步移交
threadFactory 创建线程的工厂方法,一般直接采用默认的Executors类提供的DefaultThreadFactory
handler 当线程池已满且队列已满时的任务提交时的处理策略,有CallerRunsPolicy、AbortPolicy、DiscardPolicy、DiscardOldestPolicy
1.2.2 最佳的线程池大小

首先,如果存在不同类型的任务,且差别很大,比如计算密集型和I/O密集型任务,那么最好使用多个线程池。

最佳线程池大小的配置可以根据任务的类型大致如下计算:

  • 计算密集型任务线程池大小:CPU个数+1
  • I/O密集型任务线程池大小:CPU个数 * CPU目标使用率 * (1+等待时间与计算时间的比)

其他比如连接池大小、内存、文件句柄、套接字句柄等都会限制线程池大小

1.3 Future, Callable

future

Runnable提供了run方法用于执行计算,Callable的call方法可以返回计算的执行结果。

Future描述了任务的生命周期,提供了方法获取任务执行的结果,取消任务、检查任务是否完成或取消。ExecutorService的submit方法接受Runnable或Callable并返回一个Future。FutureTask是Future的具体实现。

1.4 CompletionService及其实现ExecutorCompletionService

CompletionService整合了Executor和BlockingQueue的功能,ExecutorCompletionService是其具体实现。提交到ExecutorCompletionService的任务被包装为一个QueueingFuture,覆盖了done方法,该方法在任务完成时将结果放入其BlockingQueue中。

1.5 线程取消、线程池关闭、JVM关闭

1.5.1 线程取消

线程取消有以下方式:

  • 循环检查取消标志
  • 中断:对于处于阻塞状态中的线程无法通过设置取消标志实现取消,中断机制提供了这种情况下的取消机制。每个线程有一个boolean类型的中断状态,中断时设置为true,即线程B调用线程A的interrupt方法时,线程A的中断状态被设置为true。阻塞库函数,例如,Thread.sleep或Object.wait通过native方法检测线程是否被中断,其对中断的响应表现为:清除中断状态并抛出异常InterruptedException,表示阻塞操作因中断提前结束
  • 通过Future.cancel取消
1.5.2 线程异常处理
  • 可以在线程内部catch异常
  • 线程API提供了UncaughtExceptionHandler,当线程因为未捕获异常退出时,该handler处理异常,如果handler不存在,默认行为是像System.err打印stack trace
  • 只有通过execute方法提交的任务才能将抛出的异常传给异常处理器,通过submit方法提交的任务,只会被Future.get方法重新抛出为ExecutionException
1.5.3 JVM的关闭
  • Shutdown Hook

    Shutdown Hook是通过Runtime.addShutdownHook方法注册的尚未开始的线程。如果是通过调用Runtime.halt或者kill -9的方式强行关闭JVM,那么除了关闭JVM之外不需要完成任何其他动作,也不会运行Shutdown Hook;

    Shutdown Hook之间并发执行,不保证顺序,Shutdown Hook之行结束后,如果runFinalizersOnExit为true,JVM可以选择运行finalizer,之后停止;

    JVM不会停止或者中断应用线程,应用线程在JVM停止时强制退出。

  • daemon线程

    当只有daemon线程时,JVM会发起退出,daemon线程会被抛弃,不会执行finally块,也不会释放栈

  • Finalizer

    参见我的文章《Object.finalize()方法与Finalizer类浅析》,推荐的操作是:不要使用Finalier

2. Fork/Join框架

原本想在此针对Fork/Join框架写一小节的,由于源码阅读工作量较大,暂时搁置,以后再说。

3. CompletableFuture(JAVA 8)

CompletableFuture相关可先参考以下文章:

https://colobu.com/2016/02/29/Java-CompletableFuture/

https://www.nurkiewicz.com/2013/05/java-8-definitive-guide-to.html

七、活跃性问题、性能问题

1. 死锁

死锁最常见的场景是出现了环路的锁依赖关系。在数据库系统中,一般设计了死锁监测,通过检查表示锁依赖关系的有向图上是否存在环路,如果存在死锁,会选择一个事务退出。

不光是锁的使用会造成死锁,资源的使用也可能造成死锁,例如有两个数据库D1、D2的连接池,线程A持有到数据库D1的连接,等待D2的连接,线程B持有到数据库D2的连接,等待D1的连接,这就有可能造成死锁,若连接池大小为1,则一定发生死锁。

通过使用显示Lock的tryLock方法的带有timeout的版本,能够一定程度上避免死锁,至少在死锁发生的情况下能够通过超时进行回退。

thread dump能够进行死锁检测,可用于线上诊断。

2. 饥饿

饥饿问题是指当线程访问它需要的资源时被拒绝,不能继续进行,常见的是CPU资源的饥饿问题。线程优先级的使用不当、死循环都会造成CPU资源的饥饿问题

非公平锁也会造成线程的饥饿问题,特殊情况下,先尝试获取锁的线程反而没法抢到锁。

3. 其他

  • 弱响应性问题:即响应时间较长

  • 活锁:活锁问题一般发生在错误恢复机制中,例如,在消息处理应用程序中,如果对某种特定类型的消息处理存在bug,每次处理都会失败,失败后又被放回队首,下次还是处理这个消息,形成了死循环。可以通过在错误恢复机制中引入一定的随机性来避免着问题。

3. 性能

  • 线程上下文切换:包括线程调度的花费、线程换入后CPU缓存数据的加载都是线程上下文切换带来的花费。

  • 内存同步:synchronized、volatile等提供的可见性保证是通过使用内存屏障使CPU缓存无效化实现的,不能使用CPU缓存使得性能下降,并且内存屏障还能防止指令重排序,这就导致了编译器不能对代码执行进行优化。

    现在JVM中的JIT编译器通过逃逸分析能够实现锁消除的优化,如果一个变量不从线程内逸出,对其的加锁操作会被省略,或者通过锁粗化,即将邻近的synchronized块用相同的锁合并起来。这些JVM的优化机制表明对于没有竞争的同步代码,其开销已经经过很好的优化了,真正影响性能的是真正发生了锁竞争的代码。

  • 阻塞

    获取锁的时候如果存在竞争会发生阻塞,这时候可以选择自旋或者线程挂起,取决于具体的场景。

  • 如何减少锁的竞争

    • 缩小锁的范围:避免很大的同步块
    • 减小锁的粒度:通过锁的分拆或分离,将一个粗粒度的锁拆分为多个锁,例如ConcurrentHashMap相比HashTable通过更细粒度的锁提升了性能
    • 使用非独占锁:读写锁相比独占锁带来了性能提升

参考文献

[TOC]

Object.finalize()方法与Finalizer类浅析

这部分内容属于GC相关的内容,可以参考 https://www.jianshu.com/p/9d2788fffd5f ^2,此文写的比较深入,涉及到了hotspot的源码,强烈推荐。

一、finalize()^1

1
public class FinalizerDemo {
2
3
    static AtomicInteger aliveCount = new AtomicInteger(0);
4
5
    FinalizerDemo() {
6
        aliveCount.incrementAndGet();
7
    }
8
9
    @Override
10
    protected void finalize() throws Throwable {
11
        FinalizerDemo.aliveCount.decrementAndGet();
12
    }
13
14
    public static void main(String args[]) {
15
        for (int i = 0; ; i++) {
16
            FinalizerDemo f = new FinalizerDemo();
17
            if ((i % 100_000) == 0) {
18
                System.out.format("After creating %d objects, %d are still alive.%n", new Object[]{i, FinalizerDemo.aliveCount.get()});
19
            }
20
        }
21
    }
22
}

运行这段代码,加上-XX:+PrintGCDetails参数查看GC详情,可以发现很快就开始full gc,而去掉对finalize()方法的重写则不会触发full gc。这是为什么呢?这就涉及到重写了finalize()方法的类的回收了,下面详细说一下,或者也可以直接阅读上面推荐的文章。

列一下finalize()方法相关的特性:

  • 当GC判断没有其他对该对象的引用时,会调用该方法,一般用于执行资源释放或其他清理工作
  • finalize()方法抛出的异常会被忽略
  • finalize()方法中可以使该对象重新关联到引用链上,也即是“复活”该对象,例如,让一个静态全局变量指向该对象,使其免于被回收
  • finalize()方法最多执行一次

二、Finalizer

阅读上面推荐的文章就知道finalize()方法与Finalizer类脱不开关系。下面分析一下这个类的源码。

Finalizer类有3个静态变量,2个成员变量,分别是:

变量 作用
ReferenceQueue queue 全局静态变量,是一个引用队列,保存Finalizer对象
Finalizer unfinalized 全局静态变量,Finalizer对象以双向链表的形式保存,该变量指向当前链表的头部,每次新创建一个Finalizer对象会将其放在链表头部
Object lock 全局静态变量,用于在对双向链表操作时加锁
next 链表中当前对象的下一个
prev 链表中当前对象的前一个

注意到Finalizer类的源码中有一个register方法,注释是该方法由VM调用,推测是当重写了finalize()的对象初始化时会调用register方法(推荐文章中有对这部分的hotspot源码分析),该方法会将该对象作为构造器参数创建一个Finalizer对象,Finalizer继承了FinalReference,因此构造器会将finalizee(即实现了finalize()方法的对象)和全局的ReferenceQueue作为构造参数构造Reference

1
private Finalizer(Object finalizee) {
2
    super(finalizee, queue);
3
    add();
4
}
5
6
/* Invoked by VM */
7
static void register(Object finalizee) {
8
    new Finalizer(finalizee);
9
}

Finalizer类的runFinalizer方法就是最终用来调用对象的finalize()方法的。先判断是否已执行过finalize()方法,若没有,则从双向链表中移除自身,然后调用finalizee的finalize()方法,最后finalizee = null;这一步注释说的是清理包含这个变量的栈帧,减少false retention概率,关于false retention后面单开文章说。

1
private void runFinalizer(JavaLangAccess jla) {
2
        synchronized (this) {
3
            if (hasBeenFinalized()) return;
4
            remove();//不去掉的话,会导致对象仍然在引用链上,无法回收
5
        }
6
        try {
7
            Object finalizee = this.get();
8
            if (finalizee != null && !(finalizee instanceof java.lang.Enum)) {
9
                jla.invokeFinalize(finalizee);
10
11
                /* Clear stack slot containing this variable, to decrease
12
                   the chances of false retention with a conservative GC */
13
                finalizee = null;
14
            }
15
        } catch (Throwable x) { }
16
        super.clear();
17
    }

FinalizerThread

Finalizer类通过静态初始化代码块启动一个FinalizerThread守护线程,该线程优先级较低。注意线程优先级较低这一点,正是这一点导致在上面的示例中来不及执行对象的finalize()方法,导致垃圾回收很慢,内存泄漏,触发full gc,甚至OOM。

1
static {
2
    ThreadGroup tg = Thread.currentThread().getThreadGroup();
3
    for (ThreadGroup tgn = tg;
4
         tgn != null;
5
         tg = tgn, tgn = tg.getParent());
6
    Thread finalizer = new FinalizerThread(tg);
7
    finalizer.setPriority(Thread.MAX_PRIORITY - 2);
8
    finalizer.setDaemon(true);
9
    finalizer.start();
10
}

下面看下FinalizerThread的逻辑:

1
private static class FinalizerThread extends Thread {
2
    private volatile boolean running;
3
    FinalizerThread(ThreadGroup g) {
4
        super(g, "Finalizer");
5
    }
6
    public void run() {
7
        if (running)
8
            return;
9
10
        // Finalizer thread starts before System.initializeSystemClass
11
        // is called.  Wait until JavaLangAccess is available
12
        while (!VM.isBooted()) {
13
            // delay until VM completes initialization
14
            try {
15
                VM.awaitBooted();
16
            } catch (InterruptedException x) {
17
                // ignore and continue
18
            }
19
        }
20
        final JavaLangAccess jla = SharedSecrets.getJavaLangAccess();
21
        running = true;
22
        for (;;) {
23
            try {
24
                Finalizer f = (Finalizer)queue.remove();
25
                f.runFinalizer(jla);
26
            } catch (InterruptedException x) {
27
                // ignore and continue
28
            }
29
        }
30
    }
31
}

主要关注最后的死循环部分,调用queue的remove()方法,取出队列中的Finalizer对象,执行其runFinalizer方法。

到这里就剩下一个疑问点,Finalizer对象是什么时候加入queue队列的?这就要说到Reference类了。

三、Reference

前面说到Finalizer继承了FinalReference,而FinalReference类继承了Reference。看一下这个类的静态初始化代码块,里面启动了一个ReferenceHandler守护线程,该线程优先级较高。

ReferenceHandler

1
static {
2
    ThreadGroup tg = Thread.currentThread().getThreadGroup();
3
    for (ThreadGroup tgn = tg;
4
         tgn != null;
5
         tg = tgn, tgn = tg.getParent());
6
    Thread handler = new ReferenceHandler(tg, "Reference Handler");
7
    /* If there were a special system-only priority greater than
8
     * MAX_PRIORITY, it would be used here
9
     */
10
    handler.setPriority(Thread.MAX_PRIORITY);
11
    handler.setDaemon(true);
12
    handler.start();
13
14
    // provide access in SharedSecrets
15
    SharedSecrets.setJavaLangRefAccess(new JavaLangRefAccess() {
16
        @Override
17
        public boolean tryHandlePendingReference() {
18
            return tryHandlePending(false);
19
        }
20
    });
21
}

ReferenceHandle线程死循环执行一个方法tryHandlePending,源码如下:

1
static boolean tryHandlePending(boolean waitForNotify) {
2
        Reference<Object> r;
3
        Cleaner c;
4
        try {
5
            synchronized (lock) {
6
                if (pending != null) {
7
                    r = pending;
8
                    // 'instanceof' might throw OutOfMemoryError sometimes
9
                    // so do this before un-linking 'r' from the 'pending' chain...
10
                    c = r instanceof Cleaner ? (Cleaner) r : null;
11
                    // unlink 'r' from 'pending' chain
12
                    pending = r.discovered;
13
                    r.discovered = null;
14
                } else {
15
                    // The waiting on the lock may cause an OutOfMemoryError
16
                    // because it may try to allocate exception objects.
17
                    if (waitForNotify) {
18
                        lock.wait();
19
                    }
20
                    // retry if waited
21
                    return waitForNotify;
22
                }
23
            }
24
        } catch (OutOfMemoryError x) {
25
            // Give other threads CPU time so they hopefully drop some live references
26
            // and GC reclaims some space.
27
            // Also prevent CPU intensive spinning in case 'r instanceof Cleaner' above
28
            // persistently throws OOME for some time...
29
            Thread.yield();
30
            // retry
31
            return true;
32
        } catch (InterruptedException x) {
33
            // retry
34
            return true;
35
        }
36
37
        // Fast path for cleaners
38
        if (c != null) {
39
            c.clean();
40
            return true;
41
        }
42
43
        ReferenceQueue<? super Object> q = r.queue;
44
        if (q != ReferenceQueue.NULL) q.enqueue(r);
45
        return true;
46
    }

主要逻辑就是将pending链表的头节点取出,并将其加入队列中(最后3行代码),其中pending链表是GC扫描出的pending状态的Reference对象链表,Finalizer实现了Reference,也会被扫描出来,至于具体逻辑就要查看jdk源码了,这个正好在参考文献【2】也就是最开头推荐的文章中有提到。

四、小结

回到最开始的例子,再分析一下。main函数死循环创建一个重写了finalize()方法的对象,JVM会为每一个对象创建一个Finalizer对象,ReferenceHandle线程负责将Finalizer对象加入ReferenceQueue,该线程优先级较高,而FinalizerThread线程负责消费ReferenceQueue,执行对象的finalize()方法,优先级较低。只有当对象的finalize()方法执行完,对象才能被回收。

那么当FinalizerThread线程由于优先级低而抢不到CPU资源,或者finalize()方法执行较慢等原因来不及消费ReferenceQueue时,就会出现GC无法回收垃圾,从而导致full gc。

finalize()方法最多执行一次的原理也在此说一下我个人的理解,不一定正确。上面说到,当对象创建时,如果该对象重写了finalize()方法,JVM会调用Finalizer对象的register方法,即该对象对应的Finalizer对象只会被创建一次,而一旦这个Finalizer对象被移出ReferenceQueue就不会再回到ReferenceQueue。

参考文献

[TOC]

Spring Boot/Spring的启动过程分析

一、Spring Boot启动

先来一段众所周知的spring boot的启动入口代码。

1
@SpringBootApplication
2
public class Application {
3
    public static void main(String[] args) {
4
        SpringApplication.run(Application.class, args);
5
    }
6
}

里面的核心代码如下,各步骤已添加注释:

1
/**
2
	 * Run the Spring application, creating and refreshing a new
3
	 * {@link ApplicationContext}.
4
	 * @param args the application arguments (usually passed from a Java main method)
5
	 * @return a running {@link ApplicationContext}
6
	 */
7
	public ConfigurableApplicationContext run(String... args) {
8
		StopWatch stopWatch = new StopWatch();
9
		stopWatch.start();
10
		ConfigurableApplicationContext context = null;
11
		Collection<SpringBootExceptionReporter> exceptionReporters = new ArrayList<>();
12
		configureHeadlessProperty();
13
    /**
14
		* 获取META-INF/spring.factories中配置的SpringApplicationRunListener类型的监听器
15
		* 默认只有一个listener是org.springframework.boot.context.event.EventPublishingRunListener
16
    **/
17
		SpringApplicationRunListeners listeners = getRunListeners(args);
18
    //EventPublishingRunListener发布ApplicationStartingEvent事件
19
		listeners.starting();
20
		try {
21
			ApplicationArguments applicationArguments = new DefaultApplicationArguments(args);
22
			/**
23
			* 创建并配置Environment(主要是PropertySource和Profile)
24
			*之后调用监听器的environmentPrepared方法,触发EventPublishingRunListener发布ApplicationEnvironmentPreparedEvent事件
25
			* 将Environment与Application绑定
26
			**/
27
      ConfigurableEnvironment environment = prepareEnvironment(listeners, applicationArguments);
28
      //设置spring.beaninfo.ignore系统属性
29
			configureIgnoreBeanInfo(environment);
30
      //打印banner
31
			Banner printedBanner = printBanner(environment);
32
      /**
33
      * 根据应用类型创建ApplicationContext
34
      * 例如web应用创建的是AnnotationConfigServletWebServerApplicationContext
35
      **/
36
			context = createApplicationContext();
37
      //获取配置的异常上报类,用于后续异常上报
38
			exceptionReporters = getSpringFactoriesInstances(SpringBootExceptionReporter.class,
39
					new Class[] { ConfigurableApplicationContext.class }, context);
40
      /**
41
      * prepareContext内部逻辑:
42
      * 1. postProcessApplicationContext:配置bean工厂的beanNameGenerator,ConversionService及context的resourceLoader
43
      * 2. applyInitializers使用META-INF/spring.factories中配置的ApplicationContextInitializer初始化ApplicationContext,默认配了7个
44
      * 3. 调用监听器的contextPrepared方法,触发EventPublishingRunListener发布ApplicationContextInitializedEvent
45
      * 4. 获取beanFactory并配置(默认是DefaultListableBeanFactory,此时已有几个spring内部的bean已注册),如注册springApplicationArguments单例类,设置allowBeanDefinitionOverriding参数等,配置懒加载LazyInitializationBeanFactoryPostProcessor
46
      * 5. 加载bean,看源码是支持xml读取,annotation扫描,甚至groovy,实际上只加载了入口类也就是上面的Application类,将其注册为单例并获取了其注解原数据,即@SpringBootApplication注解
47
      * 6. 调用监听器的contextLoaded方法,触发EventPublishingRunListener发布ApplicationPreparedEvent
48
      **/
49
			prepareContext(context, environment, listeners, applicationArguments, printedBanner);
50
			/**
51
			* 调用AbstractApplicationContext的refresh方法,然后注册shutdownhook,这一步核心其实就是spring的启动了,这个在第3节展开
52
			**/
53
      refreshContext(context);
54
			afterRefresh(context, applicationArguments);
55
			stopWatch.stop();
56
			if (this.logStartupInfo) {
57
				new StartupInfoLogger(this.mainApplicationClass).logStarted(getApplicationLog(), stopWatch);
58
			}
59
      //调用监听器的started方法,触发EventPublishingRunListener发布ApplicationStartedEvent
60
			listeners.started(context);
61
      //调用ApplicationRunner,然后调用CommandLineRunner
62
			callRunners(context, applicationArguments);
63
		} catch (Throwable ex) {
64
      //里面有一步是:调用监听器的failed方法,触发EventPublishingRunListener发布ApplicationFailedEvent
65
			handleRunFailure(context, ex, exceptionReporters, listeners);
66
			throw new IllegalStateException(ex);
67
		}
68
		try {
69
      //调用监听器的running方法,触发EventPublishingRunListener发布ApplicationReadyEvent
70
			listeners.running(context);
71
		} catch (Throwable ex) {
72
			handleRunFailure(context, ex, exceptionReporters, null);
73
			throw new IllegalStateException(ex);
74
		}
75
		return context;
76
	}

可以看到spring boot的启动是在spring启动的基础上增加了一些特性,例如根据应用类型推断并指定ApplicationContext的具体实现,各阶段事件的发布(提供了一个扩展点),调用ApplicationRunner,CommandLineRunner(又一个扩展点)等。下面来一张详细的流程图,个人觉得这个图画的很好^1,可以参考该图一步步看源码。

PS:推荐先粗读一遍源码,然后对应用进行debug,一步步看执行逻辑以及每个步骤执行完毕后的数据以及状态,可以对细节有更好的理解。例如,有些变量是在构造器初始化时加载的配置,这个光看源码很难看到。

img

二、Spring Boot自动化配置

1. @EnableAutoConfiguration

img

spring boot提供了自动配置,@SpringBootApplication注解中的@EnableAutoConfiguration注解是spring boot自动配置的关键,原理可阅读文章【2】^2,以后有机会再展开详细描述。

2. META-INF/spring.factories

上面说到@EnableAutoConfiguration注解通过读取META-INF/spring.factories配置文件实现自动配置,这里说一下META-INF/spring.factories配置文件。

  1. 配置是K/V对,key是接口的全限定名,value是该接口的实现类的名字,可以是逗号分隔的多个值,示例如下:
1
example.MyService=example.MyServiceImpl1,example.MyServiceImpl2
  1. META-INF/spring.factories配置文件可在多个jar中配置,基于这个我们可以实现自己的自动配置类,并将其配置在自己的jar的META-INF/spring.factories配置文件中,spring boot会帮我们完成自动配置。

  2. spring boot自带的META-INF/spring.factories配置文件里已经配置了许多这样的配置项。

  3. 借助spring框架的SpringFactoriesLoader可以加载META-INF/spring.factories配置文件,SpringFactoriesLoader属于Spring框架私有的一种扩展方案,其主要功能就是从指定的配置文件META-INF/spring.factories加载配置。

是不是有点像java的SPI加载机制

三、Spring的启动

上面提到调用了AbstractApplicationContext的refresh()方法,其实就是spring启动时执行的方法,里面的执行顺序是固定的。

1. AbstractApplicationContext的refresh()方法

1
public void refresh() throws BeansException, IllegalStateException {
2
   synchronized (this.startupShutdownMonitor) {
3
      // Prepare this context for refreshing.
4
      prepareRefresh();
5
      // Tell the subclass to refresh the internal bean factory.
6
     	/**
7
     	* obtainFreshBeanFactory()方法内部调用refreshBeanFactory(),其默认实现由AbstractRefreshableApplicationContext提供,逻辑是:销毁原来的bean,关闭原来的bean factory,创建新的DefaultListableBeanFactory,调用loadBeanDefinitions()方法加载bean,这部分在后面详细展开;
8
     	不过在spring boot的情况下这里的实现是AnnotationConfigServletWebServerApplicationContext类,并不会销毁原来的bean以及bean factory。
9
     	**/
10
      ConfigurableListableBeanFactory beanFactory = obtainFreshBeanFactory();
11
      // Prepare the bean factory for use in this context.
12
     	/**
13
     	* 摘自该方法的注释,主要是配置bean factory
14
    	* Configure the factory's standard context characteristics,
15
	 		* such as the context's ClassLoader and post-processors.
16
	 		**/
17
      prepareBeanFactory(beanFactory);
18
      try {
19
         // Allows post-processing of the bean factory in context subclasses.
20
       	 /**
21
       	 * 摘自该方法的注释,主要是注册BeanPostProcessors
22
         * Modify the application context's internal bean factory after its standard
23
         * initialization. All bean definitions will have been loaded, but no beans
24
         * will have been instantiated yet. This allows for registering special
25
         * BeanPostProcessors etc in certain ApplicationContext implementations.
26
         * @param beanFactory the bean factory used by the application context
27
         */
28
         postProcessBeanFactory(beanFactory);
29
         // Invoke factory processors registered as beans in the context.
30
         // 这一步会调用BeanFactoryPostProcessor,其中有一个是ConfigurationClassPostProcessor,扫描@Configuration注解标注的类,并完成相应的bean definition的注册
31
         invokeBeanFactoryPostProcessors(beanFactory);
32
         // Register bean processors that intercept bean creation.
33
         registerBeanPostProcessors(beanFactory);
34
         // Initialize message source for this context.
35
         initMessageSource();
36
         // Initialize event multicaster for this context.
37
         initApplicationEventMulticaster();
38
         // Initialize other special beans in specific context subclasses.
39
       	 // web应用的情况下会检查是否有WebServer,若没有则新建一个WebServer,spring boot默认是tomcat
40
         onRefresh();
41
         // Check for listener beans and register them.
42
         // 注册ApplicationListener类型的bean,并将之前发过的ApplicationEvent再发一遍
43
         registerListeners();
44
         // Instantiate all remaining (non-lazy-init) singletons.
45
         // 完成bean factory的初始化,并将剩余的未实例化的单例bean实例化
46
         finishBeanFactoryInitialization(beanFactory);
47
         // Last step: publish corresponding event.
48
         /*
49
         * Finish the refresh of this context, invoking the LifecycleProcessor's
50
         * onRefresh() method and publishing the
51
         * {@link org.springframework.context.event.ContextRefreshedEvent}.
52
         */
53
         finishRefresh();
54
      } catch (BeansException ex) {
55
         if (logger.isWarnEnabled()) {
56
            logger.warn("Exception encountered during context initialization - " +
57
                  "cancelling refresh attempt: " + ex);
58
         }
59
         // Destroy already created singletons to avoid dangling resources.
60
         destroyBeans();
61
         // Reset 'active' flag.
62
         cancelRefresh(ex);
63
         // Propagate exception to caller.
64
         throw ex;
65
      } finally {
66
         // Reset common introspection caches in Spring's core, since we
67
         // might not ever need metadata for singleton beans anymore...
68
         resetCommonCaches();
69
      }
70
   }
71
}

这块每一步的逻辑都相当复杂,涉及spring bean容器的很多特性以及扩展点,这些都需要具体去了解,后续再补上每一步的详情,先mark一下。

小结

本文基于spring-boot:2.2.1.RELEASE版本,只是粗略过了一遍启动流程,有些部分可能理解有误,内部细节还有待继续研究。另外不得不说,看spring源码是真的累。

参考文献

[TOC]

Tomcat中部署spring web application

一、Servlet容器

java的web应用是基于Servlet的,Tomcat是比较有名的Servlet容器,一个Tomcat中可以部署多个Servlet。下图是Tomcat的容器模型。

图 1 . Tomcat 容器模型

有关Servlet与Tomcat的原理推荐阅读下面几篇文章进行了解:

https://www.ibm.com/developerworks/cn/java/j-lo-servlet/index.html

https://www.ibm.com/developerworks/cn/java/j-lo-tomcat1/index.html

https://www.ibm.com/developerworks/cn/java/j-lo-tomcat2/

http://objcoding.com/2019/05/30/tomcat-architecture/

1. Tomcat中war的部署

web应用以war包的形式部署在Tomcat中,web.xml是web应用的部署配置文件,下面说说其中的一些配置元素。

元素 功能
servlet 指定servlet
servlet-mapping 指定servlet与url之间的映射关系
filter 指定filter
filter-mapping 指定filter与url之间的映射关系
listener 指定监听器监听servlet上下文事件
context-param servlet上下文初始化参数
welcome-file-list 一般用于指定欢迎页,如index.jsp

以上是对servlet容器的简单介绍,以及在Tomcat中以war部署web应用时,web.xml的作用。

二、Tomcat中启动spring

在Tomcat中启动spring也就意味着如何启动并初始化spring上下文,在web应用中该上下文就是WebApplicationContext。

1. ContextLoadListener

WebApplicationContext可以通过在web.xml中配置ContextLoadListener这个listener实现初始化。

1
<listener>
2
   <listener-class>org.springframework.web.context.ContextLoaderListener</listener-class>
3
</listener>

ContextLoaderListener实现了ServletContextListener,该接口定义了两个方法可监听servlet的初始化与销毁,该接口的实现类也就是ContextLoaderListener有3种方式可接收到这两个通知事件:

  1. 定义在web.xml配置文件中,也就是本文描述的方式;

  2. 加上javax.servlet.annotation.WebListener注解;

  3. 通过ServletContext的addListener方法添加;

ContextLoaderListener在配置时需要注意:如果使用了org.springframework.web.util.Log4jConfigListener,需要在web.xml中将该listener配置在其之后。Log4jConfigListener用于初始化定制化的log4j。

2. WebApplicationContext的创建

ContextLoaderListener将具体的创建WebApplicationContext的操作委派给ContextLoader执行,其关键方法是

1
public WebApplicationContext initWebApplicationContext(ServletContext servletContext)

initWebApplicationContext先创建一个WebApplicationContext,然后完成配置并调用refresh()方法。

  1. WebApplicationContext的默认实现是XmlWebApplicationContext(配置在spring-web包中的ContextLoader.properties文件中),可以通过指定contextClass参数指定WebApplicationContext的实现,当然一般不会这么做。

  2. WebApplicationContext初始化的配置文件地址可以通过在web.xml文件中的context-param配置节中配置contextConfigLocation参数指定,如:

1
<context-param>
2
		<param-name>contextConfigLocation</param-name>
3
		<param-value>
4
			classpath*:config/spring/local/appcontext-*.xml,
5
			classpath*:config/spring/appcontext-*.xml
6
		</param-value>
7
</context-param>

​ PS:存在多个配置文件地址的情况下,后面文件中定义的的bean definition会覆盖前面的。

还有一些其他的可配置参数可以影响WebApplicationContext的创建,这里没有全部列出,这些一般也是非常用的功能。

  1. 调用refresh()方法实际上是调用了AbstractApplicationContext的refresh()方法,里面就是标准的spring的ApplicationContext的启动流程,主要是BeanFactory的初始化以及相应的bean的加载与初始化,具体可参考我的另一篇文章《Spring Boot/Spring的启动过程分析》。

3. DispatchServlet

DispatchServlet是将HTTP请求分发至handlers或者controllers的中央分发器,提供了便利的映射和异常处理功能。下面列一下它的一些特性:

  1. 一个web应用可以配置多个DispatcherServlet,每个DispatcherServlet在自己的命名空间内运行,加载自己私有的包含了mapping的应用上下文。只有通过ContextLoaderListener加载的上下文是共享的。
  2. 通过HandlerMapping的实现类来实现请求request到相应的handler的映射,可覆盖默认实现,默认实现是
    1
    org.springframework.web.servlet.handler.BeanNameUrlHandlerMapping
    2
    org.springframework.web.servlet.mvc.annotation.DefaultAnnotationHandlerMapping
  3. HandlerAdapter接口,用来适配请求和handler,可覆盖默认实现,默认实现是
    1
    org.springframework.web.servlet.mvc.HttpRequestHandlerAdapter对应org.springframework.web.HttpRequestHandler
    2
    3
    org.springframework.web.servlet.mvc.SimpleControllerHandlerAdapter对应org.springframework.web.servlet.mvc.Controller
    4
    另外还有一个默认的
    5
    org.springframework.web.servlet.mvc.annotation.AnnotationMethodHandlerAdapter
  4. HandlerExceptionResolver负责dispatcher的异常处理策略,例如,发生某些特定异常时映射到某个错误页面。
    这里只列出一些基础的,还有一些关于ViewResolver,MultipartResolver,LocaleResolver,ThemeResolver的没有列出,感兴趣的可以研究一下。

DispatchServlet的创建

DispatchServlet继承了FrameworkServlet,FrameworkServlet继承了HttpServletBean,HttpServletBean继承了HttpServlet。
servlet容器启动的时候会调用HttpServlet的init()方法,也就是说会调用HttpServletBean的init()方法,其中会调用FrameworkServlet的initServletBean()方法,其中会调用它的initWebApplicationContext()方法,创建这个DispatchServlet的ApplicationContext,并且会将ContextLoaderListener加载的ApplicationContext作为其parent。

那么如何才能让servlet容器去创建一个DispatchServlet呢?答案就是在web.xml中进行配置,示例如下:

1
<servlet>
2
        <servlet-name>springmvc</servlet-name>
3
        <servlet-class>org.springframework.web.servlet.DispatcherServlet</servlet-class>
4
        <init-param>
5
            <param-name>contextConfigLocation</param-name>
6
            <param-value>classpath:config/spring.mvc/appcontext-servlet.xml</param-value>
7
        </init-param>
8
        <load-on-startup>1</load-on-startup>
9
    </servlet>

其中contextConfigLocation参数指定这个DispatcherServlet的ApplicationContext的配置,<load-on-startup>1</load-on-startup>表示在容器启动时就加载。

好了,再往下就是spring mvc的内容了,还有待研究。

小结

本文基于对spring 3.2.5版本的源码阅读,目前spring最新版本已到5.2.x,内容上可能有些过时,但理论上不影响对spring框架的理解。

[TOC]

Java 函数式编程

一、函数式接口

1. @FunctionalInterface注解

@FunctionalInterface注解:作用于接口,表示该接口是函数式接口(functional interface)。

函数式接口有以下几个特点:

  1. 函数式接口有且只有一个抽象方法

  2. 默认方法不计入抽象方法的个数,静态方法也不计入抽象方法个数

  3. 覆盖java.lang.Object的方法的方法也不计入抽象方法的个数

  4. 函数式接口可以通过lambda表达式、方法引用、构造器引用来创建实例

  5. 对于编译器来说,只要一个接口符合函数式接口的定义就会将其当作函数式接口,不一定需要@FunctionalInterface注解标注,注解用于检查该接口是否只包含一个抽象方法

代码示例如下:

1
@FunctionalInterface
2
public interface HelloFuncInterface {
3
4
    String hello();
5
6
    default String helloWorld() {
7
        return "hello world";
8
    }
9
  
10
  	static void printHello() {
11
        System.out.println("Hello");
12
    }
13
}

2. JDK中的函数式接口

java.lang.Runnable

java.awt.event.ActionListener

java.util.Comparator

java.util.concurrent.Callable

java.util.function包下的接口,如Consumer、Predicate、Supplier等

二、lambda表达式

lambda表达式用来实现函数式接口。

1. lambda表达式形式

(params) -> expression
(params) -> statement
(params) -> { statements }

1
/**
2
 * 通过lambda表达式实现自定义的函数式接口(注:JAVA 8 之前一般是用匿名类实现的)
3
 */
4
private void hello() {
5
    HelloFuncInterface helloFuncInterface = () -> {
6
        System.out.println("hello");
7
        return "hello";
8
    };
9
    helloFuncInterface.hello();
10
}

2. lambda表达式替换匿名类,减少冗余代码

1
new Thread(() -> {
2
    System.out.println(Thread.currentThread().getName() + ":runnable by lambda expression");
3
}).start();
4
5
new Thread(new Runnable() {
6
    @Override
7
    public void run() {
8
        System.out.println(Thread.currentThread().getName() + ":runnable by anonymous class");
9
    }
10
}).start();

三、方法引用

方法引用由::双冒号操作符标示,看起来像C++的作用域解析运算符。实现抽象方法的参数列表,必须与方法引用方法的参数列表保持一致!至于返回值就不作要求

1
/**
2
 * 方法引用
3
 */
4
private void methodReference() {
5
    List<String> pets = Arrays.asList("cat", "dog", "snake", "squirrel");
6
    pets.forEach((a) -> System.out.println(a));//lambda表达式
7
    pets.forEach(System.out::println);//方法引用
8
}

1. 引用方法^1

  • 对象引用::实例方法名

System.out是一个静态成员变量对象

1
Consumer<String> consumer = System.out::println;
2
consumer.accept("hello");
  • 类名::静态方法名
1
Function<Long, Long> abs = Math::abs;
2
System.out.println(abs.apply(-3L));
  • 类名::实例方法名
1
BiPredicate<String, String> b = String::equals;
2
System.out.println(b.test("abc", "abcd"));

2. 引用构造器

在引用构造器的时候,构造器参数列表要与接口中抽象方法的参数列表一致,格式为 类名::new。如

1
Function<Integer, StringBuffer> fun = StringBuffer::new;
2
StringBuffer buffer = fun.apply(10);

Function接口的apply方法接收一个参数,并且有返回值。在这里接收的参数是Integer类型,与StringBuffer类的一个构造方法StringBuffer(int capacity)对应,而返回值就是StringBuffer类型。上面这段代码的功能就是创建一个Function实例,并把它apply方法实现为创建一个指定初始大小的StringBuffer对象。

3. 引用数组

引用数组和引用构造器很像,格式为 类型[]::new,其中类型可以为基本类型也可以是类。如

1
Function<Integer, int[]> fun = int[]::new;
2
int[] arr = fun.apply(10);
3
4
Function<Integer, Integer[]> fun2 = Integer[]::new;
5
Integer[] arr2 = fun2.apply(10);

四、java.util.function

1. Predicate接口

1
@FunctionalInterface
2
public interface Predicate<T> {
3
4
    /**
5
     * Evaluates this predicate on the given argument.
6
     *
7
     * @param t the input argument
8
     * @return {@code true} if the input argument matches the predicate,
9
     * otherwise {@code false}
10
     */
11
    boolean test(T t);
12
}

Predicate是一个函数式接口,也是一个泛型接口,作用是对传入的一个泛型参数进行判断,并返回一个boolean值,任何符合这一条件的lambda表达式都可以转为Predicate接口,该接口的实现一般用于集合过滤等。下面提供一些例子:

1
/**
2
 * lambda表达式加Predicate接口
3
 */
4
private void predicateDemo() {
5
    List<String> pets = Arrays.asList("cat", "dog", "snake", "squirrel");
6
    //简单的Predicate接口示例
7
    pets.stream().filter(s -> s.startsWith("s")).forEach(System.out::println);
8
9
    Predicate<String> startWith = s -> s.startsWith("s");
10
    Predicate<String> lengthPredicate = s -> s.length() > 2;
11
    //Predicate支持and()操作将两个Predicate接口的逻辑取与
12
    System.out.println("Predicate and() demo:");
13
    pets.stream().filter(startWith.and(lengthPredicate)).forEach(System.out::println);
14
    //Predicate支持or()操作将两个Predicate接口的逻辑取或
15
    System.out.println("Predicate or() demo:");
16
    pets.stream().filter(startWith.or(lengthPredicate)).forEach(System.out::println);
17
    //Predicate支持negate()操作将Predicate接口的逻辑取反
18
    System.out.println("Predicate negate() demo:");
19
    pets.stream().filter(startWith.negate()).forEach(System.out::println);
20
    //Predicate静态方法isEqual()
21
    System.out.println("Predicate isEqual() demo:");
22
    pets.stream().filter(Predicate.isEqual("cat")).forEach(System.out::println);
23
}

从示例中可以看到,Predicate接口支持and(), or(), negate()等,并且有一个静态方法isEqual。

2. Consumer接口

1
@FunctionalInterface
2
public interface Consumer<T> {
3
4
    /**
5
     * Performs this operation on the given argument.
6
     *
7
     * @param t the input argument
8
     */
9
    void accept(T t);
10
11
    /**
12
     * Returns a composed {@code Consumer} that performs, in sequence, this
13
     * operation followed by the {@code after} operation. If performing either
14
     * operation throws an exception, it is relayed to the caller of the
15
     * composed operation.  If performing this operation throws an exception,
16
     * the {@code after} operation will not be performed.
17
     *
18
     * @param after the operation to perform after this operation
19
     * @return a composed {@code Consumer} that performs in sequence this
20
     * operation followed by the {@code after} operation
21
     * @throws NullPointerException if {@code after} is null
22
     */
23
    default Consumer<T> andThen(Consumer<? super T> after) {
24
        Objects.requireNonNull(after);
25
        return (T t) -> { accept(t); after.accept(t); };
26
    }
27
}

Consumer是一个函数式接口,也是一个泛型接口,作用是对传入的一个泛型参数进行操作,可能会对原数据产生side effect,即改变数据。常见的例子有:

  1. Iterable接口的forEach方法接受一个Comsumer接口类型的操作,对集合中的元素进行操作,例如打印等。

3. Function接口

1
@FunctionalInterface
2
public interface Function<T, R> {
3
4
    /**
5
     * Applies this function to the given argument.
6
     *
7
     * @param t the function argument
8
     * @return the function result
9
     */
10
    R apply(T t);
11
}

该接口代表接受一个参数并返回一个参数的操作类型。

4. Supplier接口

1
@FunctionalInterface
2
public interface Supplier<T> {
3
4
    /**
5
     * Gets a result.
6
     *
7
     * @return a result
8
     */
9
    T get();
10
}

该接口表示调用后返回一个参数的操作类型,返回结果不要求一定是新创建的或者不同的。

五、StreamAPI原理

1. Collection接口的stream(), parallelStream()方法

Collection接口,即各种集合类的最上层的接口,支持通过stream(), parallelStream()方法产生Stream。源码如下:

1
default Stream<E> stream() {
2
    return StreamSupport.stream(spliterator(), false);
3
}
4
5
default Stream<E> parallelStream() {
6
        return StreamSupport.stream(spliterator(), true);
7
}

其中spliterator()返回Spliterator接口,该接口用于对collection进行遍历或分片(parallelStream的情况下)。

2. Stream接口及其实现ReferencePipeline

Stream接口支持一系列操作,包括filter, map, distinct, sorted, peak, limit, skip等方法,这些方法仍然返回Stream接口,还支持reduce, forEach, min, max, collect, count, toArray等方法,这些方法不再返回Stream接口,是对流式操作的结束操作,是TerminalOp。

Stream接口的具体实现是ReferencePipeline类,执行具体的操作。具体可查看源码,此处不展开。

对于parallelStream不同的操作有不同的实现,各自的实现会决定是否真正并行操作。例如,reduce操作会调用ReduceOps执行具体操作,ReduceOps中ReduceOp类的部分源码如下:

1
@Override
2
public <P_IN> R evaluateParallel(PipelineHelper<T> helper,
3
                                 Spliterator<P_IN> spliterator) {
4
    return new ReduceTask<>(this, helper, spliterator).invoke().get();
5
}

其中ReduceTask实现了ForkJoinTask,表明它是通过fork/join框架实现的并行处理。

参考文献【3】^3【4】^4中对StreamAPI原理有更详细的解释,这块后续我再更新。

3. generator 生成器

Stream为生成器的创建提供了便捷,生成器的好处是只有在你需要的时候才生成数据或对象,不需要提前生成。

例如,IntStream的generate()方法就生成一个流,可以无限调用生成整数。

1
public static IntStream generate(IntSupplier s) {
2
    Objects.requireNonNull(s);
3
    return StreamSupport.intStream(
4
            new StreamSpliterators.InfiniteSupplyingSpliterator.OfInt(Long.MAX_VALUE, s), false);
5
}

参考文献

HTTPS(HTTP over SSL)

HTTPS 实际上是HTTP over SSL,即在原来的HTTP协议层与TCP协议层之间加入SSL协议层,负责对数据通道进行加密。

TLS/SSL产生的背景是HTTP明文传输带来的几个问题:

(1) 窃听风险(eavesdropping):第三方可以获知通信内容。

(2) 篡改风险(tampering):第三方可以修改通信内容。

(3) 冒充风险(pretending):第三方可以冒充他人身份参与通信。

因此要达成的目标是:加密传输(防窃听)、数字签名校验(防篡改)、身份证书(防冒充),基于这三个目标来了解TLS/SSL的工作原理就比较清晰了,能明确每一个部分设计的原因。

PS : HTTPS 默认端口为443,而HTTP默认端口为80

TLS/SSL

先来了解一下TLS/SSL的前世今生。

SSL(Secure Socket Layer),由网景公司开发,3.0版本开始标准化为TLS。

TLS(Transport Layer Security)是SSL的标准化后的产物,有1.0 1.1 1.2三个版本,默认使用1.0

TLS是SSL的标准化后的产物,现在实际使用的是TLS,由于习惯原因仍经常称之为SSL。

协议工作流程

安全传输层协议(TLS)用于在两个通信应用程序之间提供保密性和数据完整性。该协议由两层组成: TLS 记录协议(TLS Record)和 TLS 握手协议(TLS Handshake),协议栈如下:

img

其中记录协议负责数据的分片、压缩、验证、加密等,握手协议负责客户端与服务端的握手,秘钥交换,告警等。整体流程如下图所示。

img

或者如下图所示:

img

简单的SSL握手连接过程(仅server端交换证书给client),详情可见参考文献[3]:

  • 1.client发送ClientHello,指定版本,随机数(RN),所有支持的密码套件(CipherSuites)

    ClientHello附带的数据随机数据RN,会在生成session key时使用,Cipher suite列出了client支持的所有加密算法组合,每一组包含3种算法,一个是非对称算法,如RSA,一个是对称算法如DES,3DES,RC4等,一个是Hash算法,如MD5,SHA等,server会从这些算法组合中选取一组,作为本次SSL连接中使用。

  • 2.server回应ServerHello,指定版本,RN,选择CipherSuites,会话ID(Session ID)

    ServerHello包含一个session id,如果SSL连接断开,再次连接时,可以使用该属性重新建立连接,在双方都有缓存的情况下可以省略握手的步骤。server端也会生成随机的RN,用于生成session key使用。server会从client发送的Cipher suite列表中挑出一个,例如RSA+RC4+MD5。

client接到ServerHello并处理后,双发状态是已经协商好了一组算法,包括对称加密、非对称加密、摘要,以及后续计算对称秘钥用的随机数

  • 3.server发送Certificate
    server的证书信息,只包含public key,server保存有public key对应的private key,用于证明server是该证书的实际拥有者。

    那么如何验证呢?原理很简单:client随机生成一串数,用server的public key加密(显然是RSA算法),发给server,server用private key解密后返回给client,client与原文比较,如果一致,则说明server拥有private key,也就说明与client通信的正是证书的拥有者。

    利用这个原理,就可以实现session key的交换,加密前的那串随机数就可用作session key,因为除了client和server,没有第三方能获得该数据了。实际上session key的交换也是这么做的。
    原理很简单,实际使用时会复杂很多,数据经过多次hash,伪随机等的运算,前面提到的client和server端得RN都会参与计算。这个原理用于下面client发送ClientKeyExchange前进行session key的计算。

client拿到了server的public key。

  • 4.Server发送ServerHelloDone

  • 5.client发送ClientKeyExchange,用于与server交换session key
    client随机生成48字节的pre-master secret,padding后用public key加密得到130字节的数据发送给server,server解密也能得到pre-master secret。

双方使用pre-master secret、”master secret”常量字节流、前期交换的server端RN和client的RN作为参数,使用一个伪随机函数PRF,其实就是hash之后再hash,最后得到48字节的master secret。master secret再与”key expansion”常量,双方RN经过伪随机函数运算得到key_block,PRF伪随机函数可以循环输出数据,因此我们想得到多少字节都可以,就如Random伪随机函数,给它一个种子,后续用hash计算能得到无数个随机数,如果每次种子相同,得到的序列是一样的,但是这里的输入时48字节的master secret,2个28字节的RN和一个字符串常量,碰撞的可能性是很小的。得到key block后,算法就从中取出session key,IV(对称算法中使用的初始化向量)等。client和server使用的session key是不一样的,但只要双方都知道对方使用的是什么就行了。这里会取出4个:client/server加密正文的key,client/server计算handshake数据hash的key。

注意双方只交换了pre-master key,后续的计算都是独立完成的,由于算法和种子都一样,所以得到的session key也一致。

server接到ClientKeyExchange处理后,双方状态是已经协商好了对称秘钥,后续再利用对称秘钥进行一次验证

  • 6.client发送ChangeCipherSpec,指示Server从现在开始发送的消息都是加密过的

  • 7.client发送Finishd,包含了前面所有握手消息的hash,可以让server验证握手过程是否被第三方篡改

    Finishd:client发送的加密数据,这个消息非常关键,一是能证明握手数据没有被篡改过,二能证明自己确实是session key的拥有者(这里是单边验证,只有server有certificate,server发送的Finished能证明自己含有private key,原理是一样的)。

    client将之前发送的所有握手消息存入handshake messages缓存,进行MD5和SHA-1两种hash运算,再与前面的master secret和一串常量”client finished”进行PRF伪随机运算得到12字节的verify data,还要经过改进的MD5计算得到加密信息。为什么能证明上述两点呢,前面说了只有密钥的拥有者才能解密得到pre-master key,master key,最后得到key block后,进行hash运算得到的结果才与发送方的一致。

  • 8.server发送ChangeCipherSpec,指示Client从现在开始发送的消息都是加密过的

  • 9.server发送Finishd,包含了前面所有握手消息的hash,可以让client验证握手过程是否被第三方篡改,并且证明自己是Certificate密钥的拥有者(因为如果不是,那么无法得到pre-master key,也就无法计算得到session key),即证明自己的身份。

握手完成后,客户端和服务端完成了对称秘钥session key的交换,数据通过对称秘钥加密后进行传输,实现了加密的数据传输通道。

CA证书

img

上面的握手流程讲完并没有描述数字证书扮演的角色,以及服务端向客户端返回证书,客户端进行验证的过程,这部分过程见上图。

服务端,即站点先向CA申请证书,申请信息中带有自己生成的公钥信息,CA审核通过后签发证书,证书包含服务端公钥、证书签名,证书签名为将证书明文部分计算数字摘要后,用CA的私钥加密的签名,防止证书被篡改(后面会讲到数字签名这块)。证书申请与签发是由站点先行完成的。

客户端在与服务端握手的过程中拿到证书,由于证书明文部分带有证书的公钥信息,能用CA的公钥对签名进行解密,并通过对比解密后的数字摘要与自己用明文计算的数字摘要来验证信息是否被篡改,从而保证了证书的安全性。后续通过证书中的server公钥来验证该server为证书拥有者,原理见上面的握手流程。

CA证书的安全使得客户端可以验证服务端的身份,防止站点被冒充,即钓鱼网站。

对称加密算法

加密和解密使用相同的密钥,计算较快,安全性较差。在SSL中常用的对称加密算法有RC4,AES,3DES,Camellia等。

非对称加密算法

加密和解密使用不同的密钥,数据在一端用公钥加密后,在另一端用私钥解密,安全性较高。由于性能问题,非对称加密一般用于数字签名(前面提到的证书的签名)和秘钥(对称加密算法的秘钥)的交换。常见的算法有RSA、DSA、ECC。

数字签名

数字签名就是“非对称加密+摘要算法”,其目的不是为了加密,而是用来防止他人篡改数据。
其核心思想是:比如A要给B发送数据,A先用摘要算法得到数据的指纹,然后用A的私钥加密指纹,加密后的指纹就是A的签名,B收到数据和A的签名后,也用同样的摘要算法计算指纹,然后用A公开的公钥解密签名,比较两个指纹,如果相同,说明数据没有被篡改,确实是A发过来的数据。假设C想改A发给B的数据来欺骗B,因为篡改数据后指纹会变,要想跟A的签名里面的指纹一致,就得改签名,但由于没有A的私钥,所以改不了,如果C用自己的私钥生成一个新的签名,B收到数据后用A的公钥根本就解不开。

至于为什么要使用摘要算法是因为非对称加密算法对原文长度有要求,所以先通过摘要算法生成一段较短的指纹,再进行非对称加密

摘要算法

摘要算法不是用来加密的,其输出长度固定,相当于计算数据的指纹,主要用来做数据校验,验证数据的完整性和正确性。常见的算法有CRC、MD5、SHA1、SHA256。
CRC32:CRC本身是“冗余校验码”的意思,CRC32则表示会产生一个32bit(8位十六进制数)的校验值。由于CRC32产生校验值时源数据块的每一个bit(位)都参与了计算,所以数据块中即使只有一位发生了变化,也会得到不同的CRC32值。

HTTP 短连接 vs 长连接

下面来聊一下短连接与长连接。

HTTP是一个无状态的面向连接的协议,无状态不代表HTTP不能保持TCP连接,更不能代表HTTP使用的是UDP协议(无连接)。HTTP的短连接、长连接实际上指的是TCP协议的短连接、长连接,长连接即是指多个HTTP请求复用一个TCP连接。

在HTTP/1.0中,默认使用的是短连接。也就是说,浏览器和服务器每进行一次HTTP操作,就建立一次连接,但任务结束就中断连接。如果客户端浏览器访问的某个HTML或其他类型的 Web页中包含有其他的Web资源,如JavaScript文件、图像文件、CSS文件等,当浏览器每遇到这样一个Web资源,就会建立一个HTTP会话。但从 HTTP/1.1起,默认使用长连接,用以保持连接特性。使用长连接的HTTP协议,会在响应头加入这行代码:

Connection:keep-alive 服务器和客户端都要设置

TCP 协议

三次握手与四次挥手

TCP连接的建立与释放分别对应的是三次握手与四次挥手,具体如下图:

img img

HTTP/2 多路复用

详细可参考此文:https://blog.wangriyu.wang/2018/05-HTTP2.html,作者写的非常详细:)

参考文献:

[1] http://www.ruanyifeng.com/blog/2014/02/ssl_tls.html

[2] https://blog.csdn.net/ustccw/article/details/76691248

[3] https://www.cnblogs.com/piyeyong/archive/2010/07/02/1770208.html

[4] https://blog.csdn.net/huangyuhuangyu/article/details/78220005

[5] https://blog.wangriyu.wang/2018/05-HTTP2.html