跳到主要内容

基于Netty实现简单的Redis客户端

· 阅读需 9 分钟

前言

Netty是Java中非常常用的网络库,它能自定义网络通信协议,从而实现非常丰富多样的网络功能。Redis是开发中非常常用的中间件,常用于缓存、分布式锁等场合,Redis的协议也非常简单。尽管Netty中自带了Redis的客户端,但是,为了学习,我们将不使用Netty自带的实现,而是自己动手写一个基础版本的Redis客户端。

目标

Redis支持字符串、列表、集合等数据类型,考虑到是学习目的,只实现最最基础的功能,也就是设置与获取字符串的值。具体来说,就是下面两个命令:

# 设置指定key的值
SET key value
# 获取指定key的值
GET key

协议

了解Redis协议

可以从 Redis协议详细规范 来了解Redis的协议。Redis的协议是基于字符串的而不是二进制的,这为协议的处理带来了许多便利。

除了通过看文档来了解协议细节,还可以利用Wireshark抓包来查看客户端与服务端具体的通信细节。

首先,启动Redis服务,这里我在本地启动的;然后打开Wireshark,由于Redis服务是在本地启动的,因此选择捕获本地环回网络流量(loopback traffic);由于Redis监听6379端口,过滤器可以填写port 6379,这样就过滤掉了我们不关心的流量,只留下Redis的包;最后打开Redis客户端并测试几个命令,即可看到Wireshark抓到的Redis协议的包:

screenshot-20211024193029

Redis请求

Redis请求时多行字符串的格式,例如客户端发送命令get key,那么实际上客户端发出的是:

*2\r\n$3\r\nget\r\n$3\r\nkey\r\n

*开头,然后是命令的单词数量,之后对于命令的每个单词,先是$+单词长度,然后是单词,并且都用\r\n分隔。

响应字符串

响应字符串也是多行字符串的格式。如果不存在该key,返回nil,也就是:

$-1\r\n

如果值是字符串,例如“hello”,那么返回结果为:

$5\r\nhello\r\n

响应错误与消息

如果出现错误,则会返回错误消息。错误消息是单行字符串,以-开头,如:

-ERR wrong number of arguments for 'get' command\r\n

如果是正常的消息,例如设置值成功,那么消息也是单行字符串,但是以+开头,如:

+OK\r\n

实现

命令行客户端

模仿redis-cli,使用Java写一个这样的命令行客户端。首先新建maven项目,并导入Netty依赖:

<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.12.Final</version>
</dependency>

新建类RedisClient,在main方法,初始化RedisClient对象并连接到6379端口。

public class RedisClient {
private final InetSocketAddress address;

public RedisClient(String host, int port) {
address = new InetSocketAddress(host, port);
}

public static void main(String[] args) {
int port = 6379;
RedisClient client = new RedisClient("127.0.0.1", port);
client.start();
}

public void start() {
//...
}
}

start方法中,添加4个handler,分别用于Redis请求编码、Redis响应解码、客户端处理Redis响应以及发出Redis请求。prompt用于命令行提示。

Bootstrap bootstrap = new Bootstrap();
EventLoopGroup group = new NioEventLoopGroup();
String prompt = String.format("%s:%d> ", address.getHostString(), address.getPort());
bootstrap.group(group)
.channel(NioSocketChannel.class)
.attr(AttributeKey.newInstance("prompt"), prompt)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new RedisRequestEncoder());
ch.pipeline().addLast(new RedisResponseDecoder());
ch.pipeline().addLast(new RedisClientResponseHandler());
ch.pipeline().addLast(new RedisClientRequestHandler());
}
});

之后,便可以连接到Redis服务器了。使用标准输入来获取要下发的Redis命令,当发送quit命令时客户端退出。

try {
// 连接Redis服务器
Channel channel = bootstrap.connect(address).sync().channel();
BufferedReader in = new BufferedReader(new InputStreamReader(System.in));
System.out.print(prompt);
ChannelFuture future = null;
while (true) {
String s = in.readLine();
if (s == null || "quit".equalsIgnoreCase(s = s.trim())) {
break;
}
if (s.length() == 0) {
System.out.print(prompt);
continue;
}
future = channel.writeAndFlush(s);
}
if (future != null) {
future.sync();
}
} catch (InterruptedException | IOException e) {
e.printStackTrace();
} finally {
group.shutdownGracefully();
}

编码器与请求Handler

尽管Redis支持内联命令,这里仍然将其转为多行字符串的格式,对于编码器只需要按格式构造拼接字符串即可。

public class RedisRequestEncoder extends MessageToByteEncoder<String> {
@Override
protected void encode(ChannelHandlerContext ctx, String msg, ByteBuf out) throws Exception {
String[] cmds = msg.split("\\s+");
StringBuilder sb = new StringBuilder();
sb.append("*").append(cmds.length).append("\r\n");
for (String cmd : cmds) {
sb.append("$").append(cmd.length()).append("\r\n")
.append(cmd).append("\r\n");
}
out.writeBytes(sb.toString().getBytes(StandardCharsets.UTF_8));
}
}

而请求也很简单,只需要发送命令即可。

public class RedisClientRequestHandler extends ChannelOutboundHandlerAdapter {

@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
String message = (String) msg;
ByteBuf buf = ctx.alloc().buffer(message.length());
buf.writeBytes(message.getBytes(StandardCharsets.UTF_8));
ctx.writeAndFlush(message);
ReferenceCountUtil.release(buf);
}
}

写入的命令会流到编码器转为多行字符串的格式再发到Redis服务端。

解码器与响应Handler

对于解码器,需要解析从Redis服务器返回的消息。

首先,根据第一个字符判断是哪种消息类型:如果是$开头,那么就是多行字符串,否则就是单行字符串。对于多行字符串,由于有nil的存在,因此使用Optional<String>类型,这样就能区分nil与空串。在对ByteBuf进行操作时,需要注意其readerIndex,确定读取的位置是想要的位置,以及对一个包的完整解析。

public class RedisResponseDecoder extends ByteToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
byte start = in.readByte();
if (start == '$') {
// 读取长度
int len = 0;
byte cur;
while ((cur = in.readByte()) != '\r') {
if (cur == '-') {
// 说明是空值,跳过后续的1\r\n
out.add(Optional.empty());
in.readBytes(3);
return;
}
len = len * 10 + (cur - '0');
}
// 当前cur位于\r,跳过后面的\n
in.skipBytes(1);

CharSequence sequence = in.readCharSequence(len, StandardCharsets.UTF_8);
String msg = sequence.toString();
out.add(Optional.of("\"" + msg + "\""));
// 跳过结尾的\r\n
in.skipBytes(2);
} else {
// 单行字符串
int index = ByteBufUtil.indexOf(Unpooled.buffer(1).writeByte('\r'), in);
// 注意这里的消息格式应与上面的保持一致为Optional<String>
if (index > 1) {
out.add(Optional.of(in.readCharSequence(index - 1, StandardCharsets.UTF_8).toString()));
} else {
out.add(Optional.empty());
}
// 跳过结尾的\r\n
in.skipBytes(2);
}
}
}

解码之后的处理也比较简单,打印结果即可。为了用户友好,还打印下命令行提示。

public class RedisClientResponseHandler extends SimpleChannelInboundHandler<Optional<String>> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, Optional<String> msg) throws Exception {
if (msg.isPresent()) {
System.out.println(msg.get());
} else {
System.out.println("(nil)");
}
String prompt = (String) ctx.channel().attr(AttributeKey.valueOf("prompt")).get();
System.out.print(prompt);
}
}

效果

为了查看效果,先退出本地的redis-cli,并确认本地的redis服务是启动的。然后运行我们自己编写的RedisClient,效果如下:

screenshot-20211024203515

总结

本篇介绍了Redis最基本的协议,并基于Netty做了个非常简单的Redis客户端,从而巩固了Netty的基本用法。

不过,由于是简单的demo,本例还有一些不够完善的地方。例如,没有与服务端建立心跳,在进行解码时没有判断是否有足够的字节可用等等。限于篇幅的原因,留给读者自行尝试。(好吧,是因为笔者也是初学者)

代码GitHub:demo-projects/netty-redis at master · straicat/demo-projects

下一篇:基于Netty实现简单的Redis服务端