Apache Mina2.x网络通信框架使用入门
-
开发服务器和客户端网络应用
使用Mina开发,客户端连接器和服务端接收器有更多的相似之处,Mina在API设计的时候使用了更高的抽象如:IoService,对于创建服务器端接收器我们将关注IoAccepor,而客户端连接器则是IoConnector.
下面图1是关于Mina的基本使用的描述:
图1
这里对图1做简单的说明:
图1由大的三部分组成,通过颜色就很容易区分和理解器表示的意思。对于服务器(Server)和客户端(Client)而言它们都需要中间最大一块的组成部分,其中包含了配置(Configure),会话数据工厂(SessionDataFactory),过滤器链(FilterChina,由多个过滤器组成),监听器组(有多个Listener组成),处理器(IoHandler);第三部分则可以看出服务器对应的是绑定(bind),客户端对应的是连接(connect)由此区分了服务器和客户端。
说了这么多,就中间部分而言,Mina框架最大程度的解放了开发过程要进行的会话管理,会话数据管理,服务监听处理,过滤器,服务配置等的实现,其都提供了默认实现,当然可以根据使用情况实现对应的接口来自行处理。
2.过滤器
Mina中的过滤器的顶层接口是IoFilter,其自身提供了多种过滤器,比如:LoggingFilter,ExecutorFilter,BlacklistFilter(请求地址黑名单过滤),SSLFilter,编码和解密过滤器等等(更多参考API http://mina.apache.org/mina-project/apidocs/index.html)。
过滤器是Mina框架中极其重要和有价值的部分,其提供了日志,安全,权限控制,统计等过滤器,并且其是可插拔的,可以通过不同的过滤器以不同的顺序组成过滤器链将实现不同的功能。另外可以通过实现IoFilter接口或者继承IoFilterAdapter来创建更多具体业务中需要的IoFilter,当然这么做之前,可以仔细看看Mina的org.apache.mina.filter包下是否已经提供了实现。
3.编码和解码
编码和解码作为网络应用开发必须面对的问题,而Mina作为全功能的网络通讯框架,实现对数据报文的编码和解码自然是其分内之事,具体使用者可更多关注IoHandler,即具体处理接收和发送报文的相关业务。
在org.apache.mina.filter.codec包下有更多的关于编码和解码的实现。
关于编码其方式有很多种,比如Protobuf,serialization(对象序列化),JSON,BASE64等,而解码则涉及到字节流分割的问题,下图2是三种常用的字节流分割的方式:
上面三种方式中2和3在Mina中都有对应的实现,比如3特殊字符结尾标记对应的实现有TextLineEncoder和TextLineDecoder,两者组成了TextLineCodecFactory; 2固定字节的head表示数据字节数有PrefixedStringEncoder和PrefixedStringDecoder,两者组成了PrefixedStringCodecFactory。
第一种固定长度字节数这种主要应用在传输命令的场景中,其传输的字节数是固定,应用中可以自己根据具体情况来实现对应的编码和解码类。
4.一个具体案例来贯穿全文
本案例通过客户端发送短信信息到服务器,然后服务器将其短信信息的发送者和接受者对调,短信内容设置"OK",发回给客户端。
4.1定义短信格式(protobuf):
1
2
3
4
5
6
7
8
9
10
11
|
package secondriver.mina.bo.protobuf; option java_package = "secondriver.mina.bo.protobuf"; option java_outer_classname = "SmsDataProtocal"; message Sms { required string protocol = 1;
required string sender = 2;
required string receiver = 3;
required string content = 4;
} |
使用protoc命令将定义个消息生成Java类(使用方式可以参考:)。
4.2编写Sms对象的编码和解密类,这里我们直接编写编码解密工程类,其由编码和解密类组合而成。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
|
package secondriver.mina.bo.protobuf;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import org.apache.mina.core.buffer.IoBuffer;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.CumulativeProtocolDecoder;
import org.apache.mina.filter.codec.ProtocolCodecFactory;
import org.apache.mina.filter.codec.ProtocolDecoder;
import org.apache.mina.filter.codec.ProtocolDecoderOutput;
import org.apache.mina.filter.codec.ProtocolEncoder;
import org.apache.mina.filter.codec.ProtocolEncoderAdapter;
import org.apache.mina.filter.codec.ProtocolEncoderOutput;
import com.google.protobuf.ByteString;
import secondriver.mina.bo.protobuf.SmsDataProtocal.Sms;
public class SmsDataCodecFactory implements ProtocolCodecFactory {
private final Charset charset = StandardCharsets.UTF_8;
private int prefixLength = 4 ;
private int maxDataLength = 1024 ;
@Override
public ProtocolEncoder getEncoder(IoSession session) throws Exception {
return new ProtocolEncoderAdapter() {
@Override
public void encode(IoSession session, Object message,
ProtocolEncoderOutput out) throws Exception {
Sms sms = (Sms) message;
String content = sms.toByteString().toStringUtf8();
IoBuffer buffer = IoBuffer.allocate(content.length())
.setAutoExpand( true );
buffer.putPrefixedString(content, prefixLength,
charset.newEncoder());
if (buffer.position() > maxDataLength) {
throw new IllegalArgumentException( "Data length: "
+ buffer.position());
}
buffer.flip();
out.write(buffer);
}
};
}
@Override
public ProtocolDecoder getDecoder(IoSession session) throws Exception {
return new CumulativeProtocolDecoder() {
@Override
protected boolean doDecode(IoSession session, IoBuffer in,
ProtocolDecoderOutput out) throws Exception {
if (in.prefixedDataAvailable(prefixLength, maxDataLength)) {
String msg = in.getPrefixedString(prefixLength,
charset.newDecoder());
Sms sms = Sms.parseFrom(ByteString.copyFrom(msg,
charset.name()));
out.write(sms);
return true ;
}
return false ;
}
};
}
} |
4.3参见文中1端来写服务端
创建IoAccptor对象->设置过滤器->设置IoHandler->配置->绑定到指定IP和端口
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
|
package secondriver.mina.server;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.concurrent.Executors;
import org.apache.mina.core.filterchain.DefaultIoFilterChainBuilder;
import org.apache.mina.core.service.IoAcceptor;
import org.apache.mina.core.service.IoHandlerAdapter;
import org.apache.mina.core.session.IdleStatus;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.ProtocolCodecFilter;
import org.apache.mina.filter.executor.ExecutorFilter;
import org.apache.mina.filter.logging.LogLevel;
import org.apache.mina.filter.logging.LoggingFilter;
import org.apache.mina.transport.socket.nio.NioSocketAcceptor;
import secondriver.mina.bo.protobuf.SmsDataCodecFactory;
import secondriver.mina.bo.protobuf.SmsDataProtocal.Sms;
public class SmsServer {
public static final int PORT = 9001 ;
public static void main(String[] args) throws IOException {
// 接收器
IoAcceptor acceptor = new NioSocketAcceptor();
// 过滤器链
DefaultIoFilterChainBuilder builder = new DefaultIoFilterChainBuilder();
LoggingFilter loggingFilter = new LoggingFilter();
loggingFilter.setExceptionCaughtLogLevel(LogLevel.DEBUG);
builder.addLast( "logging" , loggingFilter);
builder.addLast( "codec" , new ProtocolCodecFilter(
new SmsDataCodecFactory()));
builder.addLast( "threadPool" ,
new ExecutorFilter(Executors.newCachedThreadPool()));
acceptor.setFilterChainBuilder(builder);
// 设置处理器IoHandler
acceptor.setHandler( new IoHandlerAdapter() {
@Override
public void messageReceived(IoSession session, Object message)
throws Exception {
Sms sms = (Sms) message;
System.out.println( "客户端发来:" );
System.out.println(sms.toString());
// 服务器发送
Sms serverSms = Sms.newBuilder().setProtocol(sms.getProtocol())
.setContent( "OK" ).setReceiver(sms.getSender())
.setSender(sms.getSender()).build();
session.write(serverSms);
}
});
// 配置服务器(IoAccptor)
acceptor.getSessionConfig().setReadBufferSize( 2048 );
acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 10 );
// 绑定到指定IP和端口
acceptor.bind( new InetSocketAddress(PORT));
}
} |
4.4 参见文中1端编写客户端
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
|
package secondriver.mina.client;
import java.net.InetSocketAddress;
import java.util.Scanner;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.mina.core.RuntimeIoException;
import org.apache.mina.core.future.ConnectFuture;
import org.apache.mina.core.service.IoConnector;
import org.apache.mina.core.service.IoHandlerAdapter;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.ProtocolCodecFilter;
import org.apache.mina.filter.executor.ExecutorFilter;
import org.apache.mina.transport.socket.nio.NioSocketConnector;
import secondriver.mina.bo.protobuf.SmsDataCodecFactory;
import secondriver.mina.bo.protobuf.SmsDataProtocal.Sms;
public class SmsClient {
private static InetSocketAddress server = new InetSocketAddress(
"127.0.0.1" , 9001 );
public static void main(String[] args) throws InterruptedException {
// 客户端连接器
IoConnector connector = new NioSocketConnector();
// 过滤器
connector.getFilterChain().addLast( "codec" ,
new ProtocolCodecFilter( new SmsDataCodecFactory()));
connector.getFilterChain().addLast( "threadPool" ,
new ExecutorFilter(Executors.newCachedThreadPool()));
// 处理器
connector.setHandler( new IoHandlerAdapter() {
@Override
public void sessionCreated(IoSession session) throws Exception {
}
@Override
public void messageReceived(IoSession session, Object message)
throws Exception {
System.out.println( "服务器响应:" );
System.out.println(((Sms) message).toString());
}
});
// 建立会话Session
IoSession session = null ;
while ( true ) {
try {
ConnectFuture future = connector.connect(server);
future.awaitUninterruptibly( 100 , TimeUnit.SECONDS);
session = future.getSession();
if ( null != session) {
break ;
}
} catch (RuntimeIoException e) {
System.err.println( "Failed to connect with "
+ server.toString());
e.printStackTrace();
try {
Thread.sleep( 5000 );
} catch (InterruptedException e1) {
e1.printStackTrace();
}
}
}
// 客户端输入
try (Scanner scanner = new Scanner(System.in);) {
while ( true ) {
String sender = "1814453211" ;
System.out.println( "请输入收信息手机号:" );
String receiver = scanner.nextLine();
System.out.println( "请输入信息内容:" );
String content = scanner.nextLine();
Sms sms = Sms.newBuilder()
.setProtocol( "ip.weixin.com TC-C/2.0" )
.setSender(sender).setReceiver(receiver)
.setContent(content).build();
session.write(sms);
Thread.sleep( 10000 );
System.out.println( "是否继续,回车继续 , q or quit 退出:" );
String line = scanner.nextLine();
if (line.trim().equalsIgnoreCase( "q" )
|| line.trim().equalsIgnoreCase( "quit" )) {
break ;
}
}
}
session.close( false );
connector.dispose();
}
} |
4.5 启动服务,启动客户端
图3是运行的结果:
图3
说明:上面客户端和服务器端的关于IoHandler直接使用了匿名类的方式对数据的接收做了相应的简单处理。Sms对象转换成UTF-8编码的字符串,采用了3端中编码和解密的第2中方式,并且传输的数据最大长度为1024byte(1k)。另外,Potobuf-java和Mina集成,mina3.x提供了对protobuf定义的消息的编码和解码提供了实现支持。
为了需要更多关注Mina3.x,另外Netty的发展势头正旺,netty有种子承父业的感觉,也值得拥有!
本文转自 secondriver 51CTO博客,原文链接:http://blog.51cto.com/aiilive/1589561,如需转载请自行联系原作者