当前位置: 首页 > news >正文

netty中Future和ChannelHandler

netty中的Future,继承自 jdk中的Future,, jdk中的Future,很垃圾,只能同步阻塞获取结果,,,

netty中的Future进行了升级,,可以addListener()异步获取结果,,可以isSuccess()判断任务成功还是失败,,

  • jdk的Future
    • get()
    • isDone()
    • cancel() : 取消当前任务
  public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService service = Executors.newFixedThreadPool(2);
        Future<Integer> future = service.submit(() -> {
            log.debug("running...");
            Thread.sleep(2000);
            return 2;
        });

        Integer i = future.get();
        log.debug("i = " + i);
    }
  • netty中的Future
    • isSuccess() : 判断任务是否成功
    • sync() : 同步等待,,任务不成功会抛错
    • getNow() : 获取结果,没有就返回null
    • await() : 同步等待,,任务不成功不会报错,,后面通过isSuccess()判断是否成功
    • addListener() : 任务结束回调
  public static void main(String[] args) {
        // netty中的线程池  eventLoop,, eventloop中就一个线程
        NioEventLoopGroup group = new NioEventLoopGroup(2);

        EventLoop eventLoop = group.next();

        Future<String> future = eventLoop.submit(() -> {
            Thread.sleep(2000);
            return "hehe";
        });


        String now = future.getNow();
        System.out.println("now = " + now);
        boolean success = future.isSuccess();
        System.out.println("success = " + success);


        future.addListener(new GenericFutureListener<Future<? super String>>() {
            @Override
            public void operationComplete(Future<? super String> future) throws Exception {
                Object now1 = future.getNow();
                System.out.println("now1 = " + now1);
                boolean success = future.isSuccess();
                System.out.println("success = " + success);
            }
        });

    }
  • netty中的Promise
    继承自netty的Future,
    Promise可以设置成功和失败,,不用等到任务结束
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        EventLoopGroup group = new NioEventLoopGroup(2);
        EventLoop eventLoop = group.next();
        // 主动创建promise  ===> 结果的容器,
        DefaultPromise<Integer> promise = new DefaultPromise<>(eventLoop);

        new Thread(()->{
            System.out.println("开始计算");
            try {
                int i = 1/0;
                Thread.sleep(1000);
                promise.setSuccess(1000);
            } catch (InterruptedException e) {
               e.printStackTrace();
                promise.setFailure(e);
            }

//

        }).start();


        Integer i = promise.get();
        System.out.println("i = " + i);
    }
ChannelHandler

netty中handler分为两类:

  • ChannelInboundHandler : 入站,, 读取数据,,,channel按照添加顺序依次执行
  • ChannelOutboundHandler :出站 : 发送数据,,channel 逆序执行

channel.wirte() : 从末尾逆序执行
ctx.wirte() : 是从当前的handler,往前面找ChannelOutboundHandler执行

 public static void main(String[] args) {

        new ServerBootstrap()
                .group(new NioEventLoopGroup(2))
                .channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer<NioSocketChannel>() {
                    @Override
                    protected void initChannel(NioSocketChannel ch) throws Exception {
                        ChannelPipeline pipeline = ch.pipeline();

                        // 添加处理器  ,,, netty会自动添加两个handler,,一个叫head,,一个叫tail,,,
                        // 底层是 双向链表
                        pipeline.addLast("handle01",new ChannelInboundHandlerAdapter(){
                            // 入站的handler,,一般关心的 read

                            @Override
                            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                log.debug("msg:{}",msg);


                                ByteBuf byteBuf = (ByteBuf) msg;
                                String s = byteBuf.toString(Charset.defaultCharset());

                                // 调用下一个handle       ctx.fireChannelRead(msg);,,并且将处理完成的结果,传递给下一个handler
                                super.channelRead(ctx, s);
                            }
                        });

                        pipeline.addLast("handle02",new ChannelInboundHandlerAdapter(){
                            // 入站的handler,,一般关心的 read

                            @Override
                            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                log.debug("msg222:{}",msg);

                                User user = new User();
                                user.setName(((String) msg));
                                super.channelRead(ctx, user);
                            }
                        });

                        pipeline.addLast("handle03",new ChannelInboundHandlerAdapter(){
                            // 入站的handler,,一般关心的 read

                            @Override
                            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                log.debug("msg333:{}",msg);
                                super.channelRead(ctx, msg);

                                ch.writeAndFlush(ctx.alloc().buffer().writeBytes("server".getBytes()));
                            }
                        });



                        // 出站是,,从后面往前走  ,,只有有写出的时候,才会触发出站方法,,,,
                        pipeline.addLast("handle04",new ChannelOutboundHandlerAdapter(){
                            @Override
                            public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
                                log.debug("msg444:{}",msg);
                                super.write(ctx, msg, promise);
                            }
                        });

                        pipeline.addLast("handle05",new ChannelOutboundHandlerAdapter(){
                            @Override
                            public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
                                log.debug("msg555:{}",msg);
                                super.write(ctx, msg, promise);
                            }
                        });

                    }
                }).bind(new InetSocketAddress(8080));
    }
EmbeddedChannel 模拟channel执行
    public static void main(String[] args) {

        ChannelInboundHandlerAdapter h1 = new ChannelInboundHandlerAdapter(){
            @Override
            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                System.out.println("msg = " + msg);
                super.channelRead(ctx, msg);
            }
        };

        ChannelOutboundHandlerAdapter h2 = new ChannelOutboundHandlerAdapter(){
            @Override
            public void write(ChannelHandlerContext channelHandlerContext, Object o, ChannelPromise channelPromise) throws Exception {
                System.out.println(4444);
            }
        };

        EmbeddedChannel channel = new EmbeddedChannel(h1,h2);
//        channel.writeInbound(ByteBufAllocator.DEFAULT.buffer().writeBytes("hehe".getBytes()));

        channel.writeOutbound(channel.writeOutbound(ByteBufAllocator.DEFAULT.buffer().writeBytes("hehe".getBytes())));
    }

相关文章:

  • Best practice-生产环境中加锁的最佳实践
  • Anaconda 部署 DeepSeek
  • Java 大视界 -- Java 大数据在智能政务公共服务资源优化配置中的应用(118)
  • Linux | Vim 鼠标不能右键粘贴、跨系统复制粘贴
  • 深入解析“Elaborate”——从详细阐述到精心制作的多重含义
  • 绝美焦糖暖色调复古风景画面Lr调色教程,手机滤镜PS+Lightroom预设下载!
  • LLM-初识AI
  • 自律linux 第 35 天
  • 【C++】数据结构 双链表的实现(企业存储用户数据的实现)
  • Windows逆向工程入门之MASM 数据寻址
  • GTID模块初始化简介和参数binlog_gtid_simple_recovery
  • C#数据类型及相互转换
  • GitHub获取token
  • 计算光学成像与光学计算概论
  • typedef关键字、using关键字
  • RoboBrain:从抽象到具体的机器人操作统一大脑模型
  • 初阶数据结构习题【11】(3顺序表和链表)——141. 环形链表I
  • vue面试宝典之二
  • Linux14-io多路复用
  • Impacket工具中的横向渗透利器及其使用场景对比详解
  • 前四月国家铁路发送货物12.99亿吨,同比增长3.6%
  • 全总联合六部门印发工作指引,共保劳动者合法权益
  • 《缶翁的世界》首发:看吴昌硕王一亭等湖州籍书画家的影响
  • 台湾关闭最后的核电,岛内担忧“非核家园”缺电、涨电价困局难解
  • 国内规模最大女子赛艇官方赛事在沪启航,中外41支队伍逐浪
  • 河南一女子被医院强制带走治疗,官方通报:当值医生停职