spring-data-elasticsearch MultiNodeHostProvider 源码
spring-data-elasticsearch MultiNodeHostProvider 代码
文件路径:/src/main/java/org/springframework/data/elasticsearch/client/erhlc/MultiNodeHostProvider.java
/*
* Copyright 2018-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.elasticsearch.client.erhlc;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.function.Tuple2;
import java.net.InetSocketAddress;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.data.elasticsearch.client.ElasticsearchHost;
import org.springframework.data.elasticsearch.client.ElasticsearchHost.State;
import org.springframework.data.elasticsearch.client.NoReachableHostException;
import org.springframework.lang.Nullable;
import org.springframework.web.reactive.function.client.ClientResponse;
import org.springframework.web.reactive.function.client.WebClient;
/**
* {@link HostProvider} for a cluster of nodes.
*
* @author Christoph Strobl
* @author Mark Paluch
* @author Peter-Josef Meisch
* @since 3.2
* @deprecated since 5.0
*/
@Deprecated
class MultiNodeHostProvider implements HostProvider<MultiNodeHostProvider> {
private final static Log LOGGER = LogFactory.getLog(MultiNodeHostProvider.class);
private final WebClientProvider clientProvider;
private final Map<InetSocketAddress, ElasticsearchHost> hosts;
MultiNodeHostProvider(WebClientProvider clientProvider, InetSocketAddress... endpoints) {
this.clientProvider = clientProvider;
this.hosts = new ConcurrentHashMap<>();
for (InetSocketAddress endpoint : endpoints) {
this.hosts.put(endpoint, new ElasticsearchHost(endpoint, State.UNKNOWN));
}
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("initialized with " + hosts);
}
}
/*
* (non-Javadoc)
* @see org.springframework.data.elasticsearch.client.erhlc.HostProvider#clusterInfo()
*/
@Override
public Mono<ClusterInformation> clusterInfo() {
return checkNodes(null).map(this::updateNodeState).buffer(hosts.size())
.then(Mono.just(new ClusterInformation(new LinkedHashSet<>(this.hosts.values()))));
}
/*
* (non-Javadoc)
* @see org.springframework.data.elasticsearch.client.erhlc.HostProvider#createWebClient(java.net.InetSocketAddress)
*/
@Override
public WebClient createWebClient(InetSocketAddress endpoint) {
return this.clientProvider.get(endpoint);
}
/*
* (non-Javadoc)
* @see org.springframework.data.elasticsearch.client.erhlc.HostProvider#lookupActiveHost(org.springframework.data.elasticsearch.client.erhlc.HostProvider.Verification)
*/
@Override
public Mono<InetSocketAddress> lookupActiveHost(Verification verification) {
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("lookupActiveHost " + verification + " from " + hosts());
}
if (Verification.LAZY.equals(verification)) {
for (ElasticsearchHost entry : hosts()) {
if (entry.isOnline()) {
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("lookupActiveHost returning " + entry);
}
return Mono.just(entry.getEndpoint());
}
}
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("no online host found with LAZY");
}
}
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("searching for active host");
}
return findActiveHostInKnownActives() //
.switchIfEmpty(findActiveHostInUnresolved()) //
.switchIfEmpty(findActiveHostInDead()) //
.switchIfEmpty(Mono.error(() -> new NoReachableHostException(new LinkedHashSet<>(getCachedHostState()))));
}
Collection<ElasticsearchHost> getCachedHostState() {
return hosts.values();
}
private Mono<InetSocketAddress> findActiveHostInKnownActives() {
return findActiveForState(State.ONLINE);
}
private Mono<InetSocketAddress> findActiveHostInUnresolved() {
return findActiveForState(State.UNKNOWN);
}
private Mono<InetSocketAddress> findActiveHostInDead() {
return findActiveForState(State.OFFLINE);
}
private Mono<InetSocketAddress> findActiveForState(State state) {
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("findActiveForState state " + state + ", current hosts: " + hosts);
}
return checkNodes(state) //
.map(this::updateNodeState) //
.filter(ElasticsearchHost::isOnline) //
.map(elasticsearchHost -> {
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("findActiveForState returning host " + elasticsearchHost);
}
return elasticsearchHost;
}).map(ElasticsearchHost::getEndpoint) //
.takeLast(1) //
.next();
}
private ElasticsearchHost updateNodeState(Tuple2<InetSocketAddress, State> tuple2) {
State state = tuple2.getT2();
ElasticsearchHost elasticsearchHost = new ElasticsearchHost(tuple2.getT1(), state);
hosts.put(tuple2.getT1(), elasticsearchHost);
return elasticsearchHost;
}
private Flux<Tuple2<InetSocketAddress, State>> checkNodes(@Nullable State state) {
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("checkNodes() with state " + state);
}
return Flux.fromIterable(hosts()) //
.filter(entry -> state == null || entry.getState().equals(state)) //
.map(ElasticsearchHost::getEndpoint) //
.concatMap(host -> {
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("checking host " + host);
}
Mono<ClientResponse> clientResponseMono = createWebClient(host) //
.head().uri("/") //
.exchangeToMono(Mono::just) //
.timeout(Duration.ofSeconds(1)) //
.doOnError(throwable -> {
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("error checking host " + host + ", " + throwable.getMessage());
}
hosts.put(host, new ElasticsearchHost(host, State.OFFLINE));
clientProvider.getErrorListener().accept(throwable);
});
return Mono.just(host) //
.zipWith(clientResponseMono.flatMap(it -> it.releaseBody() //
.thenReturn(it.statusCode().isError() ? State.OFFLINE : State.ONLINE)));
}) //
.map(tuple -> {
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("check result " + tuple);
}
return tuple;
}).onErrorContinue((throwable, o) -> clientProvider.getErrorListener().accept(throwable));
}
private List<ElasticsearchHost> hosts() {
List<ElasticsearchHost> hosts = new ArrayList<>(this.hosts.values());
Collections.shuffle(hosts);
return hosts;
}
}
相关信息
spring-data-elasticsearch 源码目录
相关文章
spring-data-elasticsearch AbstractElasticsearchConfiguration 源码
spring-data-elasticsearch AbstractReactiveElasticsearchConfiguration 源码
spring-data-elasticsearch CriteriaFilterProcessor 源码
spring-data-elasticsearch CriteriaQueryProcessor 源码
spring-data-elasticsearch DefaultClusterOperations 源码
spring-data-elasticsearch DefaultReactiveClusterOperations 源码
spring-data-elasticsearch DefaultReactiveElasticsearchClient 源码
spring-data-elasticsearch DefaultRequestCreator 源码
0
赞
热门推荐
-
2、 - 优质文章
-
3、 gate.io
-
8、 golang
-
9、 openharmony
-
10、 Vue中input框自动聚焦