@Sharable注解深度解析:实现Netty ChannelHandler多管道共享的关键
一、@Sharable概述
表示可以将带注释的 ChannelHandler 的同一个实例多次添加到一个或多个 ChannelPipelines 中,而不会出现竞争条件。如果未指定此注解,则每次将其添加到管道时都必须创建一个新的处理程序实例,因为它具有成员变量等非共享状态。(这个是Netty的官方给的说明)
简单的理解:
- @Sharable是用来修饰ChannelHandler的
- ChannelHandler单例模式下需要添加多个ChannelPipelines 也就是要拦截多个Channel,就需要使用到@Sharable来修饰ChannelHandler
二、示例验证
在Netty中添加 ChannelHandler 的代码如下(代码来源Netty官网):
-
- public class DiscardServer {
- private int port;
- public DiscardServer(int port) {
- this.port = port;
- }
- public void run() throws Exception {
- EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1)
- EventLoopGroup workerGroup = new NioEventLoopGroup();
- try {
- ServerBootstrap b = new ServerBootstrap(); // (2)
- b.group(bossGroup, workerGroup)
- .channel(NioServerSocketChannel.class) // (3)
- .childHandler(new ChannelInitializer<SocketChannel>() { // (4)
- @Override
- public void initChannel(SocketChannel ch) throws Exception {
- ch.pipeline().addLast(new DiscardServerHandler()); //(8)
- }
- })
- .option(ChannelOption.SO_BACKLOG, 128) // (5)
- .childOption(ChannelOption.SO_KEEPALIVE, true); // (6)
- ChannelFuture f = b.bind(port).sync(); // (7)
- f.channel().closeFuture().sync();
- } finally {
- workerGroup.shutdownGracefully();
- bossGroup.shutdownGracefully();
- }
- }
- public static void main(String[] args) throws Exception {
- int port = 8080;
- if (args.length > 0) {
- port = Integer.parseInt(args[0]);
- }
- new DiscardServer(port).run();
- }
- }
复制代码
把上面代码稍微做一点修改,将(8)位置代码修改成如下:
-
- //在DiscardServer定义一个变量
- private DiscardServerHandler handler = new DiscardServerHandler();
- //(8位置改成如下)
- ch.pipeline().addLast(handler);
复制代码
当往多个 Channel 的 ChannelPipeline 中添加同一个 ChannelHandler 的时候,就会判断该实例是否增加了 @Sharable 注解。如果没有就会抛出错误:
-
- io.netty.channel.ChannelPipelineException: com.github.mxsm.netty.TimeServerHandler is not a @Sharable handler, so can't be added or removed multiple times.
复制代码
Tips: 上面的错误是代码演示抛出来的,下面会根据代码分析
原因分析:
- 以官网的例子进行运行,添加的不是单例,加不加@Sharable注解并没有什么关系。
- 如果你添加的是单例,并且会被添加到多个Channel的 ChannelPipelines中,就必须加上@Sharable。否则就会报错
Tips: 在 initChannel 方法中ChannelHandler是否单例和Netty没关系,也和@Sharable修饰ChannelHandler是否单例化没有关系。这个是否单例与使用者有关。如果是一上面的 new 的形式。那么 DiscardServerHandler 就不是单例。与有没有加@Sharable没关系。
三、使用示例
-
- import com.lilinchao.netty.config.Config;
- import com.lilinchao.netty.config.Serializer;
- import com.lilinchao.netty.message.Message;
- import io.netty.buffer.ByteBuf;
- import io.netty.channel.ChannelHandler;
- import io.netty.channel.ChannelHandlerContext;
- import io.netty.handler.codec.MessageToMessageCodec;
- import lombok.extern.slf4j.Slf4j;
- import java.util.List;
- /**
- * @author lilinchao
- * @date 2022/6/17
- * @description 消息注解
- * 必须和 LengthFieldBasedFrameDecoder 一起使用,确保接到的 ByteBuf 消息是完整的
- **/
- @Slf4j
- @ChannelHandler.Sharable
- public class MessageCodecSharable extends MessageToMessageCodec<ByteBuf, Message> {
- @Override
- protected void encode(ChannelHandlerContext ctx, Message msg, List<Object> outList) throws Exception {
- ByteBuf out = ctx.alloc().buffer();
- // 1. 4 字节的魔数
- out.writeBytes(new byte[]{1, 2, 3, 4});
- // 2. 1 字节的版本,
- out.writeByte(1);
- // 3. 1 字节的序列化方式 jdk 0 , json 1
- out.writeByte(Config.getSerializerAlgorithm().ordinal());
- // 4. 1 字节的指令类型
- out.writeByte(msg.getMessageType());
- // 5. 4 个字节
- out.writeInt(msg.getSequenceId());
- // 无意义,对齐填充
- out.writeByte(0xff);
- // 6. 获取内容的字节数组
- byte[] bytes = Config.getSerializerAlgorithm().serialize(msg);
- // 7. 长度
- out.writeInt(bytes.length);
- // 8. 写入内容
- out.writeBytes(bytes);
- outList.add(out);
- }
- @Override
- protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf in, List<Object> out) throws Exception {
- int magicNum = in.readInt();
- byte version = in.readByte();
- byte serializerAlgorithm = in.readByte(); // 0 或 1
- byte messageType = in.readByte(); // 0,1,2...
- int sequenceId = in.readInt();
- in.readByte();
- int length = in.readInt();
- byte[] bytes = new byte[length];
- in.readBytes(bytes, 0, length);
- // 找到反序列化算法
- Serializer.Algorithm algorithm = Serializer.Algorithm.values()[serializerAlgorithm];
- // 确定具体消息类型
- Class<? extends Message> messageClass = Message.getMessageClass(messageType);
- Message message = algorithm.deserialize(messageClass, bytes);
- // log.debug("{}, {}, {}, {}, {}, {}", magicNum, version, serializerType, messageType, sequenceId, length);
- // log.debug("{}", message);
- out.add(message);
- }
- }
复制代码
MessageToMessageCodec类将已经被处理的完整数据再次被处理。传过来的Message如果是被处理过的完整数据,那么被共享也就不会出现问题了,也就可以使用@Sharable注解。
总结
- 网上很多说这个@Sharable跟ChannelHandler是单例有关,其实没有什么关系。ChannelHandler是否为单例取决于使用者添加的是否为单例。和开发者的行为有关。但是如果你想使用单例的ChannelHandler添加到ChannelPipeline中那么就需要用@Sharable进行修饰。
- ChannelHandler可以作为一个全局的统计,例如用户连接数量的统计就可以注册一个单例ChannelHandler来实现。
|