使用Netty HTTP客户端重试请求

使用Netty HTTP客户端重试请求

问题描述:

如何在基于Netty的HTTP客户端中重试HTTP请求?使用Netty HTTP客户端重试请求

考虑以下的处理程序,它试图如果HTTP响应代码503被接收到1秒后重试HTTP请求:

public class RetryChannelHandler extends ChannelDuplexHandler { 
    List<HttpObject> requestParts; 

    @Override 
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { 
     if (msg instanceof HttpRequest) { 
      requestParts = new ArrayList<>(); 
      requestParts.add((HttpRequest)msg); 
     } else if (msg instanceof HttpObject) { 
      requestParts.add((HttpObject)msg); 
     } 

     super.write(ctx, msg, promise); 
    } 

    @Override 
    public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception { 
     if (msg instanceof HttpResponse) { 
      HttpResponse res = (HttpResponse)msg; 
      if (res.status().code() == 503) { 
       ctx.executor().schedule(new Runnable() { 
        @Override 
        public void run() { 
         for (HttpObject obj : requestParts) { 
          ctx.channel().write(obj); 
         } 
        } 
       }, 1000, TimeUnit.MILLISECONDS); 
      } else { 
       super.channelRead(ctx, msg); 
      } 
     } else { 
      super.channelRead(ctx, msg); 
     } 
    } 
} 

当我写入信道在本例中,在管道中的其他处理程序请参阅HttpObjects,但HttpRequest实际上并未再次执行 - 只收到一个HttpResponse。

我想我只是在这种情况下滥用通道,我需要创建一个新的通道(代表到服务器的新连接)来执行重试。我不清楚如何从Handler的上下文创建新的频道,以及我是否真的在Netty的正确层面上做这种逻辑。

任何有关如何实现我所描述的行为的指导将不胜感激。

您还需要在write(...)之后致电flush(),否则将不会刷新频道。此外,您需要确保您可能保留()和复制()HttpContent,否则您最终可能会尝试编写已发布的HttpContent对象。

像这样(未测试):

public class RetryChannelHandler extends ChannelDuplexHandler { 
    Queue<HttpObject> requestParts; 

    @Override 
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { 
     if (msg instanceof HttpRequest) { 
      requestParts = new ArrayDeque<>(); 
      requestParts.add((HttpRequest)msg); 
     } else if (msg instanceof HttpContent) { 
      requestParts.add(((HttpContent)msg).duplicate().retain()); 
     } 

     super.write(ctx, msg, promise); 
    } 

    @Override 
    public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception { 
     if (msg instanceof HttpResponse) { 
      HttpResponse res = (HttpResponse)msg; 
      if (res.status().code() == 503) { 
       ctx.executor().schedule(new Runnable() { 
        @Override 
        public void run() { 
         HttpObject obj; 
         while ((obj = requestParts.poll()) != null) { 
          ctx.write(obj); 
         } 
         ctx.flush(); 
        } 
       }, 1000, TimeUnit.MILLISECONDS); 
      } else { 
       HttpObject obj; 
       while ((obj = requestParts.poll()) != null) { 
        ReferenceCountUtil.release(obj); 
       } 
       super.channelRead(ctx, msg); 
      } 
     } else { 
      super.channelRead(ctx, msg); 
     } 
    } 
} 
+0

谢谢!经过对实现的小调整后,这解决了我的问题。 –