手动实现RPC

现在有很多RPC实现框架 如java写的RMI,基于soap协议的restful,谷歌的grpc等等。那我们这篇文章就去自己实现RPC通信(基于socket)

1.建立一个rpc-sever项目(用于发布服务) 

手动实现RPC


2.新建一个接口IHello

手动实现RPC

3.新建一个HelloImpl实现类实现IHello

手动实现RPC

4.创建一个请求对象RpcRequest (需要实现序列化接口,因为这个对象后面是用来通信的实体类)

手动实现RPC

5.创建一个ProcessorHandler用来处理监听的请求

public class ProcessorHandler implements Runnable {

    private Object service;//发布的服务
    private Socket socket;//请求的socket

    public ProcessorHandler(Object service, Socket socket) {
        this.service = service;
        this.socket = socket;
    }

    @Override
    public void run() {
        //请求处理流
        ObjectInputStream objectInputStream=null;
        try {
            objectInputStream=new ObjectInputStream(socket.getInputStream());
            //通过反序列化获取远程对象
            RpcRequest rpcRequest=(RpcRequest)objectInputStream.readObject();
            //通过反射获取结果Object
            Object object=invoke(rpcRequest);

            //通过输出流把结果输出
            ObjectOutputStream objectOutputStream=new ObjectOutputStream(socket.getOutputStream());
            objectOutputStream.writeObject(object);
            objectOutputStream.flush();
            objectInputStream.close();
            objectOutputStream.close();

        }catch (Exception e){
          throw  new RuntimeException(e);
        }finally {
            if(objectInputStream!=null){
                try {
                    objectInputStream.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    //通过反射调用服务
    public Object invoke(RpcRequest request) throws NoSuchMethodException, InvocationTargetException, IllegalAccessException {
        Object[] args=request.getParameters();
        Class<?>[] types=new Class[args.length];
        for (int i=0;i<args.length;i++){
            types[i]=args[i].getClass();
        }
        Method method=service.getClass().getMethod(request.getMethodName(),types);
        return method.invoke(service,args);
    }
}
6.创建RpcServer用来发布服务

 

public class RpcServer {

    //创建一个线程池
    private static final ExecutorService EXECUTOR_SERVICE= Executors.newCachedThreadPool();

    public void publisher(final Object service,int port){
        //创建一个服务端socket,且设置传送过来的端口
        ServerSocket serverSocket=null;
        try {
             serverSocket=new ServerSocket(port);
             //循环监听
            while (true){
                Socket socket=serverSocket.accept();
                //把监听到的请求交给线程池去处理
                EXECUTOR_SERVICE.execute(new ProcessorHandler(service,socket));
            }
        }catch (Exception e){
            throw new RuntimeException(e);
        }finally {
            if (serverSocket!=null){
                try {
                    serverSocket.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }


    }
}

7.启动服务

public class ServerDemo {
    public static void main(String[] args) {
        IHello hello=new HelloImpl();
        RpcServer rpcServer=new RpcServer();
        //发布服务
        rpcServer.publisher(hello,8080);
    }
}

8.然后在建立一个rpc-client项目用户调用rpc-server发布的服务进行通信

手动实现RPC

9.需要把IHello和RpcRequest这两个类从服务端拷贝到客户端

10.创建代理类

public class RpcClientProxy {

    public <T> T clientProxy(final Class<T> interfceClass,final String host,final int port){

        return (T) Proxy.newProxyInstance(interfceClass.getClassLoader(),
                new Class[]{interfceClass},new RemoteInvocationHandler(host,port));
    }
}

11.由于需要InvocationHandler实现类,所以新建RemoteInvocationHandler

public class RemoteInvocationHandler implements InvocationHandler {
    private String host;
    private int port;

    public RemoteInvocationHandler(String host, int port) {
        this.host = host;
        this.port = port;
    }

    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {

        //组装请求
        RpcRequest request=new RpcRequest();
        request.setClassName(method.getDeclaringClass().getName());
        request.setMethodName(method.getName());
        request.setParameters(args);

        //通过TCP协议进行传输
        TcpTransport tcpTransport=new TcpTransport(host,port);
        Object result=tcpTransport.send(request);

        return result;
    }
}

12.在invoke方法中使用socket去通信 所以新建一个TcpTransprot类去进行通信

public class TcpTransport {
    private String host;
    private int port;

    public TcpTransport(String host, int port) {
        this.host = host;
        this.port = port;
    }

    //创建一个socket连接
    private Socket netSocket(){
        System.out.println("创建一个连接");
        Socket socket=null;

        try {
            socket=new Socket(host,port);
            return socket;
        }catch (Exception e){
            throw new RuntimeException(e);
        }
    }
    //发送
    public Object send(RpcRequest rpcRequest){
        Socket socket=null;

        try {
            socket=netSocket();
            ObjectOutputStream objectOutputStream=new ObjectOutputStream(socket.getOutputStream());
            objectOutputStream.writeObject(rpcRequest);
            objectOutputStream.flush();

            ObjectInputStream objectInputStream=new ObjectInputStream(socket.getInputStream());
            Object result=objectInputStream.readObject();
            objectOutputStream.close();
            objectInputStream.close();
            return result;

        }catch (Exception e){
            throw new RuntimeException(e);
        }finally {
            if(socket!=null){
                try {
                    socket.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

13.模拟通信

public class RpcClientDemo {

    public static void main(String[] args) {

        RpcClientProxy proxy=new RpcClientProxy();

        IHello iHello=proxy.clientProxy(IHello.class,"localhost",8080);

        System.out.println(iHello.sayHello("xxxx"));
    }
}

14 在控制台可以看到输出

手动实现RPC

OK 自己实现rpc通信基于socket通信已经完成。