spring-data-elasticsearch ReactiveElasticsearchTemplate 源码
spring-data-elasticsearch ReactiveElasticsearchTemplate 代码
文件路径:/src/main/java/org/springframework/data/elasticsearch/client/erhlc/ReactiveElasticsearchTemplate.java
/*
* Copyright 2018-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.elasticsearch.client.erhlc;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.function.Tuple2;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.elasticsearch.Version;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.MultiGetRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.index.get.GetResult;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
import org.elasticsearch.index.reindex.UpdateByQueryRequest;
import org.reactivestreams.Publisher;
import org.springframework.data.elasticsearch.BulkFailureException;
import org.springframework.data.elasticsearch.NoSuchIndexException;
import org.springframework.data.elasticsearch.UncategorizedElasticsearchException;
import org.springframework.data.elasticsearch.core.AbstractReactiveElasticsearchTemplate;
import org.springframework.data.elasticsearch.core.AggregationContainer;
import org.springframework.data.elasticsearch.core.IndexedObjectInformation;
import org.springframework.data.elasticsearch.core.MultiGetItem;
import org.springframework.data.elasticsearch.core.ReactiveIndexOperations;
import org.springframework.data.elasticsearch.core.RefreshPolicy;
import org.springframework.data.elasticsearch.core.SearchHitMapping;
import org.springframework.data.elasticsearch.core.SearchHitSupport;
import org.springframework.data.elasticsearch.core.SearchPage;
import org.springframework.data.elasticsearch.core.convert.ElasticsearchConverter;
import org.springframework.data.elasticsearch.core.document.SearchDocument;
import org.springframework.data.elasticsearch.core.document.SearchDocumentResponse;
import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates;
import org.springframework.data.elasticsearch.core.query.BulkOptions;
import org.springframework.data.elasticsearch.core.query.ByQueryResponse;
import org.springframework.data.elasticsearch.core.query.Query;
import org.springframework.data.elasticsearch.core.query.UpdateQuery;
import org.springframework.data.elasticsearch.core.query.UpdateResponse;
import org.springframework.data.elasticsearch.core.reindex.ReindexRequest;
import org.springframework.data.elasticsearch.core.reindex.ReindexResponse;
import org.springframework.http.HttpStatus;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
/**
* @author Christoph Strobl
* @author Mark Paluch
* @author Farid Azaza
* @author Martin Choraine
* @author Peter-Josef Meisch
* @author Mathias Teier
* @author Aleksei Arsenev
* @author Roman Puchkovskiy
* @author Russell Parry
* @author Thomas Geese
* @author Farid Faoudi
* @author Sijia Liu
* @since 3.2
* @deprecated since 5.0
*/
@Deprecated
public class ReactiveElasticsearchTemplate extends AbstractReactiveElasticsearchTemplate {
private final ReactiveElasticsearchClient client;
private final ElasticsearchExceptionTranslator exceptionTranslator;
protected RequestFactory requestFactory;
private @Nullable IndicesOptions indicesOptions = IndicesOptions.strictExpandOpenAndForbidClosedIgnoreThrottled();
// region Initialization
public ReactiveElasticsearchTemplate(ReactiveElasticsearchClient client) {
this(client, null);
}
public ReactiveElasticsearchTemplate(ReactiveElasticsearchClient client, @Nullable ElasticsearchConverter converter) {
super(converter);
Assert.notNull(client, "client must not be null");
this.client = client;
this.exceptionTranslator = new ElasticsearchExceptionTranslator();
this.requestFactory = new RequestFactory(this.converter);
}
protected ReactiveElasticsearchTemplate doCopy() {
ReactiveElasticsearchTemplate copy = new ReactiveElasticsearchTemplate(client, converter);
copy.setIndicesOptions(indicesOptions);
return copy;
}
/**
* Set the default {@link IndicesOptions} for {@link SearchRequest search requests}.
*
* @param indicesOptions can be {@literal null}.
*/
public void setIndicesOptions(@Nullable IndicesOptions indicesOptions) {
this.indicesOptions = indicesOptions;
}
// endregion
// region DocumentOperations
@Override
public <T> Flux<T> saveAll(Mono<? extends Collection<? extends T>> entitiesPublisher, IndexCoordinates index) {
Assert.notNull(entitiesPublisher, "entitiesPublisher must not be null!");
return entitiesPublisher //
.flatMapMany(entities -> Flux.fromIterable(entities) //
.concatMap(entity -> maybeCallBeforeConvert(entity, index)) //
).collectList() //
.map(Entities::new) //
.flatMapMany(entities -> {
if (entities.isEmpty()) {
return Flux.empty();
}
return doBulkOperation(entities.indexQueries(), BulkOptions.defaultOptions(), index) //
.index() //
.flatMap(indexAndResponse -> {
T savedEntity = entities.entityAt(indexAndResponse.getT1());
BulkItemResponse bulkItemResponse = indexAndResponse.getT2();
DocWriteResponse response = bulkItemResponse.getResponse();
updateIndexedObject(savedEntity, IndexedObjectInformation.of(response.getId(), response.getSeqNo(),
response.getPrimaryTerm(), response.getVersion()));
return maybeCallAfterSave(savedEntity, index);
});
});
}
@Override
public <T> Flux<MultiGetItem<T>> multiGet(Query query, Class<T> clazz, IndexCoordinates index) {
Assert.notNull(index, "Index must not be null");
Assert.notNull(clazz, "Class must not be null");
Assert.notNull(query, "Query must not be null");
DocumentCallback<T> callback = new ReadDocumentCallback<>(converter, clazz, index);
MultiGetRequest request = requestFactory.multiGetRequest(query, clazz, index);
return Flux.from(execute(client -> client.multiGet(request))) //
.map(DocumentAdapters::from) //
.flatMap(multiGetItem -> multiGetItem.isFailed() //
? Mono.just(MultiGetItem.of(null, multiGetItem.getFailure())) //
: callback.toEntity(multiGetItem.getItem())
.map((T item) -> MultiGetItem.of(item, multiGetItem.getFailure())) //
);
}
/**
* Customization hook on the actual execution result {@link Publisher}. <br />
* You know what you're doing here? Well fair enough, go ahead on your own risk.
*
* @param request the already prepared {@link IndexRequest} ready to be executed.
* @return a {@link Mono} emitting the result of the operation.
*/
protected Mono<IndexResponse> doIndex(IndexRequest request) {
return Mono.from(execute(client -> client.index(request)));
}
@Override
public Mono<Void> bulkUpdate(List<UpdateQuery> queries, BulkOptions bulkOptions, IndexCoordinates index) {
Assert.notNull(queries, "List of UpdateQuery must not be null");
Assert.notNull(bulkOptions, "BulkOptions must not be null");
Assert.notNull(index, "Index must not be null");
return doBulkOperation(queries, bulkOptions, index).then();
}
protected Flux<BulkItemResponse> doBulkOperation(List<?> queries, BulkOptions bulkOptions, IndexCoordinates index) {
BulkRequest bulkRequest = prepareWriteRequest(requestFactory.bulkRequest(queries, bulkOptions, index));
return client.bulk(bulkRequest) //
.onErrorMap(e -> new UncategorizedElasticsearchException("Error while bulk for request: " + bulkRequest, e)) //
.flatMap(this::checkForBulkOperationFailure) //
.flatMapMany(response -> Flux.fromArray(response.getItems()));
}
protected Mono<BulkResponse> checkForBulkOperationFailure(BulkResponse bulkResponse) {
if (bulkResponse.hasFailures()) {
Map<String, String> failedDocuments = new HashMap<>();
for (BulkItemResponse item : bulkResponse.getItems()) {
if (item.isFailed()) {
failedDocuments.put(item.getId(), item.getFailureMessage());
}
}
BulkFailureException exception = new BulkFailureException(
"Bulk operation has failures. Use ElasticsearchException.getFailedDocuments() for detailed messages ["
+ failedDocuments + ']',
failedDocuments);
return Mono.error(exception);
} else {
return Mono.just(bulkResponse);
}
}
protected Mono<Boolean> doExists(String id, IndexCoordinates index) {
return Mono.defer(() -> doExists(requestFactory.getRequest(id, routingResolver.getRouting(), index)));
}
/**
* Customization hook on the actual execution result {@link Publisher}. <br />
*
* @param request the already prepared {@link GetRequest} ready to be executed.
* @return a {@link Mono} emitting the result of the operation.
*/
protected Mono<Boolean> doExists(GetRequest request) {
return Mono.from(execute(client -> client.exists(request))) //
.onErrorReturn(NoSuchIndexException.class, false);
}
protected <T> Mono<Tuple2<T, IndexResponseMetaData>> doIndex(T entity, IndexCoordinates index) {
IndexRequest request = requestFactory.indexRequest(getIndexQuery(entity), index);
request = prepareIndexRequest(entity, request);
return Mono.just(entity).zipWith(doIndex(request) //
.map(indexResponse -> new IndexResponseMetaData( //
indexResponse.getId(), //
indexResponse.getSeqNo(), //
indexResponse.getPrimaryTerm(), //
indexResponse.getVersion() //
))); //
}
@Override
public <T> Mono<T> get(String id, Class<T> entityType, IndexCoordinates index) {
Assert.notNull(id, "Id must not be null!");
GetRequest request = requestFactory.getRequest(id, routingResolver.getRouting(), index);
Mono<GetResult> getResult = doGet(request);
DocumentCallback<T> callback = new ReadDocumentCallback<>(converter, entityType, index);
return getResult.flatMap(response -> callback.toEntity(DocumentAdapters.from(response)));
}
/**
* Customization hook on the actual execution result {@link Publisher}. <br />
*
* @param request the already prepared {@link GetRequest} ready to be executed.
* @return a {@link Mono} emitting the result of the operation.
*/
protected Mono<GetResult> doGet(GetRequest request) {
return Mono.from(execute(client -> client.get(request)));
}
protected Mono<String> doDeleteById(String id, @Nullable String routing, IndexCoordinates index) {
return Mono.defer(() -> {
DeleteRequest request = requestFactory.deleteRequest(id, routing, index);
return doDelete(prepareDeleteRequest(request));
});
}
/*
* (non-Javadoc)
* @see org.springframework.data.elasticsearch.core.ReactiveElasticsearchOperations#delete(Query, Class, IndexCoordinates)
*/
@Override
public Mono<ByQueryResponse> delete(Query query, Class<?> entityType, IndexCoordinates index) {
Assert.notNull(query, "Query must not be null!");
return doDeleteBy(query, entityType, index).map(ResponseConverter::byQueryResponseOf);
}
@Override
public Mono<UpdateResponse> update(UpdateQuery updateQuery, IndexCoordinates index) {
Assert.notNull(updateQuery, "UpdateQuery must not be null");
Assert.notNull(index, "Index must not be null");
return Mono.defer(() -> {
UpdateRequest request = requestFactory.updateRequest(updateQuery, index);
if (updateQuery.getRefreshPolicy() == null && refreshPolicy != null) {
request.setRefreshPolicy(RequestFactory.toElasticsearchRefreshPolicy(refreshPolicy));
}
if (updateQuery.getRouting() == null && routingResolver.getRouting() != null) {
request.routing(routingResolver.getRouting());
}
return Mono.from(execute(client -> client.update(request)))
.map(response -> new UpdateResponse(UpdateResponse.Result.valueOf(response.getResult().name())));
});
}
@Override
public Mono<ByQueryResponse> updateByQuery(UpdateQuery updateQuery, IndexCoordinates index) {
Assert.notNull(updateQuery, "updateQuery must not be null");
Assert.notNull(index, "Index must not be null");
return Mono.defer(() -> {
final UpdateByQueryRequest request = requestFactory.updateByQueryRequest(updateQuery, index);
if (updateQuery.getRefreshPolicy() == null && refreshPolicy != null) {
request.setRefresh(refreshPolicy == RefreshPolicy.IMMEDIATE);
}
if (updateQuery.getRouting() == null && routingResolver.getRouting() != null) {
request.setRouting(routingResolver.getRouting());
}
return Mono.from(execute(client -> client.updateBy(request)));
});
}
@Override
public Mono<ReindexResponse> reindex(ReindexRequest postReindexRequest) {
Assert.notNull(postReindexRequest, "postReindexRequest must not be null");
return Mono.defer(() -> {
org.elasticsearch.index.reindex.ReindexRequest reindexRequest = requestFactory.reindexRequest(postReindexRequest);
return Mono.from(execute(client -> client.reindex(reindexRequest))).map(ResponseConverter::reindexResponseOf);
});
}
@Override
public Mono<String> submitReindex(ReindexRequest postReindexRequest) {
Assert.notNull(postReindexRequest, "postReindexRequest must not be null");
return Mono.defer(() -> {
org.elasticsearch.index.reindex.ReindexRequest reindexRequest = requestFactory.reindexRequest(postReindexRequest);
return Mono.from(execute(client -> client.submitReindex(reindexRequest)));
});
}
protected Mono<BulkByScrollResponse> doDeleteBy(Query query, Class<?> entityType, IndexCoordinates index) {
return Mono.defer(() -> {
DeleteByQueryRequest request = requestFactory.deleteByQueryRequest(query, entityType, index);
return doDeleteBy(prepareDeleteByRequest(request));
});
}
/**
* Customization hook on the actual execution result {@link Publisher}. <br />
*
* @param request the already prepared {@link DeleteRequest} ready to be executed.
* @return a {@link Mono} emitting the result of the operation.
*/
protected Mono<String> doDelete(DeleteRequest request) {
return Mono.from(execute(client -> client.delete(request))) //
.flatMap(it -> {
if (HttpStatus.valueOf(it.status().getStatus()).equals(HttpStatus.NOT_FOUND)) {
return Mono.empty();
}
return Mono.just(it.getId());
}) //
.onErrorResume(NoSuchIndexException.class, it -> Mono.empty());
}
/**
* Customization hook on the actual execution result {@link Publisher}. <br />
*
* @param request the already prepared {@link DeleteByQueryRequest} ready to be executed.
* @return a {@link Mono} emitting the result of the operation.
*/
protected Mono<BulkByScrollResponse> doDeleteBy(DeleteByQueryRequest request) {
return Mono.from(execute(client -> client.deleteBy(request))) //
.onErrorResume(NoSuchIndexException.class, it -> Mono.empty());
}
/**
* Customization hook to modify a generated {@link DeleteRequest} prior to its execution. E.g. by setting the
* {@link WriteRequest#setRefreshPolicy(String) refresh policy} if applicable.
*
* @param request the generated {@link DeleteRequest}.
* @return never {@literal null}.
*/
protected DeleteRequest prepareDeleteRequest(DeleteRequest request) {
return prepareWriteRequest(request);
}
/**
* Customization hook to modify a generated {@link DeleteByQueryRequest} prior to its execution. E.g. by setting the
* {@link WriteRequest#setRefreshPolicy(String) refresh policy} if applicable.
*
* @param request the generated {@link DeleteByQueryRequest}.
* @return never {@literal null}.
*/
protected DeleteByQueryRequest prepareDeleteByRequest(DeleteByQueryRequest request) {
if (refreshPolicy != null) {
if (RefreshPolicy.NONE.equals(refreshPolicy)) {
request = request.setRefresh(false);
} else {
request = request.setRefresh(true);
}
}
if (indicesOptions != null) {
request = request.setIndicesOptions(indicesOptions);
}
return request;
}
/**
* Customization hook to modify a generated {@link IndexRequest} prior to its execution. E.g. by setting the
* {@link WriteRequest#setRefreshPolicy(String) refresh policy} if applicable.
*
* @param source the source object the {@link IndexRequest} was derived from.
* @param request the generated {@link IndexRequest}.
* @return never {@literal null}.
*/
protected IndexRequest prepareIndexRequest(Object source, IndexRequest request) {
return prepareWriteRequest(request);
}
/**
* Preprocess the write request before it is sent to the server, e.g. by setting the
* {@link WriteRequest#setRefreshPolicy(String) refresh policy} if applicable.
*
* @param request must not be {@literal null}.
* @param <R>
* @return the processed {@link WriteRequest}.
*/
protected <R extends WriteRequest<R>> R prepareWriteRequest(R request) {
if (refreshPolicy == null) {
return request;
}
return request.setRefreshPolicy(RequestFactory.toElasticsearchRefreshPolicy(refreshPolicy));
}
// endregion
// region SearchOperations
@Override
public <T> Mono<SearchPage<T>> searchForPage(Query query, Class<?> entityType, Class<T> resultType) {
return searchForPage(query, entityType, resultType, getIndexCoordinatesFor(entityType));
}
@Override
public <T> Mono<SearchPage<T>> searchForPage(Query query, Class<?> entityType, Class<T> resultType,
IndexCoordinates index) {
SearchDocumentCallback<T> callback = new ReadSearchDocumentCallback<>(resultType, index);
return doFindForResponse(query, entityType, index) //
.flatMap(searchDocumentResponse -> Flux.fromIterable(searchDocumentResponse.getSearchDocuments()) //
.flatMap(callback::toEntity) //
.collectList() //
.map(entities -> SearchHitMapping.mappingFor(resultType, converter) //
.mapHits(searchDocumentResponse, entities))) //
.map(searchHits -> SearchHitSupport.searchPageFor(searchHits, query.getPageable()));
}
protected Flux<SearchDocument> doFind(Query query, Class<?> clazz, IndexCoordinates index) {
return Flux.defer(() -> {
SearchRequest request = requestFactory.searchRequest(query, clazz, index);
boolean useScroll = !(query.getPageable().isPaged() || query.isLimiting());
request = prepareSearchRequest(request, useScroll);
if (useScroll) {
return doScroll(request);
} else {
return doFind(request);
}
});
}
protected <T> Mono<SearchDocumentResponse> doFindForResponse(Query query, Class<?> clazz, IndexCoordinates index) {
return Mono.defer(() -> {
SearchRequest request = requestFactory.searchRequest(query, clazz, index);
request = prepareSearchRequest(request, false);
SearchDocumentCallback<?> documentCallback = new ReadSearchDocumentCallback<>(clazz, index);
// noinspection unchecked
SearchDocumentResponse.EntityCreator<T> entityCreator = searchDocument -> ((Mono<T>) documentCallback
.toEntity(searchDocument)).toFuture();
return doFindForResponse(request, entityCreator);
});
}
@Override
public Flux<AggregationContainer<?>> aggregate(Query query, Class<?> entityType, IndexCoordinates index) {
Assert.notNull(query, "query must not be null");
Assert.notNull(entityType, "entityType must not be null");
Assert.notNull(index, "index must not be null");
return Flux.defer(() -> {
SearchRequest request = requestFactory.searchRequest(query, entityType, index);
request = prepareSearchRequest(request, false);
return doAggregate(request);
});
}
/**
* Customization hook on the actual execution result {@link Publisher}. <br />
*
* @param request the already prepared {@link SearchRequest} ready to be executed.
* @return a {@link Flux} emitting the result of the operation.
*/
protected Flux<AggregationContainer<?>> doAggregate(SearchRequest request) {
if (QUERY_LOGGER.isDebugEnabled()) {
QUERY_LOGGER.debug(String.format("Executing doCount: %s", request));
}
return Flux.from(execute(client -> client.aggregate(request))) //
.onErrorResume(NoSuchIndexException.class, it -> Flux.empty()).map(ElasticsearchAggregation::new);
}
protected Mono<Long> doCount(Query query, Class<?> entityType, IndexCoordinates index) {
return Mono.defer(() -> {
SearchRequest request = requestFactory.searchRequest(query, entityType, index);
request = prepareSearchRequest(request, false);
return doCount(request);
});
}
/**
* Customization hook on the actual execution result {@link Publisher}. <br />
*
* @param request the already prepared {@link SearchRequest} ready to be executed.
* @return a {@link Flux} emitting the result of the operation converted to {@link SearchDocument}s.
*/
protected Flux<SearchDocument> doFind(SearchRequest request) {
if (QUERY_LOGGER.isDebugEnabled()) {
QUERY_LOGGER.debug(String.format("Executing doFind: %s", request));
}
return Flux.from(execute(client -> client.search(request))).map(DocumentAdapters::from) //
.onErrorResume(NoSuchIndexException.class, it -> Mono.empty());
}
/**
* Customization hook on the actual execution result {@link Mono}. <br />
*
* @param request the already prepared {@link SearchRequest} ready to be executed.
* @param entityCreator
* @return a {@link Mono} emitting the result of the operation converted to s {@link SearchDocumentResponse}.
*/
protected <T> Mono<SearchDocumentResponse> doFindForResponse(SearchRequest request,
SearchDocumentResponse.EntityCreator<T> entityCreator) {
if (QUERY_LOGGER.isDebugEnabled()) {
QUERY_LOGGER.debug(String.format("Executing doFindForResponse: %s", request));
}
return Mono.from(execute(client -> client.searchForResponse(request)))
.map(searchResponse -> SearchDocumentResponseBuilder.from(searchResponse, entityCreator));
}
/**
* Customization hook on the actual execution result {@link Publisher}. <br />
*
* @param request the already prepared {@link SearchRequest} ready to be executed.
* @return a {@link Mono} emitting the result of the operation.
*/
protected Mono<Long> doCount(SearchRequest request) {
if (QUERY_LOGGER.isDebugEnabled()) {
QUERY_LOGGER.debug(String.format("Executing doCount: %s", request));
}
return Mono.from(execute(client -> client.count(request)));
}
/**
* Customization hook on the actual execution result {@link Publisher}. <br />
*
* @param request the already prepared {@link SearchRequest} ready to be executed.
* @return a {@link Flux} emitting the result of the operation converted to {@link SearchDocument}s.
*/
protected Flux<SearchDocument> doScroll(SearchRequest request) {
if (QUERY_LOGGER.isDebugEnabled()) {
QUERY_LOGGER.debug(String.format("Executing doScroll: %s", request));
}
return Flux.from(execute(client -> client.scroll(request))) //
.map(DocumentAdapters::from).onErrorResume(NoSuchIndexException.class, it -> Mono.empty());
}
/**
* Customization hook to modify a generated {@link SearchRequest} prior to its execution. E.g. by setting the
* {@link SearchRequest#indicesOptions(IndicesOptions) indices options} if applicable.
*
* @param request the generated {@link SearchRequest}.
* @param useScroll
* @return never {@literal null}.
*/
protected SearchRequest prepareSearchRequest(SearchRequest request, boolean useScroll) {
if (indicesOptions != null) {
request = request.indicesOptions(indicesOptions);
}
// request_cache is not allowed on scroll requests.
if (useScroll) {
request = request.requestCache(null);
}
return request;
}
// endregion
// region Helper methods
@Override
public Mono<String> getClusterVersion() {
try {
return Mono.from(execute(ReactiveElasticsearchClient::info))
.map(mainResponse -> mainResponse.getVersion().toString());
} catch (Exception ignored) {}
return Mono.empty();
}
/**
* @return the vendor name of the used cluster and client library
* @since 4.3
*/
@Override
public Mono<String> getVendor() {
return Mono.just("Elasticsearch");
}
/**
* @return the version of the used client runtime library.
* @since 4.3
*/
@Override
public Mono<String> getRuntimeLibraryVersion() {
return Mono.just(Version.CURRENT.toString());
}
@Override
public Query matchAllQuery() {
return new NativeSearchQueryBuilder().withQuery(QueryBuilders.matchAllQuery()).build();
}
@Override
public Query idsQuery(List<String> ids) {
Assert.notNull(ids, "ids must not be null");
return new NativeSearchQueryBuilder().withQuery(QueryBuilders.idsQuery().addIds(ids.toArray(new String[] {})))
.build();
}
// endregion
@Override
public <T> Publisher<T> execute(ClientCallback<Publisher<T>> callback) {
return Flux.defer(() -> callback.doWithClient(getClient())).onErrorMap(this::translateException);
}
@Override
public <T> Publisher<T> executeWithIndicesClient(IndicesClientCallback<Publisher<T>> callback) {
return Flux.defer(() -> callback.doWithClient(getIndicesClient())).onErrorMap(this::translateException);
}
@Override
public <T> Publisher<T> executeWithClusterClient(ClusterClientCallback<Publisher<T>> callback) {
return Flux.defer(() -> callback.doWithClient(getClusterClient())).onErrorMap(this::translateException);
}
@Override
public ReactiveIndexOperations indexOps(IndexCoordinates index) {
return new ReactiveIndexTemplate(this, index);
}
@Override
public ReactiveIndexOperations indexOps(Class<?> clazz) {
return new ReactiveIndexTemplate(this, clazz);
}
@Override
public ReactiveClusterOperations cluster() {
return new DefaultReactiveClusterOperations(this);
}
/**
* Obtain the {@link ReactiveElasticsearchClient} to operate upon.
*
* @return never {@literal null}.
*/
protected ReactiveElasticsearchClient getClient() {
return this.client;
}
/**
* Obtain the {@link ReactiveElasticsearchClient.Indices} to operate upon.
*
* @return never {@literal null}.
*/
protected ReactiveElasticsearchClient.Indices getIndicesClient() {
if (client instanceof ReactiveElasticsearchClient.Indices) {
return (ReactiveElasticsearchClient.Indices) client;
}
throw new UncategorizedElasticsearchException("No ReactiveElasticsearchClient.Indices implementation available");
}
/**
* Obtain the {@link ReactiveElasticsearchClient.Cluster} to operate upon.
*
* @return never {@literal null}.
*/
protected ReactiveElasticsearchClient.Cluster getClusterClient() {
if (client instanceof ReactiveElasticsearchClient.Cluster) {
return (ReactiveElasticsearchClient.Cluster) client;
}
throw new UncategorizedElasticsearchException("No ReactiveElasticsearchClient.Cluster implementation available");
}
/**
* translates an Exception if possible. Exceptions that are no {@link RuntimeException}s are wrapped in a
* RuntimeException
*
* @param throwable the Throwable to map
* @return the potentially translated RuntimeException.
* @since 4.0
*/
private RuntimeException translateException(Throwable throwable) {
RuntimeException runtimeException = throwable instanceof RuntimeException ? (RuntimeException) throwable
: new RuntimeException(throwable.getMessage(), throwable);
RuntimeException potentiallyTranslatedException = exceptionTranslator
.translateExceptionIfPossible(runtimeException);
return potentiallyTranslatedException != null ? potentiallyTranslatedException : runtimeException;
}
}
相关信息
spring-data-elasticsearch 源码目录
相关文章
spring-data-elasticsearch AbstractElasticsearchConfiguration 源码
spring-data-elasticsearch AbstractReactiveElasticsearchConfiguration 源码
spring-data-elasticsearch CriteriaFilterProcessor 源码
spring-data-elasticsearch CriteriaQueryProcessor 源码
spring-data-elasticsearch DefaultClusterOperations 源码
spring-data-elasticsearch DefaultReactiveClusterOperations 源码
spring-data-elasticsearch DefaultReactiveElasticsearchClient 源码
spring-data-elasticsearch DefaultRequestCreator 源码
0
赞
热门推荐
-
2、 - 优质文章
-
3、 gate.io
-
8、 golang
-
9、 openharmony
-
10、 Vue中input框自动聚焦