手动实现RPC
现在有很多RPC实现框架 如java写的RMI,基于soap协议的restful,谷歌的grpc等等。那我们这篇文章就去自己实现RPC通信(基于socket)
1.建立一个rpc-sever项目(用于发布服务)
2.新建一个接口IHello
3.新建一个HelloImpl实现类实现IHello
4.创建一个请求对象RpcRequest (需要实现序列化接口,因为这个对象后面是用来通信的实体类)
5.创建一个ProcessorHandler用来处理监听的请求
public class ProcessorHandler implements Runnable { private Object service;//发布的服务 private Socket socket;//请求的socket public ProcessorHandler(Object service, Socket socket) { this.service = service; this.socket = socket; } @Override public void run() { //请求处理流 ObjectInputStream objectInputStream=null; try { objectInputStream=new ObjectInputStream(socket.getInputStream()); //通过反序列化获取远程对象 RpcRequest rpcRequest=(RpcRequest)objectInputStream.readObject(); //通过反射获取结果Object Object object=invoke(rpcRequest); //通过输出流把结果输出 ObjectOutputStream objectOutputStream=new ObjectOutputStream(socket.getOutputStream()); objectOutputStream.writeObject(object); objectOutputStream.flush(); objectInputStream.close(); objectOutputStream.close(); }catch (Exception e){ throw new RuntimeException(e); }finally { if(objectInputStream!=null){ try { objectInputStream.close(); } catch (IOException e) { e.printStackTrace(); } } } } //通过反射调用服务 public Object invoke(RpcRequest request) throws NoSuchMethodException, InvocationTargetException, IllegalAccessException { Object[] args=request.getParameters(); Class<?>[] types=new Class[args.length]; for (int i=0;i<args.length;i++){ types[i]=args[i].getClass(); } Method method=service.getClass().getMethod(request.getMethodName(),types); return method.invoke(service,args); } }
6.创建RpcServer用来发布服务
public class RpcServer { //创建一个线程池 private static final ExecutorService EXECUTOR_SERVICE= Executors.newCachedThreadPool(); public void publisher(final Object service,int port){ //创建一个服务端socket,且设置传送过来的端口 ServerSocket serverSocket=null; try { serverSocket=new ServerSocket(port); //循环监听 while (true){ Socket socket=serverSocket.accept(); //把监听到的请求交给线程池去处理 EXECUTOR_SERVICE.execute(new ProcessorHandler(service,socket)); } }catch (Exception e){ throw new RuntimeException(e); }finally { if (serverSocket!=null){ try { serverSocket.close(); } catch (IOException e) { e.printStackTrace(); } } } } }
7.启动服务
public class ServerDemo { public static void main(String[] args) { IHello hello=new HelloImpl(); RpcServer rpcServer=new RpcServer(); //发布服务 rpcServer.publisher(hello,8080); } }
8.然后在建立一个rpc-client项目用户调用rpc-server发布的服务进行通信
9.需要把IHello和RpcRequest这两个类从服务端拷贝到客户端
10.创建代理类
public class RpcClientProxy { public <T> T clientProxy(final Class<T> interfceClass,final String host,final int port){ return (T) Proxy.newProxyInstance(interfceClass.getClassLoader(), new Class[]{interfceClass},new RemoteInvocationHandler(host,port)); } }
11.由于需要InvocationHandler实现类,所以新建RemoteInvocationHandler
public class RemoteInvocationHandler implements InvocationHandler { private String host; private int port; public RemoteInvocationHandler(String host, int port) { this.host = host; this.port = port; } @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { //组装请求 RpcRequest request=new RpcRequest(); request.setClassName(method.getDeclaringClass().getName()); request.setMethodName(method.getName()); request.setParameters(args); //通过TCP协议进行传输 TcpTransport tcpTransport=new TcpTransport(host,port); Object result=tcpTransport.send(request); return result; } }
12.在invoke方法中使用socket去通信 所以新建一个TcpTransprot类去进行通信
public class TcpTransport { private String host; private int port; public TcpTransport(String host, int port) { this.host = host; this.port = port; } //创建一个socket连接 private Socket netSocket(){ System.out.println("创建一个连接"); Socket socket=null; try { socket=new Socket(host,port); return socket; }catch (Exception e){ throw new RuntimeException(e); } } //发送 public Object send(RpcRequest rpcRequest){ Socket socket=null; try { socket=netSocket(); ObjectOutputStream objectOutputStream=new ObjectOutputStream(socket.getOutputStream()); objectOutputStream.writeObject(rpcRequest); objectOutputStream.flush(); ObjectInputStream objectInputStream=new ObjectInputStream(socket.getInputStream()); Object result=objectInputStream.readObject(); objectOutputStream.close(); objectInputStream.close(); return result; }catch (Exception e){ throw new RuntimeException(e); }finally { if(socket!=null){ try { socket.close(); } catch (IOException e) { e.printStackTrace(); } } } } }
13.模拟通信
public class RpcClientDemo { public static void main(String[] args) { RpcClientProxy proxy=new RpcClientProxy(); IHello iHello=proxy.clientProxy(IHello.class,"localhost",8080); System.out.println(iHello.sayHello("xxxx")); } }
14 在控制台可以看到输出
OK 自己实现rpc通信基于socket通信已经完成。