基于Netty实现简单的Redis客户端
前言
Netty是Java中非常常用的网络库,它能自定义网络通信协议,从而实现非常丰富多样的网络功能。Redis是开发中非常常用的中间件,常用于缓存、分布式锁等场合,Redis的协议也非常简单。尽管Netty中自带了Redis的客户端,但是,为了学习,我们将不使用Netty自带的实现,而是自己动手写一个基础版本的Redis客户端。
目标
Redis支持字符串、列表、集合等数据类型,考虑到是学习目的,只实现最最基础的功能,也就是设置与获取字符串的值。具体 来说,就是下面两个命令:
# 设置指定key的值
SET key value
# 获取指定key的值
GET key
协议
了解Redis协议
可以从 Redis协议详细规范 来了解Redis的协议。Redis的协议是基于字符串的而不是二进制的,这为协议的处理带来了许多便利。
除了通过看文档来了解协议细节,还可以利用Wireshark
抓包来查看客户端与服务端具体的通信细节。
首先,启动Redis服务,这里我在本地启动的;然后打开Wireshark
,由于Redis服务是在本地启动的,因此选择捕获本地环回网络流量(loopback traffic);由于Redis监听6379端口,过滤器可以填写port 6379
,这样就过滤掉了我们不关心的流量,只留下Redis的包;最后打开Redis客户端并测试几个命令,即可看到Wireshark
抓到的Redis协议的包:
Redis请求
Redis请求时多行字符串的格式,例如客户端发送命令get key
,那么实际上客户端发出的是:
*2\r\n$3\r\nget\r\n$3\r\nkey\r\n
以*
开头,然后是命令的单词数量,之后对于命令的每个单词,先是$
+单词长度,然后是单词,并且都用\r\n
分隔。
响应字符串
响应字符串也是多行字符串的格式。如果不存在该key,返回nil
,也就是:
$-1\r\n
如果值是字符串,例如“hello”,那么返回结果为:
$5\r\nhello\r\n
响应错误与消息
如果出现错误,则会返回错误消息。错误消息是单行字符串,以-
开头,如:
-ERR wrong number of arguments for 'get' command\r\n
如果是正常的消息,例如设置值成功,那么消息也是单行字符串,但是以+
开头,如:
+OK\r\n
实现
命令行客户端
模仿redis-cli
,使用Java写一个这样的命令行 客户端。首先新建maven项目,并导入Netty依赖:
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.12.Final</version>
</dependency>
新建类RedisClient
,在main
方法,初始化RedisClient
对象并连接到6379端口。
public class RedisClient {
private final InetSocketAddress address;
public RedisClient(String host, int port) {
address = new InetSocketAddress(host, port);
}
public static void main(String[] args) {
int port = 6379;
RedisClient client = new RedisClient("127.0.0.1", port);
client.start();
}
public void start() {
//...
}
}
在start
方法中,添加4个handler
,分别用于Redis请求编码、Redis响应解码、客户端处理Redis响应以及发出Redis请求。prompt用于命令行提示。
Bootstrap bootstrap = new Bootstrap();
EventLoopGroup group = new NioEventLoopGroup();
String prompt = String.format("%s:%d> ", address.getHostString(), address.getPort());
bootstrap.group(group)
.channel(NioSocketChannel.class)
.attr(AttributeKey.newInstance("prompt"), prompt)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new RedisRequestEncoder());
ch.pipeline().addLast(new RedisResponseDecoder());
ch.pipeline().addLast(new RedisClientResponseHandler());
ch.pipeline().addLast(new RedisClientRequestHandler());
}
});
之后,便可以连接到Redis服务器了。使用标准输入来获取要下发的Redis命令,当发送quit
命令时客户端退出。
try {
// 连接Redis服务器
Channel channel = bootstrap.connect(address).sync().channel();
BufferedReader in = new BufferedReader(new InputStreamReader(System.in));
System.out.print(prompt);
ChannelFuture future = null;
while (true) {
String s = in.readLine();
if (s == null || "quit".equalsIgnoreCase(s = s.trim())) {
break;
}
if (s.length() == 0) {
System.out.print(prompt);
continue;
}
future = channel.writeAndFlush(s);
}
if (future != null) {
future.sync();
}
} catch (InterruptedException | IOException e) {
e.printStackTrace();
} finally {
group.shutdownGracefully();
}
编码器与请求Handler
尽管Redis支持内联命令,这里仍然将其转为多行字符串的格式,对于编码器只需要按格式构造拼接字符串即可。
public class RedisRequestEncoder extends MessageToByteEncoder<String> {
@Override
protected void encode(ChannelHandlerContext ctx, String msg, ByteBuf out) throws Exception {
String[] cmds = msg.split("\\s+");
StringBuilder sb = new StringBuilder();
sb.append("*").append(cmds.length).append("\r\n");
for (String cmd : cmds) {
sb.append("$").append(cmd.length()).append("\r\n")
.append(cmd).append("\r\n");
}
out.writeBytes(sb.toString().getBytes(StandardCharsets.UTF_8));
}
}
而请求也很简单,只需要发送命令即可。
public class RedisClientRequestHandler extends ChannelOutboundHandlerAdapter {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
String message = (String) msg;
ByteBuf buf = ctx.alloc().buffer(message.length());
buf.writeBytes(message.getBytes(StandardCharsets.UTF_8));
ctx.writeAndFlush(message);
ReferenceCountUtil.release(buf);
}
}
写入的命令会流到编码器转为多行字符串的格式再发到Redis服务端。
解码器与响应Handler
对于解码器,需要解析从Redis服务器返回的消息。
首先,根据第一个字符判断是哪种消息类型:如果是$
开头,那么就是多行字符串,否则就是单行字符串。对于多行字符串,由于有nil
的存在,因此使用Optional<String>
类型,这样就能区分nil
与空串。在对ByteBuf
进行操作时,需要注意其readerIndex
,确定读取的位置是想要的位置,以及对一个包的完整解析。
public class RedisResponseDecoder extends ByteToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
byte start = in.readByte();
if (start == '$') {
// 读取长度
int len = 0;
byte cur;
while ((cur = in.readByte()) != '\r') {
if (cur == '-') {
// 说明是空值,跳过后续的1\r\n
out.add(Optional.empty());
in.readBytes(3);
return;
}
len = len * 10 + (cur - '0');
}
// 当前cur位于\r,跳过后面的\n
in.skipBytes(1);
CharSequence sequence = in.readCharSequence(len, StandardCharsets.UTF_8);
String msg = sequence.toString();
out.add(Optional.of("\"" + msg + "\""));
// 跳过结尾的\r\n
in.skipBytes(2);
} else {
// 单行字符串
int index = ByteBufUtil.indexOf(Unpooled.buffer(1).writeByte('\r'), in);
// 注意这里的消息格式应与上面的保持一致为Optional<String>
if (index > 1) {
out.add(Optional.of(in.readCharSequence(index - 1, StandardCharsets.UTF_8).toString()));
} else {
out.add(Optional.empty());
}
// 跳过结尾的\r\n
in.skipBytes(2);
}
}
}
解码之后的处理也比较简单,打印结果即可。为了用户友好,还打印下命令行提示。
public class RedisClientResponseHandler extends SimpleChannelInboundHandler<Optional<String>> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, Optional<String> msg) throws Exception {
if (msg.isPresent()) {
System.out.println(msg.get());
} else {
System.out.println("(nil)");
}
String prompt = (String) ctx.channel().attr(AttributeKey.valueOf("prompt")).get();
System.out.print(prompt);
}
}
效果
为了查看效果,先退出本地的redis-cli
,并确认本地的redis
服务是启动的。然后运行我们自己编写的RedisClient
, 效果如下:
总结
本篇介绍了Redis最基本的协议,并基于Netty做了个非常简单的Redis客户端,从而巩固了Netty的基本用法。
不过,由于是简单的demo,本例还有一些不够完善的地方。例如,没有与服务端建立心跳,在进行解码时没有判断是否有足够的字节可用等等。限于篇幅的原因,留给读者自行尝试。(好吧,是因为笔者也是初学者)
代码GitHub:demo-projects/netty-redis at master · straicat/demo-projects