用"IO多路复用",实现每秒百万并发的原理你懂吗?
多路复用其实并不是什么新技术,它的作用是在一个通讯连接的基础上可以同时进行多个请求响应处理。对于网络通讯来其实不存在这一说法,因为网络层面只负责数据传输;由于上层应用协议的制订问题,导致了很多传统服务并不能支持多路复用;如:http1.1,sqlserver和redis等等,虽然有些服务提供批量处理,但这些处理都基于一个RPS下。下面通过图解来了解释单路和多路复用的区别。
单路存在的问题
每个请求响应独占一个连接,并独占连接网络读写;这样导致连接在有大量时间被闲置无法更好地利用网络资源。由于是独占读写IO,这样导致RPS处理量由必须由IO承担,IO操作起来比较损耗性能,这样在高RPS处理就出现性能问题。由于不能有效的合并IO也会导致在通讯中的带宽存在浪费情况,特别对于比较小的请求数据包。通讯上的延时当要持大量的RPS那就必须要有更多连接支撑,连接数增加也对资源的开销有所增加。
多路复用的优点
多路复用可以在一个连接上同时处理多个请求响应,这样可以大大的减少连接的数量,并提高了网络的处理能力。由于是共享连接不同请求响应数据包可以合并到一个IO上处理,这样可以大大降低IO的处理量,让性能表现得更出色。
通过多路复用实现百万级RPS
多路复用是不是真的如此出色呢,以下在.net core上使用多路复用实现单服务百万RPS吞吐,并能达到比较低的延时性。以下是测试流程:
由于基础通讯不具备消息包合并功能,所以在BeetleX的基础上做集成测试,主要BeetleX会自动合并消息到一个Buffer上,从而降低IO的读写。
测试消息结构
本测试使用了Protobuf作为基础交互消息,毕竟Protobuf已经是一个二进制序列化标准了。
请求消息
-
[ProtoMember(1)]
-
public int ID { get; set; }
-
[ProtoMember(2)]
-
public Double RequestTime { get; set; }
响应消息
-
[ProtoMember(1)]
-
public int EmployeeID { get; set; }
-
[ProtoMember(2)]
-
public string LastName { get; set; }
-
[ProtoMember(3)]
-
public string FirstName { get; set; }
-
[ProtoMember(4)]
-
public string Address { get; set; }
-
[ProtoMember(5)]
-
public string City { get; set; }
-
[ProtoMember(6)]
-
public string Region { get; set; }
-
[ProtoMember(7)]
-
public string Country { get; set; }
-
[ProtoMember(8)]
-
public Double RequestTime { get; set; }
* 服务端处理代码*
-
public static void Response(Tuple<IServer, ISession, SearchEmployee> value){
-
Employee emp = Employee.GetEmployee();
-
emp.RequestTime = value.Item3.RequestTime;
-
value.Item1.Send(emp, value.Item2);
-
System.Threading.Interlocked.Increment(ref Count);
-
}
-
public override void SessionPacketDecodeCompleted(IServer server, PacketDecodeCompletedEventArgs e) {
-
SearchEmployee emp = (SearchEmployee)e.Message;
-
multiThreadDispatcher.Enqueue(new Tuple<IServer, ISession, SearchEmployee>(server, e.Session, emp));
-
}
服务响应对象内容
-
Employee result = new Employee();
-
result.EmployeeID = 1;
-
result.LastName = "Davolio";
-
result.FirstName = "Nancy";
-
result.Address = "ja";
-
result.City = "Seattle";
-
result.Region = "WA";
-
result.Country = "USA";
接收消息后放入队列,然后由队列处理响应,设置请求相应请求时间并记录总处理消息计数。
客户端请求代码
-
private static void Response(Tuple<AsyncTcpClient, Employee> data){
-
System.Threading.Interlocked.Increment(ref mCount);
-
if (mCount > 100){
-
if (data.Item2.RequestTime > 0){
-
double tick = mWatch.Elapsed.TotalMilliseconds - data.Item2.RequestTime;
-
AddToLevel(tick);
-
}
-
}
-
var s = new SearchEmployee();
-
s.RequestTime = mWatch.Elapsed.TotalMilliseconds;
-
data.Item1.Send(s);
-
}
客户端测试发起代码
-
for (int i = 0; i < mConnections; i++){
-
var client = SocketFactory.CreateClient<BeetleX.Clients.AsyncTcpClient, TestMessages.ProtobufClientPacket>(mIPAddress, 9090);
-
client.ReceivePacket = (o, e) =>{
-
Employee emp = (Employee)e;
-
multiThreadDispatcher.Enqueue(new Tuple<AsyncTcpClient, Employee>((AsyncTcpClient)o, emp));
-
};
-
client.ClientError = (o, e) =>{
-
Console.WriteLine(e.Message);
-
};
-
mClients.Add(client);
-
}
-
for (int i = 0; i < 200; i++){
-
foreach (var item in mClients){
-
SearchEmployee search = new SearchEmployee();
-
Task.Run(() => { item.Send(search); });
-
}
-
}
整个测试开启了10个连接,在这10个连接的基础上进行请求响应复用。
测试配置
测试环境是两台服务器,配置是阿里云上的12核服务器(对应的物理机应该是6核12线程)
服务和客户端的系统都是:Ubuntu 16.04
Dotnet core版本是:2.14
测试结果
客户端统计结果
服务端统计信息
带宽统计
测试代码
https://github.com/IKende/BeetleX/blob/master/samples/MultiplexingConnectionTest.zip
文章来源:作者 | 泥水佬 来源 | my.oschina.net/ikende/blog/2250622