当前位置: 首页 > news >正文

从xxl-job源码中学习Netty的使用

1. 启动与Spring实例化

com.xxl.job.core.executor.impl.XxlJobSpringExecutor.java类 

继承SmartInitializingSingleton 类,在afterSingletonsInstantiated 实例化后方法中

调用initJobHandlerMethodRepository 把所有的xxljob任务管理起来;

  private void initJobHandlerMethodRepository(ApplicationContext applicationContext) {
        if (applicationContext == null) {
            return;
        }
        // init job handler from method
        //扫描所有的bean,并获取XxlJob的注解,registJobHandler加入到 一个 map  中
        // 见jobHandlerRepository
        String[] beanDefinitionNames = applicationContext.getBeanNamesForType(Object.class, false, true);
        for (String beanDefinitionName : beanDefinitionNames) {
            Object bean = applicationContext.getBean(beanDefinitionName);

            Map<Method, XxlJob> annotatedMethods = null;   // referred to :org.springframework.context.event.EventListenerMethodProcessor.processBean
            try {
                annotatedMethods = MethodIntrospector.selectMethods(bean.getClass(),
                        new MethodIntrospector.MetadataLookup<XxlJob>() {
                            @Override
                            public XxlJob inspect(Method method) {
                                return AnnotatedElementUtils.findMergedAnnotation(method, XxlJob.class);
                            }
                        });
            } catch (Throwable ex) {
                logger.error("xxl-job method-jobhandler resolve error for bean[" + beanDefinitionName + "].", ex);
            }
            if (annotatedMethods==null || annotatedMethods.isEmpty()) {
                continue;
            }

            for (Map.Entry<Method, XxlJob> methodXxlJobEntry : annotatedMethods.entrySet()) {
                Method executeMethod = methodXxlJobEntry.getKey();
                XxlJob xxlJob = methodXxlJobEntry.getValue();
                if (xxlJob == null) {
                    continue;
                }

                String name = xxlJob.value();
                if (name.trim().length() == 0) {
                    throw new RuntimeException("xxl-job method-jobhandler name invalid, for[" + bean.getClass() + "#" + executeMethod.getName() + "] .");
                }
                if (loadJobHandler(name) != null) {
                    throw new RuntimeException("xxl-job jobhandler[" + name + "] naming conflicts.");
                }

                // execute method
                /*if (!(method.getParameterTypes().length == 1 && method.getParameterTypes()[0].isAssignableFrom(String.class))) {
                    throw new RuntimeException("xxl-job method-jobhandler param-classtype invalid, for[" + bean.getClass() + "#" + method.getName() + "] , " +
                            "The correct method format like \" public ReturnT<String> execute(String param) \" .");
                }
                if (!method.getReturnType().isAssignableFrom(ReturnT.class)) {
                    throw new RuntimeException("xxl-job method-jobhandler return-classtype invalid, for[" + bean.getClass() + "#" + method.getName() + "] , " +
                            "The correct method format like \" public ReturnT<String> execute(String param) \" .");
                }*/

                executeMethod.setAccessible(true);

                // init and destory
                Method initMethod = null;
                Method destroyMethod = null;

                if (xxlJob.init().trim().length() > 0) {
                    try {
                        initMethod = bean.getClass().getDeclaredMethod(xxlJob.init());
                        initMethod.setAccessible(true);
                    } catch (NoSuchMethodException e) {
                        throw new RuntimeException("xxl-job method-jobhandler initMethod invalid, for[" + bean.getClass() + "#" + executeMethod.getName() + "] .");
                    }
                }
                if (xxlJob.destroy().trim().length() > 0) {
                    try {
                        destroyMethod = bean.getClass().getDeclaredMethod(xxlJob.destroy());
                        destroyMethod.setAccessible(true);
                    } catch (NoSuchMethodException e) {
                        throw new RuntimeException("xxl-job method-jobhandler destroyMethod invalid, for[" + bean.getClass() + "#" + executeMethod.getName() + "] .");
                    }
                }

                // registry jobhandler
                //扫描所有的jobhandler
                registJobHandler(name, new MethodJobHandler(bean, executeMethod, initMethod, destroyMethod));
            }
        }

    }

2. 调用父类的start() 方法 启动netty服务端

com.xxl.job.core.executor.XxlJobExecutor#initEmbedServer

核心代码见com.xxl.job.core.server.EmbedServer#start

 // start server 服务端
                    ServerBootstrap bootstrap = new ServerBootstrap();
                    bootstrap.group(bossGroup, workerGroup)
                            .channel(NioServerSocketChannel.class)
                            .childHandler(new ChannelInitializer<SocketChannel>() {
                                @Override
                                public void initChannel(SocketChannel channel) throws Exception {
                                    channel.pipeline()
                                            //心跳机制
                                            .addLast(new IdleStateHandler(0, 0, 30 * 3, TimeUnit.SECONDS))  // beat 3N, close if idle
                                            // 使用 http 协议
                                            .addLast(new HttpServerCodec())
                                            // 拆包粘包
                                            .addLast(new HttpObjectAggregator(5 * 1024 * 1024))  // merge request & reponse to FULL
                                            // 业务处理EmbedHttpServerHandler
                                            .addLast(new EmbedHttpServerHandler(executorBiz, accessToken, bizThreadPool));
                                }
                            })
                            .childOption(ChannelOption.SO_KEEPALIVE, true);

                    // bind
                    ChannelFuture future = bootstrap.bind(port).sync();

                    logger.info(">>>>>>>>>>> xxl-job remoting server start success, nettype = {}, port = {}", EmbedServer.class, port);

                    // start registry
                    startRegistry(appname, address);

                    // wait util stop
                    future.channel().closeFuture().sync();

3. 定时任务发起一个任务执行 调用netty server段的接口

此处以手动调用为一个例子; 定时任务&时间轮见下回分析

com.xxl.job.admin.core.thread.JobTriggerPoolHelper#trigger

@startuml tomcat
JobInfoController -> JobTriggerPoolHelper: trigger()
JobTriggerPoolHelper -> XxlJobTrigger:trigger()
XxlJobTrigger ->  XxlJobTrigger: processTrigger()
XxlJobTrigger -> XxlJobTrigger : runExecutor()
XxlJobTrigger -> ExecutorBizImpl :run()
@enduml

最终调用

 @Override
    public ReturnT<String> run(TriggerParam triggerParam) {
        return XxlJobRemotingUtil.postBody(addressUrl + "run", accessToken, timeout, triggerParam, String.class);
    }

4. 其他netty 详细内容见下回分析;

相关文章:

  • Talk|新加坡国立大学贾鑫宇:适用于高自由度机器人的运动控制器
  • 案例分享:高科技企业产品管理部门设置和产品经理岗位设置
  • Redis实现消息队列
  • 机械臂 CoppeliaSim Simulink联合仿真
  • 北航数据结构与程序设计查找与排序编程题
  • 显卡nvidia的CUDA和cuDNN的安装
  • 07-appium常用操作
  • Python 包 dhg 中超图的数据格式
  • 仿真模拟--telnet服务两种认证模式(自作)
  • 百度安全X盈科全球数据合规服务中心:推进数据安全及合规智能化创新领域深化合作
  • High-variance latent spaces
  • go sync包(二) 互斥锁(二)
  • Bureau of Contacts延迟高、卡顿、无法联机怎么办?
  • 线程间通信(生产者和消费者案例)
  • Python 获取class_name win32gui
  • 单片机课设-基于单片机的电子时钟设计(仿真+代码+报告)
  • Vim基础操作:常用命令、安装插件、在VS Code中使用Vim及解决Vim编辑键盘错乱
  • 八股文之JVM
  • Apple Phone Memory
  • 【转载】使用 .NET Upgrade Assistant(升级助手)升级 .NET 老旧版本项目
  • https://app.hackthebox.com/machines/Inject
  • Spring —— Spring简单的读取和存储对象 Ⅱ
  • 渗透测试之冰蝎实战
  • Mybatis、TKMybatis对比
  • Microsoft Office 2019(2022年10月批量许可版)图文教程
  • 《谷粒商城基础篇》分布式基础环境搭建
  • 哈希表题目:砖墙
  • Vue 3.0 选项 生命周期钩子
  • 【车载嵌入式开发】AutoSar架构入门介绍篇
  • 【计算机视觉 | 目标检测】DETR风格的目标检测框架解读