Netty基础入门学习
【1】Netty是什么
Netty官网:https://netty.io/
4.x用户指南:https://netty.io/wiki/user-guide-for-4.x.html
GitHub地址:https://github.com/netty/netty
Netty官网推荐使用4.x版本。这里使用的是依赖如下:
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.31.Final</version>
</dependency>
① 有这样几个描述:
- Netty是一个异步事件驱动的网络应用程序框架, 用于快速开发可维护的高性能协议服务器和客户端。它极大地简化了TCP和UDP套接字服务器等网络编程。
- Netty是基于Java NIO的网络应用框架,client-server框架。
- Netty是一个高性能、异步事件驱动的NIO框架,它提供了对TCP、UDP和文件传输的支持,作为一个异步NIO框架,Netty的所有IO操作都是异步非阻塞的,通过Future-Listener机制,用户可以方便的主动获取或者通过通知机制获得IO操作结果。
- 作为当前最流行的NIO框架,Netty在互联网领域、大数据分布式计算领域、游戏行业、通信行业等获得了广泛的应用。
② NIO通信模型
如下图所示:
Selector 一般称 为选择器 ,也可以翻译为 多路复用器。四种状态:Connect(连接就绪)、Accept(接受就绪)、Read(读就绪)、Write(写就绪)。
③ Netty基本原理
如下图所示:
【2】丢弃服务器discard
实例如下:
package com.netty.hanler;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.ReferenceCountUtil;
/**
* Created by Janus on 2018/11/19.
*/
public class DiscardServerHandler extends ChannelInboundHandlerAdapter {// (1)
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) { // (2)
// Discard the received data silently.
try {
((ByteBuf) msg).release(); // (3)
} catch (Exception e) {
e.printStackTrace();
}finally {
ReferenceCountUtil.release(msg);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // (4)
// Close the connection when an exception is raised.
cause.printStackTrace();
ctx.close();
}
}
① DiscardServerHandler extends ChannelInboundHandlerAdapter,这是一个实现ChannelInboundHandler。ChannelInboundHandler提供可以覆盖的各种事件处理程序方法。目前,只需自己扩展ChannelInboundHandlerAdapter而不是实现处理程序接口。
ChannelInboundHandlerAdapter继承图如下:
② 我们在channelRead()这里覆盖事件处理程序方法。每当从客户端接收到新数据时,都会使用收到的消息调用此方法。在此示例中,接收消息的类型是ByteBuf。
③ 要实现该DISCARD协议,处理程序必须忽略收到的消息。ByteBuf是一个引用计数对象,必须通过该release()方法显式释放。请记住,处理程序有责任释放传递给处理程序的任何引用计数对象。通常,channelRead()handler方法实现如下:
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
try {
// Do something with msg
} finally {
ReferenceCountUtil.release(msg);
}
}
④ 该exceptionCaught()事件处理方法被Netty在抛出异常时调用,异常可能由于I / O错误或处理器在处理事件时抛出。在大多数情况下,应记录捕获的异常并在此处关闭其关联的通道,尽管此方法的实现可能会有所不同,具体取决于您要处理特殊情况的操作。例如,您可能希望在关闭连接之前发送带有错误代码的响应消息。
DiscardServer 类和main方法如下:
package com.netty.server;
import com.netty.hanler.DiscardServerHandler;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
/**
* Created by Janus on 2018/11/19.
*/
public class DiscardServer {
private int port;
public DiscardServer(int port) {
this.port = port;
}
public void run() throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1)
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap(); // (2)
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class) // (3)
.childHandler(new ChannelInitializer<SocketChannel>() { // (4)
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new DiscardServerHandler());
}
})
.option(ChannelOption.SO_BACKLOG, 128) // (5)
.childOption(ChannelOption.SO_KEEPALIVE, true); // (6)
// Bind and start to accept incoming connections.
ChannelFuture f = b.bind(port).sync(); // (7)
// Wait until the server socket is closed.
// In this example, this does not happen, but you can do that to gracefully
// shut down your server.
f.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
int port = 8080;
if (args.length > 0) {
port = Integer.parseInt(args[0]);
}
new DiscardServer(port).run();
}
}
① NioEventLoopGroup是一个处理I / O操作的多线程事件循环。Netty EventLoopGroup为不同类型的传输提供各种实现。我们在此示例中实现了服务器端应用程序,因此NioEventLoopGroup将使用两个。第一个,通常称为“boss”,接受传入连接。第二个,通常称为“worker”,一旦"Boss"接受连接并将接受的连接注册到"worker",就处理被接受连接。使用了多少个线程以及它们如何映射到创建的Channels取决于EventLoopGroup实现,甚至可以通过构造函数进行配置。
② ServerBootstrap是一个设置服务器的帮助程序类。可以直接使用一个Channel设置服务器。但请注意,这是一个繁琐的过程,在大多数情况下您不需要这样做。
③ 在这里,我们指定使用NioServerSocketChannel用于实例化一个新的Channel来接受传入连接。
④ 此处指定的处理程序将始终由新接受的Channel进行评估。这ChannelInitializer是一个特殊的处理程序,旨在帮助用户配置新的Channel。您最有可能希望通过添加一些处理程序(例如DiscardServerHandler去实现网络应用)来配置ChannelPipeline of the new Channel。随着应用程序变得复杂,您可能会向管道添加更多处理程序,并最终将此匿名类提取到顶级类中。
⑤ 您还可以设置特定于Channel实现的参数。我们正在编写TCP / IP服务器,因此我们可以设置套接字选项,如tcpNoDelay和keepAlive。请参考ChannelOption 和详细的ChannelConfig 实现的接口文档以此可以对ChannelOption 的有一个大概的认识。
⑥ 你有没有注意到option()和childOption()?option()英语于NioServerSocketChannel来接受传入的连接。 childOption() 是提供给由父管道ServerChannel 接收到的连接,在这个例子中也是 NioServerSocketChannel。
⑦ 剩下的就是绑定到端口并启动服务器。在这里,我们绑定到机器8080中所有NIC(网络接口卡)的端口。现在可以bind()根据需要多次调用该方法(使用不同的绑定地址。)
这里可以使用telnet测试8080端口数据发送,修改channelRead方法,打印其接受到的内容:
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) { // (2)
// Discard the received data silently.
ByteBuf in = (ByteBuf) msg;
try {
while (in.isReadable()) { // (1)
System.out.print((char) in.readByte());
System.out.flush();
}
} finally {
ReferenceCountUtil.release(msg); // (2)
}
}
使用telnet测试步骤与效果参考博文:Telnet测试8080端口发送数据。
【3】编写Echo应答服务器
到目前为止,我们一直在使用数据而没有响应。但是,服务器通常应该响应请求。让我们学习如何通过实现ECHO协议将响应消息写入客户端,其中任何接收的数据都被发回。
与我们在前面部分中实现的丢弃服务器的唯一区别在于它将接收的数据发回,而不是将接收的数据打印到控制台。因此,再次修改channelRead()方法就足够了:
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ctx.write(msg); // (1)
ctx.flush(); // (2)
}
① ChannelHandlerContext 对象提供了许多操作,使你能够触发各种各样的 I/O 事件和操作。这里我们调用了 write(Object) 方法来逐字地把接受到的消息写入。请注意不同于 DISCARD 的例子我们并没有释放接受到的消息,这是因为当写入的时候 Netty 已经帮我们释放了。
② ctx.write(Object) 方法不会使消息写入到通道上,他被缓冲在了内部,你需要调用 ctx.flush() 方法来把缓冲区中数据强行输出。或者你可以用更简洁的cxt.writeAndFlush(msg) 以达到同样的目的。
总结:通过【2】【3】我们了解了一个基本的请求、应答netty server简单流程。使用了 EventLoopGroup bossGroup , EventLoopGroup workerGroup = new NioEventLoopGroup();及ServerBootstrap等关键基础组件。并通过继承ChannelInboundHandlerAdapter覆盖其channelRead方法来读取消息。接下来继续学习其他事件方法,如channelActive()。
【4】Time Server
在这个部分被实现的协议是TIME 协议。和之前的例子不同的是在不接受任何请求时他会发送一个含32位的整数的消息,并且一旦消息发送就会立即关闭连接。在这个例子中,你会学习到如何构建和发送一个消息,然后在完
成时关闭连接。
因为我们将会忽略任何接收到的数据,而只是在连接被创建发送一个消息,所以这次我们不能使用 channelRead() 方法了,代替他的是,我们需要覆盖 channelActive() 方法,下面的就是实现的内容:
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
/**
* Created by Janus on 2018/11/19.
*/
public class TimeServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(final ChannelHandlerContext ctx) { // (1)
final ByteBuf time = ctx.alloc().buffer(4); // (2)
time.writeInt((int) (System.currentTimeMillis() / 1000L + 2208988800L));
final ChannelFuture f = ctx.writeAndFlush(time); // (3)
f.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) {
assert f == future;
ctx.close();
}
}); // (4)
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
① channelActive() 方法将会在连接被建立并且准备进行通信时被调用。因此让我们在这个方法里完成一个代表当前时间的32位整数消息的构建工作。
② 为了发送一个新的消息,我们需要分配一个包含这个消息的新的缓冲。因为我们需要写入一个32位的整数,因此我们需要一个至少有4个字节的ByteBuf。通过 ChannelHandlerContext.alloc() 得到一个当前的ByteBufAllocator,然后分配一个新的缓冲。
③ 和往常一样我们需要编写一个构建好的消息。但是等一等,flip 在哪?难道我们使用 NIO 发送消息时不是调用 java.nio.ByteBuffer.flip() 吗?ByteBuf 之所以没有这个方法因为有两个指针,一个对应读操作一个对应写操作。当你向 ByteBuf 里写入数据的时候写指针的索引就会增加,同时读指针的索引没有变化。读指针索引和写指针索引分别代表了消息的开始和结束。
比较起来,NIO 缓冲并没有提供一种简洁的方式来计算出消息内容的开始和结尾,除非你调用 flip 方法。当你忘记调用 flip 方法而引起没有数据或者错误数据被发送时,你会陷入困境。这样的一个错误不会发生在 Netty上,因为我们对于不同的操作类型有不同的指针。你会发现这样的使用方法会让你过程变得更加的容易,因为你已经习惯一种没有使用 flip 的方式。
另外一个点需要注意的是 ChannelHandlerContext.write() (和 writeAndFlush() )方法会返回一个ChannelFuture 对象,一个 ChannelFuture 代表了一个还没有发生的 I/O 操作。这意味着任何一个请求操作都不会马上被执行,因为在 Netty 里所有的操作都是异步的。举个例子下面的代码中在消息被发送之前可能会先关闭连接。
Channel ch = ...;
ch.writeAndFlush(message);
ch.close();
因此你需要在 write() 方法返回的 ChannelFuture 完成后调用 close() 方法,然后当他的写操作已经完成他会通知他的监听者。请注意,close() 方法也可能不会立马关闭,他也会返回一个ChannelFuture。
④ 当一个写请求已经完成是如何通知到我们?这个只需要简单地在返回的 ChannelFuture 上增加一个ChannelFutureListener。这里我们构建了一个匿名的 ChannelFutureListener 类用来在操作完成时关闭 Channel。
或者,你可以使用简单的预定义监听器代码:
f.addListener(ChannelFutureListener.CLOSE);
为了测试我们的time服务如我们期望的一样工作,你可以使用 UNIX 的 rdate 命令
$ rdate -o <port> -p <host>
Port 是你在main()函数中指定的端口,host 使用 locahost 就可以了。
【5】Time Client
不像 DISCARD 和 ECHO 的服务端,对于 TIME 协议我们需要一个客户端,因为人们不能把一个32位的二进制数据翻译成一个日期或者日历。
在 Netty 中,编写服务端和客户端最大的并且唯一不同的使用了不同的BootStrap 和Channel的实现。
TimeClient实例如下:
import com.netty.hanler.TimeClientHandler;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
/**
* Created by Janus on 2018/11/19.
*/
public class TimeClient {
public static void main(String[] args) throws Exception {
// String host = args[0];
// int port = Integer.parseInt(args[1]);
String host="localhost";
//端口与Server保持一致
int port=8080;
// 这里只有一个workerGroup
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap(); // (1)
b.group(workerGroup); // (2)
//这里为NioSocketChannel not NioServerSocketChannel
b.channel(NioSocketChannel.class); // (3)
b.option(ChannelOption.SO_KEEPALIVE, true); // (4)
b.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new TimeClientHandler());
}
});
// Start the client.
ChannelFuture f = b.connect(host, port).sync(); // (5)
// Wait until the connection is closed.
f.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
}
}
}
① BootStrap 和ServerBootstrap 类似,不过他是对非服务端的 channel 而言,比如客户端或者无连接传输模式的 channel。
② 如果你只指定了一个EventLoopGroup,那他就会即作为一个 boss group ,也会作为一个 workder group,尽管客户端不需要使用到 boss worker 。
③ 代替NioServerSocketChannel的是NioSocketChannel,这个类在客户端channel 被创建时使用。
④ 不像在使用 ServerBootstrap 时需要用 childOption() 方法,因为客户端的SocketChannel 没有父亲。
⑤ 我们用 connect() 方法代替了 bind() 方法。
TimeClientHandler
正如你看到的,他和服务端的代码是不一样的。ChannelHandler 是如何实现的?他应该从服务端接受一个32位的整数消息,把他翻译成人们能读懂的格式,并打印翻译好的时间,最后关闭连接:
public class TimeClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf m = (ByteBuf) msg; // (1)
try {
long currentTimeMillis = (m.readUnsignedInt() - 2208988800L) * 1000L;
System.out.println(new Date(currentTimeMillis));
ctx.close();
} finally {
m.release();
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
① 在TCP/IP中,Netty 会把读到的数据放到 ByteBuf 的数据结构中。
这样看起来非常简单,并且和服务端的那个例子的代码也相差不多。然而,处理器有时候会因为抛出 IndexOutOfBoundsException 而拒绝工作。在下个部分我们会讨论为什么会发生这种情况。
【6】处理一个基于流的传输
① 关于 Socket Buffer的一个小警告
基于流的传输比如 TCP/IP, 接收到数据是存在 socket 接收的 buffer 中。不幸的是,基于流的传输并不是一个数据包队列,而是一个字节队列。意味着,即使你发送了2个独立的数据包,操作系统也不会作为2个消息处理而仅仅是作为一连串的字节而言。因此这是不能保证你远程写入的数据就会准确地读取。
举个例子,让我们假设操作系统的 TCP/TP 协议栈已经接收了3个数据包:
由于基于流传输的协议的这种普通的性质,在你的应用程序里读取数据的时候会有很高的可能性被分成下面的片段:
因此,一个接收方不管他是客户端还是服务端,都应该把接收到的数据整理成一个或者多个更有意思并且能够让程序的业务逻辑更好理解的数据。在上面的例子中,接收到的数据应该被构造成下面的格式:
② The First Solution
回到 TIME 客户端例子。同样也有类似的问题。一个32位整型是非常小的数据,他并不见得会被经常拆分到到不同的数据段内。然而,问题是他确实可能会被拆分到不同的数据段内,并且拆分的可能性会随着通信量的增加而增加。
最简单的方案是构造一个内部的可积累的缓冲,直到4个字节全部接收到了内部缓冲。下面的代码修改了 TimeClientHandler 的实现类修复了这个问题:
public class TimeClientHandler extends ChannelInboundHandlerAdapter {
private ByteBuf buf;
@Override
public void handlerAdded(ChannelHandlerContext ctx) {
buf = ctx.alloc().buffer(4); // (1)
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) {
buf.release(); // (1)
buf = null;
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf m = (ByteBuf) msg;
buf.writeBytes(m); // (2)
m.release();
if (buf.readableBytes() >= 4) { // (3)
long currentTimeMillis = (buf.readUnsignedInt() - 2208988800L) * 1000L;
System.out.println(new Date(currentTimeMillis));
ctx.close();
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
① ChannelHandler 有2个生命周期的监听方法:handlerAdded()和 handlerRemoved()。你可以完成任意初始化任务只要他不会被阻塞很长的时间。
② 首先,所有接收的数据都应该被累积在 buf 变量里。
③ 然后,处理器必须检查 buf 变量是否有足够的数据,在这个例子中是4个字节,然后处理实际的业务逻辑。否则,Netty 会重复调用channelRead() 当有更多数据到达直到4个字节的数据被积累。
③ The Second Solution
尽管第一个解决方案已经解决了 TIME 客户端的问题了,但是修改后的处理器看起来不那么的简洁,想象一下如果由多个字段比如可变长度的字段组成的更为复杂的协议时,你的ChannelInboundHandler 的实现将很快地变得难以维护。
正如你所知的,你可以增加多个ChannelHandler 到ChannelPipeline ,因此你可以把一整个ChannelHandler拆分成多个模块以减少应用的复杂程度,比如你可以把TimeClientHandler 拆分成2个处理器:
- TimeDecoder 处理数据拆分的问题
- TimeClientHandler 原始版本的实现
幸运地是,Netty 提供了一个可扩展的类,帮你完成 TimeDecoder 的开发。
public class TimeDecoder extends ByteToMessageDecoder { // (1)
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) { // (2)
if (in.readableBytes() < 4) {
return; // (3)
}
out.add(in.readBytes(4)); // (4)
}
}
① ByteToMessageDecoder 是ChannelInboundHandler 的一个实现类,他可以在处理数据拆分的问题上变得很简单。
② 每当有新数据接收的时候,ByteToMessageDecoder 都会调用 decode() 方法来处理内部的那个累积缓冲。
③ Decode() 方法可以决定当累积缓冲里没有足够数据时可以往 out 对象里放任意数据。当有更多的数据被接收了 ByteToMessageDecoder 会再一次调用 decode() 方法。
④ 如果在 decode() 方法里增加了一个对象到 out 对象里,这意味着解码器解码消息成功。ByteToMessageDecoder 将会丢弃在累积缓冲里已经被读过的数据。请记得你不需要对多条消息调用 decode(),ByteToMessageDecoder 会持续调用 decode() 直到不放任何数据到 out 里。
现在我们有另外一个处理器插入到ChannelPipeline 里,我们应该在 TimeClient 里修改 ChannelInitializer 的实现:
b.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new TimeDecoder(),new TimeClientHandler());
}
});
如果你是一个大胆的人,你可能会尝试使用更简单的解码类ReplayingDecoder。不过你还是需要参考一下 API文档来获取更多的信息。
public class TimeDecoder extends ReplayingDecoder<Void> {
@Override
protected void decode(
ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
out.add(in.readBytes(4));
}
}
此外,Netty还提供了更多开箱即用的解码器使你可以更简单地实现更多的协议,帮助你避免开发一个难以维护的处理器实现。请参考下面的包以获取更多更详细的例子:
- 对于二进制协议请看io.netty.example.factorial
- 对于基于文本协议请看io.netty.example.telnet
【7】用POJO代替ByteBuf
我们回顾了迄今为止的所有例子使用ByteBuf 作为协议消息的主要数据结构。在本节中,我们将改善的 TIME 协议客户端和服务器例子,使用 POJO 代替 ByteBuf。
在ChannelHandler 使用 POIO 的好处很明显:通过从ChannelHandler 中提取出 ByteBuf 的代码,将会使 ChannelHandler的实现变得更加可维护和可重用。
在 TIME 客户端和服务器的例子中,我们读取的仅仅是一个32位的整形数据,直接使用 ByteBuf 不会是一个主要的问题。然而,你会发现当你需要实现一个真实的协议,分离代码变得非常的必要。
首先,让我们定义一个新的类型叫做 UnixTime。
UnixTime实例如下:
public class UnixTime {
private final long value;
public UnixTime() {
this(System.currentTimeMillis() / 1000L + 2208988800L);
}
public UnixTime(long value) {
this.value = value;
}
public long value() {
return value;
}
@Override
public String toString() {
return new Date((value() - 2208988800L) * 1000L).toString();
}
}
现在我们可以修改下 TimeDecoder 类,返回一个 UnixTime,以替代ByteBuf:
public class TimeDecoder extends ByteToMessageDecoder { // (1)
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) { // (2)
System.out.println("进入TimeDecoder.decode方法");
if (in.readableBytes() < 4) {
System.out.println("Bytes is not reach 4");
return; // (3)
}
// out.add(in.readBytes(4)); // (4)
out.add(new UnixTime(in.readUnsignedInt())); // (4)
}
}
当更新TimeDecoder 后,TimeClientHandler 不再使用任何的 ByteBuf 代码了。
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
UnixTime m = (UnixTime) msg;
System.out.println(m);
ctx.close();
}
是不是变得更加简单和优雅了?相同的技术可以被运用到服务端。让我们修改一下 TimeServerHandler 的代码。
@Override
public void channelActive(ChannelHandlerContext ctx) {
ChannelFuture f = ctx.writeAndFlush(new UnixTime());
f.addListener(ChannelFutureListener.CLOSE);
}
现在,唯一缺少的功能是一个编码器,是ChannelOutboundHandler的实现,用来将 UnixTime 对象重新转化为一个 ByteBuf。这比编写一个解码器简单得多,因为编码消息时不需要处理数据包拆分和组装。
public class TimeEncoder extends ChannelOutboundHandlerAdapter {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
UnixTime m = (UnixTime) msg;
ByteBuf encoded = ctx.alloc().buffer(4);
encoded.writeInt((int)m.value());
ctx.write(encoded, promise); // (1)
}
}
① 在这几行代码里还有几个重要的事情:
- 第一,通过ChannelPromise,当编码后的数据被写到了通道上 Netty
可以通过这个对象标记是成功还是失败。 - 第二, 我们不需要调用 cxt.flush()。因为处理器已经单独分离出了一个
方法 void flush(ChannelHandlerContext cxt),如果像自己实现 flush() 方法内容可以自行覆盖这个方法。
进一步简化操作,你可以使用MessageToByteEncode:
public class TimeEncoder extends MessageToByteEncoder<UnixTime> {
@Override
protected void encode(ChannelHandlerContext ctx, UnixTime msg, ByteBuf out) {
out.writeInt((int)msg.value());
}
}
最后的任务就是在 TimeServerHandler 之前把 TimeEncoder 插入到ChannelPipeline。 但这是不那么重要的工作。
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class) // (3)
.childHandler(new ChannelInitializer<SocketChannel>() { // (4)
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new TimeEncoder(),new TimeServerHandler());
}
})
.option(ChannelOption.SO_BACKLOG, 128) // (5)
.childOption(ChannelOption.SO_KEEPALIVE, true); // (6)
【8】关闭你的应用
关闭一个 Netty 应用往往只需要简单地通过 shutdownGracefully() 方法来关闭你构建的所有的EventLoopGroup。
当EventLoopGroup 被完全地终止,并且对应的所有channel 都已经被关闭时,Netty 会返回一个Future对象来通知你。
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();