Netty 核心架构深度解析——EventLoop、Pipeline、Handler 的协作机制
Netty 核心架构深度解析——EventLoop、Pipeline、Handler 的协作机制
适读人群:使用过 Netty 但对内部机制理解不够深的 Java 开发者 | 阅读时长:约 17 分钟 | 核心价值:真正搞懂 Netty 三个核心组件的协作关系,不再靠复制粘贴写 Netty 代码
我见过太多人用 Netty 的姿势是:找一个 demo,复制粘贴,跑起来了就行。
这没什么问题,大多数项目够用了。但一旦出了问题——连接莫名其妙断了、消息乱序了、内存泄漏了——不理解 Netty 内部机制的人就完全不知道从哪下手。
这篇文章我想把 Netty 的三个核心组件讲清楚:EventLoop、Pipeline、Handler。不是翻译官方文档,是用我自己踩坑之后建立的理解。
EventLoop:不是线程池,是事件循环
很多人一看 NioEventLoopGroup,以为这就是个线程池。这个理解不够准确。
EventLoop 是一个单线程的事件循环,它做三件事:
- 检查是否有 IO 事件(通过 Selector)
- 处理 IO 事件(调用 Channel 的 read/write)
- 执行已提交的任务(
eventLoop.submit(Runnable)提交的)
这三件事在同一个线程里轮流做,没有任何并发。
NioEventLoopGroup 里包含多个 EventLoop,默认数量是 2 × CPU 核数。每个 Channel 绑定到一个 EventLoop,这个绑定关系终身不变(除非显式迁移)。
package com.example.netty;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
public class EventLoopDemo {
public static void main(String[] args) throws InterruptedException {
// bossGroup:只负责 ServerSocketChannel 的 accept 事件
// 通常只需要 1 个 EventLoop
NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
// workerGroup:负责所有已建立连接的 IO 事件
// 默认 2 * CPU 核数个 EventLoop
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap bootstrap = new ServerBootstrap()
.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
// 这里展示一个关键点:可以为特定 Handler 指定单独的线程池
// 这样耗时操作不会阻塞 EventLoop
EventLoopGroup businessGroup = new DefaultEventLoopGroup(4);
ch.pipeline().addLast(new QuickHandler()); // 在 EventLoop 线程里执行
ch.pipeline().addLast(businessGroup, new SlowBusinessHandler()); // 在单独线程池里执行
}
});
bootstrap.bind(8080).sync().channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
static class QuickHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
// 快速处理:直接在 EventLoop 线程里完成,不能有阻塞操作!
ctx.fireChannelRead(msg); // 传给下一个 Handler
}
}
static class SlowBusinessHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
// 这个 Handler 跑在 businessGroup 的线程上
// 可以有数据库查询、HTTP 调用等耗时操作
// 不会阻塞 EventLoop,不影响其他连接的 IO
}
}
}核心原则:不要在 EventLoop 线程里做阻塞操作。
这是 Netty 使用中最重要的规则。如果你在 Handler 里调用了 Thread.sleep()、阻塞的数据库查询、同步 HTTP 调用,那这个 EventLoop 管理的所有连接都会被卡住。
踩坑实录一:在 Handler 里直接查数据库,所有连接卡死
我刚接触 Netty 的时候,写了这样的代码:
// 错误示范:在 EventLoop 线程里做数据库查询
public class WrongHandler extends ChannelInboundHandlerAdapter {
private final UserRepository userRepository;
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
// 这里会阻塞 EventLoop!!!
// 数据库查询可能要 5ms-50ms,这段时间 EventLoop 管理的所有连接都在等
User user = userRepository.findById(getUserId(msg));
ctx.writeAndFlush(buildResponse(user));
}
}压测的时候,并发一上来,响应时间急剧变长,但 CPU 和数据库都不忙。监控显示,EventLoop 的 pendingTasks 数量一直在增长——任务堆积了。
解法是把耗时操作放到单独的线程池:
package com.example.netty;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class CorrectHandler extends ChannelInboundHandlerAdapter {
// 业务线程池,专门处理耗时操作
private final ExecutorService businessExecutor =
Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2);
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
// 立刻把耗时操作扔到业务线程池,不阻塞 EventLoop
businessExecutor.submit(() -> {
try {
Object result = doSlowBusiness(msg); // 数据库查询等耗时操作
// 写回结果时,通过 ctx 回到 EventLoop 线程
// ctx.writeAndFlush 是线程安全的,可以从任意线程调用
ctx.writeAndFlush(result);
} catch (Exception e) {
ctx.fireExceptionCaught(e);
}
});
}
private Object doSlowBusiness(Object msg) {
// 耗时业务逻辑...
return null;
}
}Pipeline:责任链模式的 IO 处理管道
每个 Channel 都有一个 ChannelPipeline,里面是一个双向链表,包含多个 ChannelHandler。
入站事件(网络收到数据)从头向尾传播,经过所有 ChannelInboundHandler。 出站事件(向网络发送数据)从尾向头传播,经过所有 ChannelOutboundHandler。
Pipeline:
[InboundA] -> [InboundB] -> [InboundC] (入站,从左到右)
[OutboundA] <- [OutboundB] <- [OutboundC] (出站,从右到左)
实际上 Inbound 和 Outbound Handler 可以混合排列在同一个 Pipeline 里一个重要但容易误解的点:ctx.write() 和 channel.write() 的行为不同。
ctx.write(msg):从当前 Handler 的位置向出站方向传播channel.write(msg)或ctx.channel().write(msg):从 Pipeline 的尾部重新开始传播
package com.example.netty;
import io.netty.channel.*;
import io.netty.buffer.ByteBuf;
/**
* 演示 Pipeline 传播方向
*/
public class PipelineDemo {
static class LoggingEncoder extends ChannelOutboundHandlerAdapter {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
throws Exception {
System.out.println("[LoggingEncoder] 准备发送数据");
// 继续往出站方向传播,不传就断链了
ctx.write(msg, promise);
}
}
static class CompressionEncoder extends ChannelOutboundHandlerAdapter {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
throws Exception {
System.out.println("[CompressionEncoder] 压缩数据");
ByteBuf compressed = compress((ByteBuf) msg);
ctx.write(compressed, promise); // 传压缩后的数据
}
private ByteBuf compress(ByteBuf buf) { return buf; /* 省略压缩逻辑 */ }
}
static class BusinessHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
// ctx.write:从 BusinessHandler 位置向出站方向传,经过 CompressionEncoder 和 LoggingEncoder
ctx.writeAndFlush("response");
// 如果用 ctx.channel().writeAndFlush:从 Pipeline 尾部开始,同样经过所有出站 Handler
// ctx.channel().writeAndFlush("response"); // 效果相同,但稍慢(从尾部开始)
}
}
}Handler:核心业务逻辑的载体
Handler 分两类:
ChannelInboundHandler:处理入站事件(数据读取、连接建立/断开、异常等)ChannelOutboundHandler:处理出站操作(写数据、关闭连接等)
实际使用中常用 SimpleChannelInboundHandler<T> 和 ChannelDuplexHandler。
重要:@Sharable 注解
如果 Handler 是无状态的(没有成员变量存储每个连接的状态),可以加 @Sharable 注解,让多个 Channel 共享同一个 Handler 实例,节省对象创建开销。
如果 Handler 有状态(如记录某个连接的消息计数),不能加 @Sharable,每个 Channel 必须有独立的 Handler 实例。
package com.example.netty;
import io.netty.channel.*;
import io.netty.util.AttributeKey;
/**
* 有状态的 Handler:不能用 @Sharable,每个 Channel 独立实例
*/
public class StatefulHandler extends ChannelInboundHandlerAdapter {
// 使用 Channel Attribute 存储每个连接的状态,比成员变量更安全
private static final AttributeKey<Integer> MSG_COUNT =
AttributeKey.valueOf("messageCount");
@Override
public void channelActive(ChannelHandlerContext ctx) {
// 连接建立,初始化计数器
ctx.channel().attr(MSG_COUNT).set(0);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
Integer count = ctx.channel().attr(MSG_COUNT).get();
ctx.channel().attr(MSG_COUNT).set(count + 1);
System.out.println("连接 " + ctx.channel().id() + " 收到第 " + (count + 1) + " 条消息");
ctx.fireChannelRead(msg);
}
}
/**
* 无状态的 Handler:可以用 @Sharable,所有 Channel 共用一个实例
*/
@ChannelHandler.Sharable
class LogHandler extends ChannelInboundHandlerAdapter {
// 没有任何成员变量,是线程安全的,可以 @Sharable
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
System.out.println("[LOG] 收到消息,channel=" + ctx.channel().id());
ctx.fireChannelRead(msg); // 别忘了传递下去!
}
}踩坑实录二:忘了调 ctx.fireChannelRead,数据在 Handler 里"消失"
这是 Netty 初学者最常见的 bug。
Pipeline 是责任链,每个 Handler 处理完之后,如果还想让后续 Handler 处理,必须调用 ctx.fireChannelRead(msg) 把消息传下去。如果不调,消息就"停"在这个 Handler 里了,后面的 Handler 什么都收不到。
// 有 BUG:忘了 fireChannelRead
public class BuggyDecoder extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
// 做了解码操作...
Object decoded = decode(msg);
// BUG!忘记把解码结果传给下一个 Handler
// ctx.fireChannelRead(decoded); // 这行被遗漏了
}
}如果你发现数据确实到了某个 Handler(加日志可以看到),但后面的 Handler 什么都没收到,第一反应就是检查 fireChannelRead 有没有调。
踩坑实录三:Handler 加到 Pipeline 的顺序搞错了
Pipeline 里 Handler 的顺序决定了处理顺序,这个顺序搞错了,业务逻辑会乱。
一个典型的错误:把认证 Handler 放在业务 Handler 后面了,导致未认证的请求直接被处理了。
// 错误的顺序
pipeline.addLast(new BusinessHandler()); // 业务处理
pipeline.addLast(new AuthHandler()); // 认证——已经太晚了!
// 正确的顺序
pipeline.addLast(new AuthHandler()); // 先认证
pipeline.addLast(new BusinessHandler()); // 认证通过后才处理业务还有一个容易出错的地方:Decoder 一定要在业务 Handler 之前,Encoder 一定要在发送方向的最终处理之前:
// 正确的 Pipeline 顺序(入站从上到下,出站从下到上)
pipeline.addLast(new LengthFieldBasedFrameDecoder(...)); // 入站:先解帧
pipeline.addLast(new MyMessageDecoder()); // 入站:再解码
pipeline.addLast(new LengthFieldPrepender(4)); // 出站:加长度前缀
pipeline.addLast(new MyMessageEncoder()); // 出站:编码
pipeline.addLast(new BusinessHandler()); // 业务处理三者的协作关系总结
- EventLoop:提供单线程执行环境,保证每个 Channel 的 IO 事件串行处理,没有并发
- Pipeline:定义 IO 事件的处理链,决定每个事件经过哪些 Handler,以什么顺序
- Handler:具体的事件处理逻辑,可以是解码、编码、认证、业务处理等任何功能
它们的关系是:EventLoop 驱动 IO 事件,IO 事件流过 Pipeline,Pipeline 里的 Handler 逐一处理。
理解了这三者,你看 Netty 的代码就不再是黑盒了。
