使用Netty HTTP客户端重试请求
问题描述:
如何在基于Netty的HTTP客户端中重试HTTP请求?使用Netty HTTP客户端重试请求
考虑以下的处理程序,它试图如果HTTP响应代码503被接收到1秒后重试HTTP请求:
public class RetryChannelHandler extends ChannelDuplexHandler {
List<HttpObject> requestParts;
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
if (msg instanceof HttpRequest) {
requestParts = new ArrayList<>();
requestParts.add((HttpRequest)msg);
} else if (msg instanceof HttpObject) {
requestParts.add((HttpObject)msg);
}
super.write(ctx, msg, promise);
}
@Override
public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof HttpResponse) {
HttpResponse res = (HttpResponse)msg;
if (res.status().code() == 503) {
ctx.executor().schedule(new Runnable() {
@Override
public void run() {
for (HttpObject obj : requestParts) {
ctx.channel().write(obj);
}
}
}, 1000, TimeUnit.MILLISECONDS);
} else {
super.channelRead(ctx, msg);
}
} else {
super.channelRead(ctx, msg);
}
}
}
当我写入信道在本例中,在管道中的其他处理程序请参阅HttpObjects,但HttpRequest实际上并未再次执行 - 只收到一个HttpResponse。
我想我只是在这种情况下滥用通道,我需要创建一个新的通道(代表到服务器的新连接)来执行重试。我不清楚如何从Handler的上下文创建新的频道,以及我是否真的在Netty的正确层面上做这种逻辑。
任何有关如何实现我所描述的行为的指导将不胜感激。
答
您还需要在write(...)
之后致电flush()
,否则将不会刷新频道。此外,您需要确保您可能保留()和复制()HttpContent
,否则您最终可能会尝试编写已发布的HttpContent
对象。
像这样(未测试):
public class RetryChannelHandler extends ChannelDuplexHandler {
Queue<HttpObject> requestParts;
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
if (msg instanceof HttpRequest) {
requestParts = new ArrayDeque<>();
requestParts.add((HttpRequest)msg);
} else if (msg instanceof HttpContent) {
requestParts.add(((HttpContent)msg).duplicate().retain());
}
super.write(ctx, msg, promise);
}
@Override
public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof HttpResponse) {
HttpResponse res = (HttpResponse)msg;
if (res.status().code() == 503) {
ctx.executor().schedule(new Runnable() {
@Override
public void run() {
HttpObject obj;
while ((obj = requestParts.poll()) != null) {
ctx.write(obj);
}
ctx.flush();
}
}, 1000, TimeUnit.MILLISECONDS);
} else {
HttpObject obj;
while ((obj = requestParts.poll()) != null) {
ReferenceCountUtil.release(obj);
}
super.channelRead(ctx, msg);
}
} else {
super.channelRead(ctx, msg);
}
}
}
谢谢!经过对实现的小调整后,这解决了我的问题。 –