分身有术:异步和多路复用

异步客户端

在Unity中,执行Start、Update方法的线程是主线程,更新UI也是主线程。将同步代码,改成异步代码,需要分别在原名称前面加上Begin和End(如BeginConnect和EndConnect)。

异步Connect

Public IAsyncResult BeginConnect{
	string host,
	int post,
	AsyncCallback requestCallback,
	object state
}
public void EndConnect{
	IAsyncResult asyncResult
}

其中requestCallback是回调函数,形式是void ConnectCallback(IAsyncResult ar),因为EndConnect必定是要在BeginConnect的回调函数中写入的,才能实现对接受完毕的处理,而EndConnect需要IAsyncResult类型的参数,所以需要与回调函数的参数保持一致,而ConnectCallback的参数又是从哪里来的呢,从第四个参数state来,这是一个用户自定义的量socket,在回调函数中可以通过ar.AsyncState得到。

异步Receive

这部分是类似的

public IAsyncResult BeginReceive{
	byte[] buffer,
	int offset,
	int size,
	SocketFlags socketFlags,
	AsyncCallback callback,
	object state
}
public int EndReceive{
	IAsyncResult asyncResult
}

异步Send

这部分也类似

public IAsyncResult BeginSend{
	byte[] buffer,
	int offset,
	int size,
	SocketFlags socketFlags,
	AsyncCallback callback,
	object state
}
public int EndSend{
	IAsyncResult asyncResult
}

异步客户端代码

using System.Collections;
using System.Collections.Generic;
using UnityEngine;
using System.Net.Sockets;
using UnityEngine.UI;
using System;

public class Echo : MonoBehaviour {

	//定义套接字
	Socket socket;
	//UGUI
	public InputField InputFeld;
	public Text text;
	//接收缓冲区
	byte[] readBuff = new byte[1024]; 
	string recvStr = "";
	//点击连接按钮
	public void Connetion()
	{
		//Socket
		socket = new Socket(AddressFamily.InterNetwork,
			SocketType.Stream, ProtocolType.Tcp);
		//Connect
		socket.Connect("127.0.0.1", 8888);
		//socket.BeginConnect("127.0.0.1", 8888, ConnectCallback, socket);
	}

	//Connect回调
	public void ConnectCallback(IAsyncResult ar){
		try{
			Socket socket = (Socket) ar.AsyncState;
			socket.EndConnect(ar);
			Debug.Log("Socket Connect Succ ");
			socket.BeginReceive( readBuff, 0, 1024, 0,
				ReceiveCallback, socket);
		}
		catch (SocketException ex){
			Debug.Log("Socket Connect fail" + ex.ToString());
		}
	}

	//Receive回调
	public void ReceiveCallback(IAsyncResult ar){
		try {
			Socket socket = (Socket) ar.AsyncState;
			int count = socket.EndReceive(ar);
			string s = System.Text.Encoding.UTF8.GetString(readBuff, 0, count);
			recvStr = s + "\n" + recvStr;

			socket.BeginReceive( readBuff, 0, 1024, 0,
				ReceiveCallback, socket);
		}
		catch (SocketException ex){
			Debug.Log("Socket Receive fail" + ex.ToString());
		}
	}

	//点击发送按钮
	public void Send()
	{
		//Send
		string sendStr = InputFeld.text;
		byte[] sendBytes = System.Text.Encoding.Default.GetBytes(sendStr);
		socket.BeginSend(sendBytes, 0, sendBytes.Length, 0, SendCallback, socket);
	}

	//Send回调
	public void SendCallback(IAsyncResult ar){
		try {
			Socket socket = (Socket) ar.AsyncState;
			int count = socket.EndSend(ar);
			Debug.Log("Socket Send succ " + count);
		}
		catch (SocketException ex){
			Debug.Log("Socket Send fail" + ex.ToString());
		}

	}


	public void Update(){
		text.text = recvStr;
	}
}

这只是将第一章的同步代码改成了异步代码,虽然实现了异步的功能,但是还有很多问题。

异步服务端

为了管理客户端信息,需要引入一个类记录客户端的状态,以后还有后续的信息也会加到这个里面,比如hp值,位置信息之类的

class ClientState{
	public Socket socket;
	public byte[] readBuff = new byte[1024];
}

同时服务端需要一个数据结构存储客户端信息,采用Dictionary<Socket,ClientState>

异步Accept

服务端的异步处理类似于客户端的异步处理

public IAsyncResult BeginAccept{
	AysncResult callback,
	object state
}

public Socket EndAccept{
	IAsyncResult asyncResult
}

服务端异步代码

using System;
using System.Net;
using System.Net.Sockets;
using System.Collections.Generic;


class ClientState
{
	public Socket socket; 
	public byte[] readBuff = new byte[1024]; 
}

class MainClass
{
	//监听Socket
	static Socket listenfd;
	//客户端Socket及状态信息
	static Dictionary<Socket, ClientState> clients = new Dictionary<Socket, ClientState>();

	public static void Main (string[] args)
	{
		//Socket
		listenfd = new Socket(AddressFamily.InterNetwork,
			SocketType.Stream, ProtocolType.Tcp);
		//Bind
		IPAddress ipAdr = IPAddress.Parse("127.0.0.1");
		IPEndPoint ipEp = new IPEndPoint(ipAdr, 8888);
		listenfd.Bind(ipEp);
		//Listen
		listenfd.Listen(0);
		Console.WriteLine("[服务器]启动成功");
		//Accept
        listenfd.BeginAccept(AcceptCallback,listenfd);
        //等待
        Console.ReadLine();
		}

	}

	//Accept回调
    public static void AcceptCallback(IAsyncResult ar){
        try
        {
            Console.WriteLine("[服务器]Accept");
            Socket listenfd = (Socket) ar.AsyncState;
            Socket clientfd = listenfd.EndAccept(ar);
            //clients列表
            ClientState state = new ClientState();
            state.socket = clientfd;
            clients.Add(clientfd,state);
            //接收数据BeginReceive
            clientfd.BeginReceive(state.readBuff,0,1024,0,ReceiveCallback,state);
            //继续Accept
            listenfd.BeginAccept(AcceptCallback,listenfd);
        }
        catch (SocketException ex)
        {
            Console.WriteLine("Socket Accept fail" + ex.ToString());
        }
    }

    //Receive回调
    public static void ReceiveCallback(IAsyncResult ar){
        try{
            ClientState state = (ClientState) ar.AsyncState;
            Socket clientfd = state.socket;
            int count = clientfd.EndReceive(ar);
            //客户端关闭
            if(count == 0){
                clientfd.Close();
                clients.Remove(clientfd);
                Console.WriteLine("Socket Close");
                return;
            }

            string recvStr = System.Text.Encoding.Default.GetString(state.readBuff,0,count);
            byte[] sendBytes = System.Text.Encoding.Default.GetBytes("echo" + recvStr);
            socket.BeginSend(sendBytes, 0, sendBytes.Length, 0, SendCallback, socket);
            clientfd.BeginReceive(state.readBuff,0,1024,0,ReceiveCallback,state);
        }
    }
    catch(SocketException ex){
        Console.WriteLine("Socket Receive fail" + ex.ToString());
    }

    //Send回调
	public static void SendCallback(IAsyncResult ar){
		try {
			Socket socket = (Socket) ar.AsyncState;
			int count = socket.EndSend(ar);
			Debug.Log("Socket Send succ " + count);
		}
		catch (SocketException ex){
			Debug.Log("Socket Send fail" + ex.ToString());
		}
	}
}

书上为了节省代码量,省略了Send的异步操作,我为了完整性,加上了这部分代码,剩下对程序的几点说明。
从这里开始服务端的逻辑有点复杂了,
对比第一章的逻辑
分身有术:异步和多路复用
异步的逻辑如下
分身有术:异步和多路复用
稍微解释一下这个图的意思,一开始是BeginAccept,这个自然会触发回调函数AcceptCallback,然后EndAccept,接着处理接受逻辑,在该函数内部调用BeginReceive,然后再调用BeginAccept,与之前不同的是,之前是在一个循环内部,执行完Send之后,程序就会从头自动再走一遍流程,而异步的没有用那么明显的循环,而是利用回调函数构成一个循环,BeginReceive也是类似的逻辑,在ReceiveCallback内部,先EndReceiveBeginSend再回到BeginReceive.

再提一下SendCallback,里面EndSend之后为什么不接着BeginSend,这部分涉及到缓存部分,主要看要实现的功能,如果多一个循环,就会一直send,而直接终止就会Receive之后只Send一次。

状态检测Poll

poll方法就是在同步的基础上,加上一层判断,这样既可以不阻塞线程,又可以使用同步的代码(毕竟异步的代码逻辑有点复杂)。

poll客户端

public bool Poll{
	int microSeconds,
	SelectMode mode
}

microSeconds的说明

  • 0,表示非阻塞
  • -1,表示一直阻塞

mode的说明

  • SelectRead:Socket可读返回true,否则false
  • SelectWrite:Socket可写返回true,否则false
  • SelectError:Socket连接失败返回true,否则false

所以用Poll的方法改写第一章的代码

using System.Collections;
using System.Collections.Generic;
using UnityEngine;
using System.Net.Sockets;
using UnityEngine.UI;
public class Echo : MonoBehaviour {

	//定义套接字
	Socket socket;
	//UGUI
	public InputField InputFeld;
	public Text text;

	//点击连接按钮
	public void Connetion()
	{
		//Socket
		socket = new Socket(AddressFamily.InterNetwork,
			SocketType.Stream, ProtocolType.Tcp);
		//Connect
		socket.Connect("127.0.0.1", 8888);
	}

	//点击发送按钮
	public void Send()
	{
		//Send
        if(socket.Poll(0,SelectMode.SelectWrite)){
            string sendStr = InputFeld.text;
		    byte[] sendBytes = System.Text.Encoding.Default.GetBytes(sendStr);
		    socket.Send(sendBytes);
        }
	}

    public void Update(){
        if(socket == null){
            return;
        }

        if(socket.Poll(0,SelectMode.SelectRead)){
            byte[] readBuff = new byte[1024];
            int count = socket.Receive(readBuff);
            string recvStr = System.Text.Encoding.Default.GetString(readBuff,0,count);
            text.text = recvStr;
        }
    }
}

感觉有点不好的是,Send是主动触发的,Receive是被动触发的,为了不阻塞线性,要放在Update里面判断,实际上有点浪费资源

poll服务端

using System;
using System.Net;
using System.Net.Sockets;
using System.Collections.Generic;


class ClientState
{
	public Socket socket; 
	public byte[] readBuff = new byte[1024]; 
}

class MainClass
{
	//监听Socket
	static Socket listenfd;
	//客户端Socket及状态信息
	static Dictionary<Socket, ClientState> clients = new Dictionary<Socket, ClientState>();

	public static void Main (string[] args)
	{
		//Socket
		listenfd = new Socket(AddressFamily.InterNetwork,
			SocketType.Stream, ProtocolType.Tcp);
		//Bind
		IPAddress ipAdr = IPAddress.Parse("127.0.0.1");
		IPEndPoint ipEp = new IPEndPoint(ipAdr, 8888);
		listenfd.Bind(ipEp);
		//Listen
		listenfd.Listen(0);
		Console.WriteLine("[服务器]启动成功");
		//主循环
		while(true){
			//检查listenfd
			if(listenfd.Poll(0, SelectMode.SelectRead)){
				ReadListenfd(listenfd);
			}
			//检查clientfd
			foreach (ClientState s in clients.Values){
				Socket clientfd = s.socket;
				if(clientfd.Poll(0, SelectMode.SelectRead)){
					if(!ReadClientfd(clientfd)){
						break;
					}
				}
			}
			//防止cpu占用过高
			System.Threading.Thread.Sleep(1);
		}

	}

	//读取Listenfd
	public static void ReadListenfd(Socket listenfd){
		Console.WriteLine("Accept");
		Socket clientfd = listenfd.Accept();
		ClientState state = new ClientState();
		state.socket = clientfd;
		clients.Add(clientfd, state);
	}

	//读取Clientfd
	public static bool ReadClientfd(Socket clientfd){
		ClientState state = clients[clientfd];
		int count = clientfd.Receive(state.readBuff);
		//客户端关闭
		if(count == 0){
			clientfd.Close();
			clients.Remove(clientfd);
			Console.WriteLine("Socket Close");
			return false;
		}
		//广播
		string recvStr = System.Text.Encoding.Default.GetString(state.readBuff, 0, count);
		Console.WriteLine("Receive " + recvStr);
		string sendStr = clientfd.RemoteEndPoint.ToString() + ":" + recvStr;
		byte[] sendBytes = System.Text.Encoding.Default.GetBytes(sendStr);
		foreach (ClientState cs in clients.Values){
			cs.socket.Send(sendBytes);
		}
		return true;
	}
}

和客户端一样,要放在循环里面一直判断。关于poll服务端的几点说明。主要实现的功能是接受多个客户端的消息,并转发到多个客户端。其中listenfd是服务端用来监听的socket,而clientfd是再监听到客户端之后,服务端为该客户端生成的socket,可以看到ReadListenfd函数内部有一句Socket client = listenfd.Accept();,表现了这种关系,所以这两个socket要区分开来。由于外层加了poll判断,所以内部用的都是同步算法。其实可以这么想和第一章的区别,就拿客户端的Receive函数举例子,第一章的时候,当执行到Receive时候,如果没有数据发过来,那么程序就处于阻塞状态了,这时候是没法做其他操作的,而在poll中写,没有数据发过来,就不会处理里面的Receive函数,所以也就不会阻塞,但是由于poll在循环中一直判断能否执行内部的函数,所以会消耗过多的cpu资源。

多路复用Select(重点)

Select相当于是Poll高阶版本,本质上并没有解决Update的问题,但是是同时处理多路信号,而不是一个一个处理,所以可以有效的解决CPU占用率过高的问题。对比之前的Poll程序,clients中的每一个用户,都要进行poll判断,而Select是直接对用户列表判断,并且修改列表,仅使满足条件的socket留下来,如果列表中没有socket那么程序阻塞,不占用CPU资源。

public static void Select{
	IList checkRead,
	IList checkWrite,
	IList checkError,
	int microSecond
}

类似Poll

Select客户端

using System.Collections;
using System.Collections.Generic;
using UnityEngine;
using System.Net.Sockets;
using UnityEngine.UI;
public class Echo : MonoBehaviour {

	//定义套接字
	Socket socket;
	//UGUI
	public InputField InputFeld;
	public Text text;
	List<Socket> checkRead = new List<Socket>();
	//点击连接按钮
	public void Connetion()
	{
		//Socket
		socket = new Socket(AddressFamily.InterNetwork,
			SocketType.Stream, ProtocolType.Tcp);
		//Connect
		socket.Connect("127.0.0.1", 8888);
	}

	//点击发送按钮
	public void Send()
	{
		//Send
        if(socket.Poll(0,SelectMode.SelectWrite)){
            string sendStr = InputFeld.text;
		    byte[] sendBytes = System.Text.Encoding.Default.GetBytes(sendStr);
		    socket.Send(sendBytes);
        }
	}

    public void Update(){
        if(socket == null){
            return;
        }

        //填充checkRead列表
        checkRead.Clear();
        checkRead.Add(socket);
        //select
        Socket.Select(checkRead,null,null,0);
        //check
        foreach (Socket s in checkRead)
        {
            byte[] readBuff = new byte[1024];
            int count = socket.Receive(readBuff);
            string recvStr = System.Text.Encoding.Default.GetString(readBuff,0,count);
            text.text = recvStr;
        }
    }
}

实际上,客户端只有一个socket,该代码只是用来展示写法,实际开发客户端采用异步的方法。

Select服务端

using System;
using System.Net;
using System.Net.Sockets;
using System.Collections.Generic;


class ClientState
{
	public Socket socket; 
	public byte[] readBuff = new byte[1024]; 
}

class MainClass
{
	//监听Socket
	static Socket listenfd;
	//客户端Socket及状态信息
	static Dictionary<Socket, ClientState> clients = new Dictionary<Socket, ClientState>();

	public static void Main (string[] args)
	{
		//Socket
		listenfd = new Socket(AddressFamily.InterNetwork,
			SocketType.Stream, ProtocolType.Tcp);
		//Bind
		IPAddress ipAdr = IPAddress.Parse("127.0.0.1");
		IPEndPoint ipEp = new IPEndPoint(ipAdr, 8888);
		listenfd.Bind(ipEp);
		//Listen
		listenfd.Listen(0);
		Console.WriteLine("[服务器]启动成功");
        //checkRead
        List<Socket> checkRead = new List<Socket>();
		//主循环
		while(true){
			//填充checkRead列表
            checkRead.Clear();
            checkRead.Add(listenfd);
            foreach (ClientState s in clients.Values){
                checkRead.Add(s.socket);
            }
		}
        //select
        Socket.Select(checkRead,null,null,1000);
        //检查可读对象
        foreach(Socket s in checkRead){
            if(s == listenfd){
                ReadListenfd(s);
            }
            else{
                ReadClientfd(s);
            }
        }
	}

	//读取Listenfd
	public static void ReadListenfd(Socket listenfd){
		Console.WriteLine("Accept");
		Socket clientfd = listenfd.Accept();
		ClientState state = new ClientState();
		state.socket = clientfd;
		clients.Add(clientfd, state);
	}

	//读取Clientfd
	public static bool ReadClientfd(Socket clientfd){
		ClientState state = clients[clientfd];
		int count = clientfd.Receive(state.readBuff);
		//客户端关闭
		if(count == 0){
			clientfd.Close();
			clients.Remove(clientfd);
			Console.WriteLine("Socket Close");
			return false;
		}
		//广播
		string recvStr = System.Text.Encoding.Default.GetString(state.readBuff, 0, count);
		Console.WriteLine("Receive " + recvStr);
		string sendStr = clientfd.RemoteEndPoint.ToString() + ":" + recvStr;
		byte[] sendBytes = System.Text.Encoding.Default.GetBytes(sendStr);
		foreach (ClientState cs in clients.Values){
			cs.socket.Send(sendBytes);
		}
		return true;
	}
}


这里是把listenfd也放到clients,实际上本身并不是client,所以才会有最后的if-else判断,后续的开发服务端也采用Select方式.