Java 网络编程实战——从 Socket 到 Netty,一步步理解为什么需要框架
Java 网络编程实战——从 Socket 到 Netty,一步步理解为什么需要框架
适读人群:想理解网络编程原理和 Netty 必要性的 Java 开发者 | 阅读时长:约 18 分钟 | 核心价值:从第一性原理出发,搞清楚 Netty 解决了哪些原始 Socket 编程的痛点
有人问我,Netty 到底解决了什么问题?如果不搞清楚这个,学 Netty 永远觉得"这些抽象很奇怪"。
最好的方式是从最底层的 Socket 开始,一步步暴露问题,然后看 Netty 怎么解决它们。
这篇文章我会写三个版本的服务器:BIO、NIO 手写、Netty。每个版本都会出现问题,然后下一个版本解决它。
版本一:BIO 服务器——简单但撑不住并发
最朴素的 TCP 服务器是这样的:
package com.example.network;
import java.io.*;
import java.net.*;
/**
* BIO 服务器——每个连接一个线程
* 简单直接,但并发上来之后线程数爆炸
*/
public class BioServer {
public static void main(String[] args) throws IOException {
ServerSocket serverSocket = new ServerSocket(8080);
System.out.println("BIO 服务器启动");
while (true) {
// accept() 阻塞直到有新连接
Socket clientSocket = serverSocket.accept();
System.out.println("新连接: " + clientSocket.getRemoteSocketAddress());
// 每个连接开一个线程——这就是问题所在
new Thread(() -> handleConnection(clientSocket)).start();
}
}
private static void handleConnection(Socket socket) {
try (BufferedReader reader = new BufferedReader(
new InputStreamReader(socket.getInputStream()));
PrintWriter writer = new PrintWriter(socket.getOutputStream(), true)) {
String line;
while ((line = reader.readLine()) != null) {
// 每次读到一行就 echo 回去
writer.println("Echo: " + line);
}
} catch (IOException e) {
System.err.println("连接处理出错: " + e.getMessage());
} finally {
try { socket.close(); } catch (IOException ignored) {}
}
}
}这个版本的问题很明显:每个连接一个线程。Java 线程默认栈大小是 512KB-1MB,1 万个并发连接就要 5-10GB 内存只用来存线程栈。而且线程切换的 CPU 成本也不小。
用线程池可以缓解,但如果连接都在等 IO,线程还是全部被占用。
版本二:NIO 手写服务器——能处理高并发,但代码很难维护
NIO 解决了线程和连接 1:1 的问题,但代价是代码复杂度大幅上升。
package com.example.network;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.*;
import java.util.concurrent.*;
/**
* NIO 服务器——单线程处理多连接,但代码已经很难读了
*/
public class NioServer {
private final Selector selector;
private final ByteBuffer buffer = ByteBuffer.allocate(4096);
// 用来存储不完整的请求数据(处理粘包/半包)
private final Map<SocketChannel, ByteArrayOutputStream> pendingData =
new ConcurrentHashMap<>();
public NioServer() throws IOException {
selector = Selector.open();
ServerSocketChannel server = ServerSocketChannel.open();
server.configureBlocking(false);
server.bind(new InetSocketAddress(8080));
server.register(selector, SelectionKey.OP_ACCEPT);
}
public void run() throws IOException {
System.out.println("NIO 服务器启动");
while (true) {
selector.select();
Iterator<SelectionKey> keys = selector.selectedKeys().iterator();
while (keys.hasNext()) {
SelectionKey key = keys.next();
keys.remove(); // 必须移除,否则重复处理
if (key.isAcceptable()) {
handleAccept(key);
} else if (key.isReadable()) {
handleRead(key);
} else if (key.isWritable()) {
handleWrite(key);
}
}
}
}
private void handleAccept(SelectionKey key) throws IOException {
ServerSocketChannel server = (ServerSocketChannel) key.channel();
SocketChannel client = server.accept();
client.configureBlocking(false);
client.register(selector, SelectionKey.OP_READ);
pendingData.put(client, new ByteArrayOutputStream());
System.out.println("接受连接: " + client.getRemoteAddress());
}
private void handleRead(SelectionKey key) throws IOException {
SocketChannel client = (SocketChannel) key.channel();
buffer.clear();
int read = client.read(buffer);
if (read == -1) {
pendingData.remove(client);
key.cancel();
client.close();
return;
}
buffer.flip();
ByteArrayOutputStream buf = pendingData.get(client);
buf.write(buffer.array(), 0, buffer.limit());
// 这里还要处理粘包问题!一次 read 可能包含多个完整请求,
// 也可能是一个请求的一部分——这就是手写 NIO 最烦的地方
String received = buf.toString();
if (received.contains("\n")) {
System.out.println("收到完整消息: " + received.trim());
buf.reset(); // 清空,准备接收下一个
// 注册 WRITE 事件,准备回写响应
String response = "Echo: " + received;
key.attach(response.getBytes());
key.interestOps(SelectionKey.OP_WRITE);
}
}
private void handleWrite(SelectionKey key) throws IOException {
SocketChannel client = (SocketChannel) key.channel();
byte[] response = (byte[]) key.attachment();
client.write(ByteBuffer.wrap(response));
key.interestOps(SelectionKey.OP_READ); // 回写完成,重新监听读事件
}
public static void main(String[] args) throws IOException {
new NioServer().run();
}
}这个版本已经 80+ 行了,而且还有很多没处理好的地方:
- 粘包/半包处理不完整
- 单线程,CPU 密集型操作会阻塞所有连接
- 异常处理不健壮
- 没有优雅关闭
- 没有心跳保活
如果要把这些都做好,代码量要翻 3-4 倍,而且非常难测试和维护。
踩坑实录一:粘包和半包,最让人崩溃的网络编程问题
TCP 是流协议,不是消息协议。发送方调用了三次 send(),接收方不一定收到三个独立的数据块,可能是:
- 三次 send 的数据合并成一次收到(粘包)
- 第一次 send 的数据分两次收到(半包)
- 以上组合
我第一次写网络代码的时候完全不知道这回事。测试的时候,每次只发一条消息,消息也不大,测试通过了。上了压测,并发一高,数据就开始出错,消息体被截断、两条消息合并。
查了很久才明白粘包/半包的概念。
解决方案是应用层协议定义消息边界,常见方案:
- 固定长度(简单但浪费)
- 分隔符(如
\n,Telnet 协议用这个) - 长度前缀(最常用,消息头里带消息体长度)
package com.example.network;
import java.nio.ByteBuffer;
/**
* 简单的长度前缀协议编解码
* 消息格式:[4字节长度][消息内容]
*/
public class LengthPrefixCodec {
/**
* 编码:在消息前面加上 4 字节的长度前缀
*/
public static byte[] encode(byte[] message) {
ByteBuffer buf = ByteBuffer.allocate(4 + message.length);
buf.putInt(message.length); // 4 字节长度
buf.put(message); // 消息内容
return buf.array();
}
/**
* 解码:从 buffer 里尝试读取一个完整消息
* 返回 null 表示数据还不够,等待更多数据
*/
public static byte[] decode(ByteArrayBuffer accumulator) {
// 至少要有 4 字节才能读长度
if (accumulator.size() < 4) return null;
// 偷看前 4 字节,不消耗
int messageLength = accumulator.peekInt();
// 检查完整消息是否到了
if (accumulator.size() < 4 + messageLength) return null;
// 消耗掉 4 字节长度头
accumulator.skip(4);
// 读取消息内容
return accumulator.read(messageLength);
}
}踩坑实录二:写操作注册时机错误,导致 CPU 100%
在 NIO 里,如果你一直让 OP_WRITE 保持注册状态,而 channel 实际上一直可写(大多数时候都是可写的),Selector 会一直触发 isWritable(),CPU 占用 100%。
正确做法是:只有在真的有数据要写的时候才注册 OP_WRITE,写完之后立刻取消注册。
这个坑我在手写 NIO 版本里见过很多次,甚至在一些开源代码里也见过。Netty 帮你把这个处理好了——你写数据调 ctx.writeAndFlush(),Netty 内部自动管理 OP_WRITE 的注册和取消。
版本三:Netty——框架帮你处理了所有脏活
package com.example.network;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
/**
* Netty 服务器——同样的功能,代码更清晰,功能更完善
*/
public class NettyServer {
public static void main(String[] args) throws InterruptedException {
// Boss 线程组:只负责接受连接
NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
// Worker 线程组:负责处理 IO
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap bootstrap = new ServerBootstrap()
.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
// 自动处理粘包/半包!
// LengthFieldBasedFrameDecoder: 读取长度前缀,自动拼凑完整帧
pipeline.addLast(new LengthFieldBasedFrameDecoder(65536, 0, 4, 0, 4));
// LengthFieldPrepender: 发送时自动加上长度前缀
pipeline.addLast(new LengthFieldPrepender(4));
// 业务处理器
pipeline.addLast(new EchoHandler());
}
});
ChannelFuture future = bootstrap.bind(8080).sync();
System.out.println("Netty 服务器启动");
future.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
static class EchoHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
// 此时 msg 已经是一个完整的帧(ByteBuf),粘包/半包已经被 Decoder 处理好了
ByteBuf buf = (ByteBuf) msg;
byte[] bytes = new byte[buf.readableBytes()];
buf.readBytes(bytes);
System.out.println("收到: " + new String(bytes));
// 直接写回去,Netty 帮我们加上长度前缀
ctx.writeAndFlush(ctx.alloc().buffer().writeBytes(("Echo: " + new String(bytes)).getBytes()));
buf.release(); // 释放引用计数
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
}比较一下 Netty 版本和 NIO 手写版本:
| 功能 | NIO 手写 | Netty |
|---|---|---|
| 粘包/半包 | 手动实现,容易出错 | 内置 Decoder/Encoder |
| 线程模型 | 自己管理 | Reactor 模型内置 |
| 连接管理 | 自己写 | ChannelPipeline |
| 优雅关闭 | 需要大量代码 | shutdownGracefully() |
| 内存管理 | 手动管理 DirectBuffer | 池化 ByteBuf,自动引用计数 |
| 心跳保活 | 自己实现 | IdleStateHandler 内置 |
踩坑实录三:Netty 的 ByteBuf 引用计数不释放,内存泄漏
从 BIO 切到 Netty 之后,有一个新问题:Netty 的 ByteBuf 用引用计数管理内存,不释放就泄漏。
// 容易忘记 release 的场景
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf buf = (ByteBuf) msg;
// 做了一些处理...
// 如果走了某个分支,忘记 release,内存泄漏
if (someCondition) {
ctx.writeAndFlush(response);
return; // BUG:没有 buf.release()!
}
buf.release();
}正确做法是继承 SimpleChannelInboundHandler 而不是 ChannelInboundHandlerAdapter,前者会在 channelRead 方法结束后自动调用 release():
// 更安全的写法
static class SafeEchoHandler extends SimpleChannelInboundHandler<ByteBuf> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) {
// 不需要手动 release,SimpleChannelInboundHandler 会自动处理
byte[] bytes = new byte[msg.readableBytes()];
msg.readBytes(bytes);
ctx.writeAndFlush(/* response */);
}
}这三个版本的演进说明了 Netty 的价值:它把网络编程中那些容易出错的细节(粘包/半包、线程模型、内存管理)封装成了可靠的框架组件,让你专注于业务逻辑。
