Thrift应用(java)
Thrift(java版)
1. 简单介绍
Thrift是什么?能做什么?
Thrift是Facebook于2007年开发的跨语言的rpc服框架,提供多语言(C++, Java, Python, PHP, Ruby, Erlang, Perl, Haskell, C#, Cocoa, JavaScript, Node.js, Smalltalk, and OCaml 等)的编译功能,并提供多种服务器工作模式;用户通过Thrift的IDL(接口定义语言)来描述接口函数及数据类型,然后通过Thrift的编译环境生成各种语言类型的接口文件,用户可以根据自己的需要采用不同的语言开发客户端代码和服务器端代码。
例如,我想开发一个快速计算的RPC服务,它主要通过接口函数getInt对外提供服务,这个RPC服务的getInt函数使用用户传入的参数,经过复杂的计算,计算出一个整形值返回给用户;服务器端使用java语言开发,而调用客户端可以是java、c、python等语言开发的程序,在这种应用场景下,我们只需要使用Thrift的IDL描述一下getInt函数(以.thrift为后缀的文件),然后使用Thrift的多语言编译功能,将这个IDL文件编译成C、java、python几种语言对应的“特定语言接口文件”(每种语言只需要一条简单的命令即可编译完成),这样拿到对应语言的“特定语言接口文件”之后,就可以开发客户端和服务器端的代码了,开发过程中只要接口不变,客户端和服务器端的开发可以独立的进行。
Thrift为服务器端程序提供了很多的工作模式,例如:线程池模型、非阻塞模型等等,可以根据自己的实际应用场景选择一种工作模式高效地对外提供服务;
2. 环境配置
1. 如果是Maven构建项目的,在pom.xml 中添加如下内容:
|
<dependency> <groupId>org.apache.thrift</groupId> <artifactId>libthrift</artifactId> <version>0.9.0</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.5.8</version> </dependency> |
2.如果自己编译lib包,把下载的压缩包解压到X:盘,然后在X:\thrift-0.9.0\lib\java 目录下运行ant进行自动编译,会在X:\thrift-0.9.0\lib\java\build\ 目录下看到编译好的lib包:libthrift-0.9.0.jar
3. 基本概念
3.1 数据类型
基本类型:
bool: 布尔值,true 或 false,对应 Java 的 Boolean
byte: 8 位有符号整数,对应 Java 的 byte
i16:16 位有符号整数,对应 Java 的 short
i32:32 位有符号整数,对应 Java 的 int
i64:64 位有符号整数,对应 Java 的 long
double:64 位浮点数,对应 Java 的 double
string:utf-8编码的字符串,对应 Java 的 String
结构体类型:
Struct: 定义公共的对象,类似于 C 语言中的结构体定义,在 Java 中是一个 JavaBean
容器类型:
list:对应 Java 的 ArrayList
set: 对应 Java 的 HashSet
map: 对应 Java 的 HashMap
异常类型:
exception:对应 Java 的 Exception
服务类型:
service:对应服务的类
枚举类型:
3.2 数据传输协议
Thrift 可以让用户选择客户端与服务端之间传输通信协议的类别,在传输协议上总体划分为文本 (text) 和二进制 (binary) 传输协议,为节约带宽,提高传输效率,一般情况下使用二进制类型的传输协议为多数,有时还会使用基于文本类型的协议,这需要根据项目 / 产品中的实际需求。常用协议有以下几种:
TBinaryProtocol : 二进制格式.
TCompactProtocol : 压缩格式
TJSONProtocol : JSON格式
TSimpleJSONProtocol : 提供JSON只写协议,生成的文件很容易通过脚本语言解析
3.3 传输层
常用的传输层有以下几种:
TSocket—使用阻塞式 I/O 进行传输,是最常见的模式
TFramedTransport—非阻塞方式,按块的大小进行传输,类似于 Java 中的 NIO
若使用 TFramedTransport 传输层,其服务器必须修改为非阻塞的服务类型,客户端只需替换 TTransport 部分
TNonblockingTransport —— 使用非阻塞方式,用于构建异步客户端
3.4编码基本步骤
服务端编码步骤:
实现服务处理接口impl
创建TProcessor
创建TServerTransport
创建TProtocol
创建TServer
启动Server
客户端编码步骤:
创建Transport
创建TProtocol
基于TTransport和TProtocol创建 Client
调用Client的相应方法
Tips: 客户端和服务端的协议要一致
4. 实例演示
4.1 生成Thrift代码
4.1.1. 创建thrift文件
/*userinfo.thrift*/
namespace java com.maociyuan.cnblogs.home.userinfo
/*结构体类型:*/
struct UserInfo
{
1:i32 userid,
2:string username,
3:string userpwd,
4:string sex,
5:string age,
}
/*服务类型:*/
service UserInfoService{
UserInfo lg_userinfo_getUserInfoById(1:i32 userid),
string lg_userinfo_getUserNameById(1:i32 userid),
i32 lg_userinfo_getUserCount(),
bool lg_userinfo_checkUserById(1:i32 userid),
}
4.1.2. 编译thrift文件
在命令窗口,进入thrift文件目录执行如下命令
thrift-0.8.0.exe -r -gen java ./userinfo.thrift
生成两个java文件
UserInfo.java
UserInfoService.java
导入工程中
4.2 实现接口Iface
package com.maociyuan.cnblogs.home.userinfo;
import java.util.HashMap;
import java.util.Map;
import org.apache.thrift.TException;
public class UserInfoServiceImpl implements UserInfoService.Iface {
private static Map<Integer, UserInfo> userMap = new HashMap<Integer, UserInfo>();
static {
userMap.put(1, new UserInfo(1,"mao","mao","男","2016"));
userMap.put(2, new UserInfo(2,"ci","ci","女","07"));
userMap.put(3, new UserInfo(3,"yuan","yuan","男","28"));
}
public UserInfo lg_userinfo_getUserInfoById(int userid) throws TException {
// TODO Auto-generated method stub
return userMap.get(userid);
}
public String lg_userinfo_getUserNameById(int userid) throws TException {
// TODO Auto-generated method stub
return userMap.get(userid).getUsername();
}
public int lg_userinfo_getUserCount() throws TException {
// TODO Auto-generated method stub
return userMap.size();
}
public boolean lg_userinfo_checkUserById(int userid) throws TException {
// TODO Auto-generated method stub
return userMap.containsKey(userid);
}
}
4.3 服务端代码
package com.maociyuan.cnblogs.home.service;
import org.apache.thrift.TProcessor;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.server.THsHaServer;
import org.apache.thrift.server.TNonblockingServer;
import org.apache.thrift.server.TServer;
import org.apache.thrift.server.TSimpleServer;
import org.apache.thrift.server.TThreadPoolServer;
import org.apache.thrift.transport.TFramedTransport;
import org.apache.thrift.transport.TNonblockingServerSocket;
import org.apache.thrift.transport.TServerSocket;
import com.maociyuan.cnblogs.home.userinfo.UserInfoService;
import com.maociyuan.cnblogs.home.userinfo.UserInfoServiceImpl;
public class UserInfoServiceDemo {
public static final int SERVER_PORT = 8090;
public static final int SERVER_PORT1 = 8091;
public static final int SERVER_PORT2 = 8092;
public static final int SERVER_PORT3 = 8093;
// 简单的单线程服务模型,一般用于测试
public void startSimleServer() {
try {
System.out.println("UserInfoServiceDemo TSimpleServer start ....");
TProcessor tprocessor = new UserInfoService.Processor<UserInfoService.Iface>(new UserInfoServiceImpl());
TServerSocket serverTransport = new TServerSocket(SERVER_PORT);
TServer.Args tArgs = new TServer.Args(serverTransport);
tArgs.processor(tprocessor);
tArgs.protocolFactory(new TBinaryProtocol.Factory());
TServer server = new TSimpleServer(tArgs);
server.serve();
} catch (Exception e) {
System.out.println("Server start error!!!");
e.printStackTrace();
}
}
//线程池服务模型,使用标准的阻塞式IO,预先创建一组线程处理请求。
public void startTThreadPoolServer() {
try {
System.out.println("UserInfoServiceDemo TThreadPoolServer start ....");
TProcessor tprocessor = new UserInfoService.Processor<UserInfoService.Iface>(new UserInfoServiceImpl());
TServerSocket serverTransport = new TServerSocket(SERVER_PORT1);
TThreadPoolServer.Args ttpsArgs = new TThreadPoolServer.Args(serverTransport);
ttpsArgs.processor(tprocessor);
ttpsArgs.protocolFactory(new TBinaryProtocol.Factory());
// 线程池服务模型,使用标准的阻塞式IO,预先创建一组线程处理请求。
TServer server = new TThreadPoolServer(ttpsArgs);
server.serve();
} catch (Exception e) {
System.out.println("Server start error!!!");
e.printStackTrace();
}
}
// 线程池服务模型,使用标准的阻塞式IO,使用非阻塞式IO
public void startTNonblockingServer() {
try {
System.out.println("UserInfoServiceDemo TNonblockingServer start ....");
TProcessor tprocessor = new UserInfoService.Processor<UserInfoService.Iface>(new UserInfoServiceImpl());
TNonblockingServerSocket tnbSocketTransport = new TNonblockingServerSocket(SERVER_PORT2);
TNonblockingServer.Args tnbArgs = new TNonblockingServer.Args(tnbSocketTransport);
tnbArgs.processor(tprocessor);
// 使用非阻塞式IO,服务端和客户端需要指定TFramedTransport数据传输的方式
tnbArgs.transportFactory(new TFramedTransport.Factory());
tnbArgs.protocolFactory(new TBinaryProtocol.Factory());
TServer server = new TNonblockingServer(tnbArgs);
server.serve();
} catch (Exception e) {
System.out.println("Server start error!!!");
e.printStackTrace();
}
}
//半同步半异步的服务端模型,需要指定为: TFramedTransport 数据传输的方式。
public void startTHsHaServer() {
try {
System.out.println("HelloWorld THsHaServer start ....");
TProcessor tprocessor = new UserInfoService.Processor<UserInfoService.Iface>(new UserInfoServiceImpl());
TNonblockingServerSocket tnbSocketTransport = new TNonblockingServerSocket(SERVER_PORT3);
THsHaServer.Args thhsArgs = new THsHaServer.Args(tnbSocketTransport);
thhsArgs.processor(tprocessor);
thhsArgs.transportFactory(new TFramedTransport.Factory());
thhsArgs.protocolFactory(new TBinaryProtocol.Factory());
TServer server = new THsHaServer(thhsArgs);
server.serve();
} catch (Exception e) {
System.out.println("Server start error!!!");
e.printStackTrace();
}
}
public static void main(String[] args) {
UserInfoServiceDemo server = new UserInfoServiceDemo();
//server.startSimleServer();
//server.startTThreadPoolServer();
//server.startTNonblockingServer();
//server.startTHsHaServer();
}
}
4.4 客户端代码
UserInfoClientDemo.java
package com.maociyuan.cnblogs.home.client;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.thrift.TException;
import org.apache.thrift.async.AsyncMethodCallback;
import org.apache.thrift.async.TAsyncClientManager;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TCompactProtocol;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.protocol.TProtocolFactory;
import org.apache.thrift.transport.TNonblockingSocket;
import org.apache.thrift.transport.TNonblockingTransport;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportException;
import com.maociyuan.cnblogs.home.userinfo.UserInfoService;
import com.maociyuan.cnblogs.home.userinfo.UserInfoService.AsyncClient.lg_userinfo_getUserNameById_call;
public class UserInfoClientDemo {
public static final String SERVER_IP = "localhost";
public static final int SERVER_PORT = 8090;
public static final int SERVER_PORT1 = 8091;
public static final int SERVER_PORT2 = 8092;
public static final int SERVER_PORT3 = 8093;
public static final int TIMEOUT = 30000;
public void startClient(int userid) {
TTransport transport = null;
try {
transport = new TSocket(SERVER_IP, SERVER_PORT, TIMEOUT);
// 协议要和服务端一致
TProtocol protocol = new TBinaryProtocol(transport);
// TProtocol protocol = new TCompactProtocol(transport);
// TProtocol protocol = new TJSONProtocol(transport);
UserInfoService.Client client = new UserInfoService.Client(protocol);
transport.open();
String result = client.lg_userinfo_getUserNameById(userid);
System.out.println("Thrify client result =: " + result);
} catch (TTransportException e) {
e.printStackTrace();
} catch (TException e) {
//处理服务端返回值为null问题
if (e instanceof TApplicationException
&& ((TApplicationException) e).getType() ==
TApplicationException.MISSING_RESULT) {
System.out.println("The result of lg_userinfo_getUserNameById function is NULL");
}
} finally {
if (null != transport) {
transport.close();
}
}
}
public void startClientAsync(int userid,int port) {
TNonblockingTransport transport = null;
try {
TAsyncClientManager clientManager = new TAsyncClientManager();
transport = new TNonblockingSocket(SERVER_IP,port, TIMEOUT);
TProtocolFactory tprotocol = new TCompactProtocol.Factory();
UserInfoService.AsyncClient asyncClient = new UserInfoService.AsyncClient(
tprotocol, clientManager, transport);
System.out.println("Client start .....");
CountDownLatch latch = new CountDownLatch(1);
AsynCallback callBack = new AsynCallback(latch);
System.out.println("call method sayHello start ...");
asyncClient.lg_userinfo_getUserNameById(userid, callBack);
System.out.println("call method sayHello .... end");
boolean wait = latch.await(30, TimeUnit.SECONDS);
System.out.println("latch.await =:" + wait);
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("startClient end.");
}
public class AsynCallback implements AsyncMethodCallback<lg_userinfo_getUserNameById_call>{
private CountDownLatch latch;
public AsynCallback(CountDownLatch latch) {
this.latch = latch;
}
@Override
public void onComplete(lg_userinfo_getUserNameById_call response) {
System.out.println("onComplete");
try {
Thread.sleep(1000L * 1);
System.out.println("AsynCall result =:" + response.getResult().toString());
} catch (TException e) {
e.printStackTrace();
} catch (Exception e) {
e.printStackTrace();
} finally {
latch.countDown();
}
}
@Override
public void onError(Exception exception) {
System.out.println("onError :" + exception.getMessage());
latch.countDown();
}
}
public static void main(String[] args) {
UserInfoClientDemo client = new UserInfoClientDemo();
client.startClient(1);
client.startClient(2);
client.startClient(3);
client.startClientAsync(1,SERVER_PORT2);
client.startClientAsync(2,SERVER_PORT3);
client.startClientAsync(3,SERVER_PORT2);
}
}
5. 服务端工作模式
常见的服务端工作模式有5种:TSimpleServer、TNonblockingServer、THsHaServer、TThreadPoolServer、TThreadedSelectorServer
5.1 TSimpleServer模式
TSimpleServer单线程服务器端使用标准的阻塞式 I/O,只有一个工作线程,循环监听新请求的到来并完成对请求的处理,它只是在简单的演示时候使用,它的工作方式如图
5.2 TThreadPoolServer模式
TThreadPoolServer模式--多线程服务器端使用标准的阻塞式 I/O,采用阻塞socket方式工作,主线程负责阻塞式监听“监听socket”中是否有新socket到来,业务处理交由一个线程池来处理,如下图所示:
TThreadPoolServer模式优点:
线程池模式中,数据读取和业务处理都交由线程池完成,主线程只负责监听新连接,因此在并发量较大时新连接也能够被及时接受。线程池模式比较适合服务器端能预知最多有多少个客户端并发的情况,这时每个请求都能被业务线程池及时处理,性能也非常高。
TThreadPoolServer模式缺点:
线程池模式的处理能力受限于线程池的工作能力,当并发请求数大于线程池中的线程数时,新请求也只能排队等待。
5.3 TNonblockingServer模式
TNonblockingServer工作模式,该模式也是单线程工作,但是该模式采用NIO的方式,所有的socket都被注册到selector中,在一个线程中通过seletor循环监控所有的socket,每次selector结束时,处理所有的处于就绪状态的socket,对于有数据到来的socket进行数据读取操作,对于有数据发送的socket则进行数据发送,对于监听socket则产生一个新业务socket并将其注册到selector中,如下图所示:
上图中读取数据之后的业务处理就是根据读取到的调用请求,调用具体函数完成处理,只有完成函数处理才能进行后续的操作;
TNonblockingServer模式优点:
相比于TSimpleServer效率提升主要体现在IO多路复用上,TNonblockingServer采用非阻塞IO,同时监控多个socket的状态变化;
TNonblockingServer模式缺点:
TNonblockingServer模式在业务处理上还是采用单线程顺序来完成,在业务处理比较复杂、耗时的时候,例如某些接口函数需要读取数据库执行时间较长,此时该模式效率也不高,因为多个调用请求任务依然是顺序一个接一个执行。
5.4 THsHaServer模式(半同步半异步)
THsHaServer类是TNonblockingServer类的子类,在5.2节中的TNonblockingServer模式中,采用一个线程来完成对所有socket的监听和业务处理,造成了效率的低下,THsHaServer模式的引入则是部分解决了这些问题。THsHaServer模式中,引入一个线程池来专门进行业务处理,如下图所示;
THsHaServer的优点:
与TNonblockingServer模式相比,THsHaServer在完成数据读取之后,将业务处理过程交由一个线程池来完成,主线程直接返回进行下一次循环操作,效率大大提升;
THsHaServer的缺点:
由图5.3可以看出,主线程需要完成对所有socket的监听以及数据读写的工作,当并发请求数较大时,且发送数据量较多时,监听socket上新连接请求不能被及时接受。
5.5 TThreadedSelectorServer模式
TThreadedSelectorServer模式是目前Thrift提供的最高级的模式,它内部有如果几个部分构成:
(1) 一个AcceptThread线程对象,专门用于处理监听socket上的新连接;
(2) 若干个SelectorThread对象专门用于处理业务socket的网络I/O操作,所有网络数据的读写均是有这些线程来完成;
(3) 一个负载均衡器SelectorThreadLoadBalancer对象,主要用于AcceptThread线程接收到一个新socket连接请求时,决定将这个新连接请求分配给哪个SelectorThread线程。
(4) 一个ExecutorService类型的工作线程池,在SelectorThread线程中,监听到有业务socket中有调用请求过来,则将请求读取之后,交个ExecutorService线程池中的线程完成此次调用的具体执行;
如上图所示,TThreadedSelectorServer模式中有一个专门的线程AcceptThread用于处理新连接请求,因此能够及时响应大量并发连接请求;另外它将网络I/O操作分散到多个SelectorThread线程中来完成,因此能够快速对网络I/O进行读写操作,能够很好地应对网络I/O较多的情况;TThreadedSelectorServer对于大部分应用场景性能都不会差,因此,如果实在不知道选择哪种工作模式,使用TThreadedSelectorServer就可以。
6. 实现原理
5.1 架构
Thrift 包含一个完整的堆栈结构用于构建客户端和服务器端。下图描绘了 Thrift 的整体架构
如图所示,图中黄色部分是用户实现的业务逻辑,褐色部分是根据 Thrift 定义的服务接口描述文件生成的客户端和服务器端代码框架,红色部分是根据 Thrift 文件生成代码实现数据的读写操作。红色部分以下是 Thrift 的传输体系、协议以及底层 I/O 通信,使用 Thrift 可以很方便的定义一个服务并且选择不同的传输协议和传输层而不用重新生成代码。
Thrift 服务器包含用于绑定协议和传输层的基础架构,它提供阻塞、非阻塞、单线程和多线程的模式运行在服务器上,可以配合服务器 / 容器一起运行,可以和现有的 J2EE 服务器 /Web 容器无缝的结合。
服务端和客户端具体的调用流程如下:
图.服务端启动、服务时序图:
该图所示是 HelloServiceServer 启动的过程以及服务被客户端调用时,服务器的响应过程。从图中我们可以看到,程序调用了 TThreadPoolServer 的 serve 方法后,server 进入阻塞监听状态,其阻塞在 TServerSocket 的 accept 方法上。当接收到来自客户端的消息后,服务器发起一个新线程处理这个消息请求,原线程再次进入阻塞状态。在新线程中,服务器通过 TBinaryProtocol 协议读取消息内容,调用 HelloServiceImpl 的 helloVoid 方法,并将结果写入 helloVoid_result 中传回客户端。
图.客户端调用服务时序图:
该图所示是 HelloServiceClient 调用服务的过程以及接收到服务器端的返回值后处理结果的过程。从图中我们可以看到,程序调用了 Hello.Client 的 helloVoid 方法,在 helloVoid 方法中,通过 send_helloVoid 方法发送对服务的调用请求,通过 recv_helloVoid 方法接收服务处理请求后返回的结果。
5.2 调用流程
Thrift框架的远程过程调用的工作过程如下:
(1) 通过IDL定义一个接口的thrift文件,然后通过thrift的多语言编译功能,将接口定义的thrift文件翻译成对应的语言版本的接口文件;
(2) Thrift生成的特定语言的接口文件中包括客户端部分和服务器部分;
(3) 客户端通过接口文件中的客户端部分生成一个Client对象,这个客户端对象中包含所有接口函数的存根实现,然后用户代码就可以通过这个Client对象来调用thrift文件中的那些接口函数了,但是,客户端调用接口函数时实际上调用的是接口函数的本地存根实现,
(4) 接口函数的存根实现将调用请求发送给thrift服务器端,然后thrift服务器根据调用的函数名和函数参数,调用实际的实现函数来完成具体的操作
(5) Thrift服务器在完成处理之后,将函数的返回值发送给调用的Client对象;
(6) Thrift的Client对象将函数的返回值再交付给用户的调用函数
5.3 源码分析
源码分析主要分析thrift生成的java接口文件,并以TestThriftService.java为例,以该文件为线索,逐渐分析文件中遇到的其他类和文件;在thrift生成的服务接口文件中,共包含以下几部分:
(1)异步客户端类AsyncClient和异步接口AsyncIface,本节暂不涉及这些异步操作相关内容;
(2)同步客户端类Client和同步接口Iface,Client类继承自TServiceClient,并实现了同步接口Iface;Iface就是根据thrift文件中所定义的接口函数所生成;Client类是在开发Thrift的客户端程序时使用,Client类是Iface的客户端存根实现, Iface在开发Thrift服务器的时候要使用,Thrift的服务器端程序要实现接口Iface。
(3)Processor类,该类主要是开发Thrift服务器程序的时候使用,该类内部定义了一个map,它保存了所有函数名到函数对象的映射,一旦Thrift接到一个函数调用请求,就从该map中根据函数名字找到该函数的函数对象,然后执行它;
(4)参数类,为每个接口函数定义一个参数类,例如:为接口getInt产生一个参数类:getInt_args,一般情况下,接口函数参数类的命名方式为:接口函数名_args;
(5)返回值类,每个接口函数定义了一个返回值类,例如:为接口getInt产生一个返回值类:getInt_result,一般情况下,接口函数返回值类的命名方式为:接口函数名_result;
参数类和返回值类中有对数据的读写操作,在参数类中,将按照协议类将调用的函数名和参数进行封装,在返回值类中,将按照协议规定读取数据。
Thrift调用过程中,Thrift客户端和服务器之间主要用到传输层类、协议层类和处理类三个主要的核心类,这三个类的相互协作共同完成rpc的整个调用过程。在调用过程中将按照以下顺序进行协同工作:
(1) 将客户端程序调用的函数名和参数传递给协议层(TProtocol),协议层将函数名和参数按照协议格式进行封装,然后封装的结果交给下层的传输层。此处需要注意:要与Thrift服务器程序所使用的协议类型一样,否则Thrift服务器程序便无法在其协议层进行数据解析;
(2) 传输层(TTransport)将协议层传递过来的数据进行处理,例如传输层的实现类TFramedTransport就是将数据封装成帧的形式,即“数据长度+数据内容”,然后将处理之后的数据通过网络发送给Thrift服务器;此处也需要注意:要与Thrift服务器程序所采用的传输层的实现类一致,否则Thrift的传输层也无法将数据进行逆向的处理;
(3) Thrift服务器通过传输层(TTransport)接收网络上传输过来的调用请求数据,然后将接收到的数据进行逆向的处理,例如传输层的实现类TFramedTransport就是将“数据长度+数据内容”形式的网络数据,转成只有数据内容的形式,然后再交付给Thrift服务器的协议类(TProtocol);
(4) Thrift服务端的协议类(TProtocol)将传输层处理之后的数据按照协议进行解封装,并将解封装之后的数据交个Processor类进行处理;
(5) Thrift服务端的Processor类根据协议层(TProtocol)解析的结果,按照函数名找到函数名所对应的函数对象;
(6) Thrift服务端使用传过来的参数调用这个找到的函数对象;
(7) Thrift服务端将函数对象执行的结果交给协议层;
(8) Thrift服务器端的协议层将函数的执行结果进行协议封装;
(9) Thrift服务器端的传输层将协议层封装的结果进行处理,例如封装成帧,然后发送给Thrift客户端程序;
(10) Thrift客户端程序的传输层将收到的网络结果进行逆向处理,得到实际的协议数据;
(11) Thrift客户端的协议层将数据按照协议格式进行解封装,然后得到具体的函数执行结果,并将其交付给调用函数;
客户端协议类和服务端协议类都是指具体实现了TProtocol接口的协议类,在实际开发过程中二者必须一样,否则便不能进行通信;同样,客户端传输类和服务端传输类是指TTransport的子类,二者也需保持一致;
在上述开发thrift客户端和服务器端程序时需要用到三个类:传输类(TTransport)、协议接口(TProtocol)和处理类(Processor),其中TTransport是抽象类,在实际开发过程中可根据具体清空选择不同的实现类;TProtocol是个协议接口,每种不同的协议都必须实现此接口才能被thrift所调用。例如TProtocol类的实现类就有TBinaryProtocol等;在Thrift生成代码的内部,还需要将待传输的内容封装成消息类TMessage。处理类(Processor)主要在开发Thrift服务器端程序的时候使用。
1、TMessage
Thrift在客户端和服务器端传递数据的时候(包括发送调用请求和返回执行结果),都是将数据按照TMessage进行组装,然后发送;TMessage包括三部分:消息的名称、消息的***和消息的类型,消息名称为字符串类型,消息的***为32位的整形,消息的类型为byte类型,消息的类型:
public final classTType {
public staticfinal byte STOP =0;
public staticfinal byte VOID =1;
public staticfinal byte BOOL =2;
public staticfinal byte BYTE =3;
public staticfinal byte DOUBLE = 4;
public staticfinal byte I16 =6;
public staticfinal byte I32 =8;
public staticfinal byte I64 =10;
public staticfinal byte STRING = 11;
public staticfinal byte STRUCT = 12;
public staticfinal byte MAP =13;
public staticfinal byte SET =14;
public staticfinal byte LIST =15;
public staticfinal byte ENUM =16;
}
2. 传输类(TTransport)
传输类或其各种实现类,都是对I/O层的一个封装,可更直观的理解为它封装了一个socket,不同的实现类有不同的封装方式,例如TFramedTransport类,它里面还封装了一个读写buf,在写入的时候,数据都先写到这个buf里面,等到写完调用该类的flush函数的时候,它会将写buf的内容,封装成帧再发送出去;
TFramedTransport是对TTransport的继承,由于tcp是基于字节流的方式进行传输,因此这种基于帧的方式传输就要求在无头无尾的字节流中每次写入和读出一个帧,TFramedTransport是按照下面的方式来组织帧的:每个帧都是按照4字节的帧长加上帧的内容来组织,帧内容就是我们要收发的数据,如下:
+---------------+---------------+
| 4字节的帧长 | 帧的内容 |
+---------------+---------------+
3. 协议接口(TProtocol)
提供了一组操作协议接口,主要用于规定采用哪种协议进行数据的读写,它内部包含一个传输类(TTransport)成员对象,通过TTransport对象从输入输出流中读写数据;它规定了很多读写方式,例如:
readByte()
readDouble()
readString()
…
每种实现类都根据自己所实现的协议来完成TProtocol接口函数的功能,例如实现了TProtocol接口的TBinaryProtocol类,对于readDouble()函数就是按照二进制的方式读取出一个Double类型的数据。
类TBinaryProtocol是TProtocol的一个实现类,TBinaryProtocol协议规定采用这种协议格式的进行消息传输时,需要为消息内容封装一个首部,TBinaryProtocol协议的首部有两种操作方式:一种是严格读写模式,一种值普通的读写模式;这两种模式下消息首部的组织方式不一样,在创建时也可以自己指定使用那种模式,但是要注意,如果要指定模式,Thrift的服务器端和客户端都需要指定。
严格读写模型下的消息首部的前16字节固定为版本号:0x8001,如图1所示;
图1二进制协议中严格读写模式下的消息组织方式
在严格读写模式下,首部中前32字节包括固定的16字节协议版本号0x8001,8字节的0x00,8字节的消息类型;然后是若干字节字符串格式的消息名称,消息名称的组织方式也是“长度+内容”的方式;再下来是32位的消息***;在***之后的才是消息内容。
普通读写模式下,没有版本信息,首部的前32字节就是消息的名称,然后是消息的名字,接下来是32为的消息***,最后是消息的内容。
图2 二进制协议中普通读写模式下的消息组织方式
通信过程中消息的首部在TBinaryProtocol类中进行通过readMessageBegin读取,通过writeMessageBegin写入;但是消息的内容读取在返回值封装类(例如:getStr_result)中进行;
(1) TBinaryProtocol的读取数据过程:
在Client中调用TBinaryProtocol读取数据的过程如下:
readMessageBegin()
…
读取数据
…
readMessageEnd()
readMessageBegin详细过程如下:
[1]首先从传输过来的网络数据中读取32位数据,然后根据首位是否为1来判断当前读到的消息是严格读写模式还是普通读写模式;如果是严格读写模式则这32位数据包括版本号和消息类型,否则这32位保存的是后面的消息名称
[2]读取消息的名称,如果是严格读写模式,则消息名称为字符串格式,保存方式为“长度+内容”的方式,如果是普通读写模式,则消息名称的长度直接根据[1]中读取的长度来读取消息名称;
[3]读取消息类型,如果是严格读写模式,则消息类型已经由[1]中读取出来了,在其32位数据中的低8位中保存着;如果是普通读写模式,则要继续读取一字节的消息类型;
[4]读取32为的消息***;
读取数据的过程是在函数返回值的封装类中来完成,根据读取的数值的类型来具体读取数据的内容;在TBinaryProtocol协议中readMessageEnd函数为空,什么都没有干。
(2) TBinaryProtocol的写入数据过程:
在sendBase函数调用TBinaryProtocol将调用函数和参数发送到Thrift服务器的过程如下:
writeMessageBegin(TMessage m)
…
写入数据到TTransport实现类的buf中
…
writeMessageEnd();
getTransport().flush();
writeMessageBegin函数需要一个参数TMessage作为消息的首部,写入过程与读取过程类似,首先判断需要执行严格读写模式还是普通读写模式,然后分别按照读写模式的具体消息将消息首部写入TBinaryProtocol的TTransport成员的buf中;
4. Thrift客户端存根代码追踪调试
下面通过跟踪附件中thrift客户端代码的test()函数,在该函数中调用了Thrift存根函数getStr,通过追踪该函数的执行过程来查看整个Thrift的调用流程:
(1)客户端代码先打开socket,然后调用存根对象的
m_transport.open();
String res = testClient.getStr("test1","test2");
(2)在getStr的存根实现中,首先发送调用请求,然后等待Thrift服务器端返回的结果:
send_getStr(srcStr1, srcStr2);
return recv_getStr();
(3)发送调用请求函数send_getStr中主要将参数存储到参数对象中,然后把参数和函数名发送出去:
getStr_args args = new getStr_args();//创建一个该函数的参数对象
args.setSrcStr1(srcStr1);//将参数值设置带参数对象中
args.setSrcStr2(srcStr2);
sendBase("getStr", args);//将函数名和参数对象发送出去
(4)sendBase函数,存根类Client继承自基类TServiceClient,sendBase函数即是在TServiceClient类中实现的,它的主要功能是调用协议类将调用的函数名和参数发送给Thrift服务器:
oprot_.writeMessageBegin(new TMessage(methodName,TMessageType.CALL, ++seqid_));//将函数名,消息类型,序号等信息存放到oprot_的TFramedTransport成员的buf中
args.write(oprot_);//将参数存放到oprot_的TFramedTransport成员的buf中
oprot_.writeMessageEnd();
oprot_.getTransport().flush();//将oprot_的TFramedTransport成员的buf中的存放的消息发送出去;
这里的oprot_就是在TProtocol的子类,本例中使用的是TBinaryProtocol,在调用TBinaryProtocol的函数时需要传入一个TMessage对象(在本节第2小节中有对TMessage的描述),这个TMessage对象的名字就是调用函数名,消息的类型为TMessageType.CALL,调用序号使用在客户端存根类中(实际上是在基类TServiceClient)中保存的一个***,每次调用时均使用此***,使用完再将序号加1。
在TBinaryProtocol中包含有一个TFramedTransport对象,而TFramedTransport对象中维护了一个缓存,上述代码中,写入函数名、参数的时候都是写入到TFramedTransport中所维护的那个缓存中,在调用TFramedTransport的flush函数的时候,flush函数首先计算缓存中数据的长度,将长度和数据内容组装成帧,然后发送出去,帧的格式按照“长度+字段”的方式组织,如:
+---------------+---------------+
| 4字节的帧长 | 帧的内容 |
+---------------+---------------+
(5)recv_getStr,在调用send_getStr将调用请求发送出去之后,存根函数getStr中将调用recv_getStr等待Thrift服务器端返回调用结果,recv_getStr的代码为:
getStr_resultresult = new getStr_result();//为接收返回结果创建一个返回值对象
receiveBase(result, "getStr");//等待Thrift服务器将结果返回
(6)receiveBase,在该函数中,首先通过协议层读取消息的首部,然后由针对getStr生成的返回值类getStr_result读取返回结果的内容;最后由协议层对象结束本次消息读取操作;如下所示:
iprot_.readMessageBegin();//通过协议层对象读取消息的首部
……
result.read(iprot_);//通过返回值类对象读取具体的返回值;
……
iprot_.readMessageEnd();//调用协议层对象结束本次消息读取
在本节第4小节中有对readMessageBegin函数的描述;
5. 处理类(Processor)
该类主要由Thrift服务器端程序使用,它是由thrift编译器根据IDL编写的thrift文件生成的具体语言的接口文件中所包含的类,例如2.5节中提到的TestThriftService.java文件,处理类(Processor)主要由thrift服务器端使用,它继承自基类TBaseProcessor。
例如,2.5节中提到服务器端程序的如下代码:
TProcessor tProcessor =
New TestThriftService.Processor<TestThriftService.Iface>(m_myService);
这里的TestThriftService.Processor就是这里提到的Processor类,包括尖括号里面的接口TestThriftService.Iface也是由thrift编译器自动生成。Processor类主要完成函数名到对应的函数对象的映射,它内部维护一个map,map的key就是接口函数的名字,而value就是接口函数所对应的函数对象,这样服务器端从网络中读取到客户端发来的函数名字的时候,就通过这个map找到该函数名对应的函数对象,然后再用客户端发来的参数调用该函数对象;在Thrift框架中,每个接口函数都有一个函数对象与之对应,这里的函数对象继承自虚基类ProcessFunction。
ProcessFunction类,它采用类似策略模式的实现方法,该类有一个字符串的成员变量,用于存放该函数对象对应的函数名字,在ProcessFunction类中主要实现了process方法,此方法的功能是通过协议层从传输层中读取并解析出调用的参数,然后再由具体的函数对象提供的getResult函数计算出结果;每个继承自虚基类ProcessFunction的函数对象都必须实现这个getResult函数,此函数内部根据函数调用参数,调用服务器端的函数,并获得执行结果;process在通过getResult函数获取到执行结果之后,通过协议类对象将结果发送给Thrift客户端程序。
Thrift服务器端程序在使用Thrrift服务框架时,需要提供以下几个条件:
(1)定义一个接口函数实现类的对象,在开发Thrift服务程序时,最主要的功能就是开发接口的实现函数,这个接口函数的实现类implements接口Iface,并实现了接口中所有函数;
(2)创建一个监听socket,Thrift服务框架将从此端口监听新的调用请求到来;
(3)创建一个实现了TProtocol接口的协议类对象,在与Thrift客户端程序通信时将使用此协议进行网络数据的封装和解封装;
(4)创建一个传输类的子类,用于和Thrift服务器之间进行数据传输
7. 参考资料
学习
Apache Thrift 官网:可下载 Thrift 工具和源码。
Thrift Features and Non-features:Thrift 的功能特点和不足之处。
Apache Thrift 介绍:介绍 Thrift 架构、协议、传输层和服务端类型,并与其他构建服务的方法 ( 如:REST) 进行比较分析。
Thrift 的安装部署:Thrift 的安装部署说明
Thrift: Scalable Cross-Language Services Implementation:Thrift 官方文档,详细介绍 Thrift 的
Thrift IDL 官方的IDL示例文件
设计
Thrift API:关于 Apache Thrift 0.6.1 构建服务端和客户端的 API 手册
Thrift 实例:Thrift 的简单应用实例
Fully async Thrift client in Java:关于 Thrift 异步客户端的介绍
developerWorks Java 技术专区:这里有数百篇关于 Java 编程各个方面的文章。
参考引用博客
http://www.ibm.com/developerworks/cn/java/j-lo-apachethrift/
http://houjixin.blog.163.com/blog/static/35628410201501654039437/
http://www.micmiu.com/soa/rpc/thrift-sample/