蚂蚁金服SOFA-Boot整合SOFA-RPC(中篇)
前言
上篇文章简单地介绍了 SOFA-Boot 的功能特性,对 Readiness 健康检查的配置举例说明。重点介绍了如何在 SOFA-Boot 中引入 SOFA-RPC 中间件,给出了基于 bolt、rest 和 dubbo 等不同协议通道的服务发布与消费的全流程。
本文将进一步介绍 SOFA-RPC 中间件提供的丰富而强大的功能,包括单向调用、同步调用、Future调用、回调,泛化调用,过滤器配置等。
正文
1. 调用方式
SOFA-RPC 提供单向调用、同步调用、异步调用和回调四种调用机制。为了区分四者的不同之处,这里给出 SOFA 官方提供的原理图。
下面给出详细阐述和配置说明:
1.1. 单向方式
当前线程发起调用后,不关心调用结果,不做超时控制,只要请求已经发出,就完成本次调用。目前支持 bolt 协议。
配置说明
使用单向方式需要在服务引用的时候通过 sofa:global-attrs
元素的 type
属性声明调用方式为 oneway
,这样使用该服务引用发起调用时就是使用的单向方式了。
<sofa:reference id="helloOneWayServiceReference" interface="com.ostenant.sofa.rpc.example.invoke.HelloOneWayService">
<sofa:binding.bolt>
<sofa:global-attrs type="oneway"/>
</sofa:binding.bolt>
</sofa:reference>
适用场景
单向调用不保证成功,而且发起方无法知道调用结果。因此通常用于可以重试,或者定时通知类的场景,调用过程是有可能因为网络问题,机器故障等原因,导致请求失败。业务场景需要能接受这样的异常场景,才可以使用。
1.2. 同步方式
当前线程发起调用后,需要在指定的超时时间内,等到响应结果,才能完成本次调用。如果超时时间内没有得到结果,那么会抛出超时异常。
配置说明
服务接口与实现类
SOFA-RPC 缺省采用的就是同步调用,可以省略 sofa:global-attrs
配置项。
服务端发布配置
<bean id="helloSyncServiceImpl" class="com.ostenant.sofa.rpc.example.invoke.HelloSyncServiceImpl"/>
<sofa:service ref="helloSyncServiceImpl" interface="com.ostenant.sofa.rpc.example.invoke.HelloSyncService">
<sofa:binding.bolt/>
</sofa:service>
客户端引用配置
<sofa:reference id="helloSyncServiceReference" interface="com.ostenant.sofa.rpc.example.invoke.HelloSyncService">
<sofa:binding.bolt/>
</sofa:reference>
服务端启动入口
SpringApplication springApplication = new SpringApplication(SyncServerApplication.class);
ApplicationContext applicationContext = springApplication.run(args);
客户端启动入口
SpringApplication springApplication = new SpringApplication(SyncClientApplication.class);
ApplicationContext applicationContext = springApplication.run(args);
客户端调用
HelloSyncService helloSyncServiceReference = (HelloSyncService) applicationContext.getBean("helloSyncServiceReference");
System.out.println(helloSyncServiceReference.saySync("sync"));
适用场景
同步调用是最常用的方式。注意要根据对端的处理能力,合理设置超时时间。
1.3. Future方式
Future 方式下,客户端发起调用后不会等待服务端的结果,继续执行后面的业务逻辑。服务端返回的结果会被 SOFA-RPC 缓存,当客户端需要结果的时候,需要主动获取。目前支持 bolt 协议。
配置说明
服务接口和实现类
HelloFutureService.java
public interface HelloFutureService {
String sayFuture(String future);
}
HelloFutureServiceImpl.java
public class HelloFutureServiceImpl implements HelloFutureService {
@Override
public String sayFuture(String future) {
return future;
}
}
服务端发布配置
<bean id="helloFutureServiceImpl" class="com.ostenant.sofa.rpc.example.invoke.HelloFutureServiceImpl"/>
<sofa:service ref="helloFutureServiceImpl" interface="com.ostenant.sofa.rpc.example.invoke.HelloFutureService">
<sofa:binding.bolt/>
</sofa:service>
客户端引用配置
使用 Future 方式需要在服务引用的时候通过 sofa:global-attrs
元素的 type
属性声明调用方式为 future
。
<sofa:reference id="helloFutureServiceReference" interface="com.ostenant.sofa.rpc.example.invoke.HelloFutureService">
<sofa:binding.bolt>
<sofa:global-attrs type="future"/>
</sofa:binding.bolt>
</sofa:reference>
这样使用该服务引用发起调用时就是使用的 Future
方式了。
服务端启动入口
SpringApplication springApplication = new SpringApplication(FutureServerApplication.class);
ApplicationContext applicationContext = springApplication.run(args);
客户端启动入口
SpringApplication springApplication = new SpringApplication(FutureClientApplication.class);
ApplicationContext applicationContext = springApplication.run(args);
客户端获取返回结果有两种方式:
- 其一,通过
SofaResponseFuture
直接获取结果。第一个参数是获取结果的超时时间,第二个参数表示是否清除线程上下文中的结果。
HelloFutureService helloFutureServiceReference = (HelloFutureService) applicationContext
.getBean("helloFutureServiceReference");
helloFutureServiceReference.sayFuture("future");
try {
String result = (String)SofaResponseFuture.getResponse(1000, true);
System.out.println("Future result: " + result)
} catch (InterruptedException e) {
e.printStackTrace();
}
- 其二,获取原生 Future。该种方式会获取 JDK 原生的 Future ,参数表示是否清除线程上下文中的结果。获取结果的方式就是 JDK Future 的获取方式。
HelloFutureService helloFutureServiceReference = (HelloFutureService) applicationContext
.getBean("helloFutureServiceReference");
helloFutureServiceReference.sayFuture("future");
try {
Future future = SofaResponseFuture.getFuture(true);
String result = (String)future.get(1000, TimeUnit.MILLISECONDS);
System.out.println("Future result: " + result)
} catch (InterruptedException e) {
e.printStackTrace();
}
适用场景
Future 方式适用于非阻塞编程模式。对于客户端程序处理后,不需要立即获取返回结果,可以先完成后续程序代码执行,在后续业务中,主动从当前线程上下文获取调用返回结果。减少了网络 IO 等待造成的代码运行阻塞和延迟。
1.4. 回调方式
当前线程发起调用,则本次调用马上结束,可以马上执行下一次调用。发起调用时需要注册一个回调,该回调需要分配一个异步线程池。待响应返回后,会在回调的异步线程池,来执行回调逻辑。
配置说明
服务接口和实现类
HelloCallbackService.java
public interface HelloCallbackService {
String sayCallback(String callback);
}
HelloCallbackServiceImpl.java
public class HelloCallbackServiceImpl implements HelloCallbackService {
@Override
public String sayCallback(String string) {
return string;
}
}
业务回调类
客户端回调类需要实现 com.alipay.sofa.rpc.core.invoke.SofaResponseCallback
接口。
CallbackImpl.java
public class CallbackImpl implements SofaResponseCallback {
@Override
public void onAppResponse(Object appResponse, String methodName, RequestBase request) {
System.out.println("callback client process:" + appResponse);
}
@Override
public void onAppException(Throwable throwable, String methodName, RequestBase request) {
}
@Override
public void onSofaException(SofaRpcException sofaException, String methodName, RequestBase request) {
}
}
SofaResponseCallback 接口提供了 3 个方法:
- onAppResponse: 程序正常运行,则进入该回调方法。
- onAppException: 服务端程序抛出异常,则进入该回调方法。
- onSofaException: 框架内部出现错误,则进入该回调方法。
服务端发布配置
<bean id="helloCallbackServiceImpl" class="helloFutureServiceReference" interface="com.ostenant.sofa.rpc.example.invoke.HelloCallbackServiceImpl"/>
<sofa:service ref="helloCallbackServiceImpl" interface="helloFutureServiceReference" interface="com.ostenant.sofa.rpc.example.invoke.HelloCallbackService">
<sofa:binding.bolt/>
</sofa:service>
客户端引用配置
在服务引用的时候通过 sofa:global-attrs
元素的 type
属性声明调用方式为 callback
,再通过 callback-ref
声明回调的实现类。
<bean id="callbackImpl" class="com.ostenant.sofa.rpc.example.invoke.CallbackImpl"/>
<sofa:reference id="helloCallbackServiceReference"
interface="com.ostenant.sofa.rpc.example.invoke.HelloCallbackService">
<sofa:binding.bolt>
<sofa:global-attrs type="callback" callback-ref="callbackImpl"/>
</sofa:binding.bolt>
</sofa:reference>
这样使用该服务引用发起调用时,就是使用的回调方式了。在结果返回时,由 SOFA-RPC 自动调用该回调类的相应方法。
服务端启动入口
SpringApplication springApplication = new SpringApplication(CallbackServerApplication.class);
ApplicationContext applicationContext = springApplication.run(args);
客户端启动入口
SpringApplication springApplication = new SpringApplication(CallbackClientApplication.class);
ApplicationContext applicationContext = springApplication.run(args);
客户端发起调用
HelloCallbackService helloCallbackServiceReference = (HelloCallbackService) applicationContext
.getBean("helloCallbackServiceReference");
helloCallbackServiceReference.sayCallback("callback");
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
sayCallback() 的返回值不应该直接获取。在客户端注册的回调类中,返回值会以参数的形式传入正确的方法,以回调的形式完成后续逻辑处理。
适用场景
Callback 方式适用于异步非阻塞编程模式。客户端程序所在线程发起调用后,继续执行后续操作,不需要主动去获取返回值。服务端程序处理完成,将返回值传回一个异步线程池,由子线程通过回调函数进行返回值处理。很大情况的减少了网络 IO 阻塞,解决了单线程的瓶颈,实现了异步编程。
2. 泛化调用
泛化调用方式能够在客户端不依赖服务端的接口情况下发起调用,目前支持 bolt 协议。由于不知道服务端的接口,因此需要通过字符串的方式将服务端的接口,调用的方法,参数及结果类进行描述。
配置说明
泛化参数类
SampleGenericParamModel.java
public class SampleGenericParamModel {
private String name;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
}
泛化返回类
SampleGenericResultModel.java
public class SampleGenericResultModel {
private String name;
private String value;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getValue() {
return value;
}
public void setValue(String value) {
this.value = value;
}
}
服务接口和实现类
SampleGenericService.java
public interface SampleGenericService {
SampleGenericResultModel sayGeneric(SampleGenericParamModel sampleGenericParamModel);
}
SampleGenericParamModel:作为 sayGeneric() 的输入参数类型,有一个
name
成员变量,作为真正的方法入参。SampleGenericResultModel:作为 sayGeneric() 的返回结果类型,声明了
name
和value
两个成员变量,作为真实的返回值。
SampleGenericServiceImpl.java
public class SampleGenericServiceImpl implements SampleGenericService {
@Override
public SampleGenericResultModel sayGeneric(SampleGenericParamModel sampleGenericParamModel) {
String name = sampleGenericParamModel.getName();
SampleGenericResultModel resultModel = new SampleGenericResultModel();
resultModel.setName(name);
resultModel.setValue("sample generic value");
return resultModel;
}
}
服务端发布配置
<bean id="sampleGenericServiceImpl" class="com.ostenant.sofa.rpc.example.generic.SampleGenericServiceImpl"/>
<sofa:service ref="sampleGenericServiceImpl" interface="com.ostenant.sofa.rpc.example.generic.SampleGenericService">
<sofa:binding.bolt/>
</sofa:service>
客户端引用配置
<sofa:reference id="sampleGenericServiceReference" interface="com.alipay.sofa.rpc.api.GenericService">
<sofa:binding.bolt>
<sofa:global-attrs generic-interface="com.ostenant.sofa.rpc.example.generic.SampleGenericService"/>
</sofa:binding.bolt>
</sofa:reference>
在泛化调用过程中,客户端配置有两点需要注意:
-
sofa:reference
指向的服务接口需要声明为 SOFA-RPC 提供的泛化接口com.alipay.sofa.rpc.api.GenericService
。 -
sofa:global-attrs
需要声明属性generic-interface
,value 为真实的服务接口名称。
服务端启动入口
SpringApplication springApplication = new SpringApplication(SampleGenericServerApplication.class);
ApplicationContext applicationContext = springApplication.run(args);
客户端启动入口
SpringApplication springApplication = new SpringApplication(SampleGenericClientApplication.class);
ApplicationContext applicationContext = springApplication.run(args);
客户端发起调用
- 获取服务的泛化引用
GenericService sampleGenericServiceReference = (GenericService) applicationContext
.getBean("sampleGenericServiceReference");
- 准备方法参数
由于客户端没有调用服务的参数类,因此通过 com.alipay.hessian.generic.model.GenericObjectGenericObject
进行描述。
// 准备方法参数
GenericObject genericParam = new GenericObject(
"com.ostenant.sofa.rpc.example.generic.SampleGenericParamModel");
genericParam.putField("name", "Harrison");
GenericObject
持有一个 Map<String, Object>
类型的变量,你能够通过 GenericObject
提供的 putField()
方法,将参数类的属性和值放到这个 Map
中,以此来描述参数类。
- 发起泛化调用
通过 GenericService
的 $genericInvoke(arg1, agr2, arg3)
方法可以发起服务的泛化调用,各个参数含义如下:
参数 | 含义 | 参数可选 |
---|---|---|
arg1 | 目标方法名称 | 必填 |
arg2 | 参数类型的数组,要求严格遵循先后次序 | 必填 |
arg3 | 参数值的数组,要求与参数类型数组保持一致 | 必填 |
arg4 | 返回值的Class类型 | 可选 |
方式一:
GenericObject genericResult = (GenericObject) sampleGenericServiceReference.$genericInvoke(
// 目标方法名称
"sayGeneric",
// 参数类型名称
new String[] { "com.ostenant.sofa.rpc.example.generic.SampleGenericParamModel" },
// 参数的值
new Object[] { genericParam });
// 验证返回结果
System.out.println("Type: " + genericResult.getType());
System.out.println("Name: " + genericResult.getField("name"));
System.out.println("Value: " + genericResult.getField("value"));
方式二:
SampleGenericResultModel sampleGenericResult = sampleGenericServiceReference.$genericInvoke(
// 目标方法名称
"sayGeneric",
// 参数类型名称
new String[] { "com.ostenant.sofa.rpc.example.generic.SampleGenericParamModel" },
// 参数的值
new Object[] { genericParam },
// 返回值的Class类型
SampleGenericResultModel.class);
// 验证返回结果
System.out.println("Type: " + sampleGenericResult.getClass().getName());
System.out.println("Name: " + sampleGenericResult.getName());
System.out.println("Value: " + sampleGenericResult.getValue());
查看控制台输出
两种方式输出如下:
Type: com.ostenant.sofa.rpc.example.generic.SampleGenericResultModel
Name: Harrison
Value: sample generic value
3. 过滤器配置
SOFA-RPC 通过过滤器 Filter 来实现对请求和响应的拦截处理。用户可以自定义 Filter 实现拦截扩展,目前支持 bolt 协议。开发人员通过继承 com.alipay.sofa.rpc.filter.Filter
实现过滤器的自定义。
配置说明
服务接口与实现类
FilterService.java
public interface FilterService {
String sayFilter(String filter);
}
FilterServiceImpl.java
public class FilterServiceImpl implements FilterService {
@Override
public String sayFilter(String filter) {
return filters;
}
}
服务端过滤器
在 Filter 实现类中,invoke()
方法实现具体的拦截逻辑,通过 FilterInvoker.invoke(SofaRequest)
触发服务的调用,在该方法前后可以实现具体的拦截处理。
public class SampleServerFilter extends Filter {
@Override
public SofaResponse invoke(FilterInvoker invoker, SofaRequest request) throws SofaRpcException {
System.out.println("SampleFilter before server process");
try {
return invoker.invoke(request);
} finally {
System.out.println("SampleFilter after server process");
}
}
}
服务端发布配置
服务端需要配置服务实现类、过滤器,然后在 sofa:service
的 sofa:global-attrs
标签配置 filter
属性,实现两者的绑定。
<bean id="sampleFilter" class="com.ostenant.sofa.rpc.example.filter.SampleServerFilter"/>
<bean id="filterService" class="com.ostenant.sofa.rpc.example.filter.FilterServiceImpl"/>
<sofa:service ref="filterService" interface="com.ostenant.sofa.rpc.example.filter.FilterService">
<sofa:binding.bolt>
<sofa:global-attrs filter="sampleFilter"/>
</sofa:binding.bolt>
</sofa:service>
客户端过滤器
public class SampleClientFilter extends Filter {
@Override
public SofaResponse invoke(FilterInvoker invoker, SofaRequest request) throws SofaRpcException {
System.out.println("SampleFilter before client invoke");
try {
return invoker.invoke(request);
} finally {
System.out.println("SampleFilter after client invoke");
}
}
}
客户端引用配置
同样的,客户端过滤器需要在 sofa:reference
的 sofa:global-attrs
标签中配置 filter
属性,实现客户端引用类的调用拦截。
<bean id="sampleFilter" class="com.alipay.sofa.rpc.samples.filter.SampleClientFilter"/>
<sofa:reference id="filterServiceReference" interface="com.ostenant.sofa.rpc.example.filter.FilterService">
<sofa:binding.bolt>
<sofa:global-attrs filter="sampleFilter"/>
</sofa:binding.bolt>
</sofa:reference>
服务端启动类
SpringApplication springApplication = new SpringApplication(FilterServerApplication.class);
ApplicationContext applicationContext = springApplication.run(args);
客户端启动类
SpringApplication springApplication = new SpringApplication(FilterClientApplication.class);
ApplicationContext applicationContext = springApplication.run(args);
客户端调用
FilterService filterServiceReference = (FilterService) applicationContext.getBean("filterServiceReference");
try {
// sleep 5s, 便于观察过滤器效果
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
String result = filterServiceReference.sayFilter("filter");
System.out.println("Invoke result: " + result);
查看拦截输出
- 服务端打印输出
SampleFilter before server process
SampleFilter after server process
- 客户端打印输出
SampleFilter before client invoke
SampleFilter after client invoke
Invoke result: filter
过滤器配置生效,总结过滤器拦截先后次序如下:
- 客户端发起调用 -> 客户端前置拦截 -> 服务端前置拦截
- 服务端方法执行
- 服务端后置拦截 -> 客户端后置拦截 -> 客户端接收返回值
小结
本文介绍了 SOFA-RPC 的集中调用方式,包括单向调用、同步调用、Future调用、回调,引入了 SOFA-RPC 独有的泛化调用机制,同时对过滤器的配置进行了简单介绍。
欢迎关注技术公众号: 零壹技术栈
本帐号将持续分享后端技术干货,包括虚拟机基础,多线程编程,高性能框架,异步、缓存和消息中间件,分布式和微服务,架构学习和进阶等学习资料和文章。