Netty网络编程实战 - 通信框架,私有协议、生产级报文追踪、认证机制、自动空闲检测、断线自动重连
前言
前面我们已经基于不同维度介绍关于Netty的很多知识了,包括通信原理、框架工作机制、核心组件、应用实战,以及不同场景对不同方案的选择等等。那么我们这次就研究一下我们项目中基于Netty端对端开发中如何搭建一个完整的应用框架,以供开发人员嵌入他们关注的各种应用部件等。
实现Netty应用级框架需要考虑哪些因素
很多人问,我们在基于某种网络通信框架构建我们自己的应用框架的时候,究竟需要考虑到哪些方面?我们如何构建一个与业务解耦的应用基础设施、定制协议格式、健壮性的机制等来支撑我们的开发呢?大家也可以在下方的留言讨论,而就个人的理解和相关实践经验,我认为至少应考虑到以下的问题:
-
网络通信协议的选择,方案的比较, 我们应基于TCP?UDP?还是应用层的一些成熟协议?...
-
网络I/O模型该采用何种?BIO?NIO?IO复用?AIO?还是信号驱动IO呢?
-
底层通信框架我们要定制,还是沿用已有的成熟框架?
-
我们场景是否需要统一定制全局可复用的交互协议、报文等?
-
是否应该建设一种高效可靠的消息编解码机制支撑快速通信?
-
来自客户端的连接、业务请求等是否需要有认证和鉴权?
-
当业务通信发生异常了,能否方便看到和追踪通信报文细节(更便于我们一步一步查找问题原因)??
-
当服务端因为非预期原因断开或崩溃,然后发现修复重启后是否每个客户端都要手动再连接一下??
-
当服务器空闲一段时间后,是否该有一种机制自动触发心跳检测网络的健康状况??
... ...
还有很多本文就不一一列举。那我们今天就以上考虑到的问题点来手写实现一个基于Netty通信框架的应用级框架,然后验证我们的问题是否能得以圆满解决。
应用框架实战
在以下应用框架中我们将给出以上问题的解决方案
-
网络通信协议的选择,方案的比较, 我们应基于TCP?UDP?还是应用层的一些成熟协议? 采用TCP/IP
-
网络I/O模型该采用何种?BIO?NIO?IO复用?AIO?还是信号驱动IO呢?采用NIO非阻塞模型
-
底层通信框架我们要定制,还是沿用已有的成熟框架? 基于Netty框架
-
我们场景是否需要统一定制全局可复用的交互协议、报文等? 私有协议定义和交互约定方式
-
是否应该建设一种高效可靠的消息编解码机制支撑快速通信?采用ByteToMessage/MessageToByte/Kryo
-
来自客户端的连接、业务请求等服务端是否需要有认证和鉴权?采用服务端白名单
-
当业务通信发生异常了,能否方便看到和追踪通信报文细节(更便于我们一步一步查找问题原因)??采用Netty内置Logging机制
-
当服务端因为非预期原因断开或崩溃,然后发现修复重启后是否每个客户端都要手动再连接一下?? 设计断线自动尝试重连
-
当服务器空闲一段时间后,是否该有一种机制自动触发心跳检测网络的健康状况??采用IdleState和自动触发机制
一、基础设施部分
/** * @author andychen https://blog.51cto.com/14815984 * @description:Kryo序列化器单例 */ public class KryoBuilder { private KryoBuilder(){} /** * 获取单例 * @return */ public static Kryo getInstance(){ return SingleKryo.builder; } private static class SingleKryo{ private static Kryo builder = new Kryo(); } /** * 构建kryo对象和注册 * @return */ public static Kryo build(){ Kryo kryo = getInstance(); kryo.setRegistrationRequired(false); kryo.register(Arrays.asList("").getClass(), new ArraysAsListSerializer()); kryo.register(GregorianCalendar.class, new GregorianCalendarSerializer()); kryo.register(InvocationHandler.class, new JdkProxySerializer()); kryo.register(BigDecimal.class, new DefaultSerializers.BigDecimalSerializer()); kryo.register(BigInteger.class, new DefaultSerializers.BigIntegerSerializer()); kryo.register(Pattern.class, new RegexSerializer()); kryo.register(BitSet.class, new BitSetSerializer()); kryo.register(URI.class, new URISerializer()); kryo.register(UUID.class, new UUIDSerializer()); UnmodifiableCollectionsSerializer.registerSerializers(kryo); SynchronizedCollectionsSerializer.registerSerializers(kryo); kryo.register(HashMap.class); kryo.register(ArrayList.class); kryo.register(LinkedList.class); kryo.register(HashSet.class); kryo.register(TreeSet.class); kryo.register(Hashtable.class); kryo.register(Date.class); kryo.register(Calendar.class); kryo.register(ConcurrentHashMap.class); kryo.register(SimpleDateFormat.class); kryo.register(GregorianCalendar.class); kryo.register(Vector.class); kryo.register(BitSet.class); kryo.register(StringBuffer.class); kryo.register(StringBuilder.class); kryo.register(Object.class); kryo.register(Object[].class); kryo.register(String[].class); kryo.register(byte[].class); kryo.register(char[].class); kryo.register(int[].class); kryo.register(float[].class); kryo.register(double[].class); return kryo; } }
/** * @author andychen https://blog.51cto.com/14815984 * @description:Kryo序列化器 */ public class KryoSerializer { private static final Kryo kryo = KryoBuilder.build(); /** * 序列化 * * @param object * @param buf */ public static void serialize(Object object, ByteBuf buf) { ByteArrayOutputStream stream = new ByteArrayOutputStream(); try{ Output out = new Output(stream); kryo.writeClassAndObject(out, object); out.flush(); out.close(); byte[] bytes = stream.toByteArray(); stream.flush(); stream.close(); /** * 写入buffer */ buf.writeBytes(bytes); } catch (Exception e) { e.printStackTrace(); } } /** * 反序列化 * @param buf 数据缓冲 * @return */ public static Object deserialize(ByteBuf buf) { try(ByteBufInputStream stream = new ByteBufInputStream(buf)) { Input input = new Input(stream); return kryo.readClassAndObject(input); } catch (Exception e) { e.printStackTrace(); } return null; } }
/** * @author andychen https://blog.51cto.com/14815984 * @description:Kryo编码器 */ public class KryoEncoder extends MessageToByteEncoder<Message> { /** * 编码实现 * @param channelHandlerContext 处理器上下文 * @param message 报文 * @param byteBuf 对端数据缓冲 * @throws Exception */ @Override protected void encode(ChannelHandlerContext channelHandlerContext, Message message, ByteBuf byteBuf) throws Exception { KryoSerializer.serialize(message, byteBuf); channelHandlerContext.flush(); } }
/** * @author andychen https://blog.51cto.com/14815984 * @description:Kryo解码器 */ public class KryoDecoder extends ByteToMessageDecoder { /** * 解码实现 * @param channelHandlerContext 处理器上下文 * @param byteBuf 对端缓冲 * @param list 反序列化列表 * @throws Exception */ @Override protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception { Object object = KryoSerializer.deserialize(byteBuf); list.add(object); } }
/** * @author andychen https://blog.51cto.com/14815984 * @description:系统快捷工具类 */ public final class Utility { /** * 构建报文 * @param sessionId 会话Id * @param msg 主体 * @return */ public static Message buildMessage(int sessionId, Object msg, byte type){ //获取校验码 final int OFFSET = 9; int seed = sessionId+(sessionId > OFFSET ? sessionId : sessionId+OFFSET); String crc = CRC.getCRC16(seed); MessageHeader header = new MessageHeader(); header.setCrc(crc); header.setLength(calcBufferLen(msg)); header.setSessionId(sessionId); header.setType(type); Message message = new Message(); message.setHeader(header); message.setBody(msg); return message; } /** * 是否IP认证通过 * @return */ public static boolean isIPPassed(String ip){ for (String p : Constant.WHITELIST){ if(ip.equals(p)){ return true; } } return false; } /** * 计算报文长度 * @param msg 报文对象 * @return int */ private static int calcBufferLen(Object msg){ try(ByteArrayOutputStream stream = new ByteArrayOutputStream(); ObjectOutputStream output = new ObjectOutputStream(stream)){ output.writeObject(msg); return stream.toByteArray().length; }catch (IOException e){ e.printStackTrace(); } return 0; } }
/** * @author andychen https://blog.51cto.com/14815984 * @description:私有报文 */ public final class Message {//<T extends Object> /** * 报文头 */ private MessageHeader header; /** * 报文主体 */ private Object body; public MessageHeader getHeader() { return header; } public void setHeader(MessageHeader header) { this.header = header; } public Object getBody() { return body; } public void setBody(Object body) { this.body = body; } @Override public String toString() { return "Message [header=" + this.header + "][body="+this.body+"]"; } }
/** * @author andychen https://blog.51cto.com/14815984 * @description:报文头 */ public final class MessageHeader { /** * CRC校验码 */ private String crc; /** * 会话id */ private int sessionId; /** * 报文长度 */ private int length; /** * 报文类型码 */ private byte type; /** * 报文优先级 */ private int priority; /** * 报文附件 */ private Map<String,Object> attachment = new HashMap<>(); public String getCrc() { return crc; } public void setCrc(String crc) {this.crc = crc;} public int getSessionId() { return sessionId; } public void setSessionId(int sessionId) { this.sessionId = sessionId; } public int getLength() { return length; } public void setLength(int length) { this.length = length; } public byte getType() { return type; } public void setType(byte type) { this.type = type; } public int getPriority() { return priority; } public void setPriority(int priority) { this.priority = priority; } public Map<String, Object> getAttachment() { return attachment; } public void setAttachment(Map<String, Object> attachment) { this.attachment = attachment; } @Override public String toString() { return "MessageHeader [crc=" + this.crc + ", length=" + this.length + ", sessionId=" + this.sessionId + ", type=" + this.type + ", priority=" + this.priority + ", attachment=" + this.attachment + "]"; } }
/** * @author andychen https://blog.51cto.com/14815984 * @description:数据报文类型 */ public enum MessageType { /** * 认证请求 */ AUTH_REQUEST((byte)0), /** * 认证应答 */ AUTH_REPLY((byte)1), /** * 心跳请求 */ HEARTBEAT_REQUEST((byte)2), /** * 心跳应答 */ HEARTBEAT_REPLY((byte)3), /** * 普通请求 */ REQUEST((byte)4), /** * 普通应答 */ REPLY((byte)5); public byte getValue() { return value; } private final byte value; MessageType(byte b) { this.value = b; }; }
二、客户端部分
/** * @author andychen https://blog.51cto.com/14815984 * @description:框架客户端启动器类 */ public class ClientStarter { /** * 日志处理 */ private static final Log log = LogFactory.getLog(ClientStarter.class); /** * 客户端启动 * @param args */ public static void main(String[] args) throws Exception { ClientTask client = new ClientTask(Constant.SERV_HOST, Constant.SERV_PORT); new Thread(client).start(); while (!client.isConnected()){ synchronized (client){ client.wait(); } } log.info("与服务器连接已建立,准备通信..."); /** * 采用在控制台适时输入消息主体的方式,发送报文 */ Scanner scanner = new Scanner(System.in); for (;;){ String body = scanner.next(); if(null != body && !"".equals(body)){ if(!body.equalsIgnoreCase("exit")){ client.send(body); }else{ client.close(); /** * 等待连接正常关闭通知 */ while (client.isConnected()){ synchronized(client){ client.wait(); } } scanner.close(); System.exit(0);//提示正常退出 } } } } }
/** * @author andychen https://blog.51cto.com/14815984 * @description:客户端封装 */ public class ClientTask implements Runnable { private final String host; private final int port; public ClientTask(String host, int port) { this.host = host; this.port = port; } /** * 日志处理 */ private static final Log log = LogFactory.getLog(ClientTask.class); /** * 报文计数器 */ public final static AtomicInteger counter = new AtomicInteger(0); /** * 这里用1个后台线程,定时执行检测客户端连接是否断开,若非用户断开则自动尝试重连 */ private final ScheduledExecutorService executor = Executors.newScheduledThreadPool(1); /** * 客户端连接通道 */ private Channel channel; /** * 工作线程池组 */ private EventLoopGroup group = new NioEventLoopGroup(); /** * 是否意外关闭:出异常或网络断开(区别于人为主动关闭) */ private volatile boolean except_closed = true; /** * 是否连接成功 */ private volatile boolean connected = false; private static Object _obj = new Object(); /** * 是否连接 * @return */ public boolean isConnected(){ return this.connected; } /** * 执行客户端 */ @Override public void run() { try { this.connect(); } catch (Exception e) { e.printStackTrace(); } } /** * 连接服务器端 */ public void connect() throws Exception{ try { Bootstrap bootstrap = new Bootstrap(); bootstrap.group(group) .channel(NioSocketChannel.class) //设置TCP底层保温发送不延迟 .option(ChannelOption.TCP_NODELAY, true) .handler(new ChannelInitializerExt()); //(1)异步连接服务器端,等待发送和接收报文 ChannelFuture future = bootstrap.connect(new InetSocketAddress(this.host, this.port)).sync(); this.channel = future.sync().channel(); //(2)通知其他等待线程,连接已建立 synchronized(this){ this.connected = true; this.notifyAll(); } this.channel.closeFuture().sync(); } finally { //检测并执行重连接 this.reconnect(); } } /** * 关闭连接:非正常关闭 */ public void close(){ this.except_closed = false; this.channel.close(); } /** * 发送报文 * @param body 报文主体 */ public void send(String body){ if(null != this.channel && this.channel.isActive()){ Message message = Utility.buildMessage(counter.incrementAndGet(), body, MessageType.REQUEST.getValue()); this.channel.writeAndFlush(message); return; } log.info("通信尚未建立,请稍后再试..."); } /** * 执行重连接 * @throws Exception */ private void reconnect() throws Exception{ //主动关闭被检测到 if(this.except_closed){ log.info("连接非正常关闭,准备尝试重连..."); this.executor.execute(new ReconnectTask(this.host, this.port)); }else{ //主动关闭连接:释放资源 this.relese(); } } /** * 关闭连接释放资源,通知其它等待的线程 * @throws Exception */ private void relese() throws Exception{ this.channel = null; this.group.shutdownGracefully().sync(); synchronized (this){ this.connected = false; this.notifyAll(); } } /** * 尝试重连服务器任务 */ private class ReconnectTask implements Runnable{ private final String h; private final int p; public ReconnectTask(String h, int p) { this.h = h; this.p = p; } /** * 尝试重连 */ @Override public void run() { try { //间隔1秒重试一次 Thread.sleep(1000); connect(); } catch (Exception e) { e.printStackTrace(); } } } }
/** * @author andychen https://blog.51cto.com/14815984 * @description:客户端通道初始化器扩展 */ public class ChannelInitializerExt extends ChannelInitializer<SocketChannel> { /** * 初始化通道 * @param channel 通道 * @throws Exception 异常 */ @Override protected void initChannel(SocketChannel channel) throws Exception { ChannelPipeline pipeline = channel.pipeline(); //(1)报文粘包处理 pipeline.addLast(new LengthFieldBasedFrameDecoder(Constant.MAX_MSG_LEN,0,2,0,2)); //(2)给报文增加分割长度 pipeline.addLast(new LengthFieldPrepender(2)); //(3)报文解码器 pipeline.addLast(new KryoDecoder()); //(4)报文编码器 pipeline.addLast(new KryoEncoder()); //(5)连接超时检测 pipeline.addLast(new ReadTimeoutHandler(Constant.TIMEOUT_SECONDS)); //(6)认证请求 pipeline.addLast(new AuthenticationHandler()); //(7)心跳处理:发送心跳 pipeline.addLast(new HeartbeatHandler()); } }
/** * @author andychen https://blog.51cto.com/14815984 * @description:客户端认证请求 */ public class AuthenticationHandler extends ChannelInboundHandlerAdapter { /** * 日志处理 */ private static final Log log = LogFactory.getLog(AuthenticationHandler.class); /** * 全局计数器 */ private final static AtomicInteger counter = new AtomicInteger(0); /** * 通道开启事件 * @param ctx 处理器上下文 * @throws Exception */ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { /** * 发起认证请求 */ Message message = Utility.buildMessage(counter.incrementAndGet(), "Auth Request", MessageType.AUTH_REQUEST.getValue()); ctx.writeAndFlush(message); } /** * 处理网络读取事件 * @param ctx 处理器上下文 * @param msg 报文 * @throws Exception */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { Message message = (Message)msg; if(null != message){ MessageHeader header = message.getHeader(); //处理认证应答 if(null != header && header.getType() == MessageType.AUTH_REPLY.getValue()){ String body = message.getBody().toString(); if(body.equals(AuthenticationResult.FAILED)){ log.info("Authentication failed, channel close.."); ctx.close(); return; } log.info("Authentication is ok: "+message); } } ctx.fireChannelRead(msg); } /** * 客户端认证异常处理 * @param ctx * @param cause * @throws Exception */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.fireExceptionCaught(cause); } }
/** * @author andychen https://blog.51cto.com/14815984 * @description:客户端心跳处理器 */ public class HeartbeatHandler extends ChannelInboundHandlerAdapter { /** * 日志处理 */ private static final Log log = LogFactory.getLog(AuthenticationHandler.class); /** * 心跳定时任务 */ private volatile ScheduledFuture<?> scheduleHeartbeat; /** * 处理客户端心跳请求报文 * @param ctx 处理器上下文 * @param msg 消息对象 * @throws Exception */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { Message message = (Message)msg; if(null != message){ MessageHeader header = message.getHeader(); //处理认证应答 if(header.getType() == MessageType.AUTH_REPLY.getValue()){ //登录完成后,开启客户端对服务端心跳 this.scheduleHeartbeat = ctx.executor().scheduleAtFixedRate(new HeartbeatTask(ctx), 0, Constant.HEARTBEAT_TIMEOUT, TimeUnit.MILLISECONDS); return; } //处理心跳应答 if(header.getType() == MessageType.HEARTBEAT_REPLY.getValue()){ log.info("Client recevied server heartbeat: "+message); ReferenceCountUtil.release(msg); return; } } ctx.fireChannelRead(msg); } /** * 客户端捕获心跳异常 * @param ctx 处理器上下文 * @param cause 异常 * @throws Exception */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); if(null != this.scheduleHeartbeat){ this.scheduleHeartbeat.cancel(true); this.scheduleHeartbeat = null; } //传递给TailHandler处理 ctx.fireExceptionCaught(cause); } /** * 定义心跳任务 */ private class HeartbeatTask implements Runnable{ /** * 心跳计数器 */ private final AtomicInteger counter = new AtomicInteger(0); private final ChannelHandlerContext ctx; public HeartbeatTask(ChannelHandlerContext ctx) { this.ctx = ctx; } /** * 心跳任务执行 */ @Override public void run() { //客户端心跳报文 Message heartbeat = Utility.buildMessage(this.counter.incrementAndGet(), Constant.HEARTBEAT_ACK, MessageType.HEARTBEAT_REQUEST.getValue()); this.ctx.writeAndFlush(heartbeat); } } }
三、服务器部分
/** * @author andychen https://blog.51cto.com/14815984 * @description:框架服务端启动器类 */ public class ServerStarter { /** * 日志处理 */ private static final Log log = LogFactory.getLog(ServerStarter.class); /** * 服务端启动 * @param args */ public static void main(String[] args) throws Exception { //主线程池组:负责处理连接 EventLoopGroup main = new NioEventLoopGroup(); //工作线程池组:负责请求对应的业务Handler处理 EventLoopGroup work = new NioEventLoopGroup(); ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(main, work) .channel(NioServerSocketChannel.class) //设置底层协议接收缓存队列最大长度 .option(ChannelOption.SO_BACKLOG, Constant.TCP_MAX_QUEUE_SIZE) .childHandler(new ChannelInitializerExt()); //绑定端口,等待同步报文 bootstrap.bind(Constant.SERV_PORT).sync(); log.info("Server started and listen port: "+Constant.SERV_PORT+"..."); } }
/** * @author andychen https://blog.51cto.com/14815984 * @description:服务器通道初始化器 */ public class ChannelInitializerExt extends ChannelInitializer<SocketChannel> { /** * 初始化处理器 * @param channel 连接通道 * @throws Exception 异常 */ @Override protected void initChannel(SocketChannel channel) throws Exception { ChannelPipeline pipeline = channel.pipeline(); //(1)日志打印处理:可以打印报文字节码 pipeline.addLast(new LoggingHandler(LogLevel.INFO)); //(2)处理粘包问题:带长度 pipeline.addLast(new LengthFieldBasedFrameDecoder(Constant.MAX_MSG_LEN,0,2,0,2)); //(3)报文编码器:消息发送增加分隔符 pipeline.addLast(new LengthFieldPrepender(2)); //(4)私有报文解码 pipeline.addLast(new KryoDecoder()); //(5)私有报文编码 pipeline.addLast(new KryoEncoder()); //(6)通道连接超时检测,发送心跳 pipeline.addLast(new ReadTimeoutHandler(Constant.TIMEOUT_SECONDS)); //(7)身份认证应答 pipeline.addLast(new AuthenticationHandler()); //(8)心跳应答 pipeline.addLast(new HeartbeatHandler()); //(9)其他业务处理... pipeline.addLast(new OtherServiceHandler()); } }
/** * @author andychen https://blog.51cto.com/14815984 * @description:服务端身份认证处理器 */ public class AuthenticationHandler extends ChannelInboundHandlerAdapter { /** * 日志处理 */ private static final Log log = LogFactory.getLog(AuthenticationHandler.class); /** * 定义认证业务计数器 */ private static final AtomicInteger counter = new AtomicInteger(0); /** * 缓存已认证ip列表 */ private static final List<String> authedIPList = new LinkedList<>(); /** * 认证业务处理 * @param ctx * @param msg * @throws Exception */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { Message message = (Message)msg; Message authMessage; if(null != message){ MessageHeader header = message.getHeader(); //处理认证请求 if(null != header && header.getType() == MessageType.AUTH_REQUEST.getValue()){ String ip = ctx.channel().remoteAddress().toString(); String result; //重复登录 if(authedIPList.contains(ip)){ result = AuthenticationResult.REPEAT_AUTH.toString(); }else{ //是否ip认证通过 if(Utility.isIPPassed(ip)){ authedIPList.add(ip); result = AuthenticationResult.SUCCESS.toString(); }else{ result = AuthenticationResult.FAILED.toString(); } } authMessage = Utility.buildMessage(counter.incrementAndGet(), result, MessageType.AUTH_REPLY.getValue()); ctx.writeAndFlush(authMessage); //释放对象,不再向后传递 ReferenceCountUtil.release(msg); log.info("Server reply client auth request:"+authMessage); return; } } ctx.fireChannelRead(msg); } /** * 认证处理器捕获异常处理 * @param ctx 处理器上下文 * @param cause 异常 * @throws Exception */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); authedIPList.remove(ctx.channel().remoteAddress().toString()); ctx.close(); ctx.fireExceptionCaught(cause); } }
/** * @author andychen https://blog.51cto.com/14815984 * @description:服务器端心跳包处理器 */ public class HeartbeatHandler extends ChannelInboundHandlerAdapter { /** * 日志处理 */ private static final Log log = LogFactory.getLog(HeartbeatHandler.class); /** * 会话计数器 */ private final AtomicInteger counter = new AtomicInteger(0); /** * 处理心跳报文 * @param ctx 处理器上下文 * @param msg 消息报文 * @throws Exception */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { Message message = (Message)msg; if(null != message){ MessageHeader header = message.getHeader(); /** * 处理心跳请求 */ if(null != header && header.getType() == MessageType.HEARTBEAT_REQUEST.getValue()){ log.info("Server recevied client heartbeat: "+message); //应答报文 Message heartbeat = Utility.buildMessage(counter.incrementAndGet(), Constant.HEARTBEAT_ACK, MessageType.HEARTBEAT_REPLY.getValue()); ctx.writeAndFlush(heartbeat); //引用计数器释放对象 ReferenceCountUtil.release(msg); return; } } ctx.fireChannelRead(msg); } }
/** * @author andychen https://blog.51cto.com/14815984 * @description:其它业务处理 */ public class OtherServiceHandler extends SimpleChannelInboundHandler<Message> { /** * 日志处理 */ private static final Log log = LogFactory.getLog(OtherServiceHandler.class); /** * 读取对端发送的报文 * @param ctx 处理器上下文 * @param message 报文 * @throws Exception */ @Override protected void channelRead0(ChannelHandlerContext ctx, Message message) throws Exception { log.info(message); } /** * 连接断开事件 * @param ctx 上下文 * @throws Exception */ @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { log.info("["+ctx.channel().remoteAddress()+"]断开了连接..."); } }
运行检验
开发小结
以上就是我们基于Netty实现的一整套通信应用框架的和核心代码。所有的业务开发都可定制和构建类似此的基础应用框架,开发Handler处理器的业务人员可任意嵌入被解耦化的业务领域Handler。可采用Handler自动注入Netty管道的方式零侵入框架,支持更多更复杂的业务!希望本文能给大家提供靠谱和一站式的借鉴。大家有任何关于Netty的问题可以在下方留言,谢谢关注!