SOFA BOLT源码解析之设计要点-线程模型
1 设计要点解析
1.1 线程模型
此部分内容主要介绍蚂蚁为什么选择Netty4作为基础网络编程框架,来源于蚂蚁技术团队发布的一篇文章:
文章名称为:蚂蚁通信框架实践;
链接地址为:https://mp.weixin.qq.com/s/JRsbK1Un2av9GKmJ8DK7IQ?
此处引用该文章的内容,主要是想让大家更好的理解SOFA Bolt设计过程中所考虑的关键点,以便大家在实现自己的私有通信协议时,更加全面的考虑问题。
本文对原文作了一些修改,以更详细的说明SOFA Bolt实现过程中一些细节。
蚂蚁的中间件产品,主要是 Java 语言开发,如果通信产品直接用原生的 Java NIO 接口开发,工作量相当庞大。通常我们会选择一些基础网络编程框架,而在基础网络通信框架上,我们也经历了自研、基于Apache Mina实现。最终,由于 Netty 在网络编程领域的出色表现,我们逐步切换到了 Netty 上。
Netty 在 2008 年就发布了3.0.0 版本,到现在已经经历了 10 年多的发展。而且从 4.x 之后的版本,把无锁化的设计理念放在第一位,然后针对内存分配,高效的 Queue队列,高吞吐的超时机制等,做了各种细节优化。同时 Netty 的核心 Committer 与社区非常活跃,如果发现了缺陷能够及时得到修复。所有这些,使得Netty 性能非常的出色和稳定,成为当下 Java 领域最优秀的网络通信组件。
SOFA Bolt采用Netty4作为底层的网络通信组件,并对其进行了优化和扩展。所以,我们先看一下Netty4的线程模型,如下图所示:
本文没有采用原文的Netty4的线程模型图,而是采用自己以前绘制的Netty4线程模型图,突出了一些关键的细节,并根据SOFA Bolt实现,做了一些修改。
Netty4线程模型有几个关键点:
1. Reactor模型;
2. 串行化处理;
3. 业务线程池;
1.1.1 Reactor模型
Netty4采用Reactor模式。
Reactor模式是基于事件驱动的并发处理模型,将业务组织成各种事件驱动的对象。
Reactor模式主要用来处理来自于一个或多个客户端的并发服务请求。
Reactor模式主要是提高系统的吞吐量,在有限的资源下处理更多的事情。
RpcServer基于Netty4的ServerBootstrap实现了SOFA Bolt服务器端,其主要源码如下:
1. public class RpcServer extendsRemotingServer {
2.
3. ……略
4. /** server bootstrap */
5. privateServerBootstrap bootstrap;
6.
7. /** channelFuture */
8. private ChannelFuture channelFuture;
9.
10. /** global switch */
11. private GlobalSwitch globalSwitch = newGlobalSwitch();
12.
13. /** connection event handler */
14. private ConnectionEventHandler connectionEventHandler;
15.
16. /** connection event listener */
17. private ConnectionEventListener connectionEventListener =new ConnectionEventListener();
18.
19. /** user processors of rpc server */
20. private ConcurrentHashMap<String,UserProcessor<?>> userProcessors = new ConcurrentHashMap<String,UserProcessor<?>>(4);
21.
22. /** boss event loop group*/
23. private final EventLoopGroup bossGroup = new NioEventLoopGroup(1, newNamedThreadFactory("Rpc-netty-server-boss"));
24.
25. /** worker event loop group. Reuse I/Oworker threads between rpc servers. */
26. private final static NioEventLoopGroup workerGroup = newNioEventLoopGroup(Runtime.getRuntime().availableProcessors() * 2, new NamedThreadFactory("Rpc-netty-server-worker"));
27.
28. /** address parser to get custom args */
29. private RemotingAddressParser addressParser;
30.
31. /** connection manager */
32. private DefaultConnectionManager connectionManager;
33.
34. /** rpc remoting */
35. protected RpcRemoting rpcRemoting;
36.
37. static {
38. workerGroup.setIoRatio(SystemProperties.netty_io_ratio());
39. }
40.
41. /**
42. * Construct a rpc server. <br>
43. */
44. public RpcServer(int port) {
45. super(port);
46. }
47.
48. /**
49. * Construct a rpc server. <br>
50. */
51. public RpcServer(int port, booleanmanageConnection) {
52. this(port);
53. /** server connection managementfeature enabled or not, default value false, means disabled. */
54. if (manageConnection) {
55. this.globalSwitch.turnOn(GlobalSwitch.SERVER_MANAGE_CONNECTION_SWITCH);
56. }
57. }
58.
59. /**
60. * Construct a rpc server. <br>
61. */
62. public RpcServer(int port, booleanmanageConnection, boolean syncStop) {
63. this(port, manageConnection);
64. if (syncStop) {
65. this.globalSwitch.turnOn(GlobalSwitch.SERVER_SYNC_STOP);
66. }
67. }
68.
69. @Override
70. protected void doInit() {
71. if (this.addressParser == null) {
72. this.addressParser = newRpcAddressParser();
73. }
74. initRpcRemoting(null);
75. this.bootstrap = new ServerBootstrap();
76. this.bootstrap.group(bossGroup,workerGroup).channel(NioServerSocketChannel.class)
77. .option(ChannelOption.SO_BACKLOG,SystemProperties.tcp_so_backlog())
78. .option(ChannelOption.SO_REUSEADDR,SystemProperties.tcp_so_reuseaddr())
79. .childOption(ChannelOption.TCP_NODELAY,SystemProperties.tcp_nodelay())
80. .childOption(ChannelOption.SO_KEEPALIVE,SystemProperties.tcp_so_keepalive());
81.
82. // set write buffer water mark
83. initWriteBufferWaterMark();
84.
85. boolean pooledBuffer =SystemProperties.netty_buffer_pooled();
86. if (pooledBuffer) {
87. this.bootstrap.option(ChannelOption.ALLOCATOR,PooledByteBufAllocator.DEFAULT)
88. .childOption(ChannelOption.ALLOCATOR,PooledByteBufAllocator.DEFAULT);
89. }
90.
91. final boolean idleSwitch =SystemProperties.tcp_idle_switch();
92. final int idleTime =SystemProperties.tcp_server_idle();
93. final ChannelHandler serverIdleHandler= new ServerIdleHandler();
94. final RpcHandler rpcHandler = newRpcHandler(true, this.userProcessors);
95. this.bootstrap.childHandler(newChannelInitializer<SocketChannel>() {
96.
97. protected voidinitChannel(SocketChannel channel) throws Exception {
98. ChannelPipeline pipeline =channel.pipeline();
99. pipeline.addLast("decoder", new RpcProtocolDecoder(
100. RpcProtocolManager.DEFAULT_PROTOCOL_CODE_LENGTH));
101. pipeline.addLast(
102. "encoder",
103. newProtocolCodeBasedEncoder(ProtocolCode
104. .fromBytes(RpcProtocolV2.PROTOCOL_CODE)));
105. if (idleSwitch) {
106. pipeline.addLast("idleStateHandler", new IdleStateHandler(0, 0,idleTime,
107. TimeUnit.MILLISECONDS));
108. pipeline.addLast("serverIdleHandler", serverIdleHandler);
109. }
110. pipeline.addLast("connectionEventHandler", connectionEventHandler);
111. pipeline.addLast("handler", rpcHandler);
112. createConnection(channel);
113. }
114.
115. /**
116. * create connectionoperation<br>
117. */
118. private voidcreateConnection(SocketChannel channel) {
119. Url url =addressParser.parse(RemotingUtil.parseRemoteAddress(channel));
120. if(globalSwitch.isOn(GlobalSwitch.SERVER_MANAGE_CONNECTION_SWITCH)) {
121. connectionManager.add(newConnection(channel, url), url.getUniqueKey());
122. } else {
123. new Connection(channel,url);
124. }
125. channel.pipeline().fireUserEventTriggered(ConnectionEventType.CONNECT);
126. }
127. });
128. }
129. ……略
130. @Override
131. protected boolean doStart() throwsInterruptedException {
132. this.channelFuture = this.bootstrap.bind(newInetSocketAddress(this.port)).sync();
133. return this.channelFuture.isSuccess();
134. }
135. ……略
136. }
首先,创建ServerBootstrap所使用的线程池。
RpcServer在初始化 ServerBootstrap 时,提供了两个不同的 EventLoopGroup 实例,从而实现了Reactor的主从模型。
我们通常把负责处理建立连接事件的线程,叫做BossGroup,对应ServerBootstrap 构造方法里的parentGroup参数,即我们常说的 Acceptor 线程。处理已经创建好的 channel相关连IO事件的线程,叫做 WorkerGroup,对应ServerBootstrap构造方法里的childGroup参数,即我们常说的IO线程。
然后,设置ServerSocketChannel基本参数,其实也就是设置底层Socket参数,主要包括:
1. TCP_NODELAY:在SOFA Bolt中默认值为true,表示将Nagle算法关闭后,客户端每发送一次数据,无论数据包的大小都会将这些数据发送出去。否则,当数据包中的数据很少时,如只有1个字节,而数据包的头却有几十个字节(IP头+TCP头)时,系统会在发送之前先将较小的包合并到软大的包后,一起将数据发送出去。在发送下一个数据包时,系统会等待服务器对前一个数据包的响应,当收到服务器的响应后,再发送下一个数据包,这就是所谓的Nagle算法;在默认情况下,Nagle算法是开启的。
2. SO_REUSEADDR:在SOFA Bolt中默认值为true,表示如果端口忙,但TCP状态位于TIME_WAIT,可以重用端口。否则,如果端口忙,而TCP状态位于其他状态,重用端口时得到一个错误信息,抛出“Address already in use: JVM_Bind”。如果你的服务程序停止后想立即重启,不等60秒,而新套接字依旧 使用同一端口,此时 SO_REUSEADDR 选项非常有用。
3. SO_KEEPALIVE:在SOFA Bolt中默认值为true,表示客户端Socket每隔段的时间(大约两个小时)就会利用空闲的连接向服务器发送一个数据包。这个数据包并没有其它的作用,只是为了检测一下服务器是否仍处于活动状态。如果服务器未响应这个数据包,在大约11分钟后,客户端Socket再发送一个数据包,如果在12分钟内,服务器还没响应,那么客户端Socket将关闭。否则,如果将Socket选项关闭,客户端Socket在服务器无效的情况下可能会长时间不会关闭。
4. SO_BACKLOG:在SOFA Bolt中默认值为1024,表示accpet queue队列的长度(在Linux内核中,实际为backlog参数与系统参数somaxconn,取二者的较小值)。accept queue用于保存全连接状态的请求。如果accept queue队列满了,服务端将发送一个ECONNREFUSED错误信息Connection refused到客户端。
接着设置SOFA Bolt实现的ChannelHandler:
1. RpcProtocolDecoder:SOFA Bolt私有协议解码器,后续详细描述;
2. ProtocolCodeBasedEncoder:SOFA Bolt私有协议编码器,后续详细描述;
3. IdleStateHandler:Netty4空闲状态处理器,当Channel在一段时间内没有多、写操作,则触发IdleStateEvent事件。配合ServerIdleHandler处理器使用;
4. ServerIdleHandler:监听IdleStateEvent事件,如果监听到该事件,表明Channel在一段时间内没有多、写操作,处于空闲状态,则关闭Channel;
5. ConnectionEventHandler:连接事件处理器,后续详细描述;
6. RpcHandler:SOFA Bolt业务处理器,此处使用线程池(SOFA Bolt默认提供的线程池或用户提供的线程池)进行业务处理,后续详细描述。
最后,在服务端启动时调用ServerBootstrap的bind方法,监听指定的接口,开始服务。
最佳实践:通常 bossGroup 只需要设置为 1 即可,因为 ServerSocketChannel 在初始化阶段,只会注册到某一个eventLoop 上,而这个 eventLoop 只会有一个线程在运行,所以没有必要设置为多线程(什么时候需要多线程呢,可以参考 Norman Maurer 在 StackOverflow 上的这个回答);而IO 线程,为了充分利用 CPU,同时考虑减少线上下文切换的开销,通常设置为 CPU 核数的两倍,这也是 Netty 提供的默认值。
1.1.2 串行化设计
Netty从 4.x 的版本之后,所推崇的设计理念是串行化处理一个Channel所对应的所有IO事件和异步任务,单线程处理来规避并发问题。
在Netty 中,当创建完Channel以后,通过EventLoopGroup注册到某一个 EventLoop上,之后该 Channel所有读写事件,以及经由ChannelPipeline里各个 Handler 的处理,都是在这一个线程里完成。
一个Channel只会注册到一个 EventLoop 上,而一个 EventLoop可以注册多个Channel。所以,我们在使用时,也需要尽可能避免使用带锁的实现,能无锁化就无锁。
最佳实践:Channel的实现是线程安全的,因此我们通常在运行时,会保存一个 Channel 的引用,同时为了保持 Netty 的无锁化理念,也应该尽可能避免使用带锁的实现,尤其是在 Handler里的处理逻辑。举个例子:这里会有一个比较特殊的容易死锁的场景,比如在业务线程提交异步任务前需要先抢占某个锁,Handler里某个异步任务的处理也需要获取同一把锁。如果某一个时刻业务线程先拿到锁 lock1,同时 Handler 里由于事件机制触发了一个异步任务 A,并在业务线程提交异步任务之前,提交到了 EventLoop 的队列里。之后,业务线程提交任务 B,等待 B 执行完成后才能释放锁 lock1;而任务 A 在队列里排在 B 之前,先被执行,执行过程需要获取锁 lock1 才能完成。这样死锁就发生了,与常见的资源竞争不同,而是任务执行权导致的死锁。要规避这类问题,最好的办法就是不要加锁;如果实在需要用锁,需要格外注意 Netty 的线程模型与任务处理机制。
1.1.3 业务处理
IO密集型的轻计算业务:此时线程的上下文切换消耗,会比IO线程的占用消耗更为突出,所以我们通常会建议在IO线程来处理请求;
CPU密集型的计算业务:比如需要做远程调用,操作DB的业务,此时IO线程的占用远远超过线程上下文切换的消耗,所以我们就会建议在单独的业务线程池里来处理请求,以此来释放IO线程的占用。该模式,也是蚂蚁微服务,消息通信等最常使用的模型。该模式在后面的RPC 协议实现举例部分会详细介绍。
针对不同的场景,我们需要进行合理的设计,将硬件的IO能力,CPU计算能力与内存结合起来,发挥最佳的效果。
针对不同的业务类型,我们会选择不同的处理方式。
以下是SOFA Bolt服务端RpcHandler类进行业务处理相关类的类图:
从上图可以看出,在业务处理过程中,主要涉及以下几个类:
1. Protocol:定义了SOFA Bolt的私有通信协议,后续详细描述;
2. RemotingCommand:远程命令,如负载命令(RpcRequestCommand、RpcResponseCommand等)、控制命令(HeartbeatCommand、HeartbeatAckCommand)等。
3. CommandEncoder:私有通信协议编码器,针对业务负载能支持不同的序列化机制;
4. CommandDecoder:私有通信协议解码器,针对业务负载能支持不同的反序列化机制;
5. CommandFactory:私有通信协议命令工厂,根据不同的业务,构建不同的命令;
6. CommandHandler:命令处理分发器,根据不同的命令码,选择对应的RemotingProcessor进行具体的命令处理;
7. RemotingProcessor:命令处理器,如RPC请求处理器、心跳处理器等。此处需要注意的是,SOFA Bolt通过ProcessorManager管理所有注册的RemotingProcessor,在ProcessorManager中默认创建了一个线程池,当用户没有提供线程池时,在这个默认的线程池中执行业务逻辑处理,从而提高整个网络通信层的处理能力。
8. UserProcessor:用户请求处理器,在RPC的命令处理器中,增加一层映射关系,保存业务传输对象的className与UserProcessor的对应关系。此时服务端只需要简单注册一个className对应processor,并提供一个独立的executor,就可以实现在业务线程处理请求了。
既然RpcHandler是负责业务处理的入口,那么我们就从RpcHandler开始分析业务处理是如何在业务线程池中异步执行的。
首先,看一下业务线程池的初始化的位置和过程:
在SOFA Bolt中,提供业务线程池的类主要有三个:
1. 所有命令处理器公用的业务线程池defaultExecutor:由ProcessorManager类提供,在ProcessorManager类的构造函数中初始化:
1. public ProcessorManager() {
2. defaultExecutor = newThreadPoolExecutor(minPoolSize, maxPoolSize, keepAliveTime,
3. TimeUnit.SECONDS, newArrayBlockingQueue<Runnable>(queueSize), new NamedThreadFactory(
4. "Bolt-default-executor"));
5. }
2. 命令处理器(RemotingProcessor接口实现类)独有的业务线程池executor:创建AbstractRemotingProcessor抽象类的子类时,通过构造函数或setExecutor方法设置:
1. public class RpcRequestProcessor extendsAbstractRemotingProcessor<RpcRequestCommand> {
2. ……略
3.
4. /**
5. * @param executor
6. */
7. public RpcRequestProcessor(ExecutorServiceexecutor) {
8. super(executor);
9. }
10.
11. public void setExecutor(ExecutorServiceexecutor) {
12. this.executor = executor;
13. }
14. ……略
15. }
3. 用户处理器(UserProcessor接口实现类)独有的业务线程池executor:通过ExecutorSelector接口实现类提供具体的业务线程池,或通过构造函数直接提供业务线程池:
1. public interface UserProcessor<T> {
2. ……略
3. voidsetExecutorSelector(ExecutorSelector executorSelector);
4. ……略
5. }
或
1. public class SimpleServerUserProcessor extendsSyncUserProcessor<RequestBody> {
2. ……略
3. private ThreadPoolExecutor executor;
4. publicSimpleServerUserProcessor() {
5. this.delaySwitch = false;
6. this.delayMs = 0;
7. this.executor = new ThreadPoolExecutor(1, 3,60, TimeUnit.SECONDS,
8. newArrayBlockingQueue<Runnable>(4), new NamedThreadFactory("Request-process-pool"));
9. }
10. ……略
11.}
上述三种业务线程池在使用时存在优先级:
1. 优先使用用户处理器的业务线程池。
2. 如果用户处理器没有设置业务线程池,则使用命令处理器的业务线程池。
3. 如果命令处理器也没有设置业务线程池,则默认使用所有命令处理器公用的业务线程池。
其次,看一下服务端在响应客户端请求时,是如何把请求封装成任务,然后提交到业务线程池执行,并返回执行结果的。RpcHandler作为Netty缺省Channel管道DefaultChannelPipeline中Handler执行链倒数第二个ChannelInboundHandler,负责具体的业务处理。所以,从RpcHandler的channelRead方法开始:
1. public voidchannelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
2. ProtocolCode protocolCode =ctx.channel().attr(Connection.PROTOCOL).get();
3. Protocol protocol =ProtocolManager.getProtocol(protocolCode);
4. protocol.getCommandHandler().handleCommand(
5. newRemotingContext(ctx, new InvokeContext(), serverSide, userProcessors), msg);
6. }
在channelRead方法中,主要处理逻辑如下:
1. 首先获取协议码(内部其实是协议版本号);
2. 根据协议码,通过协议管理器获取指定的协议,此处以SOFA Bolt第一版协议RpcProtocol为例;
3. 获取RpcProtocol协议的命令处理器RpcCommandHandler;
4. 调用RpcCommandHandler类handleCommand方法,进行业务处理。此时,需要构建RemotingContext实例,除了ChannelHandlerContext、InvokeContext、serverSide参数外,还需要提供用户处理器,此处以SimpleServerUserProcessor为例;
在RpcCommandHandler类handleCommand方法中:
1. public void handleCommand(RemotingContext ctx, Object msg) throwsException {
2. this.handle(ctx, msg);
3. }
4.
5. /*
6. * Handle the request(s).
7. */
8. private void handle(final RemotingContext ctx, final Objectmsg) {
9. try {
10. if (msg instanceof List) {
11. processorManager.getDefaultExecutor().execute(newRunnable() {
12. @Override
13. public voidrun() {
14. if(logger.isDebugEnabled()) {
15. logger.debug("Batch message!size={}", ((List<?>) msg).size());
16. }
17. for (Objectm : (List<?>) msg) {
18. RpcCommandHandler.this.process(ctx,m);
19. }
20. }
21. });
22. } else {
23. process(ctx, msg);
24. }
25. } catch (Throwable t) {
26. processException(ctx,msg, t);
27. }
28. }
29.
30. @SuppressWarnings({"rawtypes", "unchecked" })
31. private void process(RemotingContext ctx, Object msg) {
32. try {
33. RpcCommand cmd =(RpcCommand) msg;
34. RemotingProcessorprocessor = processorManager.getProcessor(cmd.getCmdCode());
35. processor.process(ctx,cmd, processorManager.getDefaultExecutor());
36. } catch (Throwable t) {
37. processException(ctx,msg, t);
38. }
39. }
从handle方法可以看出,如果传入的消息是包含多个消息的消息列表,则使用公用的业务线程池中一个线程,循环调用process方法处理每条消息。否则,如果只是一个消息,则在当前线程中直接调用process方法处理消息。
process方法的主要处理逻辑如下:
1. msg强制类型转换为RpcCommand;
2. 通过ProcessorManager,根据命令码(如:RpcCommandCode.RPC_REQUEST)获取该命令码对应的命令处理器,此外为RpcRequestProcessor。该命令处理器是在创建RpcCommandHandler实例时注册的。
3. 最后,调用RpcRequestProcessor的process方法,进行业务处理。此时,除了RemotingContext,RpcCommand参数外,还需要提供业务线程池,此处使用公用的业务线程池;
在RpcRequestProcessor类process方法中:
1. public void process(RemotingContext ctx, RpcRequestCommand cmd,ExecutorService defaultExecutor) throws Exception {
2. if(!deserializeRequestCommand(ctx, cmd, RpcDeserializeLevel.DESERIALIZE_CLAZZ)) {
3. return;
4. }
5. UserProcessor userProcessor= ctx.getUserProcessor(cmd.getRequestClass());
6. if (userProcessor == null){
7. String errMsg ="No user processor found for request: " + cmd.getRequestClass();
8. logger.error(errMsg);
9. sendResponseIfNecessary(ctx,cmd.getType(), this.getCommandFactory()
10. .createExceptionResponse(cmd.getId(),errMsg));
11. return;// must endprocess
12. }
13.
14. // set timeout check statefrom user's processor
15. ctx.setTimeoutDiscard(userProcessor.timeoutDiscard());
16.
17. // to check whether toprocess in io thread
18. if(userProcessor.processInIOThread()) {
19. if(!deserializeRequestCommand(ctx, cmd, RpcDeserializeLevel.DESERIALIZE_ALL)) {
20. return;
21. }
22. // process in io thread
23. new ProcessTask(ctx,cmd).run();
24. return;// end
25. }
26.
27. Executor executor = null;
28. // to check whether getexecutor using executor selector
29. if (null ==userProcessor.getExecutorSelector()) {
30. executor =userProcessor.getExecutor();
31. } else {
32. // in case haven'tdeserialized in io thread
33. // it need todeserialize clazz and header before using executor dispath strategy
34. if(!deserializeRequestCommand(ctx, cmd, RpcDeserializeLevel.DESERIALIZE_HEADER)){
35. return;
36. }
37. //try get executor withstrategy
38. executor =userProcessor.getExecutorSelector().select(cmd.getRequestClass(),
39. cmd.getRequestHeader());
40. }
41.
42. // Till now, if executorstill null, then try default
43. if (executor == null) {
44. executor =(this.getExecutor() == null ? defaultExecutor : this.getExecutor());
45. }
46.
47. // use the final executordispatch process task
48. executor.execute(new ProcessTask(ctx, cmd));
49. }
process方法的主要处理逻辑如下:
1. 反序列化RpcRequestCommand;
2. 根据请求类名,获取用户处理器,此处类名为com.alipay.remoting.rpc.common.RequestBody,用户处理器为SimpleServerUserProcessor的一个实例;
3. 如果SimpleServerUserProcessor为null,表示没有为此类业务请求设置用户处理器,则直接返回异常响应。否则,继续下一步处理。此处不为null,所以继续下一步。
4. 检查该用户处理器是否设置为在IO线程中执行。如果为true,则直接在当前线程中构造ProcessTask任务,并调用run方法进行业务处理。否则,继续下一步处理。此处采用默认值false,所以继续下一步。
5. 判断用户处理器中ExecutorSelector是否为null。如果不为null,表示设置业务线程池选择器,则使用线程池选择器根据请求类信息和请求头信息,查找适用的业务线程池executor(可能存在找不到的情况);如果为null,表示没有设置业务线程池选择器,则直接获取用户处理器中的业务线程池executor(可能没有设置)。
6. 判断executor是否为null。如果不为null,表示用户处理器设置了业务线程池,则直接使用该线程池中线程执行业务处理。如果为null,则表示用户处理器没有设置业务线程池,则继续下一步;
7. 判断命令处理器,即RpcRequestProcessor是否设置了业务线程池。如果设置了,则直接使用该线程池中线程执行业务处理。如果没有设置,则使用公用的线程池中线程执行业务处理。
接下来,看一下ProcessTask任务的具体业务处理过程:
1. class ProcessTask implements Runnable {
2.
3. RemotingContext ctx;
4. RpcRequestCommand msg;
5.
6. publicProcessTask(RemotingContext ctx, RpcRequestCommand msg) {
7. this.ctx = ctx;
8. this.msg = msg;
9. }
10.
11. /**
12. * @seejava.lang.Runnable#run()
13. */
14. @Override
15. public void run() {
16. try {
17. RpcRequestProcessor.this.doProcess(ctx, msg);
18. } catch (Throwable e) {
19. //protect thethread running this task
20. StringremotingAddress = RemotingUtil.parseRemoteAddress(ctx.getChannelContext()
21. .channel());
22. logger
23. .error(
24. "Exception caughtwhen process rpc request command in RpcRequestProcessor, Id="
25. + msg.getId() + "!Invoke source address is [" + remotingAddress
26. +"].", e);
27. }
28. }
29.
30. }
ProcessTask是RpcRequestProcessor的内部类,实现了Runnable接口,可以作为一个任务提交到线程池执行。从ProcessTask的run方法可以看出,其实际是调用RpcRequestProcessor类doProcess方法,进行实际的业务处理:
1. public void doProcess(final RemotingContext ctx, RpcRequestCommand cmd)throws Exception {
2. long currenTimestamp =System.currentTimeMillis();
3.
4. preProcessRemotingContext(ctx, cmd,currenTimestamp);
5. if (ctx.isTimeoutDiscard()&& ctx.isRequestTimeout()) {
6. timeoutLog(cmd,currenTimestamp, ctx);// do some log
7. return;// then, discardthis request
8. }
9. debugLog(ctx, cmd,currenTimestamp);
10. // decode request all
11. if(!deserializeRequestCommand(ctx, cmd, RpcDeserializeLevel.DESERIALIZE_ALL)) {
12. return;
13. }
14. dispatchToUserProcessor(ctx, cmd);
15. }
在doProcess方法中,调用dispatchToUserProcessor方法,把客户端请求发送给用户处理器进行处理:
1. private void dispatchToUserProcessor(RemotingContext ctx,RpcRequestCommand cmd) {
2. final int id = cmd.getId();
3. final byte type =cmd.getType();
4. // processor here must notbe null, for it have been checked before
5. UserProcessor processor =ctx.getUserProcessor(cmd.getRequestClass());
6. if (processor instanceof AsyncUserProcessor) {
7. try {
8. processor.handleRequest(processor.preHandleRequest(ctx,cmd.getRequestObject()),
9. newRpcAsyncContext(ctx, cmd, this), cmd.getRequestObject());
10. } catch(RejectedExecutionException e) {
11. logger
12. .warn("RejectedExecutionExceptionoccurred when do ASYNC process in RpcRequestProcessor");
13. sendResponseIfNecessary(ctx,type, this.getCommandFactory()
14. .createExceptionResponse(id,ResponseStatus.SERVER_THREADPOOL_BUSY));
15. } catch (Throwable t) {
16. String errMsg = "AYSNCprocess rpc request failed in RpcRequestProcessor, id=" + id;
17. logger.error(errMsg, t);
18. sendResponseIfNecessary(ctx,type, this.getCommandFactory()
19. .createExceptionResponse(id,t, errMsg));
20. }
21. } else {
22. try {
23. Object responseObject = processor
24. .handleRequest(processor.preHandleRequest(ctx,cmd.getRequestObject()), cmd.getRequestObject());
25.
26. sendResponseIfNecessary(ctx, type,
27. this.getCommandFactory().createResponse(responseObject,cmd));
28. } catch(RejectedExecutionException e) {
29. logger
30. .warn("RejectedExecutionExceptionoccurred when do SYNC process in RpcRequestProcessor");
31. sendResponseIfNecessary(ctx,type, this.getCommandFactory()
32. .createExceptionResponse(id,ResponseStatus.SERVER_THREADPOOL_BUSY));
33. } catch (Throwable t) {
34. String errMsg = "SYNC processrpc request failed in RpcRequestProcessor, id=" + id;
35. logger.error(errMsg, t);
36. sendResponseIfNecessary(ctx,type, this.getCommandFactory()
37. .createExceptionResponse(id,t, errMsg));
38. }
39. }
40. }
dispatchToUserProcessor方法主要处理逻辑如下:
1. 根据请求类名,获取用户处理器,此处类名为com.alipay.remoting.rpc.common.RequestBody,用户处理器为SimpleServerUserProcessor的一个实例;
2. 判断用户处理器是同步的还是异步的,此处SimpleServerUserProcessor是同步的,所以直接调用SimpleServerUserProcessor类handleRequest方法,进行业务处理。由于SimpleServerUserProcessor只是一个Demo类,演示时使用,所以其handleRequest方法只是进行了超时时间的判断、处理次数的更新等操作,没有实际意义,在此不详述了;
3. 用户处理器完成业务处理以后,调用sendResponseIfNecessary方法,返回业务处理结果。
1. public void sendResponseIfNecessary(final RemotingContext ctx, byte type,
2. finalRemotingCommand response) {
3. final int id =response.getId();
4. if (type !=RpcCommandType.REQUEST_ONEWAY) {
5. RemotingCommandserializedResponse = response;
6. try {
7. response.serialize();
8. } catch(SerializationException e) {
9. ……略
10. } catch (Throwable t) {
11. ……略 }
12.
13. ctx.writeAndFlush(serializedResponse).addListener(new ChannelFutureListener() {
14. @Override
15. public voidoperationComplete(ChannelFuture future) throws Exception {
16. ……略
17. if(!future.isSuccess()) {
18. logger.error(……略);
19. }
20. }
21. });
22. } else {
23. ……略 }
24. }
sendResponseIfNecessary方法主要处理逻辑如下:
如果调用方式是单向调用,即oneway,则不需要返回结果给客户端;
如果调用方式不是单向调用,则把处理结果进行序列化,然后调用RemotingContext的writeAndFlush方法,把序列化好的响应结果返回客户端。
1. public ChannelFuturewriteAndFlush(RemotingCommand msg) {
2. returnthis.channelContext.writeAndFlush(msg);
3. }
其实,在RemotingContext的writeAndFlush方法中,最终调用Netty的DefaultChannelHandlerContext的writeAndFlush方法,发送响应结果给客户端。对于Netty如何发送响应结果的过程,在此不详述了。
至此,在业务线程池处理客户端请求的过程完成。
最佳实践:“Never block the event loop, reduce context-swtiching”,引自Netty committer Norman Maurer。
1.1.4 其它实践建议
1. 最小化线程池,能复用EventLoopGroup的地方尽量复用。比如蚂蚁因为历史原因,有过两版RPC协议,在两个协议升级过渡期间,我们会复用Acceptor线程与IO线程在同一个端口处理不同协议的请求;除此,针对多应用合并部署的场景,我们也会复用IO线程防止一个进程开过多的IO线程。
2. 对于无状态的ChannelHandler,设置成共享模式。比如我们的事件处理器,RPC 处理器都可以设置为共享,减少不同的Channel对应的ChannelPipeline里生成的对象个数。
3. 正确使用ChannelHandlerContext的ctx.write()与ctx.channel().write() 方法。前者是从当前 Handler的下一个Handler开始处理,而后者会从tail开始处理。大多情况下使用ctx.write()即可。
4. 在使用Channel写数据之前,建议使用isWritable()方法来判断一下当前ChannelOutboundBuffer里的写缓存水位,防止OOM发生。不过实践下来,正常的通信过程不太会OOM,但当网络环境不好,同时传输报文很大时,确实会出现限流的情况。
5. ByteBuf是Netty中主要用来数据byte[]的封装类,主要分为Heap ByteBuf 和 Direct ByteBuf。为了减少内存的分配回收以及产生的内存碎片,Netty提供了PooledByteBufAllocator 用来分配可回收的ByteBuf,可以把PooledByteBufAllocator看做一个池子,需要的时候从里面获取ByteBuf,用完了放回去,以此提高性能。当然与之对应的还有 UnpooledByteBufAllocator,顾名思义Unpooled就是不会放到池子里,所以根据该分配器分配的ByteBuf,不需要放回池子有JVM自己GC回收。在Netty中,根据ChannelHandlerContext和Channel获取的Allocator默认都是Pooled,所以需要再合适的时机对其进行释放,避免造成内存泄漏:
● Netty默认在ChannelPipline的最后添加一个Tail Handler,帮助完成ByteBuf的release,释放的是channelRead传入的ByteBuf。
● 如果在handlers传递过程中,传递了新值,需要自己手动释放旧值。
● 另外,如果中途没有使用fireChannelRead传递下去也要自己释放。
● 在传递过程中自己通过Channel或ChannelHandlerContext创建的但是没有传递下去的ByteBuf也要手动释放。
本系列文章目录: