spring integration-rmi
1.spring integrtion 说明
2.概念
2.1 Message
Header 包含通用的信息例如
- id
- timestamp
- correlation id
- return address
Payload 则是POJO对象,例如User
public class User {
private int id;
private String name;
//getter and setter
}
2.2 Message Channel
Channel 分为
- P2P channel里的消息只能由一个消费者
- Publish/Subscribe channel里的消息会广播给所有的订阅者
Pollable Channels 会缓存消息到一个队列里,避免发送过量的消息给消费者而导致消费者崩溃
2.3 Message Endpoint
消息的终端,就是消息的消费者,分为
- Transformer :将channel里的消息进行转换
例如 xml 转成Json - Filter :过滤哪些消息是想要消费的
- Router :路由,将消息转到哪些管道里
- Splitter :消息分割
- Aggregator :消息汇总
- Service Activator : 比较重要,这个一般配置在我们已经写好的服务,想暴露出去
- Channel Adapter :不是很明白
这些类似流的处理,用官方的sample 点cafe来表示
流程是
- 调用Cafe这个网关类去点cafe(封装成为上面提到的概念里的消息-
Message
) - cafe网关有个关联通道是orders,cafe消息会发送到orders这个通道里
- orderSplitter是上面说的Message Endpoint里的Splitter,将消息组装成List放到drinks通道里
- DrinkRouter会订阅drinks通道里的信息,按冷热进行路由到不同的通道里hotDrinkBarista或coldDrinkBarista
- Barista这个是订阅通道hotDrinkBarista或coldDrinkBarista来处理产生drink,放到preparedDrinks通道里
- Waiter会对所有preparedDrinks通道里的drink集合最后发到deliveries网关
这个例子很好的解释了 message gateway ,channal, message,message endpoint的几个重要概念
3.重要的类
3.1 消息网关(Messaging Gateways)
网关最重要的功能就是屏蔽调用channel细节,比如
MessageChannel 的send(Message msg)方法。
而是改成接口的方式,然后注册成为一个Bean,直接调用即可
4.例子
原来写了个UserService,很简单,只有加user和查找user2个功能
4.1 RMI 调用的例子
现在要暴露出来给其它系统用。原来的代码如下
@Service
public class UserService {
private ConcurrentHashMap<Integer,User> map = new ConcurrentHashMap();
public void addUser(User user){
map.put(user.getId(),user);
}
public User findUserById(int id){
return map.get(id);
}
}
依照spring integration不侵入的原则,只需要配置消息的终端到这个类上,即加上 @ServiceActivator,这个annotation是上面提到的消息终端的一种,监听到addUser和findUserById这2个通道
@Service
public class UserService {
private ConcurrentHashMap<Integer,User> map = new ConcurrentHashMap();
@ServiceActivator(inputChannel = "addUser")
public void addUser(User user){
map.put(user.getId(),user);
}
@ServiceActivator(inputChannel = "findUserById")
public User findUserById(int id){
return map.get(id);
}
}
public class User {
private int id;
private String name;
//getter and setter
}
启动springboot, RmiInboundGateway网关会绑定一个rmi-input。
@SpringBootApplication
public class ServerApplication {
public static void main(String[] args) {
SpringApplicationBuilder builder =new SpringApplicationBuilder(ServerApplication.class);
builder.run(args);
}
@Bean
public RmiInboundGateway inbound(@Qualifier("rmi-input") MessageChannel channel) {
RmiInboundGateway gateway = new RmiInboundGateway();
gateway.setRequestChannel(channel);
//默认就是1099
gateway.setRegistryPort(1099);
return gateway;
}
@Bean("rmi-input")
public MessageChannel requestChannel() {
PublishSubscribeChannel channel = new PublishSubscribeChannel();
return channel;
}
}
@Router这个message endpoint监听的rmi-input,然后路由到不同channel,这里是找到message里的头部里的method值
@Component
public class UserServiceRouter {
@Router(inputChannel = "rmi-input")
public String route(Message msg){
System.out.println("msg payload : "+msg.getPayload());
System.out.println("msg method : "+msg.getHeaders().get("method"));
return msg.getHeaders().get("method").toString();
}
}
另个项目里Client需要去调用这个UserService
@SpringBootApplication
public class ClientApplication {
public static void main(String[] args) {
SpringApplicationBuilder builder =new SpringApplicationBuilder(ClientApplication.class);
ConfigurableApplicationContext context = builder.run(args);
RmiOutboundGateway gw = (RmiOutboundGateway)context.getBean("outbound");
Message<User> msg =MessageBuilder.withPayload(new User(1,"rechard"))
.setHeader("method","addUser")
.build();
gw.handleRequestMessage(msg);
Message<Integer> msg2 =MessageBuilder.withPayload(1)
.setHeader("method","findUserById")
.build();
Object obj = gw.handleRequestMessage(msg2);
System.out.println("return obj:"+obj);
}
@Bean
@ServiceActivator(inputChannel="inChannel")
public RmiOutboundGateway outbound() {
//DESKTOP-TTESTAQ 是机器名,也可以是IP地址
RmiOutboundGateway gateway = new RmiOutboundGateway("rmi://DESKTOP-TTESTAQ/org.springframework.integration.rmiGateway.rmi-input");
return gateway;
}
}
整个流程(粗体代表channel)
RmiOutboundGateway->rmi://DESKTOP-TTESTAQ/org.springframework.integration.rmiGateway.rmi-input->RmiInboundGateway->rmi-input->UserServiceRouter->addUser or findUserById->UserService