spring-data-elasticsearch AbstractReactiveElasticsearchTemplate 源码
spring-data-elasticsearch AbstractReactiveElasticsearchTemplate 代码
文件路径:/src/main/java/org/springframework/data/elasticsearch/core/AbstractReactiveElasticsearchTemplate.java
/*
* Copyright 2021-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.elasticsearch.core;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.function.Tuple2;
import java.util.Collection;
import java.util.List;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.data.convert.EntityReader;
import org.springframework.data.elasticsearch.core.convert.ElasticsearchConverter;
import org.springframework.data.elasticsearch.core.convert.MappingElasticsearchConverter;
import org.springframework.data.elasticsearch.core.document.Document;
import org.springframework.data.elasticsearch.core.document.SearchDocument;
import org.springframework.data.elasticsearch.core.document.SearchDocumentResponse;
import org.springframework.data.elasticsearch.core.event.ReactiveAfterConvertCallback;
import org.springframework.data.elasticsearch.core.event.ReactiveAfterLoadCallback;
import org.springframework.data.elasticsearch.core.event.ReactiveAfterSaveCallback;
import org.springframework.data.elasticsearch.core.event.ReactiveBeforeConvertCallback;
import org.springframework.data.elasticsearch.core.mapping.ElasticsearchPersistentEntity;
import org.springframework.data.elasticsearch.core.mapping.ElasticsearchPersistentProperty;
import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates;
import org.springframework.data.elasticsearch.core.mapping.SimpleElasticsearchMappingContext;
import org.springframework.data.elasticsearch.core.query.ByQueryResponse;
import org.springframework.data.elasticsearch.core.query.IndexQuery;
import org.springframework.data.elasticsearch.core.query.Query;
import org.springframework.data.elasticsearch.core.query.SeqNoPrimaryTerm;
import org.springframework.data.elasticsearch.core.routing.DefaultRoutingResolver;
import org.springframework.data.elasticsearch.core.routing.RoutingResolver;
import org.springframework.data.elasticsearch.core.suggest.response.Suggest;
import org.springframework.data.elasticsearch.support.VersionInfo;
import org.springframework.data.mapping.PersistentPropertyAccessor;
import org.springframework.data.mapping.callback.ReactiveEntityCallbacks;
import org.springframework.lang.NonNull;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
/**
* Base class keeping common code for implementations of the {@link ReactiveElasticsearchOperations} interface
* independent of the used client.
*
* @author Peter-Josef Meisch
* @since 4.4
*/
abstract public class AbstractReactiveElasticsearchTemplate
implements ReactiveElasticsearchOperations, ApplicationContextAware {
protected static final Logger QUERY_LOGGER = LoggerFactory
.getLogger("org.springframework.data.elasticsearch.core.QUERY");
protected final ElasticsearchConverter converter;
protected final SimpleElasticsearchMappingContext mappingContext;
protected final EntityOperations entityOperations;
protected @Nullable RefreshPolicy refreshPolicy = RefreshPolicy.NONE;
protected RoutingResolver routingResolver;
protected @Nullable ReactiveEntityCallbacks entityCallbacks;
// region Initialization
protected AbstractReactiveElasticsearchTemplate(@Nullable ElasticsearchConverter converter) {
this.converter = converter != null ? converter : createElasticsearchConverter();
this.mappingContext = (SimpleElasticsearchMappingContext) this.converter.getMappingContext();
this.entityOperations = new EntityOperations(this.mappingContext);
this.routingResolver = new DefaultRoutingResolver(this.mappingContext);
// initialize the VersionInfo class in the initialization phase
// noinspection ResultOfMethodCallIgnored
VersionInfo.versionProperties();
}
@Override
public ElasticsearchConverter getElasticsearchConverter() {
return converter;
}
/**
* @return copy of this instance.
*/
private AbstractReactiveElasticsearchTemplate copy() {
AbstractReactiveElasticsearchTemplate copy = doCopy();
copy.setRefreshPolicy(refreshPolicy);
if (entityCallbacks != null) {
copy.setEntityCallbacks(entityCallbacks);
}
copy.setRoutingResolver(routingResolver);
return copy;
}
abstract protected AbstractReactiveElasticsearchTemplate doCopy();
private ElasticsearchConverter createElasticsearchConverter() {
MappingElasticsearchConverter mappingElasticsearchConverter = new MappingElasticsearchConverter(
new SimpleElasticsearchMappingContext());
mappingElasticsearchConverter.afterPropertiesSet();
return mappingElasticsearchConverter;
}
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
if (entityCallbacks == null) {
setEntityCallbacks(ReactiveEntityCallbacks.create(applicationContext));
}
}
/**
* Set the default {@link RefreshPolicy} to apply when writing to Elasticsearch.
*
* @param refreshPolicy can be {@literal null}.
*/
public void setRefreshPolicy(@Nullable RefreshPolicy refreshPolicy) {
this.refreshPolicy = refreshPolicy;
}
/**
* @return the current {@link RefreshPolicy}.
*/
@Nullable
public RefreshPolicy getRefreshPolicy() {
return refreshPolicy;
}
/**
* Set the {@link ReactiveEntityCallbacks} instance to use when invoking {@link ReactiveEntityCallbacks callbacks}
* like the {@link ReactiveBeforeConvertCallback}. Overrides potentially existing {@link ReactiveEntityCallbacks}.
*
* @param entityCallbacks must not be {@literal null}.
* @throws IllegalArgumentException if the given instance is {@literal null}.
* @since 4.0
*/
public void setEntityCallbacks(ReactiveEntityCallbacks entityCallbacks) {
Assert.notNull(entityCallbacks, "EntityCallbacks must not be null!");
this.entityCallbacks = entityCallbacks;
}
/**
* logs the versions of the different Elasticsearch components.
*
* @return a Mono signalling finished execution
* @since 4.3
*/
public Mono<Void> logVersions() {
return getVendor() //
.zipWith(getRuntimeLibraryVersion()) //
.zipWith(getClusterVersion()) //
.doOnNext(objects -> VersionInfo.logVersions(objects.getT1().getT1(), objects.getT1().getT2(), objects.getT2()))
.then();
}
// endregion
// region routing
private void setRoutingResolver(RoutingResolver routingResolver) {
Assert.notNull(routingResolver, "routingResolver must not be null");
this.routingResolver = routingResolver;
}
@Override
public ReactiveElasticsearchOperations withRouting(RoutingResolver routingResolver) {
Assert.notNull(routingResolver, "routingResolver must not be null");
AbstractReactiveElasticsearchTemplate copy = copy();
copy.setRoutingResolver(routingResolver);
return copy;
}
// endregion
// region DocumentOperations
@Override
public <T> Mono<T> save(T entity) {
return save(entity, getIndexCoordinatesFor(entity.getClass()));
}
@Override
public <T> Flux<T> saveAll(Mono<? extends Collection<? extends T>> entities, Class<T> clazz) {
return saveAll(entities, getIndexCoordinatesFor(clazz));
}
protected IndexQuery getIndexQuery(Object value) {
EntityOperations.AdaptableEntity<?> entity = entityOperations.forEntity(value, converter.getConversionService(),
routingResolver);
Object id = entity.getId();
IndexQuery query = new IndexQuery();
if (id != null) {
query.setId(id.toString());
}
query.setObject(value);
boolean usingSeqNo = false;
if (entity.hasSeqNoPrimaryTerm()) {
SeqNoPrimaryTerm seqNoPrimaryTerm = entity.getSeqNoPrimaryTerm();
if (seqNoPrimaryTerm != null) {
query.setSeqNo(seqNoPrimaryTerm.sequenceNumber());
query.setPrimaryTerm(seqNoPrimaryTerm.primaryTerm());
usingSeqNo = true;
}
}
// seq_no and version are incompatible in the same request
if (!usingSeqNo && entity.isVersionedEntity()) {
Number version = entity.getVersion();
if (version != null) {
query.setVersion(version.longValue());
}
}
query.setRouting(entity.getRouting());
return query;
}
protected <T> T updateIndexedObject(T entity, IndexedObjectInformation indexedObjectInformation) {
ElasticsearchPersistentEntity<?> persistentEntity = converter.getMappingContext()
.getPersistentEntity(entity.getClass());
if (persistentEntity != null) {
PersistentPropertyAccessor<Object> propertyAccessor = persistentEntity.getPropertyAccessor(entity);
ElasticsearchPersistentProperty idProperty = persistentEntity.getIdProperty();
// Only deal with text because ES generated Ids are strings!
if (indexedObjectInformation.getId() != null && idProperty != null && idProperty.isWritable()
&& idProperty.getType().isAssignableFrom(String.class)) {
propertyAccessor.setProperty(idProperty, indexedObjectInformation.getId());
}
if (indexedObjectInformation.getSeqNo() != null && indexedObjectInformation.getPrimaryTerm() != null
&& persistentEntity.hasSeqNoPrimaryTermProperty()) {
ElasticsearchPersistentProperty seqNoPrimaryTermProperty = persistentEntity.getSeqNoPrimaryTermProperty();
// noinspection ConstantConditions
propertyAccessor.setProperty(seqNoPrimaryTermProperty,
new SeqNoPrimaryTerm(indexedObjectInformation.getSeqNo(), indexedObjectInformation.getPrimaryTerm()));
}
if (indexedObjectInformation.getVersion() != null && persistentEntity.hasVersionProperty()) {
ElasticsearchPersistentProperty versionProperty = persistentEntity.getVersionProperty();
// noinspection ConstantConditions
propertyAccessor.setProperty(versionProperty, indexedObjectInformation.getVersion());
}
// noinspection unchecked
T updatedEntity = (T) propertyAccessor.getBean();
return updatedEntity;
} else {
EntityOperations.AdaptableEntity<T> adaptableEntity = entityOperations.forEntity(entity,
converter.getConversionService(), routingResolver);
adaptableEntity.populateIdIfNecessary(indexedObjectInformation.getId());
}
return entity;
}
@Override
public <T> Flux<MultiGetItem<T>> multiGet(Query query, Class<T> clazz) {
return multiGet(query, clazz, getIndexCoordinatesFor(clazz));
}
@Override
public Mono<Boolean> exists(String id, Class<?> entityType) {
return doExists(id, getIndexCoordinatesFor(entityType));
}
@Override
public Mono<Boolean> exists(String id, IndexCoordinates index) {
return doExists(id, index);
}
@Override
public <T> Mono<T> save(T entity, IndexCoordinates index) {
Assert.notNull(entity, "Entity must not be null!");
Assert.notNull(index, "index must not be null");
return maybeCallBeforeConvert(entity, index)
.flatMap(entityAfterBeforeConversionCallback -> doIndex(entityAfterBeforeConversionCallback, index)) //
.map(it -> {
T savedEntity = it.getT1();
IndexResponseMetaData indexResponseMetaData = it.getT2();
return updateIndexedObject(savedEntity, IndexedObjectInformation.of( //
indexResponseMetaData.id(), //
indexResponseMetaData.seqNo(), //
indexResponseMetaData.primaryTerm(), //
indexResponseMetaData.version()));
}).flatMap(saved -> maybeCallAfterSave(saved, index));
}
abstract protected <T> Mono<Tuple2<T, IndexResponseMetaData>> doIndex(T entity, IndexCoordinates index);
abstract protected Mono<Boolean> doExists(String id, IndexCoordinates index);
@Override
public <T> Mono<T> get(String id, Class<T> entityType) {
return get(id, entityType, getIndexCoordinatesFor(entityType));
}
@Override
public Mono<String> delete(Object entity, IndexCoordinates index) {
EntityOperations.AdaptableEntity<?> elasticsearchEntity = entityOperations.forEntity(entity,
converter.getConversionService(), routingResolver);
if (elasticsearchEntity.getId() == null) {
return Mono.error(new IllegalArgumentException("entity must have an id"));
}
return Mono.defer(() -> {
String id = converter.convertId(elasticsearchEntity.getId());
String routing = elasticsearchEntity.getRouting();
return doDeleteById(id, routing, index);
});
}
@Override
public Mono<String> delete(Object entity) {
return delete(entity, getIndexCoordinatesFor(entity.getClass()));
}
@Override
public Mono<String> delete(String id, Class<?> entityType) {
Assert.notNull(id, "id must not be null");
Assert.notNull(entityType, "entityType must not be null");
return delete(id, getIndexCoordinatesFor(entityType));
}
@Override
public Mono<String> delete(String id, IndexCoordinates index) {
Assert.notNull(id, "id must not be null");
Assert.notNull(index, "index must not be null");
return doDeleteById(id, routingResolver.getRouting(), index);
}
abstract protected Mono<String> doDeleteById(String id, @Nullable String routing, IndexCoordinates index);
@Override
public Mono<ByQueryResponse> delete(Query query, Class<?> entityType) {
return delete(query, entityType, getIndexCoordinatesFor(entityType));
}
// endregion
// region SearchDocument
@Override
public <T> Flux<SearchHit<T>> search(Query query, Class<?> entityType, Class<T> resultType, IndexCoordinates index) {
SearchDocumentCallback<T> callback = new ReadSearchDocumentCallback<>(resultType, index);
return doFind(query, entityType, index).concatMap(callback::toSearchHit);
}
@Override
public <T> Flux<SearchHit<T>> search(Query query, Class<?> entityType, Class<T> returnType) {
return search(query, entityType, returnType, getIndexCoordinatesFor(entityType));
}
@Override
public <T> Mono<SearchPage<T>> searchForPage(Query query, Class<?> entityType, Class<T> resultType) {
return searchForPage(query, entityType, resultType, getIndexCoordinatesFor(entityType));
}
@Override
public <T> Mono<SearchPage<T>> searchForPage(Query query, Class<?> entityType, Class<T> resultType,
IndexCoordinates index) {
SearchDocumentCallback<T> callback = new ReadSearchDocumentCallback<>(resultType, index);
return doFindForResponse(query, entityType, index) //
.flatMap(searchDocumentResponse -> Flux.fromIterable(searchDocumentResponse.getSearchDocuments()) //
.flatMap(callback::toEntity) //
.collectList() //
.map(entities -> SearchHitMapping.mappingFor(resultType, converter) //
.mapHits(searchDocumentResponse, entities))) //
.map(searchHits -> SearchHitSupport.searchPageFor(searchHits, query.getPageable()));
}
@Override
public <T> Mono<ReactiveSearchHits<T>> searchForHits(Query query, Class<?> entityType, Class<T> resultType) {
return searchForHits(query, entityType, resultType, getIndexCoordinatesFor(entityType));
}
@Override
public <T> Mono<ReactiveSearchHits<T>> searchForHits(Query query, Class<?> entityType, Class<T> resultType,
IndexCoordinates index) {
Assert.notNull(query, "query must not be null");
Assert.notNull(entityType, "entityType must not be null");
Assert.notNull(resultType, "resultType must not be null");
Assert.notNull(index, "index must not be null");
SearchDocumentCallback<T> callback = new ReadSearchDocumentCallback<>(resultType, index);
return doFindForResponse(query, entityType, index) //
.flatMap(searchDocumentResponse -> Flux.fromIterable(searchDocumentResponse.getSearchDocuments()) //
.flatMap(callback::toEntity) //
.collectList() //
.map(entities -> SearchHitMapping.mappingFor(resultType, converter) //
.mapHits(searchDocumentResponse, entities))) //
.map(ReactiveSearchHitSupport::searchHitsFor);
}
abstract protected Flux<SearchDocument> doFind(Query query, Class<?> clazz, IndexCoordinates index);
abstract protected <T> Mono<SearchDocumentResponse> doFindForResponse(Query query, Class<?> clazz,
IndexCoordinates index);
@Override
public Flux<? extends AggregationContainer<?>> aggregate(Query query, Class<?> entityType) {
return aggregate(query, entityType, getIndexCoordinatesFor(entityType));
}
@Override
public Mono<Suggest> suggest(Query query, Class<?> entityType) {
return suggest(query, entityType, getIndexCoordinatesFor(entityType));
}
@Override
public Mono<Suggest> suggest(Query query, Class<?> entityType, IndexCoordinates index) {
Assert.notNull(query, "query must not be null");
Assert.notNull(entityType, "entityType must not be null");
Assert.notNull(index, "index must not be null");
return doFindForResponse(query, entityType, index).mapNotNull(searchDocumentResponse -> {
Suggest suggest = searchDocumentResponse.getSuggest();
SearchHitMapping.mappingFor(entityType, converter).mapHitsInCompletionSuggestion(suggest);
return suggest;
});
}
@Override
public Mono<Long> count(Query query, Class<?> entityType) {
return count(query, entityType, getIndexCoordinatesFor(entityType));
}
@Override
public Mono<Long> count(Query query, Class<?> entityType, IndexCoordinates index) {
return doCount(query, entityType, index);
}
abstract protected Mono<Long> doCount(Query query, Class<?> entityType, IndexCoordinates index);
// endregion
// region callbacks
protected <T> Mono<T> maybeCallBeforeConvert(T entity, IndexCoordinates index) {
if (null != entityCallbacks) {
return entityCallbacks.callback(ReactiveBeforeConvertCallback.class, entity, index);
}
return Mono.just(entity);
}
protected <T> Mono<T> maybeCallAfterSave(T entity, IndexCoordinates index) {
if (null != entityCallbacks) {
return entityCallbacks.callback(ReactiveAfterSaveCallback.class, entity, index);
}
return Mono.just(entity);
}
protected <T> Mono<T> maybeCallAfterConvert(T entity, Document document, IndexCoordinates index) {
if (null != entityCallbacks) {
return entityCallbacks.callback(ReactiveAfterConvertCallback.class, entity, document, index);
}
return Mono.just(entity);
}
protected <T> Mono<Document> maybeCallbackAfterLoad(Document document, Class<T> type, IndexCoordinates index) {
if (entityCallbacks != null) {
return entityCallbacks.callback(ReactiveAfterLoadCallback.class, document, type, index);
}
return Mono.just(document);
}
protected interface DocumentCallback<T> {
@NonNull
Mono<T> toEntity(@Nullable Document document);
}
protected class ReadDocumentCallback<T> implements DocumentCallback<T> {
private final EntityReader<? super T, Document> reader;
private final Class<T> type;
private final IndexCoordinates index;
public ReadDocumentCallback(EntityReader<? super T, Document> reader, Class<T> type, IndexCoordinates index) {
Assert.notNull(reader, "reader is null");
Assert.notNull(type, "type is null");
this.reader = reader;
this.type = type;
this.index = index;
}
@NonNull
public Mono<T> toEntity(@Nullable Document document) {
if (document == null) {
return Mono.empty();
}
return maybeCallbackAfterLoad(document, type, index) //
.flatMap(documentAfterLoad -> {
T entity = reader.read(type, documentAfterLoad);
IndexedObjectInformation indexedObjectInformation = IndexedObjectInformation.of( //
documentAfterLoad.hasId() ? documentAfterLoad.getId() : null, //
documentAfterLoad.hasSeqNo() ? documentAfterLoad.getSeqNo() : null, //
documentAfterLoad.hasPrimaryTerm() ? documentAfterLoad.getPrimaryTerm() : null, //
documentAfterLoad.hasVersion() ? documentAfterLoad.getVersion() : null); //
entity = updateIndexedObject(entity, indexedObjectInformation);
return maybeCallAfterConvert(entity, documentAfterLoad, index);
});
}
}
protected interface SearchDocumentCallback<T> {
Mono<T> toEntity(SearchDocument response);
Mono<SearchHit<T>> toSearchHit(SearchDocument response);
}
protected class ReadSearchDocumentCallback<T> implements SearchDocumentCallback<T> {
private final DocumentCallback<T> delegate;
private final Class<T> type;
public ReadSearchDocumentCallback(Class<T> type, IndexCoordinates index) {
Assert.notNull(type, "type is null");
this.delegate = new ReadDocumentCallback<>(converter, type, index);
this.type = type;
}
@Override
public Mono<T> toEntity(SearchDocument response) {
return delegate.toEntity(response);
}
@Override
public Mono<SearchHit<T>> toSearchHit(SearchDocument response) {
return toEntity(response).map(entity -> SearchHitMapping.mappingFor(type, converter).mapHit(response, entity));
}
}
// endregion
// region Helper methods
@Override
public IndexCoordinates getIndexCoordinatesFor(Class<?> clazz) {
ElasticsearchPersistentEntity<?> persistentEntity = getPersistentEntityFor(clazz);
Assert.notNull(persistentEntity, "could not get indexCoordinates for class " + clazz.getName());
return persistentEntity.getIndexCoordinates();
}
@Override
@Nullable
public ElasticsearchPersistentEntity<?> getPersistentEntityFor(@Nullable Class<?> type) {
return type != null ? mappingContext.getPersistentEntity(type) : null;
}
/**
* @return the vendor name of the used cluster and client library
* @since 4.3
*/
public abstract Mono<String> getVendor();
/**
* @return the version of the used client runtime library.
* @since 4.3
*/
public abstract Mono<String> getRuntimeLibraryVersion();
public abstract Mono<String> getClusterVersion();
/**
* Value class to capture client independent information from a response to an index request.
*/
public record IndexResponseMetaData(String id, long seqNo, long primaryTerm, long version) {
}
// endregion
protected class Entities<T> {
private final List<T> entities;
public Entities(List<T> entities) {
Assert.notNull(entities, "entities cannot be null");
this.entities = entities;
}
public boolean isEmpty() {
return entities.isEmpty();
}
public List<IndexQuery> indexQueries() {
return entities.stream().map(AbstractReactiveElasticsearchTemplate.this::getIndexQuery)
.collect(Collectors.toList());
}
public T entityAt(long index) {
// it's safe to cast to int because the original indexed collection was fitting in memory
int intIndex = (int) index;
return entities.get(intIndex);
}
}
}
相关信息
spring-data-elasticsearch 源码目录
相关文章
spring-data-elasticsearch AbstractElasticsearchTemplate 源码
spring-data-elasticsearch AbstractIndexTemplate 源码
spring-data-elasticsearch ActiveShardCount 源码
spring-data-elasticsearch AggregationContainer 源码
spring-data-elasticsearch AggregationsContainer 源码
spring-data-elasticsearch DocumentOperations 源码
spring-data-elasticsearch ElasticsearchOperations 源码
spring-data-elasticsearch EntityOperations 源码
0
赞
热门推荐
-
2、 - 优质文章
-
3、 gate.io
-
8、 golang
-
9、 openharmony
-
10、 Vue中input框自动聚焦