RPC框架底层通信原理
RPC(Remote Procedure Call)即远程过程调用,它是一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术的协议。
RPC采用客户机/服务器模式。请求程序就是一个客户机,而服务提供程序就是一个服务器。首先,客户机调用进程发送一个有进程参数的调用信息到服务进程,然后等待应答信息。在服务器端,进程保持睡眠状态直到调用信息到达为止。当一个调用信息到达,服务器获得进程参数,计算结果,发送答复信息,然后等待下一个调用信息,最后,客户端调用进程接收答复信息,获得进程结果,然后调用执行继续进行。
实现技术方案:
下面使用比较原始的方案实现RPC框架,采用Socket通信、动态代理与反射与Java原生的序列化。
源码可在github找到
原理:
1.创建一个公共的api
/**
* 暴露的服务接口
* @author XXJ
*
* 2019年3月12日 下午7:34:38
*/
public interface ISayHello {
String sayHello(String name);
}
2.包装一个发送的数据包
public class RequestData implements Serializable{
/**
*
*/
private static final long serialVersionUID = -3866469063627295476L;
private String interfaceName;
private String methodName;
private Class<?>[] parameterTypes;
private Object[] parameters;
public RequestData() {
}
/**
* @return the interfaceName
*/
public String getInterfaceName() {
return interfaceName;
}
/**
* @param interfaceName the interfaceName to set
*/
public void setInterfaceName(String interfaceName) {
this.interfaceName = interfaceName;
}
/**
* @return the methodName
*/
public String getMethodName() {
return methodName;
}
/**
* @param methodName the methodName to set
*/
public void setMethodName(String methodName) {
this.methodName = methodName;
}
/**
* @return the parameterTypes
*/
public Class<?>[] getParameterTypes() {
return parameterTypes;
}
/**
* @param parameterTypes the parameterTypes to set
*/
public void setParameterTypes(Class<?>[] parameterTypes) {
this.parameterTypes = parameterTypes;
}
/**
* @return the parameters
*/
public Object[] getParameters() {
return parameters;
}
/**
* @param parameters the parameters to set
*/
public void setParameters(Object[] parameters) {
this.parameters = parameters;
}
}
3.创建客户端代理
public class RPCClient {
@SuppressWarnings("unchecked")
public<T> T importor(Class<?> serviceClass,InetSocketAddress address){
return (T) Proxy.newProxyInstance(serviceClass.getClassLoader(), new Class[]{serviceClass}, new InvocationHandler() {
@SuppressWarnings("resource")
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
Socket socket = new Socket();
socket.connect(address);
try(ObjectOutputStream output = new ObjectOutputStream(socket.getOutputStream());
ObjectInputStream input = new ObjectInputStream(socket.getInputStream())){
RequestData data = new RequestData();
data.setInterfaceName(serviceClass.getName());
data.setMethodName(method.getName());
data.setParameters(args);
data.setParameterTypes(method.getParameterTypes());
output.writeObject(data);
return input.readObject();
}catch(Exception e){
e.printStackTrace();
}
return null;
}
});
}
}
4.创建服务端实现
/**
* 具体实现服务的类
* @author XXJ
*
* 2019年3月12日 下午7:41:46
*/
public class SayHelloImpl implements ISayHello {
@Override
public String sayHello(String name) {
return "Hello," + name;
}
}
/**
* 服务注册与发布
* @author XXJ
*
* 2019年3月12日 下午7:46:39
*/
public class ServerRegister {
private Map<String,Object> map = new ConcurrentHashMap<>();
private static Executor executor = new ThreadPoolExecutor(5, 10, 500, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(10));
public ServerRegister(String intefaceName,Object instance){
map.put(intefaceName, instance);
}
@SuppressWarnings("resource")
public void publishServer(InetSocketAddress address) throws IOException{
//监听服务端口
ServerSocket serversocket = new ServerSocket();
serversocket.bind(address);
System.out.println("服务已启动。。。。。。");
while(true){
executor.execute(new Task(serversocket.accept()));
}
}
//具体执行过程
private class Task implements Runnable {
private Socket socket;
public Task(Socket socket){
this.socket = socket;
}
@Override
public void run() {
try(ObjectInputStream input = new ObjectInputStream(socket.getInputStream());
ObjectOutputStream output = new ObjectOutputStream(socket.getOutputStream())){
RequestData data = (RequestData) input.readObject();
Method method = map.get(data.getInterfaceName()).getClass().getMethod(data.getMethodName(), data.getParameterTypes());
Object result = method.invoke(map.get(data.getInterfaceName()), data.getParameters());
output.writeObject(result);
}catch (Exception e){
e.printStackTrace();
}
}
}
}
5.启动服务端
/**
* Hello world!
*
*/
public class App
{
public static void main( String[] args )
{
ISayHello say = new SayHelloImpl();
ServerRegister server = new ServerRegister(ISayHello.class.getName(),say);
try {
server.publishServer(new InetSocketAddress("localhost", 12345));
} catch (IOException e) {
e.printStackTrace();
}
}
}
6.客户端调用
/**
* Hello world!
*
*/
public class App
{
public static void main( String[] args )
{
RPCClient client = new RPCClient();
ISayHello say = client.importor(ISayHello.class, new InetSocketAddress("localhost", 12345));
System.out.println(say.sayHello("Tom"));
}
}