春季卡夫卡异步发送呼叫块

问题描述:

我正在使用Spring-Kafka 1.2.1版本,并且当卡夫卡服务器关闭/不可达时,异步发送呼叫块一段时间。它似乎是TCP超时。该代码是这样的:春季卡夫卡异步发送呼叫块

ListenableFuture<SendResult<K, V>> future = kafkaTemplate.send(topic, key, message); 
future.addCallback(new ListenableFutureCallback<SendResult<K, V>>() { 
    @Override 
    public void onSuccess(SendResult<K, V> result) { 
     ... 
    } 

    @Override 
    public void onFailure(Throwable ex) { 
     ... 
    } 
}); 

我在春天,卡夫卡代码采取一个非常快看,它似乎只是传递任务一起卡夫卡客户端库,翻译回调互动的未来对象交互。查看kafka客户端库,代码变得更加复杂,我没有花时间理解这一切,但我想它可能是在同一个线程中进行远程调用(至少是元数据?)。

作为一个用户,我期望Spring-Kafka方法能够立即返回未来,即使远程kafka服务器无法访问。

任何确认,如果我的理解是错误的,或者这是一个错误,将受到欢迎。目前为止,我最终使其在异步方面处于异步状态。

另一个问题是Spring-Kafka文档在开始时说,它提供了同步和异步发送方法。我找不到任何不会返回期货的方法,也许文档需要更新。

如果需要,我很乐意提供任何进一步的细节。谢谢。

只是为了确定。您是否应用了@EnableAsync注释?我想说这可能是指定未来行为的关键<>

+0

谢谢您的答复。不,我没有使用这个注释,文档中没有任何关于它的信息。我会尝试一下并让你知道它是否能解决问题。 –

+0

使用@EnableAsync不幸的是没有改变任何东西=/ –

除了配置类上的@EnableAsync注释之外,在您调用此代码时需要在方法上使用@Async注释。

http://www.baeldung.com/spring-async

这里是一些代码fragements。卡夫卡制片配置:

@EnableAsync 
@Configuration 
public class KafkaProducerConfig { 

    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaProducerConfig.class); 

    @Value("${kafka.brokers}") 
    private String servers; 

    @Bean 
    public Map<String, Object> producerConfigs() { 
     Map<String, Object> props = new HashMap<>(); 
     props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers); 
     props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); 
     props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonDeserializer.class); 
     return props; 
    } 

    @Bean 
    public ProducerFactory<String, GenericMessage> producerFactory(ObjectMapper objectMapper) { 
     return new DefaultKafkaProducerFactory<>(producerConfigs(), new StringSerializer(), new JsonSerializer(objectMapper)); 
    } 

    @Bean 
    public KafkaTemplate<String, GenericMessage> kafkaTemplate(ObjectMapper objectMapper) { 
     return new KafkaTemplate<String, GenericMessage>(producerFactory(objectMapper)); 
    } 

    @Bean 
    public Producer producer() { 
     return new Producer(); 
    } 
} 

而且生产者本身:

public class Producer { 

    public static final Logger LOGGER = LoggerFactory.getLogger(Producer.class); 

    @Autowired 
    private KafkaTemplate<String, GenericMessage> kafkaTemplate; 

    @Async 
    public void send(String topic, GenericMessage message) { 
     ListenableFuture<SendResult<String, GenericMessage>> future = kafkaTemplate.send(topic, message); 
     future.addCallback(new ListenableFutureCallback<SendResult<String, GenericMessage>>() { 

      @Override 
      public void onSuccess(final SendResult<String, GenericMessage> message) { 
       LOGGER.info("sent message= " + message + " with offset= " + message.getRecordMetadata().offset()); 
      } 

      @Override 
      public void onFailure(final Throwable throwable) { 
       LOGGER.error("unable to send message= " + message, throwable); 
      } 
     }); 
    } 
} 
+0

谢谢你的回应。不,我没有使用这些注释,文档中没有提及这些注释。我会尝试并告诉你是否解决问题。 –

+0

使用EnableAsync不幸的是没有改变任何东西。另外,从链接中我明白,它是spring-kafka库应该使用Async注释,因为它为我提供了未来的对象。 –

+0

我同意你的观点,对我而言,你提供期货并不合理,但我必须放置注释。在我们的案例中,放置这两个注释使它像魅力一样工作。我将编辑添加一些代码片段的响应。 –