SOFA BOLT源码解析之设计要点-连接管理
1 设计要点解析
1.1 连接管理
为了提高通信效率,我们需要考虑复用连接,减少TCP三次握手的次数,因此需要完善的连接管理的机制。
另外,在业务通信场景中,还需要识别一些不得不走硬负载(比如 LVS VIP)的场景,此时如果只建立单链接,可能会出现负载不均衡的问题,因此需要建立多个连接,来缓解负载不均的问题。
为此,SOFA Bolt设计了一个针对某个连接地址(IP 与 Port 唯一确定的地址)建立特定数目连接的实现,同时保存在一个连接池里。该连接池设计了一个通用的 PoolKey,不限定 Key 的类型。
需要注意的是,这里的建立连接的过程,有一个并发问题要解,比如客户端在高并发的调用建连接口时,如何保证建立的连接刚好是所设定的个数呢?为了配合 Netty 的无锁理念,我们也采用一个无锁化的建连过程来实现,利用ConcurrentHashMap 的 putIfAbsent 接口。
除此,我们的连接管理,还要具备定时断连功能,自动重连功能,自定义连接选择算法功能来适用不同的连接场景。
在SOFA Bolt中,连接管理相关类的类图如下:
1.1.1 连接定义-Connection
由于SOFA Bolt基于Netty实现,所以针对io.netty.channel.Channel抽象出Connection类,除了提供Channel所具备的基本功能外,还根据SOFA Bolt私有通信框架的需求,增加了一些额外的控制功能,以满足一些特殊场景的需求。
首先,看看Connection类源码:
1. public class Connection {
2.
3. ……略
4.
5. private Channel channel;
6.
7. private final ConcurrentHashMap<Integer,InvokeFuture> invokeFutureMap = newConcurrentHashMap<Integer, InvokeFuture>(4);
8.
9. /** Attribute key for connection */
10. public static finalAttributeKey<Connection> CONNECTION =AttributeKey.valueOf("connection");
11.
12. /** Attribute key for heartbeat count */
13. public static finalAttributeKey<Integer> HEARTBEAT_COUNT =AttributeKey.valueOf("heartbeatCount");
14.
15. /** Attribute key for heartbeat switch foreach connection */
16. public static finalAttributeKey<Boolean> HEARTBEAT_SWITCH = AttributeKey.valueOf("heartbeatSwitch");
17.
18. /** Attribute key for protocol */
19. public static finalAttributeKey<ProtocolCode>
20. PROTOCOL =AttributeKey.valueOf("protocol");
21. private ProtocolCode protocolCode;
22.
23. /** Attribute key for version */
24. public static finalAttributeKey<Byte>
25. VERSION =AttributeKey.valueOf("version");
26. private byte version = RpcProtocolV2.PROTOCOL_VERSION_1;
27.
28. private Url url;
29.
30. private finalConcurrentHashMap<Integer/* id */, String/* poolKey */>
31. id2PoolKey = new ConcurrentHashMap<Integer,String>(256);
32.
33. private Set<String> poolKeys = newConcurrentHashSet<String>();
34.
35. private AtomicBoolean closed = new AtomicBoolean(false);
36.
37. private final ConcurrentHashMap<String/*attr key*/, Object /*attr value*/>
38. attributes= new ConcurrentHashMap<String, Object>();
39.
40. /** the reference count used for thisconnection. If equals 2, it means this connection has been referenced 2 times*/
41. private final AtomicIntegerreferenceCount = new AtomicInteger();
42.
43. /** no reference of the current connection*/
44. private static final int NO_REFERENCE = 0;
45.
46. /**
47. * Constructor
48. *
49. * @param channel
50. */
51. public Connection(Channel channel) {
52. this.channel = channel;
53. this.channel.attr(CONNECTION).set(this);
54. }
55.
56. /**
57. * Constructor
58. *
59. * @param channel
60. * @param url
61. */
62.
63. public Connection(Channel channel, Urlurl) {
64. this(channel);
65. this.url = url;
66. this.poolKeys.add(url.getUniqueKey());
67. }
68.
69. /**
70. * Constructor
71. *
72. * @param channel
73. * @param protocolCode
74. * @param url
75. */
76. public Connection(Channel channel,ProtocolCode protocolCode, Url url) {
77. this(channel, url);
78. this.protocolCode = protocolCode;
79. this.init();
80. }
81.
82. /**
83. * Constructor
84. *
85. * @param channel
86. * @param protocolCode
87. * @param url
88. */
89. public Connection(Channel channel,ProtocolCode protocolCode, byte version, Url url) {
90. this(channel, url);
91. this.protocolCode = protocolCode;
92. this.version = version;
93. this.init();
94. }
95.
96. /**
97. * Initialization.
98. */
99. private void init() {
100. this.channel.attr(HEARTBEAT_COUNT).set(newInteger(0));
101. this.channel.attr(PROTOCOL).set(this.protocolCode);
102. this.channel.attr(VERSION).set(this.version);
103. this.channel.attr(HEARTBEAT_SWITCH).set(true);
104. }
105.
106. /**
107. * to check whether the connection is fineto use
108. *
109. * @return
110. */
111. public boolean isFine() {
112. return this.channel != null &&this.channel.isActive();
113. }
114.
115. /**
116. * increase the reference count
117. */
118. public void increaseRef() {
119. this.referenceCount.getAndIncrement();
120. }
121.
122. /**
123. * decrease the reference count
124. */
125. public void decreaseRef() {
126. this.referenceCount.getAndDecrement();
127. }
128.
129. /**
130. * to check whether the reference count is0
131. *
132. * @return
133. */
134. public boolean noRef() {
135. return this.referenceCount.get() ==NO_REFERENCE;
136. }
137.
138. /**
139. * Get the address of the remote peer.
140. *
141. * @return
142. */
143. public InetSocketAddress getRemoteAddress(){
144. return (InetSocketAddress)this.channel.remoteAddress();
145. }
146.
147. /**
148. * Get the remote IP.
149. *
150. * @return
151. */
152. public String getRemoteIP() {
153. returnRemotingUtil.parseRemoteIP(this.channel);
154. }
155.
156. /**
157. * Get the remote port.
158. *
159. * @return
160. */
161. public int getRemotePort() {
162. returnRemotingUtil.parseRemotePort(this.channel);
163. }
164.
165. /**
166. * Get the address of the local peer.
167. *
168. * @return
169. */
170. public InetSocketAddress getLocalAddress(){
171. return (InetSocketAddress)this.channel.localAddress();
172. }
173.
174. /**
175. * Get the local IP.
176. *
177. * @return
178. */
179. public String getLocalIP() {
180. returnRemotingUtil.parseLocalIP(this.channel);
181. }
182.
183. /**
184. * Get the local port.
185. *
186. * @return
187. */
188. public int getLocalPort() {
189. returnRemotingUtil.parseLocalPort(this.channel);
190. }
191.
192. /**
193. * Get the netty channel of the connection.
194. *
195. * @return
196. */
197. public Channel getChannel() {
198. return this.channel;
199. }
200.
201. /**
202. * Get the InvokeFuture with invokeId ofid.
203. *
204. * @param id
205. * @return
206. */
207. public InvokeFuture getInvokeFuture(int id){
208. return this.invokeFutureMap.get(id);
209. }
210.
211. /**
212. * Add an InvokeFuture
213. *
214. * @param future
215. * @return
216. */
217. public InvokeFutureaddInvokeFuture(InvokeFuture future) {
218. returnthis.invokeFutureMap.putIfAbsent(future.invokeId(), future);
219. }
220.
221. /**
222. * Remove InvokeFuture who's invokeId is id
223. *
224. * @param id
225. * @return
226. */
227. public InvokeFuture removeInvokeFuture(intid) {
228. return this.invokeFutureMap.remove(id);
229. }
230.
231. /**
232. * Do something when closing.
233. */
234. public void onClose() {
235. Iterator<Entry<Integer,InvokeFuture>> iter = invokeFutureMap.entrySet().iterator();
236. while (iter.hasNext()) {
237. Entry<Integer, InvokeFuture>entry = iter.next();
238. iter.remove();
239. InvokeFuture future =entry.getValue();
240. if (future != null) {
241. future.putResponse(future.createConnectionClosedResponse(this.getRemoteAddress()));
242. future.cancelTimeout();
243. future.tryAsyncExecuteInvokeCallbackAbnormally();
244. }
245. }
246. }
247.
248. /**
249. * Close the connection.
250. */
251. public void close() {
252. if (closed.compareAndSet(false, true)){
253. try {
254. if (this.getChannel() != null){
255. this.getChannel().close().addListener(newChannelFutureListener() {
256.
257. @Override
258. public voidoperationComplete(ChannelFuture future) throws Exception {
259. if(logger.isInfoEnabled()) {
260. logger
261. .info(
262. "Closethe connection to remote address={}, result={}, cause={}",
263. RemotingUtil.parseRemoteAddress(Connection.this
264. .getChannel()),future.isSuccess(), future.cause());
265. }
266. }
267.
268. });
269. }
270. } catch (Exception e) {
271. logger.warn("Exceptioncaught when closing connection {}",
272. RemotingUtil.parseRemoteAddress(Connection.this.getChannel()),e);
273. }
274. }
275. }
276.
277. /**
278. * Whether invokeFutures is completed
279. *
280. */
281. public boolean isInvokeFutureMapFinish() {
282. return invokeFutureMap.isEmpty();
283. }
284.
285. /**
286. * add a pool key to list
287. *
288. * @param poolKey
289. */
290. public void addPoolKey(String poolKey) {
291. poolKeys.add(poolKey);
292. }
293.
294. /**
295. * get all pool keys
296. */
297. public Set<String> getPoolKeys() {
298. return newHashSet<String>(poolKeys);
299. }
300.
301. /**
302. * remove pool key
303. *
304. * @param poolKey
305. */
306. public void removePoolKey(String poolKey) {
307. poolKeys.remove(poolKey);
308. }
309.
310. /**
311. * Getter method for property<tt>url</tt>.
312. *
313. * @return property value of url
314. */
315. public Url getUrl() {
316. return url;
317. }
318.
319. /**
320. * add Id to group Mapping
321. *
322. * @param id
323. * @param poolKey
324. */
325. public void addIdPoolKeyMapping(Integer id,String poolKey) {
326. this.id2PoolKey.put(id, poolKey);
327. }
328.
329. /**
330. * remove id to group Mapping
331. *
332. * @param id
333. * @return
334. */
335. public StringremoveIdPoolKeyMapping(Integer id) {
336. return this.id2PoolKey.remove(id);
337. }
338.
339. /**
340. * Set attribute key=value.
341. *
342. * @param key
343. * @param value
344. */
345. public void setAttribute(String key, Objectvalue) {
346. attributes.put(key, value);
347. }
348.
349. /**
350. * set attribute if key absent.
351. *
352. * @param key
353. * @param value
354. * @return
355. */
356. public Object setAttributeIfAbsent(Stringkey, Object value) {
357. return attributes.putIfAbsent(key,value);
358. }
359.
360. /**
361. * Remove attribute.
362. *
363. * @param key
364. */
365. public void removeAttribute(String key) {
366. attributes.remove(key);
367. }
368.
369. /**
370. * Get attribute.
371. *
372. * @param key
373. * @return
374. */
375. public Object getAttribute(String key) {
376. return attributes.get(key);
377. }
378.
379. /**
380. * Clear attribute.
381. */
382. public void clearAttributes() {
383. attributes.clear();
384. }
385.
386. /**
387. * Getter method for property<tt>invokeFutureMap</tt>.
388. *
389. * @return property value ofinvokeFutureMap
390. */
391. public ConcurrentHashMap<Integer,InvokeFuture> getInvokeFutureMap() {
392. return invokeFutureMap;
393. }
394. }
看一下Connection类主要成员变量:
● channel:类型为io.netty.channel.Channel,引用了Netty的Channel实例,代表客户端到服务端的一个实际连接通道,负责实际的读、写操作;
● invokeFutureMap:类型为ConcurrentHashMap<Integer,InvokeFuture>,主要用来缓存该连接通道上所有的客户端调用;
● url:类型为com.alipay.remoting.Url,代表一个请求URL;
● poolKeys:类型为ConcurrentHashSet<String>,连接池对应的key的集合,key的格式为:IP:PORT;
● referenceCount:类型为AtomicInteger,表示该Connection被使用的次数。使用原子操作,保证线程安全。当把Connection增加到连接池时,加1;当从连接池删除Connection时,减1。当referenceCount为0时,才可以关闭Connection。
● attributes:类型为ConcurrentHashMap<String,Object>,缓存Connection相关的一些额外的属性信息;
● CONNECTION、HEARTBEAT_COUNT、HEARTBEAT_SWITCH、PROTOCOL、VERSION:类型为AttributeKey<T>,主要用来在channel的属性集合中记录Connection对象、心跳数、心跳开关、协议码、协议版本号,以便在后续处理过程中使用;
对于Connection中的方法,基本都是基于上述成员变量的操作,理解了上述成员变量的含义以后,相信大家都可以理解,在此就不在详述了。
1.1.2 连接创建-ConnectionFactory
连接工厂负责创建连接。
ConnectionFactory接口定义了连接工厂的基本功能,源码如下:
1. public interface ConnectionFactory {
2. /**
3. * Initialize the factory.
4. */
5. public void init(ConnectionEventHandlerconnectionEventHandler);
6.
7. /**
8. * Create a connection use #BoltUrl
9. */
10. public Connection createConnection(Url url) throws Exception;
11.
12. /**
13. * Create a connection according to the IPand port.
14. */
15. public Connection createConnection(String targetIP, int targetPort, intconnectTimeout) throws Exception;
16.
17. /**
18. * Create a connection according to the IPand port.
19. */
20. public Connection createConnection(String targetIP, int targetPort, byte version,
21. intconnectTimeout) throws Exception;
22.
23. /**
24. * Register User processor
25. */
26. public void registerUserProcessor(UserProcessor<?> processor);
27. }
看一下ConnectionFactory类主要方法:
● createConnection(Url url):根据url创建Connection;
● createConnection(String targetIP,int targetPort, int connectTimeout):根据目标IP地址、端口号和连接超时时间,创建Connection;
● createConnection(String targetIP,int targetPort, byte version, int connectTimeout):根据目标IP地址、端口号、协议版本号和连接超时时间,创建Connection
● registerUserProcessor(UserProcessor<?>processor):注册用户处理器;
RpcConnectionFactory实现了ConnectionFactory接口,其主要源码如下:
1. public class RpcConnectionFactoryimplements ConnectionFactory {
2.
3. ……略
4.
5. private static final EventLoopGroup workerGroup = newNioEventLoopGroup(Runtime.getRuntime().availableProcessors() + 1, newNamedThreadFactory("Rpc-netty-client-worker"));
6.
7. private Bootstrap bootstrap;
8.
9. private ConcurrentHashMap<String,UserProcessor<?>> userProcessors = new ConcurrentHashMap<String,UserProcessor<?>>(4);
10.
11. /**
12. * @seecom.alipay.remoting.ConnectionFactory#init(ConnectionEventHandler)
13. */
14. @Override
15. public void init(finalConnectionEventHandler connectionEventHandler) {
16. bootstrap = new Bootstrap();
17. bootstrap.group(workerGroup).channel(NioSocketChannel.class)
18. .option(ChannelOption.TCP_NODELAY,SystemProperties.tcp_nodelay())
19. .option(ChannelOption.SO_REUSEADDR,SystemProperties.tcp_so_reuseaddr())
20. .option(ChannelOption.SO_KEEPALIVE,SystemProperties.tcp_so_keepalive());
21.
22. /**
23. * init netty write buffer water mark
24. */
25. initWriteBufferWaterMark();
26.
27. boolean pooledBuffer =SystemProperties.netty_buffer_pooled();
28. if (pooledBuffer) {
29. bootstrap.option(ChannelOption.ALLOCATOR,PooledByteBufAllocator.DEFAULT);
30. }
31.
32. final boolean idleSwitch =SystemProperties.tcp_idle_switch();
33. final int idleTime =SystemProperties.tcp_idle();
34. final RpcHandler rpcHandler = newRpcHandler(userProcessors);
35. final HeartbeatHandler heartbeatHandler= new HeartbeatHandler();
36. bootstrap.handler(newChannelInitializer<SocketChannel>() {
37.
38. protected voidinitChannel(SocketChannel channel) throws Exception {
39. ChannelPipeline pipeline =channel.pipeline();
40. pipeline.addLast("decoder",new RpcProtocolDecoder(
41. RpcProtocolManager.DEFAULT_PROTOCOL_CODE_LENGTH));
42. pipeline.addLast(
43. "encoder",
44. newProtocolCodeBasedEncoder(ProtocolCode
45. .fromBytes(RpcProtocolV2.PROTOCOL_CODE)));
46. if (idleSwitch) {
47. pipeline.addLast("idleStateHandler",new IdleStateHandler(idleTime, idleTime,
48. 0,TimeUnit.MILLISECONDS));
49. pipeline.addLast("heartbeatHandler",heartbeatHandler);
50. }
51. pipeline.addLast("connectionEventHandler",connectionEventHandler);
52. pipeline.addLast("handler",rpcHandler);
53. }
54.
55. });
56. }
57.
58. @Override
59. public Connection createConnection(Url url) throws Exception {
60. ChannelFuture future =doCreateConnection(url.getIp(), url.getPort(),
61. url.getConnectTimeout());
62. Connection conn = newConnection(future.channel(),
63. ProtocolCode.fromBytes(url.getProtocol()),url.getVersion(), url);
64. future.channel().pipeline().fireUserEventTriggered(ConnectionEventType.CONNECT);
65. return conn;
66. }
67.
68. @Override
69. public Connection createConnection(String targetIP, int targetPort, intconnectTimeout)
70. throwsException {
71. ChannelFuture future =doCreateConnection(targetIP, targetPort, connectTimeout);
72. Connection conn = newConnection(future.channel(),
73. ProtocolCode.fromBytes(RpcProtocol.PROTOCOL_CODE),RpcProtocolV2.PROTOCOL_VERSION_1, new Url(targetIP, targetPort));
74. future.channel().pipeline().fireUserEventTriggered(ConnectionEventType.CONNECT);
75. return conn;
76. }
77.
78. @Override
79. public Connection createConnection(String targetIP, int targetPort, byteversion,
80. intconnectTimeout) throws Exception {
81. ChannelFuture future = doCreateConnection(targetIP,targetPort, connectTimeout);
82. Connection conn = newConnection(future.channel(),
83. ProtocolCode.fromBytes(RpcProtocolV2.PROTOCOL_CODE),version, new Url(targetIP, targetPort));
84. future.channel().pipeline().fireUserEventTriggered(ConnectionEventType.CONNECT);
85. return conn;
86. }
87.
88. protected ChannelFuture doCreateConnection(String targetIP, int targetPort, intconnectTimeout) throws Exception {
89. // prevent unreasonable value, at least1000
90. connectTimeout =Math.max(connectTimeout, 1000);
91. String addr = targetIP + ":"+ targetPort;
92. if (logger.isDebugEnabled()) {
93. logger.debug("connectTimeoutof address [{}] is [{}].", addr, connectTimeout);
94. }
95. bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS,connectTimeout);
96. ChannelFuture future =bootstrap.connect(new InetSocketAddress(targetIP, targetPort));
97.
98. future.awaitUninterruptibly();
99. if (!future.isDone()) {
100. String errMsg = "Createconnection to " + addr + " timeout!";
101. logger.warn(errMsg);
102. throw new Exception(errMsg);
103. }
104. if (future.isCancelled()) {
105. String errMsg = "Createconnection to " + addr + " cancelled by user!";
106. logger.warn(errMsg);
107. throw new Exception(errMsg);
108. }
109. if (!future.isSuccess()) {
110. String errMsg = "Createconnection to " + addr + " error!";
111. logger.warn(errMsg);
112. throw new Exception(errMsg,future.cause());
113. }
114. return future;
115. }
116.
117. @Override
118. public void registerUserProcessor(UserProcessor<?> processor) {
119. if (processor == null ||StringUtils.isBlank(processor.interest())) {
120. throw new RuntimeException("……略");
121. }
122. UserProcessor<?> preProcessor =this.userProcessors.putIfAbsent(processor.interest(), processor);
123. if (preProcessor != null) {
124. ……略
125. throw new RuntimeException(errMsg);
126. }
127. }
128.
129. private void initWriteBufferWaterMark() {
130. ……略
131. }
132. }
从上述代码可以看出,RpcConnectionFactory基于Netty的Bootstrap创建Connection,主要处理逻辑如下:
● 初始化Bootstrap,这个是Netty客户端基本过程,在此不详述;
● 在doCreateConnection方法中,调用Bootstrap的connect方法,连接目标服务端,并等待连接完成。如果在连接超时时间内完成连接,则返回此处操作的ChannelFuture。如果连接失败或被中断,则抛出异常;
● 通过ChannelFuture获取代表此次连接的channel,然后根据channel、Url、目标IP地址、端口号、协议版本号等参数,分别调用对应的Connection构造函数,创建Connection对象;
● 最后,发布ConnectionEventType.CONNECT事件,触发所有监听此事件的监听器,执行相关处理操作。
1.1.3 连接池-ConnectionPool
SOFA Bolt提供了连接池ConnectionPool管理已经创建的连接,其源码如下:
1. public class ConnectionPool implementsScannable {
2.
3. ……略
4.
5. /** connections */
6. privateCopyOnWriteArrayList<Connection> conns = newCopyOnWriteArrayList<Connection>();
7.
8. /** strategy */
9. private ConnectionSelectStrategy strategy;
10.
11. /** timestamp to record the last time thispool be accessed */
12. private volatile long lastAccessTimestamp;
13.
14. /** whether async create connection done */
15. privatevolatile boolean asyncCreationDone = true;
16.
17. publicConnectionPool(ConnectionSelectStrategy strategy) {
18. this.strategy = strategy;
19. }
20.
21. /**
22. * add a connection
23. *
24. * @param connection
25. */
26. public void add(Connection connection) {
27. markAccess();
28. if (null == connection) {
29. return;
30. }
31. boolean res =this.conns.addIfAbsent(connection);
32. if (res) {
33. connection.increaseRef();
34. }
35. }
36.
37. /**
38. * check weather a connection already added
39. *
40. * @param connection
41. * @return
42. */
43. public boolean contains(Connectionconnection) {
44. return this.conns.contains(connection);
45. }
46.
47. /**
48. * removeAndTryClose a connection
49. *
50. * @param connection
51. */
52. public void removeAndTryClose(Connectionconnection) {
53. if (null == connection) {
54. return;
55. }
56. boolean res =this.conns.remove(connection);
57. if (res) {
58. connection.decreaseRef();
59. }
60. if (connection.noRef()) {
61. connection.close();
62. }
63. }
64.
65. /**
66. * remove all connections
67. */
68. public void removeAllAndTryClose() {
69. for (Connection conn : this.conns) {
70. removeAndTryClose(conn);
71. }
72. this.conns.clear();
73. }
74.
75. /**
76. * get a connection
77. *
78. * @return
79. */
80. public Connection get() {
81. markAccess();
82. if (null != this.conns) {
83. List<Connection> snapshot =new ArrayList<Connection>(this.conns);
84. if (snapshot.size() > 0) {
85. returnthis.strategy.select(snapshot);
86. } else {
87. return null;
88. }
89. } else {
90. return null;
91. }
92. }
93.
94. /**
95. * get all connections
96. *
97. * @return
98. */
99. public List<Connection> getAll() {
100. markAccess();
101. return newArrayList<Connection>(this.conns);
102. }
103.
104. /**
105. * connection pool size
106. *
107. * @return
108. */
109. public int size() {
110. return this.conns.size();
111. }
112.
113. /**
114. * is connection pool empty
115. *
116. * @return
117. */
118. public boolean isEmpty() {
119. return this.conns.isEmpty();
120. }
121.
122. /**
123. * Getter method for property<tt>lastAccessTimestamp</tt>.
124. *
125. * @return property value oflastAccessTimestamp
126. */
127. public long getLastAccessTimestamp() {
128. return this.lastAccessTimestamp;
129. }
130.
131. /**
132. * do mark the time stamp when access thispool
133. */
134. private void markAccess() {
135. this.lastAccessTimestamp =System.currentTimeMillis();
136. }
137.
138. /**
139. * is async create connection done
140. * @return
141. */
142. public boolean isAsyncCreationDone() {
143. return this.asyncCreationDone;
144. }
145.
146. /**
147. * do mark async create connection done
148. */
149. public void markAsyncCreationDone() {
150. this.asyncCreationDone = true;
151. }
152.
153. /**
154. * do mark async create connection start
155. */
156. public void markAsyncCreationStart() {
157. this.asyncCreationDone = false;
158. }
159.
160. /**
161. * @seecom.alipay.remoting.Scannable#scan()
162. */
163. @Override
164. public void scan() {
165. if (null != this.conns &&!this.conns.isEmpty()) {
166. for (Connection conn : conns) {
167. if (!conn.isFine()) {
168. logger.warn(
169. "Remove badconnection when scanning conns of ConnectionPool - {}:{}",
170. conn.getRemoteIP(),conn.getRemotePort());
171. conn.close();
172. this.removeAndTryClose(conn);
173. }
174. }
175. }
176. }
177. }
看一下ConnectionPool类主要成员变量:
● conns:类型为CopyOnWriteArrayList<Connection>,主要用来缓存已经创建的连接;
● strategy:类型为ConnectionSelectStrategy,主要用来从缓存的连接中按照某种策略选择一个连接。通过实现ConnectionSelectStrategy接口,提供不同的连接选择策略。例如:RandomSelectStrategy,实现从conns中随机选择一个连接;
● lastAccessTimestamp:类型为long,主要用来记录最后一次访问连接池的时间。同时声明为volatile,在多线程环境下,保证内存的可见性,但不能保证操作的原子性;
● asyncCreationDone:类型为boolean,表示是否异步创建连接。同时声明为volatile,意义同上。
再看一下ConnectionPool类主要方法:
1. add(Connectionconnection):
● 设置lastAccessTimestamp为系统当前时间;
● 如果connection为null,直接返回;
● 如果connection不为null,则把connection增加到conns列表中;
● 如果增加成功,则connection的referenceCount属性加1,表示该连接正在被该连接池使用;
2. removeAndTryClose(Connectionconnection):
● 如果connection为null,直接返回;
● 从conns列表中删除指定connection;
● 如果删除成功,则connection的referenceCount属性减1,表示该连接不被该连接池使用;
● 检查connection的referenceCount是否为0。如果为0,表示connection没有被使用,可以关闭了,则调用connection的close方法关闭此连接。如果不为0,则表示connection还在使用过程中,不能关闭,直接返回。
对于ConnectionPool中的其它方法,相对比较简单,在此就不在详述了。
1.1.4 连接管理-ConnectionManager
连接管理器是面向客户端的API。客户端通过连接管理器创建连接池,初始化连接、获取连接等操作。
首先看一下ConnectionManager接口的源码:
1. public interface ConnectionManager extendsScannable {
2. ……略
3.
4. /**
5. * Add a connection to {@linkConnectionPool}.
6. * If it contains multiple pool keys, thisconnection will be added to multiple {@link ConnectionPool} too.
7. ……略
8. */
9. public void add(Connection connection);
10.
11. /**
12. * Add a connection to {@linkConnectionPool} with the specified poolKey.
13. ……略
14. */
15. public void add(Connection connection, String poolKey);
16.
17. /**
18. * Get a connection from {@linkConnectionPool} with the specified poolKey.
19. ……略
20. */
21. public Connection get(String poolKey);
22.
23. /**
24. * Get all connections from {@linkConnectionPool} with the specified poolKey.
25. ……略
26. */
27. public List<Connection> getAll(StringpoolKey);
28.
29. /**
30. * Get all connections of all poolKey.
31. ……略
32. */
33. public Map<String,List<Connection>> getAll();
34.
35. /**
36. * Remove a {@link Connection} from all{@link ConnectionPool} with the poolKeys in {@link Connection}, and close it.
37. ……略
38. */
39. public void remove(Connection connection);
40.
41. /**
42. * Remove and close a {@link Connection}from {@link ConnectionPool} with the specified poolKey.
43. ……略
44. */
45. public void remove(Connection connection,String poolKey);
46.
47. /**
48. * Remove and close all connections from{@link ConnectionPool} with the specified poolKey.
49. ……略
50. */
51. public void remove(String poolKey);
52.
53. /**
54. * Remove and close all connections fromall {@link ConnectionPool}.
55. */
56. publicvoid removeAll();
57.
58. /**
59. * check a connection whether available, ifnot, throw RemotingException
60. *
61. * @param connection
62. * @throws RemotingException
63. */
64. public void check(Connection connection) throwsRemotingException;
65.
66. /**
67. * Get the number of {@link Connection} in{@link ConnectionPool} with the specified pool key
68. */
69. public int count(String poolKey);
70.
71. /**
72. * Get a connection using {@link Url}, if{@code null} then create and add into {@link ConnectionPool}.
73. * The connection number of {@linkConnectionPool} is decided by {@link Url#getConnNum()}
74. ……略
75. */
76. public Connection getAndCreateIfAbsent(Url url) throws InterruptedException,RemotingException;
77.
78. /**
79. * This method can create connection poolwith connections initialized and check the number of connections.
80. * The connection number of {@linkConnectionPool} is decided by {@link Url#getConnNum()}.
81. * Each time call this method, will checkthe number of connection, if not enough, this will do the healing logicadditionally.
82. ……略
83. */
84. public voidcreateConnectionAndHealIfNeed(Url url) throws InterruptedException,
85. RemotingException;
86.
87. /**
88. * Create a connection using specified{@link Url}.
89. ……略
90. */
91. public Connection create(Url url) throwsRemotingException;
92.
93. /**
94. * Create a connection using specified{@link String} address.
95. ……略
96. */
97. public Connection create(String address,int connectTimeout) throws RemotingException;
98.
99. /**
100. * Create a connection using specified ipand port.
101. ……略
102. */
103. public Connection create(String ip, intport, int connectTimeout) throws RemotingException;
104. }
ConnectionManager接口定义了连接管理器的基本方法,如增加连接的方法、删除连接的方法、检查连接状态的方法等。
DefaultConnectionManager是SOFA Bolt提供的ConnectionManager接口的实现,在此主要关注几个比较重要的方法,其它方法实现相对比较简单,在此不详述。源码如下:
1. public class DefaultConnectionManagerimplements ConnectionManager, ConnectionHeartbeatManager, Scannable {
2.
3. protected ConcurrentHashMap<String,RunStateRecordedFutureTask<ConnectionPool>> connTasks;
4.
5. protected ConcurrentHashMap<String,FutureTask<Integer>> healTasks;
6.
7. privateConnectionPool getConnectionPoolAndCreateIfAbsent(String poolKey,
8. Callable<ConnectionPool>callable) throws RemotingException, InterruptedException {
9.
10. RunStateRecordedFutureTask<ConnectionPool>initialTask = null;
11. ConnectionPool pool = null;
12.
13. int retry = DEFAULT_RETRY_TIMES;
14.
15. int timesOfResultNull = 0;
16. int timesOfInterrupt = 0;
17.
18. for (int i = 0; (i < retry)&& (pool == null); ++i) {
19. initialTask = this.connTasks.get(poolKey);
20. if (null == initialTask) {
21. initialTask = newRunStateRecordedFutureTask<ConnectionPool>(callable);
22. initialTask =this.connTasks.putIfAbsent(poolKey, initialTask);
23. if (null == initialTask) {
24. initialTask =this.connTasks.get(poolKey);
25. initialTask.run();
26. }
27. }
28.
29. try {
30. pool = initialTask.get();
31. if (null == pool) {
32. if (i + 1 < retry) {
33. timesOfResultNull++;
34. continue;
35. }
36. this.connTasks.remove(poolKey);
37. String errMsg = "Getfuture task result null for poolKey [" + poolKey
38. + "]after [" + (timesOfResultNull + 1) + "] times try.";
39. throw newRemotingException(errMsg);
40. }
41. } catch (InterruptedException e) {
42. if (i + 1 < retry) {
43. timesOfInterrupt++;
44. continue;// retry ifinterrupted
45. }
46. this.connTasks.remove(poolKey);
47. logger
48. .warn(
49. "Future task ofpoolKey {} interrupted {} times. InterruptedException thrown and stopretry.",
50. poolKey, (timesOfInterrupt + 1), e);
51. throw e;
52. } catch (ExecutionException e) {
53. // DO NOT retry ifExecutionException occurred
54. this.connTasks.remove(poolKey);
55.
56. Throwable cause = e.getCause();
57. if (cause instanceofRemotingException) {
58. throw (RemotingException)cause;
59. } else {
60. FutureTaskUtil.launderThrowable(cause);
61. }
62. }
63. }
64. return pool;
65. }
66. }
在DefaultConnectionManager中,getConnectionPoolAndCreateIfAbsent是最核心的方法。它负责连接池的创建、连接池的初始化等操作,是其它方法操作的基础。
该方法有两个入参:
● poolKey:连接池唯一标识,通用标准格式为:IP:端口号;也可以在创建Url时,自定义poolKey;
● callable:实现Callable接口的任务。此处为DefaultConnectionManager的内部类ConnectionPoolCall,负责创建连接池,如果需要初始化连接池,则创建连接,并置入连接池。ConnectionPoolCall源码如下:
1. private class ConnectionPoolCall implements Callable<ConnectionPool> {
2. private boolean whetherInitConnection;
3. private Url url;
4.
5. public ConnectionPoolCall() {
6. this.whetherInitConnection = false;
7. }
8.
9. public ConnectionPoolCall(Url url) {
10. this.whetherInitConnection = true;
11. this.url = url;
12. }
13.
14. @Override
15. public ConnectionPool call() throwsException {
16. final ConnectionPool pool = newConnectionPool(connectionSelectStrategy);
17. if (whetherInitConnection) {
18. try {
19. doCreate(this.url, pool,this.getClass().getSimpleName(), 1);
20. } catch (Exception e) {
21. pool.removeAllAndTryClose();
22. throw e;
23. }
24. }
25. return pool;
26. }
27.
28. }
主要关注一下ConnectionPoolCall类中doCreate方法,负责创建连接池中的连接:
1. private void doCreate(final Url url, finalConnectionPool pool, final String taskName,
2. final intsyncCreateNumWhenNotWarmup) throws RemotingException {
3. final int actualNum = pool.size();
4. final int expectNum = url.getConnNum();
5. if (actualNum < expectNum) {
6. if (url.isConnWarmup()) {
7. ……略
8. for (int i = actualNum; i <expectNum; ++i) {
9. Connection connection = create(url);
10. pool.add(connection);
11. }
12. } else {
13. if (syncCreateNumWhenNotWarmup< 0 || syncCreateNumWhenNotWarmup > url.getConnNum()) {
14. throw newIllegalArgumentException(……略);
15. }
16. // create connection in syncway
17. if (syncCreateNumWhenNotWarmup > 0){
18. for (int i = 0; i <syncCreateNumWhenNotWarmup; ++i) {
19. Connection connection =create(url);
20. pool.add(connection);
21. }
22. if (syncCreateNumWhenNotWarmup ==url.getConnNum()) {
23. return;
24. }
25. }
26. // initialize executor in lazyway
27. initializeExecutor();
28. pool.markAsyncCreationStart();//mark the start of async
29. try {
30. this.asyncCreateConnectionExecutor.execute(newRunnable() {
31. @Override
32. public void run() {
33. try {
34. for (int i =pool.size(); i < url.getConnNum(); ++i) {
35. Connectionconn = null;
36. try {
37. conn =create(url);
38. } catch(RemotingException e) {
39. ……略 }
40. pool.add(conn);
41. }
42. } finally {
43. pool.markAsyncCreationDone();//mark the end of async
44. }
45. }
46. });
47. } catch(RejectedExecutionException e) {
48. pool.markAsyncCreationDone();// mark the endof async when reject
49. throw e;
50. }
51. } // end of NOT warm up
52. } // end of if
53. }
doCreate方法主要处理逻辑如下:
● 如果连接池中的连接数量小于期望的连接数量,则继续下面的流程,创建连接;否则,直接返回;
● 如果在Url中设置connWarmup(连接预热,即创建完连接池,同时同步创建连接)为true,则表示需要初始化连接池中的连接。此时,则循环调用create方法,通过连接工厂ConnectionFactory的createConnection方法,创建连接,并置入连接池;
● 如果connWarmup为false,则首先同步创建syncCreateNumWhenNotWarmup指定数量的连接,创建方法同上。创建完以后,如果此时连接池中连接数量小于期望的连接数量,则创建一个线程池,然后提交一个实现了Runnable接口的任务,该任务循环创建剩余数量的连接。
接下来看一下getConnectionPoolAndCreateIfAbsent方法的主要逻辑。该方法的主要思想是在指定的重试次数范围内,循环调用RunStateRecordedFutureTask任务创建连接池、初始化连接池中连接。如果创建成功,则直接返回。否则,根据具体的失败原因,返回相关的异常,如RemotingException、InterruptedException、ExecutionException。
着重看一下下面这段代码:
1. initialTask =this.connTasks.get(poolKey);
2. if (null == initialTask) {
3. initialTask = newRunStateRecordedFutureTask<ConnectionPool>(callable);
4. initialTask =this.connTasks.putIfAbsent(poolKey, initialTask);
5. if (null == initialTask) {
6. initialTask =this.connTasks.get(poolKey);
7. initialTask.run();
8. }
9. }
10. try {
11. pool = initialTask.get();
12. ……略
13. }
在创建连接过程中,需要解决客户端高并发的问题,即客户端高并发地调用该方法创建连接池时,如何保证连接池中建立的连接数量与所设定的数量相同?
在此,SOFA Bolt利用ConcurrentHashMap的putIfAbsent接口,实现了一个无锁化的建连过程:
● 创建一个connTasks集合,负责缓存已经创建的连接池初始化任务,类型为ConcurrentHashMap<String,RunStateRecordedFutureTask<ConnectionPool>>;
● 根据poolKey获取该key关联的连接池初始化任务initialTask;
● 如果initialTask为null,则表示该key对应的连接池还没有被创建过。则创建新的连接池初始化任务,并赋予initialTask;
● 调用ConcurrentHashMap的putIfAbsent方法,把initialTask置入connTasks,并返回上一个与该key相关联的初始化任务,赋值给initialTask;
● 如果initialTask为null,则表示该key对应的连接池还没有被创建过,则再次从connTasks中获取该key的对应的初始化任务,并运行,创建连接池,并初始化连接;
● 如果initialTask不为null,则表示该key对应的连接池已经被其它线程创建过或正在创建,则直接调用initialTask的get方法,获取或等待创建的结果。
除DefaultConnectionManager核心方法getConnectionPoolAndCreateIfAbsent外,其它方法并不是很复杂,在此不详述。
1.1.5 连接重连-ReconnectManager
重连管理器以独立线程方式运行,负责定期检查重新连接任务队列,如果有任务,则为指定连接池执行连接修复操作,以满足连接池的初始需求。如果没有任务,则阻塞该线程,等待任务。
在RpcClient的初始化过程中(init方法),根据全局的重连开关CONN_RECONNECT_SWITCH的值,判断是否开启重连功能:
1. if (globalSwitch.isOn(GlobalSwitch.CONN_RECONNECT_SWITCH)){
2. reconnectManager = newReconnectManager(connectionManager);
3. connectionEventHandler.setReconnectManager(reconnectManager);
4. logger.warn("Switch onreconnect manager");
5. }
ReconnectManager源码如下:
1. public class ReconnectManager {
2.
3. ……略
4. class ReconnectTask {
5. Url url;
6. }
7.
8. private finalLinkedBlockingQueue<ReconnectTask> tasks = newLinkedBlockingQueue<ReconnectTask>();
9.
10. protected final List<Url/* url */> canceled = newCopyOnWriteArrayList<Url>();
11. private volatile boolean started = false;
12.
13. private int healConnectionInterval = 1000;
14.
15. private final Thread healConnectionThreads;
16.
17. private ConnectionManager connectionManager;
18.
19. public ReconnectManager(ConnectionManagerconnectionManager) {
20. this.connectionManager =connectionManager;
21. this.healConnectionThreads = newThread(new HealConnectionRunner());
22. this.healConnectionThreads.start();
23. this.started = true;
24. }
25.
26. private void doReconnectTask(ReconnectTasktask) throws InterruptedException, RemotingException {
27. connectionManager.createConnectionAndHealIfNeed(task.url);
28. }
29.
30. private void addReconnectTask(ReconnectTasktask) {
31. tasks.add(task);
32. }
33.
34. public void addCancelUrl(Url url) {
35. canceled.add(url);
36. }
37.
38. public void removeCancelUrl(Url url) {
39. canceled.remove(url);
40. }
41.
42. /**
43. * add reconnect task
44. *
45. * @param url
46. */
47. public void addReconnectTask(Url url) {
48. ReconnectTask task = newReconnectTask();
49. task.url = url;
50. tasks.add(task);
51. }
52.
53. /**
54. * Check task whether is valid, ifcanceled, is not valid
55. *
56. * @param task
57. * @return
58. */
59. private boolean isValidTask(ReconnectTasktask) {
60. return !canceled.contains(task.url);
61. }
62.
63. /**
64. * stop reconnect thread
65. */
66. public void stop() {
67. if (!this.started) {
68. return;
69. }
70. this.started = false;
71. healConnectionThreads.interrupt();
72. this.tasks.clear();
73. this.canceled.clear();
74. }
75.
76. /**
77. * heal connection thread
78. *
79. * @author yunliang.shi
80. * @version $Id: ReconnectManager.java, v0.1 Mar 11, 2016 5:24:08 PM yunliang.shi Exp $
81. */
82. private final class HealConnectionRunnerimplements Runnable {
83. private long lastConnectTime = -1;
84.
85. public void run() {
86. while(ReconnectManager.this.started) {
87. long start = -1;
88. ReconnectTask task = null;
89. try {
90. if (this.lastConnectTime> 0
91. &&this.lastConnectTime < ReconnectManager.this.healConnectionInterval
92. || this.lastConnectTime< 0) {
93. Thread.sleep(ReconnectManager.this.healConnectionInterval);
94. }
95. try {
96. task =ReconnectManager.this.tasks.take();
97. } catch(InterruptedException e) {
98. // ignore
99. }
100.
101. start = System.currentTimeMillis();
102. if(ReconnectManager.this.isValidTask(task)) {
103. try {
104. ReconnectManager.this.doReconnectTask(task);
105. } catch(InterruptedException e) {
106. throw e;
107. }
108. } else {
109. logger.warn("Invalidreconnect request task {}, cancel list size {}",
110. task.url, canceled.size());
111. }
112. this.lastConnectTime =System.currentTimeMillis() - start;
113. } catch (Exception e) {
114. retryWhenException(start,task, e);
115. }
116. }
117. }
118.
119. private void retryWhenException(longstart, ReconnectTask task, Exception e) {
120. if (start != -1) {
121. this.lastConnectTime =System.currentTimeMillis() - start;
122. }
123. if (task != null) {
124. logger.warn("reconnecttarget: {} failed.", task.url, e);
125. ReconnectManager.this.addReconnectTask(task);
126. }
127. }
128. }
129. }
首先,看一下ReconnectManager构造函数,主要执行逻辑如下:
● 初始化连接管理器;
● 创建修复连接线程healConnectionThreads,修复任务为HealConnectionRunner;
● 启动修复连接线程healConnectionThreads;
● 设置重连管理器的状态started为true,表示该线程已经启动;
然后,着重看一下HealConnectionRunner内部类,它负责修复指定Url对应的连接池。
其中,重连任务ReconnectTask为内部类,只有一个Url成员变量,表示需要进行重新连接的Url对应的连接池。
任务列表tasks,类型为 LinkedBlockingQueue<ReconnectTask>(),表示待重新连接的连接任务。采用基于链表的阻塞队列,一是对于频繁增加、删除节点的场景,效率比较高;二是通过take方法获取重连任务,如果没有,阻塞重连线程,等待新任务。
HealConnectionRunner的run方法定义了ReconnectManager主要执行逻辑:
● 如果连接修复线程已经启动,则无限循环执行连接修复任务;
● 在下述两种情况下,需要等待指定的时间(修复连接间隔时间healConnectionInterval,单位毫米,默认1000),避免过度浪费CPU资源:
l 如果上次修复时间为负数(初始值为-1),表示还没有修复任务;
l 两次修复任务之间的时间间隔小于healConnectionInterval;
● 从tasks中获取重连连接任务。如果有任务,则调用DefaultConnectionManager的createConnectionAndHealIfNeed()方法,为指定Url对应的连接池执行连接修复操作,以满足连接池的初始需求。如果没有任务,则阻塞该线程,等待任务。
● 在执行连接修复任务的过程中,如果发生异常,则调用retryWhenException方法,把修复任务重新置入tasks任务队列,等待下一次重新执行。
最后,看一下addReconnectTask方法,它负责为指定的Url创建重连任务,并置入tasks任务列表中。此处逻辑比较简单,但我们更关心该方法在什么时间被调用。
由于SOFA Bolt采用Netty作为底层网络通信层,所以通过监控Netty的channel状态变化及SOFA Bolt的重连设置,来决定是否增加重连任务,其具体逻辑为:在ConnectionEvenHandler的channelInactive方法中,当Connection代表的channel变为非活动状态(inActive),并且CONN_RECONNECT_SWITCH位置的值为true时,调用addReconnectTask方法,增加重连任务。
1.1.6 连接关闭-DefaultConnectionMonitor
SOFABolt提供了连接监控器DefaultConnectionMonitor,根据指定的监控策略(ConnectionMonitorStrategy接口的具体实现),监控和处理连接。
在RpcClient的初始化过程中(init方法),根据全局的连接监控开关CONN_MONITOR_SWITCH的值,判断是否开启连接监控:
1. if(globalSwitch.isOn(GlobalSwitch.CONN_MONITOR_SWITCH)) {
2. if (monitorStrategy == null) {
3. ScheduledDisconnectStrategystrategy = new ScheduledDisconnectStrategy();
4. connectionMonitor = newDefaultConnectionMonitor(strategy, this.connectionManager);
5. } else {
6. connectionMonitor = newDefaultConnectionMonitor(monitorStrategy,
7. this.connectionManager);
8. }
9. connectionMonitor.start();
10. logger.warn("Switch onconnection monitor");
11. }
DefaultConnectionMonitor源码如下:
1. public class DefaultConnectionMonitor {
2.
3. ……略
4. /** Connection pools to monitor */
5. private DefaultConnectionManager connectionManager;
6.
7. /** Monitor strategy */
8. private ConnectionMonitorStrategy strategy;
9.
10. private ScheduledThreadPoolExecutor executor;
11.
12. publicDefaultConnectionMonitor(ConnectionMonitorStrategy strategy,
13. DefaultConnectionManagerconnectionManager) {
14. this.strategy = strategy;
15. this.connectionManager =connectionManager;
16. }
17.
18. public void start() {
19. /** initial delay to execute scheduletask, unit: ms */
20. long initialDelay =SystemProperties.conn_monitor_initial_delay();
21.
22. /** period of schedule task, unit: ms*/
23. long period =SystemProperties.conn_monitor_period();
24.
25. this.executor = newScheduledThreadPoolExecutor(1, new NamedThreadFactory(
26. "ConnectionMonitorThread"),new ThreadPoolExecutor.AbortPolicy());
27. MonitorTask monitorTask = new MonitorTask();
28. this.executor.scheduleAtFixedRate(monitorTask,initialDelay, period, TimeUnit.MILLISECONDS);
29. }
30.
31. public void destroy() {
32. executor.purge();
33. executor.shutdown();
34. }
35.
36. private class MonitorTask implements Runnable {
37. @Override
38. public void run() {
39. try {
40. if (strategy != null) {
41. Map<String,RunStateRecordedFutureTask<ConnectionPool>> connPools = connectionManager.getConnPools();
42. strategy.monitor(connPools);
43. }
44. } catch (Exception e) {
45. logger.warn("MonitorTaskerror", e);
46. }
47. }
48. }
49. }
DefaultConnectionMonitor类的功能比较简单,其主要处理逻辑为:
● 创建定时任务线程池ScheduledThreadPoolExecutor的实例executor;
● 创建监控任务MonitorTask的实例monitorTask,该任务的主要任务是通过连接管理器获取所有的连接池集合,然后通过指定的监控策略处理连接池中的所有连接;
● 以固定的时间间隔执行连接监控任务monitorTask。
从上述描述中可以看出,监控任务主要由具体的监控策略执行完成。
SOFABolt提供了一个监控策略ConnectionMonitorStrategy接口的默认实现类ScheduledDisconnectStrategy,主要负责关闭多余的连接。
1. public class ScheduledDisconnectStrategyimplements ConnectionMonitorStrategy {
2. ……略
3. /** the connections threshold of each{@link Url#uniqueKey} */
4. private static final int CONNECTION_THRESHOLD = SystemProperties.conn_threshold();
5.
6. /** fresh select connections to be closed*/
7. private Map<String, Connection>freshSelectConnections = new ConcurrentHashMap<String, Connection>();
8.
9. /** Retry detect period forScheduledDisconnectStrategy*/
10. private static int RETRY_DETECT_PERIOD = SystemProperties.retry_detect_period();
11.
12. /** random */
13. private Random random = new Random();
14.
15. /**
16. * Filter connections to monitor
17. *
18. * @param connections
19. */
20. @Override
21. public Map<String, List<Connection>>filter(List<Connection> connections) {
22. List<Connection>serviceOnConnections = new ArrayList<Connection>();
23. List<Connection>serviceOffConnections = new ArrayList<Connection>();
24. Map<String,List<Connection>> filteredConnections = newConcurrentHashMap<String, List<Connection>>();
25.
26. String serviceStatus = (String)connection.getAttribute(Configs.CONN_SERVICE_STATUS);
27.
28. for (Connection connection :connections) {
29. if (serviceStatus != null) {
30. if(connection.isInvokeFutureMapFinish()
31. &&!freshSelectConnections.containsValue(connection)) {
32. serviceOffConnections.add(connection);
33. }
34. } else {
35. serviceOnConnections.add(connection);
36. }
37. }
38.
39. filteredConnections.put(Configs.CONN_SERVICE_STATUS_ON,serviceOnConnections);
40. filteredConnections.put(Configs.CONN_SERVICE_STATUS_OFF,serviceOffConnections);
41. return filteredConnections;
42. }
43.
44. /**
45. * Monitor connections and closeconnections with status is off
46. *
47. * @param connPools
48. */
49. @Override
50. public void monitor(Map<String,RunStateRecordedFutureTask<ConnectionPool>> connPools) {
51. try {
52. if (null != connPools &&!connPools.isEmpty()) {
53. Iterator<Map.Entry<String,RunStateRecordedFutureTask<ConnectionPool>>> iter =connPools.entrySet().iterator();
54.
55. while (iter.hasNext()) {
56. Map.Entry<String,RunStateRecordedFutureTask<ConnectionPool>> entry = iter.next();
57. String poolKey =entry.getKey();
58. ConnectionPool pool =FutureTaskUtil.getFutureTaskResult(entry.getValue(),logger);
59.
60. List<Connection>connections = pool.getAll();
61. Map<String,List<Connection>> filteredConnectons = this.filter(connections);
62. List<Connection>serviceOnConnections = filteredConnectons
63. .get(Configs.CONN_SERVICE_STATUS_ON);
64. List<Connection>serviceOffConnections = filteredConnectons
65. .get(Configs.CONN_SERVICE_STATUS_OFF);
66. if(serviceOnConnections.size() > CONNECTION_THRESHOLD) {
67. Connection freshSelectConnect =serviceOnConnections.get(random
68. .nextInt(serviceOnConnections.size()));
69. freshSelectConnect.setAttribute(Configs.CONN_SERVICE_STATUS,
70. Configs.CONN_SERVICE_STATUS_OFF);
71.
72. ConnectionlastSelectConnect = freshSelectConnections.remove(poolKey);
73. freshSelectConnections.put(poolKey,freshSelectConnect);
74.
75. closeFreshSelectConnections(lastSelectConnect,serviceOffConnections);
76.
77. } else {
78. if(freshSelectConnections.containsKey(poolKey)) {
79. ConnectionlastSelectConnect = freshSelectConnections.remove(poolKey);
80. closeFreshSelectConnections(lastSelectConnect,serviceOffConnections);
81. }
82. ……略
83. }
84.
85. for (Connection offConn :serviceOffConnections) {
86. if (offConn.isFine()) {
87. offConn.close();
88. }
89. }
90. }
91. }
92. } catch (Exception e) {
93. logger.error("ScheduledDisconnectStrategymonitor error", e);
94. }
95. }
96.
97. /**
98. * close the connection of the fresh selectconnections
99. *
100. * @param lastSelectConnect
101. * @param serviceOffConnections
102. * @throws InterruptedException
103. */
104. private void closeFreshSelectConnections(ConnectionlastSelectConnect,
105. List<Connection>serviceOffConnections)
106. throwsInterruptedException {
107. if (null != lastSelectConnect) {
108. if(lastSelectConnect.isInvokeFutureMapFinish()) {
109. serviceOffConnections.add(lastSelectConnect);
110. } else {
111. Thread.sleep(RETRY_DETECT_PERIOD);
112. if (lastSelectConnect.isInvokeFutureMapFinish()){
113. serviceOffConnections.add(lastSelectConnect);
114. } else {
115. if (logger.isInfoEnabled()){
116. logger.info("Address={}won't close at this schedule turn",
117. RemotingUtil.parseRemoteAddress(lastSelectConnect.getChannel()));
118. }
119. }
120. }
121. }
122. }
123. }
ScheduledDisconnectStrategy类monitor方法的主要处理逻辑为:循环遍历connPools中所有连接池,并依次处理:
● 获取每个连接池中所有连接,调用filter方法,按照每个连接的状态值bolt.conn.service.status进行分组(on(连接可用)和off(连接不可用)),分别为serviceOnConnections和serviceOffConnections。
● 如果可用连接数大于可用最大连接数,则从serviceOnConnections中随机选择一个连接,把其状态bolt.conn.service.status设置为off,并置入serviceOffConnections。
● 如果可用连接数小于等于可用最大连接数,但freshSelectConnections中包含有poolKey对应的连接,表明该连接也需要关闭,则置入serviceOffConnections。
● 循环遍历serviceOffConnections,关闭其中所有连接。
此方法需要注意,由于是定时任务,每隔固定的时间都会执行MonitorTask任务,所以可能会出现上次MonitorTask任务未完成,新的MonitorTask任务又开始执行的情况。由于此处采用ScheduledThreadPoolExecutor线程池,所以当前MonitorTask任务在指定的时间间隔内未执行完成,则下一次执行MonitorTask任务的时间会推迟,但绝不会出现并发执行MonitorTask任务的情况。另外,如果某次MonitorTask任务执行过程中抛出异常,则禁止后续MonitorTask任务的执行。
1.1.7 连接保持策略
在网络通信框架中,最常用的网络通信协议是TCP协议。
采用TCP协议进行网络通信时,在读写操作之前,server与client之间必须建立一个连接。当读、写操作完成后,双方不再需要这个连接时,它们可以关闭这个连接。TCP连接的建立是需要三次握手,而关闭则需要4次握手,所以,每个TCP连接的建立都非常消耗资源和时间。
为了减少TCP连接建立和关闭操作,减少网络流量,我们通常使用TCP长连接,即在一个TCP连接中处理多个读、写操作。而不是像TCP短连接那样,每次读、写操作都需要创建一个新的TCP连接。另外,采用TCP长连接,可以减少后续请求的延时,并且长时间的连接让TCP有充足的时间判断网络的拥塞情况,方便做出下步操作。
为此,在连接管理过程中,我们要考虑如何保持连接以及连接保持时间的问题。
针对上述问题,我们分别从服务端和客户端考虑解决方案。
1.1.7.1 服务端
在TCP连接中,服务端主要是监控某个端口,然后接收客户端发到该端口的数据,并进行处理,最后把处理结果返回给客户端。
从总体上来说,服务端对于保持连接的策略相对于客户端而言比较简单,主要遵循的原则为:尽量保证由客户端主动发起关闭连接的操作,而不是服务端主动发起。
遵循上述原则的主要原因为:解决服务端主动关闭连接时的TIME_WAIT问题。
为了理解TIME_WAIT问题,简述一下TCP正常的关闭过程如下(四次握手过程):
1. (FIN_WAIT_1) A ---FIN---> B(CLOSE_WAIT)
2. (FIN_WAIT_2) A <--ACK-- B(CLOSE_WAIT)
3. (TIME_WAIT) A <--FIN---- B(LAST_ACK)
4. (TIME_WAIT) A ---ACK-> B(CLOSED)
四次握手过程如下:
1. A端首先发送一个FIN请求给B端,要求关闭,发送后A段的TCP状态变更为FIN_WAIT_1,接收到FIN请求后B端的TCP状态变更为CLOSE_WAIT。
2. B接收到ACK请求后,B回一个ACK给A端,确认接收到的FIN请求,接收到ACK请求后,A端的TCP状态变更为为FIN_WAIT_2。
3. B端再发送一个FIN请求给A端,与连接过程的3次握手过程不一样,这个FIN请求之所以并不是与上一个请求一起发送,之所以如此处理,是因为TCP是双通道的,允许在发送ACK请求后,并不马上发FIN请求,即只关闭A到B端的数据流,仍然允许B端到A端的数据流。这个ACK请求发送之后,B端的TCP状态变更为LAST_ACK,A端的状态变更为TIME_WAIT。
4. A端接收到B端的FIN请求后,再回B端一个ACK信息,对上一个FIN请求进行确认,到此时B端状态变更为CLOSED,Socket可以关闭。
除了如上正常的关闭(优雅关闭)之外,TCP还提供了另外一种非优雅的关闭方式RST(Reset):
1. (CLOSED) A ---RST--> B (CLOSED)
A端发送RST状态之后,TCP进入CLOSED状态,B端接收到RST后,也即可进入CLOSED状态。
在第一种关闭方式上(优雅关闭),非常遗憾,A端在最后发送一个ACK请求后,并不能马上将该Socket回收,因为A并不能确定B一定能够接收到这个ACK请求,因此A端必须对这个Socket维持TIME_WAIT状态2MSL(MSL=Max Segment Lifetime,取决于操作系统和TCP实现,该值为30秒、60秒或2分钟)。如果A端是客户端,这并不会成为问题,但如果A端是服务端,那就很危险了,如果连接的Socket非常多,而又维持如此多的TIME_WAIT状态的话,那么有可能会将Socket耗尽(报Too Many Open File)。
服务端为了解决这个TIME_WAIT问题,可选择的方式有三种:
1. 保证由客户端主动发起关闭(即做为B端);
2. 关闭的时候使用RST的方式;
3. 对处于TIME_WAIT状态的TCP允许重用;
一般我们当然最好是选择第一种方式,实在没有办法的时候,我们可以使用SO_LINGER选择第二种方式,使用SO_REUSEADDR选择第三种方式。
1. public void setSoLinger(boolean on, intlinger) throws SocketException
2. public void setReuseAddress(boolean on)throws SocketException
第一个on表示是否使用SO_LINGER选项,linger(以秒为单位)表示在发RST之前会等待多久,因为一旦发送RST,还在缓冲区中还没有发送出去的数据就会直接丢弃。
遵循上述原则,服务端在连接管理上,可以参考如下方法:
1. 对于已经确定要关闭的连接,如网络原因导致socket关闭、网络读、写异常等,则服务端直接调用相关方法,主动关闭该连接;
2. 对于其它情况的连接,如客户端设置使用长连接、请求数量达到该连接支持的最大请求数量等,则保持连接,但需要在服务端返回给客户端的响应结果中设置相关参数,由客户端决定是否保持连接,以及连接保留时间等。
1.1.7.2 客户端
相对于服务端,客户端对于保持连接的策略相对复杂一些。
客户端对于连接是否保持以及连接保持时间上,可以参考HttpClient组件的实现,其实现原则如下:
一、 连接是否保持
客户端如果希望保持长连接,应该在发起请求时告诉服务器希望服务器保持长连接(http1.0设置connection字段为keep-alive,http 1.1字段默认保持)。然后,根据服务器的响应来确定是否保持长连接,判断原则如下:
1. 检查返回response报文头的Transfer-Encoding字段,若该字段值存在且不为chunked,则连接不保持,直接关闭。其他情况进入下一步。
2. 检查返回的response报文头的Content-Length字段,若该字段值为空或者格式不正确(多个长度,值不是整数),则连接不保持,直接关闭。其他情况进入下一步。
3. 检查返回的response报文头的connection字段(若该字段不存在,则为Proxy-Connection字段)值:
● 如果这俩字段都不存在,则http 1.1版本默认为保持,将连接标记为保持, 1.0版本默认为连接不保持,直接关闭。
● 如果字段存在,若字段值为close 则连接不保持,直接关闭;若字段值为keep-alive则连接标记为保持。
二、 连接保持时间
连接交换至连接管理器时,若连接标记为保持,则将由连接管理器保持一段时间;若连接没有标记为保持,则直接从连接池中删除并关闭entry。连接保持时,保持时间规则如下:
1. 保持时间计时开始时间为连接交换至连接池的时间。
2. 保持时长计算规则为:获取keep-alive字段中timeout属性的值,
● 若该字段存在,则保持时间为 timeout属性值*1000,单位毫秒。
● 若该字段不存在,则连接保持时间设置为-1,表示为无穷。
本系列文章目录: