Netty实践(四):心跳检测实现
心跳检测的概念
在分布式架构中,比如Hadoop集群,Storm集群等,或多或少都涉及到Master/Slave的概念,往往是一个或者多个Master和N个Slave之间进行通信。那么通常Master应该需要知道Slave的状态,Slave会定时的向Master进行发送消息,相当于告知Master:“我还活着,我现在在做什么,什么进度,我的CPU/内存情况如何”等,这就是所谓的心跳。Master根据Slave的心跳,进行协调,比如Slave的CPU/内存消耗很大,那么Master可以将任务分配给其他负载小的Slave进行处理;比如Slave一段时间没有发送心跳过来,那么Master可能会将可服务列表中暂时删除该Slave,并可能发出报警,告知运维/开发人员进行处理.如下图所示。
Netty实现心跳检测代码实例
心跳信息对象
主要储存Slave的IP,通信PORT,时间,内存,CPU信息等。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
|
package day4;
import java.io.Serializable;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
/** * Created by zhangfengzhe on 2017/2/4.
*/
public class HeartInfo implements Serializable{
private String ip;
private int port;
private Date lasttime;
private Map<String , String> cpuInfo = new HashMap<String,String>();
private Map<String , String> memInfo = new HashMap<String, String>();
public String getIp() {
return ip;
}
public void setIp(String ip) {
this .ip = ip;
}
public int getPort() {
return port;
}
public void setPort( int port) {
this .port = port;
}
public Date getLasttime() {
return lasttime;
}
public void setLasttime(Date lasttime) {
this .lasttime = lasttime;
}
public Map<String, String> getCpuInfo() {
return cpuInfo;
}
public void setCpuInfo(Map<String, String> cpuInfo) {
this .cpuInfo = cpuInfo;
}
public Map<String, String> getMemInfo() {
return memInfo;
}
public void setMemInfo(Map<String, String> memInfo) {
this .memInfo = memInfo;
}
@Override
public String toString() {
return "HeartInfo{" +
"ip='" + ip + '\ '' +
", port=" + port +
", lasttime=" + lasttime +
", cpuInfo=" + cpuInfo +
", memInfo=" + memInfo +
'}' ;
}
} |
JBoss Marshalling编解码处理器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
|
package day3;
import io.netty.handler.codec.marshalling.DefaultMarshallerProvider;
import io.netty.handler.codec.marshalling.DefaultUnmarshallerProvider;
import io.netty.handler.codec.marshalling.MarshallerProvider;
import io.netty.handler.codec.marshalling.MarshallingDecoder;
import io.netty.handler.codec.marshalling.MarshallingEncoder;
import io.netty.handler.codec.marshalling.UnmarshallerProvider;
import org.jboss.marshalling.MarshallerFactory;
import org.jboss.marshalling.Marshalling;
import org.jboss.marshalling.MarshallingConfiguration;
/** * Marshalling工厂
*/
public final class MarshallingCodeCFactory {
/**
* 创建Jboss Marshalling解码器MarshallingDecoder
* @return MarshallingDecoder
*/
public static MarshallingDecoder buildMarshallingDecoder() {
//首先通过Marshalling工具类的精通方法获取Marshalling实例对象 参数serial标识创建的是java序列化工厂对象。
final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory( "serial" );
//创建了MarshallingConfiguration对象,配置了版本号为5
final MarshallingConfiguration configuration = new MarshallingConfiguration();
configuration.setVersion( 5 );
//根据marshallerFactory和configuration创建provider
UnmarshallerProvider provider = new DefaultUnmarshallerProvider(marshallerFactory, configuration);
//构建Netty的MarshallingDecoder对象,俩个参数分别为provider和单个消息序列化后的最大长度
MarshallingDecoder decoder = new MarshallingDecoder(provider, 1024 );
return decoder;
}
/**
* 创建Jboss Marshalling编码器MarshallingEncoder
* @return MarshallingEncoder
*/
public static MarshallingEncoder buildMarshallingEncoder() {
final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory( "serial" );
final MarshallingConfiguration configuration = new MarshallingConfiguration();
configuration.setVersion( 5 );
MarshallerProvider provider = new DefaultMarshallerProvider(marshallerFactory, configuration);
//构建Netty的MarshallingEncoder对象,MarshallingEncoder用于实现序列化接口的POJO对象序列化为二进制数组
MarshallingEncoder encoder = new MarshallingEncoder(provider);
return encoder;
}
} |
Client
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
|
package day4;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
public class Client {
public static void main(String[] args) throws Exception{
EventLoopGroup group = new NioEventLoopGroup();
Bootstrap b = new Bootstrap();
final int port = 8765 ;
final String serverIP = "127.0.0.1" ;
b.group(group)
.channel(NioSocketChannel. class )
.handler( new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel sc) throws Exception {
sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
sc.pipeline().addLast( new ClientHandler(port));
}
});
ChannelFuture cf = b.connect(serverIP, port).sync();
cf.channel().closeFuture().sync();
group.shutdownGracefully();
}
} |
Client Handler
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
|
package day4;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.ReferenceCountUtil;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
/** * Created by zhangfengzhe on 2017/2/4.
*/
public class ClientHandler extends ChannelHandlerAdapter {
private String ip;
private int port;
private ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool( 1 );
private ScheduledFuture<?> scheduledFuture;
private static final String SUCCESS = "OK" ;
public ClientHandler(){}
public ClientHandler( int port) {
this .port = port;
//获取本机IP
try {
this .ip = InetAddress.getLocalHost().getHostAddress();
} catch (UnknownHostException e) {
e.printStackTrace();
}
}
//通道建立初始化时 发送信息 准备握手验证
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
String authInfo = this .ip + ":" + this .port;
ctx.writeAndFlush(authInfo);
}
//当服务器发送认证信息后,开始启动心跳发送
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof String){
//认证成功
if (SUCCESS.equals((String)msg)){
this .scheduledFuture = scheduledExecutorService.scheduleWithFixedDelay( new HeartTask(ctx,ip,port), 2 , 3 , TimeUnit.SECONDS);
} else {
System.out.println( "服务器发来消息:" + msg);
}
}
ReferenceCountUtil.release(msg);
}
//如果出现异常 取消定时
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
if ( this .scheduledFuture != null ){
this .scheduledFuture.cancel( true );
this .scheduledFuture = null ;
}
}
} |
Client和Server建立通道初始化的时候,Client会向服务器发送信息用于认证。在实际开发中,Client在发送心跳前,需要和Server端进行握手验证,会涉及到加解密,这里为了简单起见,省去了这些过程。从上面的代码也可以看到,如果服务端认证成功,那么Client会开始启动定时线程去执行任务,那么接下来,我们看看这个心跳任务。
心跳任务HeartTask
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
|
package day4;
import io.netty.channel.ChannelHandlerContext;
import org.hyperic.sigar.CpuPerc;
import org.hyperic.sigar.Mem;
import org.hyperic.sigar.Sigar;
import java.util.Date;
import java.util.Random;
/** * Created by zhangfengzhe on 2017/2/4.
*/
public class HeartTask implements Runnable{
//持有引用,方便读写操作
private ChannelHandlerContext ctx;
private HeartInfo heartInfo = new HeartInfo();
public HeartTask(ChannelHandlerContext ctx, String ip, int port) {
this .ctx = ctx;
heartInfo.setIp(ip);
heartInfo.setPort(port);
}
@Override
public void run() {
try {
//利用sigar获取 内存/CPU方面的信息 ; 利用CTX给服务器端发送消息
Sigar sigar = new Sigar();
//内存使用信息memory
Mem mem = sigar.getMem();
heartInfo.getMemInfo().put( "total" ,String.valueOf(mem.getTotal()));
heartInfo.getMemInfo().put( "used" ,String.valueOf(mem.getUsed()));
heartInfo.getMemInfo().put( "free" ,String.valueOf(mem.getFree()));
//CPU使用信息
CpuPerc cpuPerc = sigar.getCpuPerc();
heartInfo.getCpuInfo().put( "user" ,String.valueOf(cpuPerc.getUser()));
heartInfo.getCpuInfo().put( "sys" ,String.valueOf(cpuPerc.getSys()));
heartInfo.getCpuInfo().put( "wait" ,String.valueOf(cpuPerc.getWait()));
heartInfo.getCpuInfo().put( "idle" ,String.valueOf(cpuPerc.getIdle()));
heartInfo.setLasttime( new Date());
ctx.writeAndFlush(heartInfo);
} catch (Exception e){
e.printStackTrace();
}
}
} |
首先,为了方便在心跳任务中进行读写操作,HeartTask持有ChannelHandlerContext的引用。其次,为了方便收集系统的内存、CPU信息,这里使用了Sigar,也是在实际中引用非常广泛的一个工具。
Server
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
|
package day4;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
public class Server {
public static void main(String[] args) throws Exception{
EventLoopGroup pGroup = new NioEventLoopGroup();
EventLoopGroup cGroup = new NioEventLoopGroup();
ServerBootstrap b = new ServerBootstrap();
b.group(pGroup, cGroup)
.channel(NioServerSocketChannel. class )
.option(ChannelOption.SO_BACKLOG, 1024 )
//设置日志
.handler( new LoggingHandler(LogLevel.INFO))
.childHandler( new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel sc) throws Exception {
sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
sc.pipeline().addLast( new ServerHandler());
}
});
ChannelFuture cf = b.bind( 8765 ).sync();
cf.channel().closeFuture().sync();
pGroup.shutdownGracefully();
cGroup.shutdownGracefully();
}
} |
Server Handler
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
|
package day4;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/** * Created by zhangfengzhe on 2017/2/4.
*/
public class ServerHandler extends ChannelHandlerAdapter {
//KEY: ip:port VALUE: HeartInfo
private Map<String,HeartInfo> heartInfoMap = new HashMap<String, HeartInfo>();
private static final List<String> authList = new ArrayList<String>();
static {
//从其他地方加载出来的IP列表
authList.add( "192.168.99.219:8765" );
}
//服务器会接收到2种消息 一个是客户端初始化时发送过来的认证信息 第二个是心跳信息
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof String){
if (authList.contains(msg)){ //验证通过
ctx.writeAndFlush( "OK" );
} else {
ctx.writeAndFlush( "不在认证列表中..." );
}
} else if (msg instanceof HeartInfo){
System.out.println((HeartInfo)msg);
ctx.writeAndFlush( "心跳接收成功!" );
HeartInfo heartInfo = (HeartInfo)msg;
heartInfoMap.put(heartInfo.getIp() + ":" + heartInfo.getPort(),heartInfo);
}
}
} |
运行结果
Client端
Server端
到这里,心跳检测就实现了,就这么简单,你会了么,See U~
本文转自zfz_linux_boy 51CTO博客,原文链接:http://blog.51cto.com/zhangfengzhe/1895031,如需转载请自行联系原作者