Jedis源码分析(二) - Jedis类结构及实现

Jedis源码分析共有四个章节,以下为各章链接:

  1. Jedis源码分析(一) - Jedis介绍
  2. Jedis源码分析(二) - Jedis类结构及实现
  3. Jedis源码分析(三) - JedisCluster类结构及实现
  4. Jedis源码分析(四) - JedisSentinel与ShardedJedis介绍

Jedis的类结构

首先看Jedis的内部结构,图2-1中用橘色框标出了主要支架, 为突出主要架构,或有稍许内容没有标出。

avatar

图1-1 Jedis的类结构

​ Jedis以输入的命令参数是否为二进制,将处理请求的具体实现部署在两个类中,例如JedisBinaryJedisClientBinaryClient。与Redis服务器的连接信息(Socket,host,port……)封装在Client的基类Connection中。BinaryJedis类中提供了Client,Pipeline和Transcation变量,对应3种请求模式。

Jedis的初始化流程

1
2
3
Jedis jedis = new Jedis("localhost", 6379, 15000);
Transaction t = jedis.multi();
Pipeline pipeline = jedis.pipelined();

Jedis通过传入Redis服务器地址(host,port)开始初始化,然后在BinaryJedis里实例化Client。Client通过Socket维持客户端与Redis服务器的连接与沟通。

前文提到Transaction和Pipeline很相似,它们继承同一个基类MultiKeyPipelineBase。区别在于Transaction在实例化的时候,就自动发送MULTI命令,开启事务模式,而Pipeline则按情况手动开启,它们均依靠Client发送命令。以下是Transaction和Pipeline初始化的具体实现:

1
2
3
4
5
6
7
8
9
10
11
//BinaryJedis类
public Transaction multi() {
client.multi();
transaction = new Transaction(client);
return transaction;
}
public Pipeline pipelined() {
pipeline = new Pipeline();
pipeline.setClient(client);
return pipeline;
}

下面通过发送一个get key 的命令,看看,这三种模式是如何运转的。

Jedis工作模式的调用流程

client请求模式

get key 为例,为突出主要步骤,部分代码略有缩减。

用法:

1
jedis.get("foo");

avatar

图3-1 Clinet模式的时序图

avatar

图3-2 Client模式的调用流程

图3-1和3-2是client模式下,发送请求,读取回复的具体实现。可以清楚看到,在每次发送命令前,会先通过connect()方法判断是否已经连接,若未连接则进行如下操作:

  1. 实例化Socket,并配置,
  2. 连接Socket,获取OutputStream和InputStream
  3. 如果是SSL连接,则会通过SSLSocketFactory创建socket连接

Protocol是一个通讯工具类,将Redis的各类执行关键字存储为静态变量,可以直观调用命令,例如Protocol.Command.GET。同时,将命令包装成符合Redis的统一请求协议,回复消息的处理也是在这个类进行,先通过通讯协提取出当次请求的回复消息,将Object类型的消息,格式化为String,List等具体类型,如果回复消息有Error则以异常的形式抛出。

Pipeline和Transaction模式

avatar

图3-3 Transaction和Pipeline的类结构

图3-3是Transaction和Pipeline两个类的的类结构。可以看到Pipeline和Transaction都继承自MultiKeyPipelineBase,其中,MultiKeyPipelineBasePipelineBase的区别在于处理的命令不同,内部均调用Client发送命令。从以下用例也可以看出两者的操作也十分类似。Pipeline有一个内部类对象MultiResponseBuilder,前文提到,当Redis事务结束时,会以List的形式,一次性返回所有命令的执行结果。MultiResponseBuilder对象就是用于,当Pipeline开始其实模式后,在事务结束时,存储所有返回结果。

Queable用一个LinkedList装入每个命令的返回结果,Response<T>是一个泛型,set(Object data)方法传入格式化之前的结果,get()方法返回格式化之后的结果。
Pipeline的使用方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
Pipeline p = jedis.pipelined();
//只发送命令,不读取结果,此时的Response<T>没有数据
Response<String> string = p.get("string");
Response<String> list = p.lpop("list");
Response<String> hash = p.hget("hash", "foo");
Response<Set<String>> zset = p.zrange("zset", 0, -1);
Response<String> set = p.spop("set");
//一次读取所有response.此时的Response<T>有数据
p.sync();

assertEquals("foo", string.get());
assertEquals("foo", list.get());
assertEquals("bar", hash.get());
assertEquals("foo", zset.get().iterator().next());
assertEquals("foo", set.get());

Transactions使用方法:
//开启事务
Transaction t = jedis.multi();
//命令进入服务端的待执行队列
Response<String> string = t.get("string");
Response<String> list = t.lpop("list");
Response<String> hash = t.hget("hash", "foo");
Response<Set<String>> zset = t.zrange("zset", 0, -1);
Response<String> set = t.spop("set");
//发送EXEC指令,执行所有命令,并返回结果
t.exec();

assertEquals("foo", string.get());
assertEquals("foo", list.get());
assertEquals("bar", hash.get());
assertEquals("foo", zset.get().iterator().next());
assertEquals("foo", set.get());

avatar

图3-4 Pipeline的调用时序图

avatar

图3-5 Pipeline的调用流程

图3-4,3-5显示了Pipeline从发送请求到读取回复的具体实现,Transaction的实现与其类似,因而没有另外做图说明。由上图可见,Pipeline通过Client发送命令,Client在sendCommand时,会同时执行pipelinedCommands++,记录发送命令的条数(参见图3-5)。之后,返回一个Response<T>实例,并将这个实例塞入了pipelinedResponses队列中。Response<T>主要有3个属性:

  1. 格式化前的回复消息data,
  2. 格式化后的回复消息response,
  3. 格式化方式builder。

​ 刚发送消息后,Response<T>里面的dataresponse是空值,只有格式化的方式builderSync()用于一次性读取所有回复,首先调用client的getAll()方法,getAll()方法根据之前记录的pipelinedCommands和Redis通讯协议,读取相同条数的回复消息到一个List,并返回给Pipeline。随后遍历这个List,逐个将回复消息赋给pipelinedResponses中每个Response<T>data

在执行Response<T>.get()命令时,Response<T>里面data已经有值了,但是是Object类型的,因而还要调用build()方法,做一次数据转换,返回格式化之后的数据。

以上就是Pipeline的主要工作流程。Transaction的exec()方法和sync()很相似,下文为exec()的具体实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public List<Object> exec() {
// 清空inputstream里面的所有数据,忽略QUEUED or ERROR回复
client.getMany(getPipelinedResponseLength());
//发送EXEC指令,让服务端执行所有命令
client.exec();
//事务结束
inTransaction = false;
//从inputStream中读取所有回复
List<Object> unformatted = client.getObjectMultiBulkReply();
if (unformatted == null) {
return null;
}
//和sync()一样
List<Object> formatted = new ArrayList<Object>();
for (Object o : unformatted) {
try {
formatted.add(generateResponse(o).get());
} catch (JedisDataException e) {
formatted.add(e);
}
}
return formatted;
}

本节虽未将Pipeline和Transaction的方法实现尽述,但也大同小异。关键点在于理解第一章介绍的3类请求逻辑。

0%