NettyExample 1 构建简单的http服务
NettyExample 1 构建简单的http服务
netty 最新版本是netty-4.1.15.Final. 本来还有Netty5,但官网已经放弃5.0了,maven仓库倒是还有5.0的版本。下载netty-4.1.15.Final源码包,里面包含一个 netty-example-4.1.15.Final-sources.jar文件,提供了比较丰富的example例子。
Netty Http服务端代码实现
import com.mdq.HttpServerTest.netty.handler.HttpServerInboundHandler;
import com.mdq.HttpServerTest.netty.handler.TestInBoundHandler1;
import com.mdq.HttpServerTest.netty.handler.TestOutBoundHandler1;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpRequestDecoder;
import io.netty.handler.codec.http.HttpResponseEncoder;
import io.netty.handler.codec.http.cors.CorsConfig;
import io.netty.handler.codec.http.cors.CorsConfigBuilder;
import io.netty.handler.codec.http.cors.CorsHandler;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.util.SelfSignedCertificate;
import io.netty.handler.stream.ChunkedWriteHandler;
public class NettyHttpServer {
private static Logger log = LoggerFactory.getLogger(NettyHttpServer.class);
static final boolean SSL = System.getProperty("ssl") != null;
static final int PORT = Integer.parseInt(System.getProperty("port", SSL ? "8443" : "8080"));
public void start(int port) throws Exception {
// Configure SSL.
final SslContext sslCtx;
if (SSL) {
SelfSignedCertificate ssc = new SelfSignedCertificate();
sslCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build();
} else {
sslCtx = null;
}
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
.childHandler(new HttpServerChannelInitializer(sslCtx))
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true);
ChannelFuture f = b.bind(port).sync();
f.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
public class HttpServerChannelInitializer extends ChannelInitializer<SocketChannel> {
private final SslContext sslCtx;
public HttpServerChannelInitializer(SslContext sslCtx) {
this.sslCtx = sslCtx;
}
@Override
public void initChannel(SocketChannel ch) {
CorsConfig corsConfig = CorsConfigBuilder.forAnyOrigin().allowNullOrigin().allowCredentials().build();
ChannelPipeline pipeline = ch.pipeline();
if (sslCtx != null) {
pipeline.addLast(sslCtx.newHandler(ch.alloc()));
}
pipeline.addLast("encoder", new HttpResponseEncoder());// server端发送httpResponse,需要进行编码
pipeline.addLast("outHandler1", new TestOutBoundHandler1("OutHandler1"));
pipeline.addLast("devoder", new HttpRequestDecoder()); // server端接收httpRequest,需要进行解码
// 聚合器,把多个消息转换为一个单一的FullHttpRequest或是FullHttpResponse
pipeline.addLast("aggregator", new HttpObjectAggregator(65536));
pipeline.addLast(new ChunkedWriteHandler()); // 块写入处理器
pipeline.addLast(new CorsHandler(corsConfig));
pipeline.addLast("InHandler1", new TestInBoundHandler1("InHandler1"));
pipeline.addLast("InHandler2", new TestInBoundHandler1("InHandler2"));
pipeline.addLast(new HttpServerInboundHandler());
}
}
public static void main(String[] args) throws Exception {
NettyHttpServer server = new NettyHttpServer();
log.info("Http Server listening on 8844 ...");
server.start(8844);
}
}
【1】、NioEventLoopGroup 是用来处理I/O操作的线程池,Netty对 EventLoopGroup 接口针对不同的传输协议提供了不同的实现。在本例子中,需要实例化两个NioEventLoopGroup,通常第一个称为“boss”,用来accept客户端连接,另一个称为“worker”,处理客户端数据的读写操作。
【2】、ServerBootstrap 是启动服务的辅助类,有关socket的参数可以通过ServerBootstrap进行设置。
【3】、这里指定NioServerSocketChannel类初始化channel用来接受客户端请求。
【4】、通常会为新SocketChannel通过添加一些handler,来设置ChannelPipeline。ChannelInitializer 是一个特殊的handler,其中initChannel方法可以为SocketChannel 的pipeline添加指定handler。
【5】、通过绑定端口8080,就可以对外提供服务了
服务端Http处理类Handler。Netty的FullHttpRequest、包含了读取uri,获取头部信息,解析Post/Get请求参数
import static io.netty.handler.codec.http.HttpHeaderNames.CONTENT_TYPE;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpHeaderValues;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpUtil;
import io.netty.handler.codec.http.HttpVersion;
public class HttpServerInboundHandler extends ChannelInboundHandlerAdapter {
private static Logger logger = LoggerFactory.getLogger(HttpServerInboundHandler.class);
private Gson gson = new Gson();
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
HashMap<String, String> params = new HashMap<String, String>();
String taskId = UUID.randomUUID().toString().replaceAll("-", "");
System.out.println(msg.getClass().toString());
if (msg instanceof FullHttpRequest) {
FullHttpRequest fullrequest = (FullHttpRequest) msg;
String uri = fullrequest.uri(); // 获取请求uri
logger.info("【url :{} taskid;{}】", uri, taskId);
System.out.println(fullrequest.headers().get("messageType")); // 获取头部信息
ByteBuf buf = fullrequest.content();
System.out.println(buf.toString(io.netty.util.CharsetUtil.UTF_8));// 打印请求的数据包
buf.release();
params = RequestParse.parseHttp(fullrequest, ""); // 解析get/post请求参数
String res = "I am OK";
FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1,
HttpResponseStatus.OK,
Unpooled.wrappedBuffer(res.getBytes("UTF-8")));
response.headers().set(CONTENT_TYPE, "text/plain");
response.headers().set(CONTENT_TYPE, response.content().readableBytes());
boolean keepAlive = HttpUtil.isKeepAlive(fullrequest);
if (keepAlive) {
response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);
ctx.write(response);
} else {
ctx.write(response).addListener(ChannelFutureListener.CLOSE);
}
ctx.flush();
}
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
logger.error(cause.getMessage());
ctx.close();
}
}
自定义的ChannelInboundHandlerAdapter
public class TestInBoundHandler1 extends ChannelInboundHandlerAdapter{
private static Logger logger = LoggerFactory.getLogger(TestInBoundHandler1.class);
private String message;
public TestInBoundHandler1(String message) {
this.message = message;
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
logger.info("this is TestInBoundHandler, message is {}",message);
super.channelRead(ctx, msg);
}
}
自定义的ChannelOutboundHandlerAdapter
public class TestOutBoundHandler1 extends ChannelOutboundHandlerAdapter{
private static Logger logger = LoggerFactory.getLogger(TestOutBoundHandler1.class);
private String message;
public TestOutBoundHandler1(String message) {
this.message = message;
}
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
logger.info("this is TestOutBoundHandler, message : {}",message);
super.write(ctx, msg, promise);
}
}
Http请求参数解析类
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.QueryStringDecoder;
import io.netty.handler.codec.http.multipart.DefaultHttpDataFactory;
import io.netty.handler.codec.http.multipart.HttpDataFactory;
import io.netty.handler.codec.http.multipart.HttpPostRequestDecoder;
import io.netty.handler.codec.http.multipart.InterfaceHttpData;
import io.netty.handler.codec.http.multipart.InterfaceHttpData.HttpDataType;
import io.netty.handler.codec.http.multipart.MemoryAttribute;
public class RequestParse {
private static Logger logger = LoggerFactory.getLogger(RequestParse.class);
private static final HttpDataFactory factory = new DefaultHttpDataFactory(DefaultHttpDataFactory.MINSIZE);
private static Gson gson = new Gson();
/**
* 解析Netty HttpRequest的请求参数,GET/POST的解析结果都为一个Map
* @param request
* @param taskId 本次请求taskId
* @return HashMap<String, String>
*/
public static HashMap<String, String> parseHttp(HttpRequest request, String taskId) throws Exception {
HashMap<String, String> params = new HashMap<String, String>();
HttpMethod method = request.method();
String uri = request.uri();
if (HttpMethod.GET.equals(method)) {
QueryStringDecoder paramdecoder = new QueryStringDecoder(uri);
for (Entry<String, List<String>> e : paramdecoder.parameters().entrySet()) {
params.put(e.getKey(), e.getValue().get(0));
}
logger.info("【GET 接受参数】:{}", gson.toJson(params));
} else if (HttpMethod.POST.equals(method)) {
HttpPostRequestDecoder decoder = new HttpPostRequestDecoder(
new DefaultHttpDataFactory(false), request);
List<InterfaceHttpData> postData = decoder.getBodyHttpDatas(); //
for(InterfaceHttpData data:postData){
if (data.getHttpDataType() == HttpDataType.Attribute) {
MemoryAttribute attribute = (MemoryAttribute) data;
params.put(attribute.getName(), attribute.getValue());
}
}
logger.info("【POST 接受参数】:{}", gson.toJson(params));
} else {
throw new MethodNotSupportedException("not sopport such method. please use CET or POST");
}
return params;
}
}
Netty Http客户端
public class NettyHttpClient {
public void connect(String host, int port) throws Exception {
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(workerGroup);
b.channel(NioSocketChannel.class);
b.option(ChannelOption.SO_KEEPALIVE, true);
b.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
// 客户端接收到的是httpResponse响应,所以要使用HttpResponseDecoder进行解码
ch.pipeline().addLast(new HttpResponseDecoder());
// 客户端发送的是httprequest,所以要使用HttpRequestEncoder进行编码
ch.pipeline().addLast(new HttpRequestEncoder());
ch.pipeline().addLast(new HttpClientInboundHandler());
}
});
// Start the client.
ChannelFuture f = b.connect(host, port).sync(); // (5)
String uriSting = new URI("http://127.0.0.1:8844?name=mm&age=12").toASCIIString();
String msg = "Are you ok?";
DefaultFullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, uriSting,
Unpooled.wrappedBuffer(msg.getBytes()));
// 构建http请求
request.headers().set(HttpHeaderNames.HOST, host);
request.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);
request.headers().set(HttpHeaderNames.CONTENT_LENGTH, request.content().readableBytes());
request.headers().set("messageType", "normal");
request.headers().set("businessType", "testServerState");
// 发送http请求
f.channel().write(request);
f.channel().flush();
f.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
NettyHttpClient client = new NettyHttpClient();
client.connect("127.0.0.1", 8844);
}
}
Netty 客户端Handler处理
public class HttpClientInboundHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof HttpResponse) {
HttpResponse response = (HttpResponse) msg;
System.out.println("CONTENT_TYPE:" + response.headers().get(HttpHeaderNames.CONTENT_TYPE));
}
if (msg instanceof HttpContent) {
HttpContent content = (HttpContent) msg;
ByteBuf buf = content.content();
System.out.println(buf.toString(io.netty.util.CharsetUtil.UTF_8));
buf.release();
}
}
}
HttpMethod:主要是对method的封装,可以获取请求方式
HttpVersion: 对version的封装,netty包含1.0和1.1的版本
HttpHeaders:包含对header的内容进行封装及操作,可以通过headers()获取头信息
HttpContent:是对body进行封装,本质上就是一个ByteBuf。如果ByteBuf的长度是固定的,则请求的body过大,可能包含多个HttpContent,其中最后一个为LastHttpContent(空的HttpContent),用来说明body的结束。
HttpRequest:主要包含对Request Line和Header的组合
FullHttpRequest: 主要包含对HttpRequest和httpContent的组合。继承了HttpRequest和httpContent
QueryStringDecoder: 主要是对url进行封装,解析path和url上面的参数。(Tips:在tomcat中如果提交的post请求是application/x-www-form-urlencoded,则getParameter获取的是包含url后面和body里面所有的参数,而在netty中,获取的仅仅是url上面的参数)
HttpPostRequestDecoder:通过request请求创建一个HttpPostRequestDecoder。对HttpContent进行解析,获取Post请求的数据报并且解析参数。注意的是要求request是FullHttpRequest,因为单纯的HttpRequest不包含HttpContent数据。
注意事项:
1、可以通过在Netty的Chanel中发送HttpRequest对象,完成发送http请求的要求,同时可以对HttpHeader进行设置。
2、上面涉及到的http对象都是Netty自己封装的,不是标准的。无法使用一般的http请求,需要使用Netty创建对应的HttpClient。
(Netty服务端可以接受一般Http请求,但是请求方无法解析Netty服务端返回的HttpResponse.用Fiddler抓包工具显示是502错误)。
需要加上
ChannelFuture 少了对 CLOSE 事件的监听器
HTTP1.1 版本默认是长连接方式(也就是 Connection=Keep-Alive),而我在 netty HTTP 服务器中并没有对长、短连接方式做区别处理,并且在 HttpResponse 响应中并没有显式加上 Content-Length 头部信息,恰巧 netty Http 协议栈并没有在框架上做这件工作,导致服务端虽然把响应消息发出去了,但是客户端并不知道你是否发送完成了(即没办法判断数据是否已经发送完)。
if (keepAlive) {
response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);
response.headers().setInt(HttpHeaderNames.CONTENT_LENGTH, response.content().readableBytes());
ctx.write(response);
// If keep-alive is off, close the connection once the content is fully written.
ctx.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
} else {
ctx.write(response).addListener(ChannelFutureListener.CLOSE);
}
netty处理http请求,为什么HttpRequest和HttpContent是分两次过来的?
因为HttpRequestDecoder对Netty请求进行了解码,会产生HttpRequest和HttpContent。可通过HttpObjectAggregator进行聚合成一个FullHttpRequest。
pipeline.addLast("aggregator", new HttpObjectAggregator(65536));// 聚合器,把多个消息转换为一个单一的FullHttpRequest或是FullHttpResponse