当前位置: 首页 > news >正文

【ES实战】ES客户端线程量分析

文章目录

  • ES客户端线程量分析
    • Transport Client的主要线程情况
      • ES工作线程池
      • Netty网络通信线程
    • Rest Client主要线程情况

ES客户端线程量分析

Transport Client的主要线程情况

Version 5.6.1

ES工作线程池

每个Transport Client都会创建一系列的ES线程池,来处理任务。管理平台为了兼容安全特性,使用PreBuiltXPackTransportClient

在这里插入图片描述

PreBuiltXPackTransportClient到TransportClient的继承关系

继承
继承
TransportClient
PreBuiltTransportClient
PreBuiltXPackTransportClient

PreBuiltXPackTransportClient进行实例化时,最终会调用下方TransportClient的构造方法

    /*** Creates a new TransportClient with the given settings, defaults and plugins.* @param settings the client settings* @param defaultSettings default settings that are merged after the plugins have added it's additional settings.* @param plugins the client plugins*/protected TransportClient(Settings settings, Settings defaultSettings, Collection<Class<? extends Plugin>> plugins,HostFailureListener hostFailureListener) {this(buildTemplate(settings, defaultSettings, plugins, hostFailureListener));}

buildTemplate方法内会创建ThreadPool实例

final ThreadPool threadPool = new ThreadPool(settings);

初始化ThreadPool对象时会创建线程池

    public ThreadPool(final Settings settings, final ExecutorBuilder<?>... customBuilders) {super(settings);assert Node.NODE_NAME_SETTING.exists(settings);final Map<String, ExecutorBuilder> builders = new HashMap<>();// 根据服务器CPU数量,确认可用处理器数量,最大不超过32final int availableProcessors = EsExecutors.boundedNumberOfProcessors(settings);// 处理器数量的一半,最多5个,最少1个final int halfProcMaxAt5 = halfNumberOfProcessorsMaxFive(availableProcessors);// 处理器数量的一半,最多10个,最少1个final int halfProcMaxAt10 = halfNumberOfProcessorsMaxTen(availableProcessors);// 处理器数量的4倍,最多512个,最少128个final int genericThreadPoolMax = boundedBy(4 * availableProcessors, 128, 512);builders.put(Names.GENERIC, new ScalingExecutorBuilder(Names.GENERIC, 4, genericThreadPoolMax, TimeValue.timeValueSeconds(30)));builders.put(Names.INDEX, new FixedExecutorBuilder(settings, Names.INDEX, availableProcessors, 200));builders.put(Names.BULK, new FixedExecutorBuilder(settings, Names.BULK, availableProcessors, 200)); // now that we reuse bulk for index/delete opsbuilders.put(Names.GET, new FixedExecutorBuilder(settings, Names.GET, availableProcessors, 1000));builders.put(Names.SEARCH, new FixedExecutorBuilder(settings, Names.SEARCH, searchThreadPoolSize(availableProcessors), 1000));builders.put(Names.MANAGEMENT, new ScalingExecutorBuilder(Names.MANAGEMENT, 1, 5, TimeValue.timeValueMinutes(5)));// no queue as this means clients will need to handle rejections on listener queue even if the operation succeeded// the assumption here is that the listeners should be very lightweight on the listeners sidebuilders.put(Names.LISTENER, new FixedExecutorBuilder(settings, Names.LISTENER, halfProcMaxAt10, -1));builders.put(Names.FLUSH, new ScalingExecutorBuilder(Names.FLUSH, 1, halfProcMaxAt5, TimeValue.timeValueMinutes(5)));builders.put(Names.REFRESH, new ScalingExecutorBuilder(Names.REFRESH, 1, halfProcMaxAt10, TimeValue.timeValueMinutes(5)));builders.put(Names.WARMER, new ScalingExecutorBuilder(Names.WARMER, 1, halfProcMaxAt5, TimeValue.timeValueMinutes(5)));builders.put(Names.SNAPSHOT, new ScalingExecutorBuilder(Names.SNAPSHOT, 1, halfProcMaxAt5, TimeValue.timeValueMinutes(5)));builders.put(Names.FETCH_SHARD_STARTED, new ScalingExecutorBuilder(Names.FETCH_SHARD_STARTED, 1, 2 * availableProcessors, TimeValue.timeValueMinutes(5)));builders.put(Names.FORCE_MERGE, new FixedExecutorBuilder(settings, Names.FORCE_MERGE, 1, -1));builders.put(Names.FETCH_SHARD_STORE, new ScalingExecutorBuilder(Names.FETCH_SHARD_STORE, 1, 2 * availableProcessors, TimeValue.timeValueMinutes(5)));for (final ExecutorBuilder<?> builder : customBuilders) {if (builders.containsKey(builder.name())) {throw new IllegalArgumentException("builder with name [" + builder.name() + "] already exists");}builders.put(builder.name(), builder);}this.builders = Collections.unmodifiableMap(builders);threadContext = new ThreadContext(settings);final Map<String, ExecutorHolder> executors = new HashMap<>();for (@SuppressWarnings("unchecked") final Map.Entry<String, ExecutorBuilder> entry : builders.entrySet()) {final ExecutorBuilder.ExecutorSettings executorSettings = entry.getValue().getSettings(settings);final ExecutorHolder executorHolder = entry.getValue().build(executorSettings, threadContext);if (executors.containsKey(executorHolder.info.getName())) {throw new IllegalStateException("duplicate executors with name [" + executorHolder.info.getName() + "] registered");}logger.debug("created thread pool: {}", entry.getValue().formatInfo(executorHolder.info));executors.put(entry.getKey(), executorHolder);}// DIRECT_EXECUTOR是个AbstractExecutorService实例executors.put(Names.SAME, new ExecutorHolder(DIRECT_EXECUTOR, new Info(Names.SAME, ThreadPoolType.DIRECT)));this.executors = unmodifiableMap(executors);// ScheduledThreadPoolExecutor 线程固定1个this.scheduler = new ScheduledThreadPoolExecutor(1, EsExecutors.daemonThreadFactory(settings, "scheduler"), new EsAbortPolicy());this.scheduler.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);this.scheduler.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);this.scheduler.setRemoveOnCancelPolicy(true);TimeValue estimatedTimeInterval = ESTIMATED_TIME_INTERVAL_SETTING.get(settings);// CachedTimeThread线程对象1个this.cachedTimeThread = new CachedTimeThread(EsExecutors.threadName(settings, "[timer]"), estimatedTimeInterval.millis());this.cachedTimeThread.start();}

线程池类型

  • fixed:固定数量的线程池
  • scaling:线程数量介于core与max参数之间变化的线程池
  • direct:直接执行线程池,直接执行Runnable

availableProcessors的取值:

 Math.min(32, Runtime.getRuntime().availableProcessors())
线程池名线程池类型core(size)maxJVM线程栈中名称前缀
genericscaling44*availableProcessors
[128,512]
elasticsearch[_client_][generic]
listenerfixedavailableProcessorsavailableProcessorselasticsearch[_client_][listener]
getfixedavailableProcessorsavailableProcessorselasticsearch[_client_][get]
indexfixedavailableProcessorsavailableProcessorselasticsearch[_client_][index]
bulkfixedavailableProcessorsavailableProcessorselasticsearch[_client_][bulk]
searchfixed((availableProcessors * 3) / 2) + 1((availableProcessors * 3) / 2) + 1elasticsearch[_client_][search]
managementscaling15elasticsearch[_client_][management]
flushscaling1availableProcessors/2
[1,5]
elasticsearch[_client_][flush]
refreshscaling1availableProcessors/2
[1,10]
elasticsearch[_client_][refresh]
warmerscaling1availableProcessors/2
[1,5]
elasticsearch[_client_][warmer]
snapshotscaling1availableProcessors/2
[1,5]
elasticsearch[_client_][snapshot]
force_mergefixed11elasticsearch[_client_][force_merge]
fetch_shard_startedscaling12 * availableProcessorselasticsearch[_client_][fetch_shard_started]
fetch_shard_storescaling12 * availableProcessorselasticsearch[_client_][fetch_shard_store]
samedirect--elasticsearch[_client_][same]

其他线程

  • CachedTimeThread:个数固定1个,JVM线程栈中的名称前缀elasticsearch[_client_][[timer]]
  • scheduler:使用ScheduledThreadPoolExecutor, 线程固定1个,JVM线程栈中的名称前缀elasticsearch[_client_][scheduler]

Netty网络通信线程

在JVM堆栈中发现大量名称以elasticsearch[_client_][transport_client_boss]开头的线程,示例:

"elasticsearch[_client_][transport_client_boss][T#1]" #387106 daemon prio=5 os_prio=0 tid=0x00007f79a422e000 nid=0x3bc55d runnable [0x00007f78c98d7000]
java.lang.Thread.State: RUNNABLEat sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)- locked <0x0000000622f76b90> (a sun.nio.ch.Util$3)- locked <0x0000000622f76b78> (a java.util.Collections$UnmodifiableSet)- locked <0x0000000613be11a0> (a sun.nio.ch.EPollSelectorImpl)at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)at io.netty.channel.nio.NioEventLoop.select(NioEventLoop.java:752)at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:408)at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)at java.lang.Thread.run(Thread.java:750)

buildTemplate方法内会创建NetworkModule实例

  NetworkModule networkModule = new NetworkModule(settings, true, pluginsService.filterPlugins(NetworkPlugin.class), threadPool, bigArrays, circuitBreakerService, namedWriteableRegistry, xContentRegistry, networkService, null);

NetworkModule实例中会创建Netty的客户端进行网络通信

    /*** Creates a network module that custom networking classes can be plugged into.* @param settings The settings for the node* @param transportClient True if only transport classes should be allowed to be registered, false otherwise.*/public NetworkModule(Settings settings, boolean transportClient, List<NetworkPlugin> plugins, ThreadPool threadPool,BigArrays bigArrays,CircuitBreakerService circuitBreakerService,NamedWriteableRegistry namedWriteableRegistry,NamedXContentRegistry xContentRegistry,NetworkService networkService, HttpServerTransport.Dispatcher dispatcher) {this.settings = settings;this.transportClient = transportClient;registerTransport(LOCAL_TRANSPORT, () -> new LocalTransport(settings, threadPool, namedWriteableRegistry, circuitBreakerService));for (NetworkPlugin plugin : plugins) {if (transportClient == false && HTTP_ENABLED.get(settings)) {Map<String, Supplier<HttpServerTransport>> httpTransportFactory = plugin.getHttpTransports(settings, threadPool, bigArrays,circuitBreakerService, namedWriteableRegistry, xContentRegistry, networkService, dispatcher);for (Map.Entry<String, Supplier<HttpServerTransport>> entry : httpTransportFactory.entrySet()) {registerHttpTransport(entry.getKey(), entry.getValue());}}// getTransports会调用Netty4Plugin创建Netty4TransportMap<String, Supplier<Transport>> httpTransportFactory = plugin.getTransports(settings, threadPool, bigArrays,circuitBreakerService, namedWriteableRegistry, networkService);for (Map.Entry<String, Supplier<Transport>> entry : httpTransportFactory.entrySet()) {registerTransport(entry.getKey(), entry.getValue());}List<TransportInterceptor> transportInterceptors = plugin.getTransportInterceptors(threadPool.getThreadContext());for (TransportInterceptor interceptor : transportInterceptors) {registerTransportInterceptor(interceptor);}}}

Netty4Plugin创建Netty4Transport

@Override
public Map<String, Supplier<Transport>> getTransports(Settings settings, ThreadPool threadPool, BigArrays bigArrays,CircuitBreakerService circuitBreakerService,NamedWriteableRegistry namedWriteableRegistry,NetworkService networkService) {return Collections.singletonMap(NETTY_TRANSPORT_NAME, () -> new Netty4Transport(settings, threadPool, networkService, bigArrays,namedWriteableRegistry, circuitBreakerService));
}

Netty4Transport实例化的时候会设置workCount,默认是2*availableProcessors

public Netty4Transport(Settings settings, ThreadPool threadPool, NetworkService networkService, BigArrays bigArrays,NamedWriteableRegistry namedWriteableRegistry, CircuitBreakerService circuitBreakerService) {super("netty", settings, threadPool, bigArrays, circuitBreakerService, namedWriteableRegistry, networkService);Netty4Utils.setAvailableProcessors(EsExecutors.PROCESSORS_SETTING.get(settings));this.workerCount = WORKER_COUNT.get(settings);.........}

启动Netty客户端启动时,会执行createBootstrap,来创建固定数量的线程作为Netty客户端的NIO工作线程进行工作。

 private Bootstrap createBootstrap() {final Bootstrap bootstrap = new Bootstrap();// 默认并非阻塞客户端if (TCP_BLOCKING_CLIENT.get(settings)) {bootstrap.group(new OioEventLoopGroup(1, daemonThreadFactory(settings, TRANSPORT_CLIENT_WORKER_THREAD_NAME_PREFIX)));bootstrap.channel(OioSocketChannel.class);} else {// 线程标识transport_client_boss,线程数量workCountbootstrap.group(new NioEventLoopGroup(workerCount, daemonThreadFactory(settings, TRANSPORT_CLIENT_BOSS_THREAD_NAME_PREFIX)));bootstrap.channel(NioSocketChannel.class);}.........}

Rest Client主要线程情况

Version 6.7.2

在JVM堆栈中发现大量名称以I/O dispatcher开头的线程,示例:

"I/O dispatcher 85700" #387097 prio=5 os_prio=0 tid=0x00007f7970080000 nid=0x3bc554 runnable [0x00007f78ca7e6000]
java.lang.Thread.State: RUNNABLEat sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)- locked <0x0000000623151970> (a sun.nio.ch.Util$3)- locked <0x0000000623151958> (a java.util.Collections$UnmodifiableSet)- locked <0x0000000613bec330> (a sun.nio.ch.EPollSelectorImpl)at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)at org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:255)at org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104)at org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:588)at java.lang.Thread.run(Thread.java:750)

在这里插入图片描述

在类RestClientBuilder中,实现基于CloseableHttpAsyncClient实例构建RestClient实例对象。

    /*** Creates a new {@link RestClient} based on the provided configuration.*/public RestClient build() {if (failureListener == null) {failureListener = new RestClient.FailureListener();}// CloseableHttpAsyncClient httpClient = AccessController.doPrivileged(new PrivilegedAction<CloseableHttpAsyncClient>() {@Overridepublic CloseableHttpAsyncClient run() {return createHttpClient();}});RestClient restClient = new RestClient(httpClient, maxRetryTimeout, defaultHeaders, nodes,pathPrefix, failureListener, nodeSelector, strictDeprecationMode);// httpClient.start();return restClient;}

通过下面的简化类图,说明Rest客户端使用的 IO线程数量在IOReactorConfig类中设置,其内部类BuilderBuilder()方法中对赋值this.ioThreadCount = IOReactorConfig.AVAIL_PROCS;,默认数量也是服务器的CPU内核数。

«abstract»
CloseableHttpAsyncClient
Builder
- int ioThreadCount
Builder()
IOReactorConfig
-int AVAIL_PROCS = Runtime.getRuntime()
+IOReactorConfig DEFAULT =(new Builder()
HttpAsyncClientBuilder
RestClient
RestClientBuilder

核心管理使用的对象们在HttpAsyncClientBuilder类中进行创建。

public CloseableHttpAsyncClient build() {.........// 创建DefaultConnectingIOReactor对象,会去执行AbstractMultiworkerIOReactor的构造方法ConnectingIOReactor ioreactor = IOReactorUtils.create(this.defaultIOReactorConfig != null ? this.defaultIOReactorConfig : IOReactorConfig.DEFAULT, this.threadFactory);//  poolingmgr中有ConnectingIOReactor处理IOEventDispatch        PoolingNHttpClientConnectionManager poolingmgr = new PoolingNHttpClientConnectionManager(ioreactor, RegistryBuilder.create().register("http", NoopIOSessionStrategy.INSTANCE).register("https", reuseStrategy).build());.........return new InternalHttpAsyncClient((NHttpClientConnectionManager)connManager, (ConnectionReuseStrategy)reuseStrategy, (ConnectionKeepAliveStrategy)keepAliveStrategy, threadFactory, (NHttpClientEventHandler)eventHandler, exec, (Lookup)cookieSpecRegistry, (Lookup)authSchemeRegistry, (CookieStore)defaultCookieStore, (CredentialsProvider)defaultCredentialsProvider, defaultRequestConfig);
}
继承
实现
«Abstract»
AbstractMultiworkerIOReactor
«interface»
ConnectingIOReactor
DefaultConnectingIOReactor
PoolingNHttpClientConnectionManager
HttpAsyncClientBuilder
    public AbstractMultiworkerIOReactor(final IOReactorConfig config,final ThreadFactory threadFactory) throws IOReactorException {super();this.config = config != null ? config : IOReactorConfig.DEFAULT;this.params = new BasicHttpParams();try {this.selector = Selector.open();} catch (final IOException ex) {throw new IOReactorException("Failure opening selector", ex);}this.selectTimeout = this.config.getSelectInterval();this.interestOpsQueueing = this.config.isInterestOpQueued();this.statusLock = new Object();if (threadFactory != null) {this.threadFactory = threadFactory;} else {this.threadFactory = new DefaultThreadFactory();}this.auditLog = new ArrayList<ExceptionEvent>();// this.workerCount的值就是this.ioThreadCount = IOReactorConfig.AVAIL_PROCS;this.workerCount = this.config.getIoThreadCount();this.dispatchers = new BaseIOReactor[workerCount];this.workers = new Worker[workerCount];this.threads = new Thread[workerCount];this.status = IOReactorStatus.INACTIVE;}

具体创建线程创建执行在AbstractMultiworkerIOReactorexecute(final IOEventDispatch eventDispatch)方法中。

   public void execute(final IOEventDispatch eventDispatch) throws InterruptedIOException, IOReactorException {Args.notNull(eventDispatch, "Event dispatcher");synchronized (this.statusLock) {if (this.status.compareTo(IOReactorStatus.SHUTDOWN_REQUEST) >= 0) {this.status = IOReactorStatus.SHUT_DOWN;this.statusLock.notifyAll();return;}Asserts.check(this.status.compareTo(IOReactorStatus.INACTIVE) == 0,"Illegal state %s", this.status);this.status = IOReactorStatus.ACTIVE;// Start I/O dispatchersfor (int i = 0; i < this.dispatchers.length; i++) {final BaseIOReactor dispatcher = new BaseIOReactor(this.selectTimeout, this.interestOpsQueueing);dispatcher.setExceptionHandler(exceptionHandler);this.dispatchers[i] = dispatcher;}for (int i = 0; i < this.workerCount; i++) {final BaseIOReactor dispatcher = this.dispatchers[i];this.workers[i] = new Worker(dispatcher, eventDispatch);this.threads[i] = this.threadFactory.newThread(this.workers[i]);}}try {for (int i = 0; i < this.workerCount; i++) {if (this.status != IOReactorStatus.ACTIVE) {return;}this.threads[i].start();}for (;;) {final int readyCount;try {readyCount = this.selector.select(this.selectTimeout);} catch (final InterruptedIOException ex) {throw ex;} catch (final IOException ex) {throw new IOReactorException("Unexpected selector failure", ex);}if (this.status.compareTo(IOReactorStatus.ACTIVE) == 0) {processEvents(readyCount);}// Verify I/O dispatchersfor (int i = 0; i < this.workerCount; i++) {final Worker worker = this.workers[i];final Exception ex = worker.getException();if (ex != null) {throw new IOReactorException("I/O dispatch worker terminated abnormally", ex);}}if (this.status.compareTo(IOReactorStatus.ACTIVE) > 0) {break;}}} catch (final ClosedSelectorException ex) {addExceptionEvent(ex);} catch (final IOReactorException ex) {if (ex.getCause() != null) {addExceptionEvent(ex.getCause());}throw ex;} finally {doShutdown();synchronized (this.statusLock) {this.status = IOReactorStatus.SHUT_DOWN;this.statusLock.notifyAll();}}}
http://www.dtcms.com/a/270453.html

相关文章:

  • java-网络编程
  • Java中数组与链表的性能对比:查询与增删效率分析
  • RabbitMQ第二章(RocketMQ的五大工作模式)
  • 【Linux服务器】-安装ftp与sftp服务
  • 数据结构:数组:合并数组(Merging Arrays)
  • 20 道 Node.js 高频面试题
  • Codeforces Round 868 (Div. 2) D. Unique Palindromes(1900,构造)
  • 深入企业内部的MCP知识(四):FastMCP装饰器与类方法:正确结合面向对象与MCP组件的实践指南
  • 4.权重衰减(weight decay)
  • MySQL-索引
  • SQL135 每个6/7级用户活跃情况
  • ${project.basedir}延申出来的Maven内置的一些常用属性
  • Python入门Day5
  • 嵌入式面试八股文100题(二)
  • 分库分表之实战-sharding-JDBC水平分库+水平分表配置实战
  • 【深度学习入门 鱼书学习笔记(1)感知机】
  • 7月8日学习笔记——统计决策方法
  • 基于springboot的物流配货系统
  • Nuxt.js 静态生成中的跨域问题解决方案
  • C++学习笔记之数组、指针和字符串
  • 【PyTorch】PyTorch中torch.nn模块的激活函数
  • 项目Win系统下可正常获取Header字段,但是到了linux、docker部署后无法获取
  • python基础day08
  • linux wsl2 docker 镜像复用快速方法
  • 【读代码】GLM-4.1V-Thinking:开源多模态推理模型的创新实践
  • 基于模板设计模式开发优惠券推送功能以及对过期优惠卷进行定时清理
  • C++ 遍历可变参数的几种方法
  • 数据库表设计:图片存储与自定义数据类型的实战指南
  • C语言宏替换比较练习
  • 暑假算法日记第四天