当前位置: 首页 > wzjs >正文

ftp可以发布网站吗一键生成原创文案

ftp可以发布网站吗,一键生成原创文案,如何说课网站建设,收废品做网站怎么做文章目录ES客户端线程量分析Transport Client的主要线程情况ES工作线程池Netty网络通信线程Rest Client主要线程情况ES客户端线程量分析 Transport Client的主要线程情况 Version 5.6.1 ES工作线程池 每个Transport Client都会创建一系列的ES线程池,来处理任务。…

文章目录

  • ES客户端线程量分析
    • Transport Client的主要线程情况
      • ES工作线程池
      • Netty网络通信线程
    • Rest Client主要线程情况

ES客户端线程量分析

Transport Client的主要线程情况

Version 5.6.1

ES工作线程池

每个Transport Client都会创建一系列的ES线程池,来处理任务。管理平台为了兼容安全特性,使用PreBuiltXPackTransportClient

在这里插入图片描述

PreBuiltXPackTransportClient到TransportClient的继承关系

继承
继承
TransportClient
PreBuiltTransportClient
PreBuiltXPackTransportClient

PreBuiltXPackTransportClient进行实例化时,最终会调用下方TransportClient的构造方法

    /*** Creates a new TransportClient with the given settings, defaults and plugins.* @param settings the client settings* @param defaultSettings default settings that are merged after the plugins have added it's additional settings.* @param plugins the client plugins*/protected TransportClient(Settings settings, Settings defaultSettings, Collection<Class<? extends Plugin>> plugins,HostFailureListener hostFailureListener) {this(buildTemplate(settings, defaultSettings, plugins, hostFailureListener));}

buildTemplate方法内会创建ThreadPool实例

final ThreadPool threadPool = new ThreadPool(settings);

初始化ThreadPool对象时会创建线程池

    public ThreadPool(final Settings settings, final ExecutorBuilder<?>... customBuilders) {super(settings);assert Node.NODE_NAME_SETTING.exists(settings);final Map<String, ExecutorBuilder> builders = new HashMap<>();// 根据服务器CPU数量,确认可用处理器数量,最大不超过32final int availableProcessors = EsExecutors.boundedNumberOfProcessors(settings);// 处理器数量的一半,最多5个,最少1个final int halfProcMaxAt5 = halfNumberOfProcessorsMaxFive(availableProcessors);// 处理器数量的一半,最多10个,最少1个final int halfProcMaxAt10 = halfNumberOfProcessorsMaxTen(availableProcessors);// 处理器数量的4倍,最多512个,最少128个final int genericThreadPoolMax = boundedBy(4 * availableProcessors, 128, 512);builders.put(Names.GENERIC, new ScalingExecutorBuilder(Names.GENERIC, 4, genericThreadPoolMax, TimeValue.timeValueSeconds(30)));builders.put(Names.INDEX, new FixedExecutorBuilder(settings, Names.INDEX, availableProcessors, 200));builders.put(Names.BULK, new FixedExecutorBuilder(settings, Names.BULK, availableProcessors, 200)); // now that we reuse bulk for index/delete opsbuilders.put(Names.GET, new FixedExecutorBuilder(settings, Names.GET, availableProcessors, 1000));builders.put(Names.SEARCH, new FixedExecutorBuilder(settings, Names.SEARCH, searchThreadPoolSize(availableProcessors), 1000));builders.put(Names.MANAGEMENT, new ScalingExecutorBuilder(Names.MANAGEMENT, 1, 5, TimeValue.timeValueMinutes(5)));// no queue as this means clients will need to handle rejections on listener queue even if the operation succeeded// the assumption here is that the listeners should be very lightweight on the listeners sidebuilders.put(Names.LISTENER, new FixedExecutorBuilder(settings, Names.LISTENER, halfProcMaxAt10, -1));builders.put(Names.FLUSH, new ScalingExecutorBuilder(Names.FLUSH, 1, halfProcMaxAt5, TimeValue.timeValueMinutes(5)));builders.put(Names.REFRESH, new ScalingExecutorBuilder(Names.REFRESH, 1, halfProcMaxAt10, TimeValue.timeValueMinutes(5)));builders.put(Names.WARMER, new ScalingExecutorBuilder(Names.WARMER, 1, halfProcMaxAt5, TimeValue.timeValueMinutes(5)));builders.put(Names.SNAPSHOT, new ScalingExecutorBuilder(Names.SNAPSHOT, 1, halfProcMaxAt5, TimeValue.timeValueMinutes(5)));builders.put(Names.FETCH_SHARD_STARTED, new ScalingExecutorBuilder(Names.FETCH_SHARD_STARTED, 1, 2 * availableProcessors, TimeValue.timeValueMinutes(5)));builders.put(Names.FORCE_MERGE, new FixedExecutorBuilder(settings, Names.FORCE_MERGE, 1, -1));builders.put(Names.FETCH_SHARD_STORE, new ScalingExecutorBuilder(Names.FETCH_SHARD_STORE, 1, 2 * availableProcessors, TimeValue.timeValueMinutes(5)));for (final ExecutorBuilder<?> builder : customBuilders) {if (builders.containsKey(builder.name())) {throw new IllegalArgumentException("builder with name [" + builder.name() + "] already exists");}builders.put(builder.name(), builder);}this.builders = Collections.unmodifiableMap(builders);threadContext = new ThreadContext(settings);final Map<String, ExecutorHolder> executors = new HashMap<>();for (@SuppressWarnings("unchecked") final Map.Entry<String, ExecutorBuilder> entry : builders.entrySet()) {final ExecutorBuilder.ExecutorSettings executorSettings = entry.getValue().getSettings(settings);final ExecutorHolder executorHolder = entry.getValue().build(executorSettings, threadContext);if (executors.containsKey(executorHolder.info.getName())) {throw new IllegalStateException("duplicate executors with name [" + executorHolder.info.getName() + "] registered");}logger.debug("created thread pool: {}", entry.getValue().formatInfo(executorHolder.info));executors.put(entry.getKey(), executorHolder);}// DIRECT_EXECUTOR是个AbstractExecutorService实例executors.put(Names.SAME, new ExecutorHolder(DIRECT_EXECUTOR, new Info(Names.SAME, ThreadPoolType.DIRECT)));this.executors = unmodifiableMap(executors);// ScheduledThreadPoolExecutor 线程固定1个this.scheduler = new ScheduledThreadPoolExecutor(1, EsExecutors.daemonThreadFactory(settings, "scheduler"), new EsAbortPolicy());this.scheduler.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);this.scheduler.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);this.scheduler.setRemoveOnCancelPolicy(true);TimeValue estimatedTimeInterval = ESTIMATED_TIME_INTERVAL_SETTING.get(settings);// CachedTimeThread线程对象1个this.cachedTimeThread = new CachedTimeThread(EsExecutors.threadName(settings, "[timer]"), estimatedTimeInterval.millis());this.cachedTimeThread.start();}

线程池类型

  • fixed:固定数量的线程池
  • scaling:线程数量介于core与max参数之间变化的线程池
  • direct:直接执行线程池,直接执行Runnable

availableProcessors的取值:

 Math.min(32, Runtime.getRuntime().availableProcessors())
线程池名线程池类型core(size)maxJVM线程栈中名称前缀
genericscaling44*availableProcessors
[128,512]
elasticsearch[_client_][generic]
listenerfixedavailableProcessorsavailableProcessorselasticsearch[_client_][listener]
getfixedavailableProcessorsavailableProcessorselasticsearch[_client_][get]
indexfixedavailableProcessorsavailableProcessorselasticsearch[_client_][index]
bulkfixedavailableProcessorsavailableProcessorselasticsearch[_client_][bulk]
searchfixed((availableProcessors * 3) / 2) + 1((availableProcessors * 3) / 2) + 1elasticsearch[_client_][search]
managementscaling15elasticsearch[_client_][management]
flushscaling1availableProcessors/2
[1,5]
elasticsearch[_client_][flush]
refreshscaling1availableProcessors/2
[1,10]
elasticsearch[_client_][refresh]
warmerscaling1availableProcessors/2
[1,5]
elasticsearch[_client_][warmer]
snapshotscaling1availableProcessors/2
[1,5]
elasticsearch[_client_][snapshot]
force_mergefixed11elasticsearch[_client_][force_merge]
fetch_shard_startedscaling12 * availableProcessorselasticsearch[_client_][fetch_shard_started]
fetch_shard_storescaling12 * availableProcessorselasticsearch[_client_][fetch_shard_store]
samedirect--elasticsearch[_client_][same]

其他线程

  • CachedTimeThread:个数固定1个,JVM线程栈中的名称前缀elasticsearch[_client_][[timer]]
  • scheduler:使用ScheduledThreadPoolExecutor, 线程固定1个,JVM线程栈中的名称前缀elasticsearch[_client_][scheduler]

Netty网络通信线程

在JVM堆栈中发现大量名称以elasticsearch[_client_][transport_client_boss]开头的线程,示例:

"elasticsearch[_client_][transport_client_boss][T#1]" #387106 daemon prio=5 os_prio=0 tid=0x00007f79a422e000 nid=0x3bc55d runnable [0x00007f78c98d7000]
java.lang.Thread.State: RUNNABLEat sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)- locked <0x0000000622f76b90> (a sun.nio.ch.Util$3)- locked <0x0000000622f76b78> (a java.util.Collections$UnmodifiableSet)- locked <0x0000000613be11a0> (a sun.nio.ch.EPollSelectorImpl)at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)at io.netty.channel.nio.NioEventLoop.select(NioEventLoop.java:752)at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:408)at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)at java.lang.Thread.run(Thread.java:750)

buildTemplate方法内会创建NetworkModule实例

  NetworkModule networkModule = new NetworkModule(settings, true, pluginsService.filterPlugins(NetworkPlugin.class), threadPool, bigArrays, circuitBreakerService, namedWriteableRegistry, xContentRegistry, networkService, null);

NetworkModule实例中会创建Netty的客户端进行网络通信

    /*** Creates a network module that custom networking classes can be plugged into.* @param settings The settings for the node* @param transportClient True if only transport classes should be allowed to be registered, false otherwise.*/public NetworkModule(Settings settings, boolean transportClient, List<NetworkPlugin> plugins, ThreadPool threadPool,BigArrays bigArrays,CircuitBreakerService circuitBreakerService,NamedWriteableRegistry namedWriteableRegistry,NamedXContentRegistry xContentRegistry,NetworkService networkService, HttpServerTransport.Dispatcher dispatcher) {this.settings = settings;this.transportClient = transportClient;registerTransport(LOCAL_TRANSPORT, () -> new LocalTransport(settings, threadPool, namedWriteableRegistry, circuitBreakerService));for (NetworkPlugin plugin : plugins) {if (transportClient == false && HTTP_ENABLED.get(settings)) {Map<String, Supplier<HttpServerTransport>> httpTransportFactory = plugin.getHttpTransports(settings, threadPool, bigArrays,circuitBreakerService, namedWriteableRegistry, xContentRegistry, networkService, dispatcher);for (Map.Entry<String, Supplier<HttpServerTransport>> entry : httpTransportFactory.entrySet()) {registerHttpTransport(entry.getKey(), entry.getValue());}}// getTransports会调用Netty4Plugin创建Netty4TransportMap<String, Supplier<Transport>> httpTransportFactory = plugin.getTransports(settings, threadPool, bigArrays,circuitBreakerService, namedWriteableRegistry, networkService);for (Map.Entry<String, Supplier<Transport>> entry : httpTransportFactory.entrySet()) {registerTransport(entry.getKey(), entry.getValue());}List<TransportInterceptor> transportInterceptors = plugin.getTransportInterceptors(threadPool.getThreadContext());for (TransportInterceptor interceptor : transportInterceptors) {registerTransportInterceptor(interceptor);}}}

Netty4Plugin创建Netty4Transport

@Override
public Map<String, Supplier<Transport>> getTransports(Settings settings, ThreadPool threadPool, BigArrays bigArrays,CircuitBreakerService circuitBreakerService,NamedWriteableRegistry namedWriteableRegistry,NetworkService networkService) {return Collections.singletonMap(NETTY_TRANSPORT_NAME, () -> new Netty4Transport(settings, threadPool, networkService, bigArrays,namedWriteableRegistry, circuitBreakerService));
}

Netty4Transport实例化的时候会设置workCount,默认是2*availableProcessors

public Netty4Transport(Settings settings, ThreadPool threadPool, NetworkService networkService, BigArrays bigArrays,NamedWriteableRegistry namedWriteableRegistry, CircuitBreakerService circuitBreakerService) {super("netty", settings, threadPool, bigArrays, circuitBreakerService, namedWriteableRegistry, networkService);Netty4Utils.setAvailableProcessors(EsExecutors.PROCESSORS_SETTING.get(settings));this.workerCount = WORKER_COUNT.get(settings);.........}

启动Netty客户端启动时,会执行createBootstrap,来创建固定数量的线程作为Netty客户端的NIO工作线程进行工作。

 private Bootstrap createBootstrap() {final Bootstrap bootstrap = new Bootstrap();// 默认并非阻塞客户端if (TCP_BLOCKING_CLIENT.get(settings)) {bootstrap.group(new OioEventLoopGroup(1, daemonThreadFactory(settings, TRANSPORT_CLIENT_WORKER_THREAD_NAME_PREFIX)));bootstrap.channel(OioSocketChannel.class);} else {// 线程标识transport_client_boss,线程数量workCountbootstrap.group(new NioEventLoopGroup(workerCount, daemonThreadFactory(settings, TRANSPORT_CLIENT_BOSS_THREAD_NAME_PREFIX)));bootstrap.channel(NioSocketChannel.class);}.........}

Rest Client主要线程情况

Version 6.7.2

在JVM堆栈中发现大量名称以I/O dispatcher开头的线程,示例:

"I/O dispatcher 85700" #387097 prio=5 os_prio=0 tid=0x00007f7970080000 nid=0x3bc554 runnable [0x00007f78ca7e6000]
java.lang.Thread.State: RUNNABLEat sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)- locked <0x0000000623151970> (a sun.nio.ch.Util$3)- locked <0x0000000623151958> (a java.util.Collections$UnmodifiableSet)- locked <0x0000000613bec330> (a sun.nio.ch.EPollSelectorImpl)at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)at org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:255)at org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104)at org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:588)at java.lang.Thread.run(Thread.java:750)

在这里插入图片描述

在类RestClientBuilder中,实现基于CloseableHttpAsyncClient实例构建RestClient实例对象。

    /*** Creates a new {@link RestClient} based on the provided configuration.*/public RestClient build() {if (failureListener == null) {failureListener = new RestClient.FailureListener();}// CloseableHttpAsyncClient httpClient = AccessController.doPrivileged(new PrivilegedAction<CloseableHttpAsyncClient>() {@Overridepublic CloseableHttpAsyncClient run() {return createHttpClient();}});RestClient restClient = new RestClient(httpClient, maxRetryTimeout, defaultHeaders, nodes,pathPrefix, failureListener, nodeSelector, strictDeprecationMode);// httpClient.start();return restClient;}

通过下面的简化类图,说明Rest客户端使用的 IO线程数量在IOReactorConfig类中设置,其内部类BuilderBuilder()方法中对赋值this.ioThreadCount = IOReactorConfig.AVAIL_PROCS;,默认数量也是服务器的CPU内核数。

«abstract»
CloseableHttpAsyncClient
Builder
- int ioThreadCount
Builder()
IOReactorConfig
-int AVAIL_PROCS = Runtime.getRuntime()
+IOReactorConfig DEFAULT =(new Builder()
HttpAsyncClientBuilder
RestClient
RestClientBuilder

核心管理使用的对象们在HttpAsyncClientBuilder类中进行创建。

public CloseableHttpAsyncClient build() {.........// 创建DefaultConnectingIOReactor对象,会去执行AbstractMultiworkerIOReactor的构造方法ConnectingIOReactor ioreactor = IOReactorUtils.create(this.defaultIOReactorConfig != null ? this.defaultIOReactorConfig : IOReactorConfig.DEFAULT, this.threadFactory);//  poolingmgr中有ConnectingIOReactor处理IOEventDispatch        PoolingNHttpClientConnectionManager poolingmgr = new PoolingNHttpClientConnectionManager(ioreactor, RegistryBuilder.create().register("http", NoopIOSessionStrategy.INSTANCE).register("https", reuseStrategy).build());.........return new InternalHttpAsyncClient((NHttpClientConnectionManager)connManager, (ConnectionReuseStrategy)reuseStrategy, (ConnectionKeepAliveStrategy)keepAliveStrategy, threadFactory, (NHttpClientEventHandler)eventHandler, exec, (Lookup)cookieSpecRegistry, (Lookup)authSchemeRegistry, (CookieStore)defaultCookieStore, (CredentialsProvider)defaultCredentialsProvider, defaultRequestConfig);
}
继承
实现
«Abstract»
AbstractMultiworkerIOReactor
«interface»
ConnectingIOReactor
DefaultConnectingIOReactor
PoolingNHttpClientConnectionManager
HttpAsyncClientBuilder
    public AbstractMultiworkerIOReactor(final IOReactorConfig config,final ThreadFactory threadFactory) throws IOReactorException {super();this.config = config != null ? config : IOReactorConfig.DEFAULT;this.params = new BasicHttpParams();try {this.selector = Selector.open();} catch (final IOException ex) {throw new IOReactorException("Failure opening selector", ex);}this.selectTimeout = this.config.getSelectInterval();this.interestOpsQueueing = this.config.isInterestOpQueued();this.statusLock = new Object();if (threadFactory != null) {this.threadFactory = threadFactory;} else {this.threadFactory = new DefaultThreadFactory();}this.auditLog = new ArrayList<ExceptionEvent>();// this.workerCount的值就是this.ioThreadCount = IOReactorConfig.AVAIL_PROCS;this.workerCount = this.config.getIoThreadCount();this.dispatchers = new BaseIOReactor[workerCount];this.workers = new Worker[workerCount];this.threads = new Thread[workerCount];this.status = IOReactorStatus.INACTIVE;}

具体创建线程创建执行在AbstractMultiworkerIOReactorexecute(final IOEventDispatch eventDispatch)方法中。

   public void execute(final IOEventDispatch eventDispatch) throws InterruptedIOException, IOReactorException {Args.notNull(eventDispatch, "Event dispatcher");synchronized (this.statusLock) {if (this.status.compareTo(IOReactorStatus.SHUTDOWN_REQUEST) >= 0) {this.status = IOReactorStatus.SHUT_DOWN;this.statusLock.notifyAll();return;}Asserts.check(this.status.compareTo(IOReactorStatus.INACTIVE) == 0,"Illegal state %s", this.status);this.status = IOReactorStatus.ACTIVE;// Start I/O dispatchersfor (int i = 0; i < this.dispatchers.length; i++) {final BaseIOReactor dispatcher = new BaseIOReactor(this.selectTimeout, this.interestOpsQueueing);dispatcher.setExceptionHandler(exceptionHandler);this.dispatchers[i] = dispatcher;}for (int i = 0; i < this.workerCount; i++) {final BaseIOReactor dispatcher = this.dispatchers[i];this.workers[i] = new Worker(dispatcher, eventDispatch);this.threads[i] = this.threadFactory.newThread(this.workers[i]);}}try {for (int i = 0; i < this.workerCount; i++) {if (this.status != IOReactorStatus.ACTIVE) {return;}this.threads[i].start();}for (;;) {final int readyCount;try {readyCount = this.selector.select(this.selectTimeout);} catch (final InterruptedIOException ex) {throw ex;} catch (final IOException ex) {throw new IOReactorException("Unexpected selector failure", ex);}if (this.status.compareTo(IOReactorStatus.ACTIVE) == 0) {processEvents(readyCount);}// Verify I/O dispatchersfor (int i = 0; i < this.workerCount; i++) {final Worker worker = this.workers[i];final Exception ex = worker.getException();if (ex != null) {throw new IOReactorException("I/O dispatch worker terminated abnormally", ex);}}if (this.status.compareTo(IOReactorStatus.ACTIVE) > 0) {break;}}} catch (final ClosedSelectorException ex) {addExceptionEvent(ex);} catch (final IOReactorException ex) {if (ex.getCause() != null) {addExceptionEvent(ex.getCause());}throw ex;} finally {doShutdown();synchronized (this.statusLock) {this.status = IOReactorStatus.SHUT_DOWN;this.statusLock.notifyAll();}}}
http://www.dtcms.com/wzjs/199516.html

相关文章:

  • 网站服务器基本要素有哪些最新旅游热点
  • 临翔区城乡建设局网站百度推广个人能开户吗
  • wordpress怎么设置404页面跳转优化设计答案大全
  • 聊城哪里网站做的好如何在百度上发广告
  • 网站蜘蛛爬行记录客源软件哪个最好
  • 做网站兰州seo联盟
  • 简单网站制作网页制作的软件
  • 北海做网站网站建设seo网站排名优化公司哪家好
  • 8080端口wordpress文明seo
  • 咨询行业网站建设公司产品软文范例大全
  • 360网站 备案关键词什么意思
  • 俄罗斯乌克兰战争结束了吗seo优化快排
  • 做网站要怎么找单网站排名工具
  • 网站增值业务天津网站排名提升多少钱
  • 桂林市建设局网站病毒式营销
  • jsp源码做网站优质的seo网站排名优化软件
  • 做外贸的社交网站百度重庆营销中心
  • 网站开发语言在线检测个人网站模板建站
  • 西安北郊网站开发百度快速排名案例
  • 山东疫情最新通报河南网站seo推广
  • 网站收录提交入口大全天津提升专业关键词排名
  • 电网站建设用地赔偿网上引流推广怎么做
  • 给个网站能看的百度如何优化排名靠前
  • 秦皇岛哪家做网站好seo网站推广建站服务商
  • 青岛网站建设套餐报价腾讯企点app下载安装
  • 网站设计策划书模板seo网站优化培训公司
  • 做淘宝客需要建网站吗网站推广郑州
  • 做效果图挣钱的网站营销软文广告
  • 深圳独立设计工作室seo网站的优化流程
  • 常用seo站长工具长沙做引流推广的公司