分身有术:异步和多路复用
文章目录
异步客户端
在Unity中,执行Start、Update方法的线程是主线程,更新UI也是主线程。将同步代码,改成异步代码,需要分别在原名称前面加上Begin和End(如BeginConnect和EndConnect)。
异步Connect
Public IAsyncResult BeginConnect{
string host,
int post,
AsyncCallback requestCallback,
object state
}
public void EndConnect{
IAsyncResult asyncResult
}
其中requestCallback是回调函数,形式是void ConnectCallback(IAsyncResult ar)
,因为EndConnect
必定是要在BeginConnect
的回调函数中写入的,才能实现对接受完毕的处理,而EndConnect
需要IAsyncResult
类型的参数,所以需要与回调函数的参数保持一致,而ConnectCallback
的参数又是从哪里来的呢,从第四个参数state
来,这是一个用户自定义的量socket
,在回调函数中可以通过ar.AsyncState
得到。
异步Receive
这部分是类似的
public IAsyncResult BeginReceive{
byte[] buffer,
int offset,
int size,
SocketFlags socketFlags,
AsyncCallback callback,
object state
}
public int EndReceive{
IAsyncResult asyncResult
}
异步Send
这部分也类似
public IAsyncResult BeginSend{
byte[] buffer,
int offset,
int size,
SocketFlags socketFlags,
AsyncCallback callback,
object state
}
public int EndSend{
IAsyncResult asyncResult
}
异步客户端代码
using System.Collections;
using System.Collections.Generic;
using UnityEngine;
using System.Net.Sockets;
using UnityEngine.UI;
using System;
public class Echo : MonoBehaviour {
//定义套接字
Socket socket;
//UGUI
public InputField InputFeld;
public Text text;
//接收缓冲区
byte[] readBuff = new byte[1024];
string recvStr = "";
//点击连接按钮
public void Connetion()
{
//Socket
socket = new Socket(AddressFamily.InterNetwork,
SocketType.Stream, ProtocolType.Tcp);
//Connect
socket.Connect("127.0.0.1", 8888);
//socket.BeginConnect("127.0.0.1", 8888, ConnectCallback, socket);
}
//Connect回调
public void ConnectCallback(IAsyncResult ar){
try{
Socket socket = (Socket) ar.AsyncState;
socket.EndConnect(ar);
Debug.Log("Socket Connect Succ ");
socket.BeginReceive( readBuff, 0, 1024, 0,
ReceiveCallback, socket);
}
catch (SocketException ex){
Debug.Log("Socket Connect fail" + ex.ToString());
}
}
//Receive回调
public void ReceiveCallback(IAsyncResult ar){
try {
Socket socket = (Socket) ar.AsyncState;
int count = socket.EndReceive(ar);
string s = System.Text.Encoding.UTF8.GetString(readBuff, 0, count);
recvStr = s + "\n" + recvStr;
socket.BeginReceive( readBuff, 0, 1024, 0,
ReceiveCallback, socket);
}
catch (SocketException ex){
Debug.Log("Socket Receive fail" + ex.ToString());
}
}
//点击发送按钮
public void Send()
{
//Send
string sendStr = InputFeld.text;
byte[] sendBytes = System.Text.Encoding.Default.GetBytes(sendStr);
socket.BeginSend(sendBytes, 0, sendBytes.Length, 0, SendCallback, socket);
}
//Send回调
public void SendCallback(IAsyncResult ar){
try {
Socket socket = (Socket) ar.AsyncState;
int count = socket.EndSend(ar);
Debug.Log("Socket Send succ " + count);
}
catch (SocketException ex){
Debug.Log("Socket Send fail" + ex.ToString());
}
}
public void Update(){
text.text = recvStr;
}
}
这只是将第一章的同步代码改成了异步代码,虽然实现了异步的功能,但是还有很多问题。
异步服务端
为了管理客户端信息,需要引入一个类记录客户端的状态,以后还有后续的信息也会加到这个里面,比如hp值,位置信息之类的
class ClientState{
public Socket socket;
public byte[] readBuff = new byte[1024];
}
同时服务端需要一个数据结构存储客户端信息,采用Dictionary<Socket,ClientState>
异步Accept
服务端的异步处理类似于客户端的异步处理
public IAsyncResult BeginAccept{
AysncResult callback,
object state
}
public Socket EndAccept{
IAsyncResult asyncResult
}
服务端异步代码
using System;
using System.Net;
using System.Net.Sockets;
using System.Collections.Generic;
class ClientState
{
public Socket socket;
public byte[] readBuff = new byte[1024];
}
class MainClass
{
//监听Socket
static Socket listenfd;
//客户端Socket及状态信息
static Dictionary<Socket, ClientState> clients = new Dictionary<Socket, ClientState>();
public static void Main (string[] args)
{
//Socket
listenfd = new Socket(AddressFamily.InterNetwork,
SocketType.Stream, ProtocolType.Tcp);
//Bind
IPAddress ipAdr = IPAddress.Parse("127.0.0.1");
IPEndPoint ipEp = new IPEndPoint(ipAdr, 8888);
listenfd.Bind(ipEp);
//Listen
listenfd.Listen(0);
Console.WriteLine("[服务器]启动成功");
//Accept
listenfd.BeginAccept(AcceptCallback,listenfd);
//等待
Console.ReadLine();
}
}
//Accept回调
public static void AcceptCallback(IAsyncResult ar){
try
{
Console.WriteLine("[服务器]Accept");
Socket listenfd = (Socket) ar.AsyncState;
Socket clientfd = listenfd.EndAccept(ar);
//clients列表
ClientState state = new ClientState();
state.socket = clientfd;
clients.Add(clientfd,state);
//接收数据BeginReceive
clientfd.BeginReceive(state.readBuff,0,1024,0,ReceiveCallback,state);
//继续Accept
listenfd.BeginAccept(AcceptCallback,listenfd);
}
catch (SocketException ex)
{
Console.WriteLine("Socket Accept fail" + ex.ToString());
}
}
//Receive回调
public static void ReceiveCallback(IAsyncResult ar){
try{
ClientState state = (ClientState) ar.AsyncState;
Socket clientfd = state.socket;
int count = clientfd.EndReceive(ar);
//客户端关闭
if(count == 0){
clientfd.Close();
clients.Remove(clientfd);
Console.WriteLine("Socket Close");
return;
}
string recvStr = System.Text.Encoding.Default.GetString(state.readBuff,0,count);
byte[] sendBytes = System.Text.Encoding.Default.GetBytes("echo" + recvStr);
socket.BeginSend(sendBytes, 0, sendBytes.Length, 0, SendCallback, socket);
clientfd.BeginReceive(state.readBuff,0,1024,0,ReceiveCallback,state);
}
}
catch(SocketException ex){
Console.WriteLine("Socket Receive fail" + ex.ToString());
}
//Send回调
public static void SendCallback(IAsyncResult ar){
try {
Socket socket = (Socket) ar.AsyncState;
int count = socket.EndSend(ar);
Debug.Log("Socket Send succ " + count);
}
catch (SocketException ex){
Debug.Log("Socket Send fail" + ex.ToString());
}
}
}
书上为了节省代码量,省略了Send
的异步操作,我为了完整性,加上了这部分代码,剩下对程序的几点说明。
从这里开始服务端的逻辑有点复杂了,
对比第一章的逻辑
异步的逻辑如下
稍微解释一下这个图的意思,一开始是BeginAccept
,这个自然会触发回调函数AcceptCallback
,然后EndAccept
,接着处理接受逻辑,在该函数内部调用BeginReceive
,然后再调用BeginAccept
,与之前不同的是,之前是在一个循环内部,执行完Send
之后,程序就会从头自动再走一遍流程,而异步的没有用那么明显的循环,而是利用回调函数构成一个循环,BeginReceive
也是类似的逻辑,在ReceiveCallback
内部,先EndReceive
再BeginSend
再回到BeginReceive
.
再提一下SendCallback
,里面EndSend
之后为什么不接着BeginSend
,这部分涉及到缓存部分,主要看要实现的功能,如果多一个循环,就会一直send
,而直接终止就会Receive
之后只Send
一次。
状态检测Poll
poll方法就是在同步的基础上,加上一层判断,这样既可以不阻塞线程,又可以使用同步的代码(毕竟异步的代码逻辑有点复杂)。
poll客户端
public bool Poll{
int microSeconds,
SelectMode mode
}
microSeconds的说明
- 0,表示非阻塞
- -1,表示一直阻塞
mode
的说明
-
SelectRead
:Socket
可读返回true
,否则false
-
SelectWrite
:Socket
可写返回true
,否则false
-
SelectError
:Socket
连接失败返回true
,否则false
所以用Poll的方法改写第一章的代码
using System.Collections;
using System.Collections.Generic;
using UnityEngine;
using System.Net.Sockets;
using UnityEngine.UI;
public class Echo : MonoBehaviour {
//定义套接字
Socket socket;
//UGUI
public InputField InputFeld;
public Text text;
//点击连接按钮
public void Connetion()
{
//Socket
socket = new Socket(AddressFamily.InterNetwork,
SocketType.Stream, ProtocolType.Tcp);
//Connect
socket.Connect("127.0.0.1", 8888);
}
//点击发送按钮
public void Send()
{
//Send
if(socket.Poll(0,SelectMode.SelectWrite)){
string sendStr = InputFeld.text;
byte[] sendBytes = System.Text.Encoding.Default.GetBytes(sendStr);
socket.Send(sendBytes);
}
}
public void Update(){
if(socket == null){
return;
}
if(socket.Poll(0,SelectMode.SelectRead)){
byte[] readBuff = new byte[1024];
int count = socket.Receive(readBuff);
string recvStr = System.Text.Encoding.Default.GetString(readBuff,0,count);
text.text = recvStr;
}
}
}
感觉有点不好的是,Send
是主动触发的,Receive
是被动触发的,为了不阻塞线性,要放在Update
里面判断,实际上有点浪费资源
poll服务端
using System;
using System.Net;
using System.Net.Sockets;
using System.Collections.Generic;
class ClientState
{
public Socket socket;
public byte[] readBuff = new byte[1024];
}
class MainClass
{
//监听Socket
static Socket listenfd;
//客户端Socket及状态信息
static Dictionary<Socket, ClientState> clients = new Dictionary<Socket, ClientState>();
public static void Main (string[] args)
{
//Socket
listenfd = new Socket(AddressFamily.InterNetwork,
SocketType.Stream, ProtocolType.Tcp);
//Bind
IPAddress ipAdr = IPAddress.Parse("127.0.0.1");
IPEndPoint ipEp = new IPEndPoint(ipAdr, 8888);
listenfd.Bind(ipEp);
//Listen
listenfd.Listen(0);
Console.WriteLine("[服务器]启动成功");
//主循环
while(true){
//检查listenfd
if(listenfd.Poll(0, SelectMode.SelectRead)){
ReadListenfd(listenfd);
}
//检查clientfd
foreach (ClientState s in clients.Values){
Socket clientfd = s.socket;
if(clientfd.Poll(0, SelectMode.SelectRead)){
if(!ReadClientfd(clientfd)){
break;
}
}
}
//防止cpu占用过高
System.Threading.Thread.Sleep(1);
}
}
//读取Listenfd
public static void ReadListenfd(Socket listenfd){
Console.WriteLine("Accept");
Socket clientfd = listenfd.Accept();
ClientState state = new ClientState();
state.socket = clientfd;
clients.Add(clientfd, state);
}
//读取Clientfd
public static bool ReadClientfd(Socket clientfd){
ClientState state = clients[clientfd];
int count = clientfd.Receive(state.readBuff);
//客户端关闭
if(count == 0){
clientfd.Close();
clients.Remove(clientfd);
Console.WriteLine("Socket Close");
return false;
}
//广播
string recvStr = System.Text.Encoding.Default.GetString(state.readBuff, 0, count);
Console.WriteLine("Receive " + recvStr);
string sendStr = clientfd.RemoteEndPoint.ToString() + ":" + recvStr;
byte[] sendBytes = System.Text.Encoding.Default.GetBytes(sendStr);
foreach (ClientState cs in clients.Values){
cs.socket.Send(sendBytes);
}
return true;
}
}
和客户端一样,要放在循环里面一直判断。关于poll服务端的几点说明。主要实现的功能是接受多个客户端的消息,并转发到多个客户端。其中listenfd
是服务端用来监听的socket
,而clientfd
是再监听到客户端之后,服务端为该客户端生成的socket
,可以看到ReadListenfd
函数内部有一句Socket client = listenfd.Accept();
,表现了这种关系,所以这两个socket
要区分开来。由于外层加了poll
判断,所以内部用的都是同步算法。其实可以这么想和第一章的区别,就拿客户端的Receive
函数举例子,第一章的时候,当执行到Receive
时候,如果没有数据发过来,那么程序就处于阻塞状态了,这时候是没法做其他操作的,而在poll
中写,没有数据发过来,就不会处理里面的Receive
函数,所以也就不会阻塞,但是由于poll
在循环中一直判断能否执行内部的函数,所以会消耗过多的cpu资源。
多路复用Select(重点)
Select
相当于是Poll
高阶版本,本质上并没有解决Update
的问题,但是是同时处理多路信号,而不是一个一个处理,所以可以有效的解决CPU占用率过高的问题。对比之前的Poll
程序,clients
中的每一个用户,都要进行poll
判断,而Select
是直接对用户列表判断,并且修改列表,仅使满足条件的socket
留下来,如果列表中没有socket
那么程序阻塞,不占用CPU资源。
public static void Select{
IList checkRead,
IList checkWrite,
IList checkError,
int microSecond
}
类似Poll
Select客户端
using System.Collections;
using System.Collections.Generic;
using UnityEngine;
using System.Net.Sockets;
using UnityEngine.UI;
public class Echo : MonoBehaviour {
//定义套接字
Socket socket;
//UGUI
public InputField InputFeld;
public Text text;
List<Socket> checkRead = new List<Socket>();
//点击连接按钮
public void Connetion()
{
//Socket
socket = new Socket(AddressFamily.InterNetwork,
SocketType.Stream, ProtocolType.Tcp);
//Connect
socket.Connect("127.0.0.1", 8888);
}
//点击发送按钮
public void Send()
{
//Send
if(socket.Poll(0,SelectMode.SelectWrite)){
string sendStr = InputFeld.text;
byte[] sendBytes = System.Text.Encoding.Default.GetBytes(sendStr);
socket.Send(sendBytes);
}
}
public void Update(){
if(socket == null){
return;
}
//填充checkRead列表
checkRead.Clear();
checkRead.Add(socket);
//select
Socket.Select(checkRead,null,null,0);
//check
foreach (Socket s in checkRead)
{
byte[] readBuff = new byte[1024];
int count = socket.Receive(readBuff);
string recvStr = System.Text.Encoding.Default.GetString(readBuff,0,count);
text.text = recvStr;
}
}
}
实际上,客户端只有一个socket
,该代码只是用来展示写法,实际开发客户端采用异步的方法。
Select服务端
using System;
using System.Net;
using System.Net.Sockets;
using System.Collections.Generic;
class ClientState
{
public Socket socket;
public byte[] readBuff = new byte[1024];
}
class MainClass
{
//监听Socket
static Socket listenfd;
//客户端Socket及状态信息
static Dictionary<Socket, ClientState> clients = new Dictionary<Socket, ClientState>();
public static void Main (string[] args)
{
//Socket
listenfd = new Socket(AddressFamily.InterNetwork,
SocketType.Stream, ProtocolType.Tcp);
//Bind
IPAddress ipAdr = IPAddress.Parse("127.0.0.1");
IPEndPoint ipEp = new IPEndPoint(ipAdr, 8888);
listenfd.Bind(ipEp);
//Listen
listenfd.Listen(0);
Console.WriteLine("[服务器]启动成功");
//checkRead
List<Socket> checkRead = new List<Socket>();
//主循环
while(true){
//填充checkRead列表
checkRead.Clear();
checkRead.Add(listenfd);
foreach (ClientState s in clients.Values){
checkRead.Add(s.socket);
}
}
//select
Socket.Select(checkRead,null,null,1000);
//检查可读对象
foreach(Socket s in checkRead){
if(s == listenfd){
ReadListenfd(s);
}
else{
ReadClientfd(s);
}
}
}
//读取Listenfd
public static void ReadListenfd(Socket listenfd){
Console.WriteLine("Accept");
Socket clientfd = listenfd.Accept();
ClientState state = new ClientState();
state.socket = clientfd;
clients.Add(clientfd, state);
}
//读取Clientfd
public static bool ReadClientfd(Socket clientfd){
ClientState state = clients[clientfd];
int count = clientfd.Receive(state.readBuff);
//客户端关闭
if(count == 0){
clientfd.Close();
clients.Remove(clientfd);
Console.WriteLine("Socket Close");
return false;
}
//广播
string recvStr = System.Text.Encoding.Default.GetString(state.readBuff, 0, count);
Console.WriteLine("Receive " + recvStr);
string sendStr = clientfd.RemoteEndPoint.ToString() + ":" + recvStr;
byte[] sendBytes = System.Text.Encoding.Default.GetBytes(sendStr);
foreach (ClientState cs in clients.Values){
cs.socket.Send(sendBytes);
}
return true;
}
}
这里是把listenfd
也放到clients
,实际上本身并不是client
,所以才会有最后的if-else
判断,后续的开发服务端也采用Select
方式.