Python 网络编程实战——socket、asyncio TCP Server、WebSocket 服务
Python 网络编程实战——socket、asyncio TCP Server、WebSocket 服务
适读人群:想深入理解网络编程原理的 Python 开发者 | 阅读时长:约18分钟 | 核心价值:从 socket 底层到 WebSocket 服务的完整网络编程路径
那个凌晨三点的紧急求援
去年有个做物联网的朋友老周,他们的设备端通过 TCP 长连接往后端推送传感器数据,每秒大约有500个设备同时在线。有一天凌晨三点,他发消息给我:"张哥,服务宕了,我们客户那边的生产线停了,帮我看看。"
我远程进了他的服务器,发现一个问题:他用的是同步 socket + 多线程模型,每个连接开一个线程,500个设备就是500个线程,每个线程栈约8MB,光线程栈就吃掉了4GB内存,加上业务逻辑,服务器直接 OOM。
那次紧急修复花了将近3个小时,把同步多线程改成了 asyncio,1000个并发连接的内存消耗降到了不到200MB。从那之后,老周专门抽了一周认真学了 asyncio 网络编程。
今天这篇,我就从 socket 底层讲起,一路到 asyncio TCP Server,再到 WebSocket 服务,把 Python 网络编程的完整图谱讲清楚。
一、socket 基础——理解底层才能用好上层
import socket
import threading
# 最简单的 TCP Server(同步,仅用于学习原理)
def simple_tcp_server(host: str = "127.0.0.1", port: int = 8888):
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server.bind((host, port))
server.listen(5)
print(f"服务器启动: {host}:{port}")
def handle_client(conn: socket.socket, addr: tuple):
print(f"新连接: {addr}")
with conn:
while True:
data = conn.recv(1024)
if not data:
break
print(f"收到数据: {data.decode()}")
conn.sendall(f"Echo: {data.decode()}".encode())
print(f"连接断开: {addr}")
with server:
while True:
conn, addr = server.accept()
# 每个连接开一个线程(这是性能瓶颈所在)
t = threading.Thread(target=handle_client, args=(conn, addr))
t.daemon = True
t.start()
# 最简单的 TCP Client
def simple_tcp_client(host: str = "127.0.0.1", port: int = 8888):
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.connect((host, port))
s.sendall(b"Hello, Server!")
data = s.recv(1024)
print(f"收到回复: {data.decode()}")理解了同步 socket 的局限性(每连接一线程),才能理解为什么 asyncio 是正确答案。
二、asyncio TCP Server——高并发的正确姿势
import asyncio
import json
import time
import logging
from typing import Optional
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class ClientSession:
"""客户端连接会话"""
def __init__(
self,
reader: asyncio.StreamReader,
writer: asyncio.StreamWriter,
client_id: str,
):
self.reader = reader
self.writer = writer
self.client_id = client_id
self.connected_at = time.time()
self.last_heartbeat = time.time()
self.metadata: dict = {}
@property
def peer_addr(self) -> str:
addr = self.writer.get_extra_info("peername")
return f"{addr[0]}:{addr[1]}" if addr else "unknown"
async def send(self, data: dict) -> bool:
"""发送 JSON 消息"""
try:
message = json.dumps(data, ensure_ascii=False) + "\n"
self.writer.write(message.encode("utf-8"))
await self.writer.drain()
return True
except (ConnectionResetError, BrokenPipeError):
return False
async def close(self):
try:
self.writer.close()
await self.writer.wait_closed()
except Exception:
pass
class AsyncTCPServer:
"""
高并发异步 TCP 服务器
协议:每条消息是一行 JSON,以 \n 结尾
"""
def __init__(self, host: str = "0.0.0.0", port: int = 8888):
self.host = host
self.port = port
self._sessions: dict[str, ClientSession] = {}
self._session_counter = 0
self._server: Optional[asyncio.Server] = None
async def _handle_message(self, session: ClientSession, message: dict):
"""处理客户端消息(子类可以重写)"""
msg_type = message.get("type", "unknown")
if msg_type == "heartbeat":
session.last_heartbeat = time.time()
await session.send({"type": "heartbeat_ack", "ts": time.time()})
elif msg_type == "register":
session.metadata.update(message.get("data", {}))
logger.info(
f"客户端注册: {session.client_id} "
f"设备ID={session.metadata.get('device_id')}"
)
await session.send({"type": "register_ack", "client_id": session.client_id})
elif msg_type == "data":
# 处理业务数据
payload = message.get("payload", {})
logger.info(f"数据上报 [{session.client_id}]: {payload}")
# 这里可以入库、推送到消息队列等
await session.send({"type": "data_ack", "received": True})
else:
logger.warning(f"未知消息类型: {msg_type}")
async def _client_handler(
self,
reader: asyncio.StreamReader,
writer: asyncio.StreamWriter,
):
"""每个连接的处理协程"""
self._session_counter += 1
client_id = f"client-{self._session_counter}"
session = ClientSession(reader, writer, client_id)
self._sessions[client_id] = session
logger.info(f"新连接: {session.peer_addr} -> {client_id}")
await session.send({"type": "welcome", "client_id": client_id})
try:
while True:
# 等待一行数据(带超时)
try:
line = await asyncio.wait_for(
reader.readline(),
timeout=60.0, # 60秒无数据则断开
)
except asyncio.TimeoutError:
logger.warning(f"连接超时,断开: {client_id}")
break
if not line:
break # 客户端断开
# 解析 JSON
try:
message = json.loads(line.decode("utf-8").strip())
await self._handle_message(session, message)
except json.JSONDecodeError as e:
logger.warning(f"无效 JSON [{client_id}]: {e}")
await session.send({"type": "error", "msg": "Invalid JSON"})
except (ConnectionResetError, BrokenPipeError):
logger.info(f"客户端主动断开: {client_id}")
except Exception as e:
logger.error(f"处理连接出错 [{client_id}]: {e}", exc_info=True)
finally:
del self._sessions[client_id]
await session.close()
logger.info(f"连接已清理: {client_id},当前在线: {len(self._sessions)}")
async def broadcast(self, message: dict, exclude: str = None):
"""广播消息给所有在线客户端"""
tasks = []
for cid, session in list(self._sessions.items()):
if cid != exclude:
tasks.append(session.send(message))
if tasks:
await asyncio.gather(*tasks, return_exceptions=True)
async def start(self):
"""启动服务器"""
self._server = await asyncio.start_server(
self._client_handler,
self.host,
self.port,
)
addr = self._server.sockets[0].getsockname()
logger.info(f"TCP 服务器启动: {addr[0]}:{addr[1]}")
async with self._server:
await self._server.serve_forever()
def get_stats(self) -> dict:
return {
"online_count": len(self._sessions),
"total_accepted": self._session_counter,
}
# 启动示例
async def main():
server = AsyncTCPServer(host="0.0.0.0", port=8888)
await server.start()
# asyncio.run(main())三、WebSocket 服务——实时双向通信
pip install websocketsimport asyncio
import json
import logging
from typing import Optional, Set
import websockets
from websockets.server import WebSocketServerProtocol
logger = logging.getLogger(__name__)
class WebSocketRoom:
"""简单的 WebSocket 聊天室/推送服务"""
def __init__(self):
self._clients: Set[WebSocketServerProtocol] = set()
async def register(self, ws: WebSocketServerProtocol):
self._clients.add(ws)
logger.info(f"新连接: {ws.remote_address}, 在线: {len(self._clients)}")
await self._broadcast({"type": "system", "msg": f"有人加入,当前在线 {len(self._clients)} 人"})
async def unregister(self, ws: WebSocketServerProtocol):
self._clients.discard(ws)
logger.info(f"断开连接: {ws.remote_address}, 在线: {len(self._clients)}")
await self._broadcast({"type": "system", "msg": f"有人离开,当前在线 {len(self._clients)} 人"})
async def _broadcast(self, message: dict, exclude: WebSocketServerProtocol = None):
"""广播消息"""
if not self._clients:
return
data = json.dumps(message, ensure_ascii=False)
tasks = [
ws.send(data) for ws in self._clients
if ws != exclude and ws.open
]
if tasks:
await asyncio.gather(*tasks, return_exceptions=True)
async def handle_client(self, ws: WebSocketServerProtocol):
"""处理单个 WebSocket 连接"""
await self.register(ws)
try:
async for raw_message in ws:
try:
data = json.loads(raw_message)
msg_type = data.get("type", "chat")
if msg_type == "chat":
await self._broadcast(
{
"type": "chat",
"from": data.get("nickname", "匿名"),
"content": data.get("content", ""),
"ts": __import__("time").time(),
},
exclude=ws,
)
elif msg_type == "ping":
await ws.send(json.dumps({"type": "pong"}))
except json.JSONDecodeError:
await ws.send(json.dumps({"type": "error", "msg": "Invalid JSON"}))
except websockets.exceptions.ConnectionClosed:
pass
finally:
await self.unregister(ws)
room = WebSocketRoom()
async def ws_server_main():
async with websockets.serve(
room.handle_client,
"0.0.0.0",
8765,
ping_interval=30, # 每30秒发一次 ping
ping_timeout=10, # 10秒内没响应断开
max_size=1024 * 1024, # 最大消息 1MB
):
logger.info("WebSocket 服务启动: ws://0.0.0.0:8765")
await asyncio.Future() # 永久运行
# asyncio.run(ws_server_main())四、踩坑实录
踩坑实录1:asyncio 中调用了阻塞 IO,整个事件循环卡死
现象:服务运行正常,但偶发地所有客户端同时无响应,持续约1-2秒。
原因:业务代码里直接调用了同步数据库操作,阻塞了事件循环。
解法:用 asyncio.get_event_loop().run_in_executor() 把阻塞操作放到线程池。
import asyncio
from concurrent.futures import ThreadPoolExecutor
executor = ThreadPoolExecutor(max_workers=10)
async def async_db_query(sql: str) -> list:
"""把同步数据库查询变为异步"""
loop = asyncio.get_running_loop()
return await loop.run_in_executor(executor, sync_db_query, sql)踩坑实录2:大量并发连接下 file descriptor 耗尽
现象:运行一段时间后,新连接无法建立,报 [Errno 24] Too many open files。
原因:默认 ulimit 是1024,超过后无法创建新的 socket fd。
解法:修改系统限制 ulimit -n 65536,并在代码里限制最大连接数。
踩坑实录3:客户端断开后,服务端 session 没清理,内存泄漏
现象:服务跑了几天后内存持续增长,即使客户端数量稳定。
原因:异常断开时没有走到清理逻辑,session 一直保留在字典里。
解法:用 try/finally 确保 session 清理一定执行(见上面代码中的写法)。
五、选型建议
| 需求 | 推荐方案 |
|---|---|
| 简单 TCP 通信(学习/脚本) | socket 模块 |
| 高并发 TCP 服务 | asyncio.start_server |
| 实时双向通信 | websockets 库 |
| 高性能生产级 WebSocket | aiohttp.web 或 fastapi + starlette |
| gRPC 远程调用 | grpcio |
老周那次事故之后,他把我的 asyncio 代码加了监控、连接限流、优雅重启,稳定运行了快两年,再没有出过类似的问题。
