spring-data-redis RedisKeyValueAdapter 源码
spring-data-redis RedisKeyValueAdapter 代码
文件路径:/src/main/java/org/springframework/data/redis/core/RedisKeyValueAdapter.java
/*
* Copyright 2015-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.redis.core;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.ApplicationListener;
import org.springframework.core.convert.ConversionService;
import org.springframework.core.convert.ConverterNotFoundException;
import org.springframework.data.keyvalue.core.AbstractKeyValueAdapter;
import org.springframework.data.keyvalue.core.KeyValueAdapter;
import org.springframework.data.mapping.PersistentPropertyAccessor;
import org.springframework.data.redis.connection.DataType;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.core.PartialUpdate.PropertyUpdate;
import org.springframework.data.redis.core.PartialUpdate.UpdateCommand;
import org.springframework.data.redis.core.RedisKeyValueAdapter.RedisUpdateObject.Index;
import org.springframework.data.redis.core.convert.GeoIndexedPropertyValue;
import org.springframework.data.redis.core.convert.KeyspaceConfiguration;
import org.springframework.data.redis.core.convert.MappingRedisConverter;
import org.springframework.data.redis.core.convert.MappingRedisConverter.BinaryKeyspaceIdentifier;
import org.springframework.data.redis.core.convert.MappingRedisConverter.KeyspaceIdentifier;
import org.springframework.data.redis.core.convert.PathIndexResolver;
import org.springframework.data.redis.core.convert.RedisConverter;
import org.springframework.data.redis.core.convert.RedisCustomConversions;
import org.springframework.data.redis.core.convert.RedisData;
import org.springframework.data.redis.core.convert.ReferenceResolverImpl;
import org.springframework.data.redis.core.mapping.RedisMappingContext;
import org.springframework.data.redis.core.mapping.RedisPersistentEntity;
import org.springframework.data.redis.core.mapping.RedisPersistentProperty;
import org.springframework.data.redis.listener.KeyExpirationEventMessageListener;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.util.ByteUtils;
import org.springframework.data.util.CloseableIterator;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;
import org.springframework.util.ObjectUtils;
/**
* Redis specific {@link KeyValueAdapter} implementation. Uses binary codec to read/write data from/to Redis. Objects
* are stored in a Redis Hash using the value of {@link RedisHash}, the {@link KeyspaceConfiguration} or just
* {@link Class#getName()} as a prefix. <br />
* <strong>Example</strong>
*
* <pre>
* <code>
* @RedisHash("persons")
* class Person {
* @Id String id;
* String name;
* }
*
*
* prefix ID
* | |
* V V
* hgetall persons:5d67b7e1-8640-4475-beeb-c666fab4c0e5
* 1) id
* 2) 5d67b7e1-8640-4475-beeb-c666fab4c0e5
* 3) name
* 4) Rand al'Thor
* </code>
* </pre>
*
* <br />
* The {@link KeyValueAdapter} is <strong>not</strong> intended to store simple types such as {@link String} values.
* Please use {@link RedisTemplate} for this purpose.
*
* @author Christoph Strobl
* @author Mark Paluch
* @author Andrey Muchnik
* @since 1.7
*/
public class RedisKeyValueAdapter extends AbstractKeyValueAdapter
implements InitializingBean, ApplicationContextAware, ApplicationListener<RedisKeyspaceEvent> {
/**
* Time To Live in seconds that phantom keys should live longer than the actual key.
*/
private static final int PHANTOM_KEY_TTL = 300;
private RedisOperations<?, ?> redisOps;
private RedisConverter converter;
private @Nullable RedisMessageListenerContainer messageListenerContainer;
private boolean managedListenerContainer = true;
private final AtomicReference<KeyExpirationEventMessageListener> expirationListener = new AtomicReference<>(null);
private @Nullable ApplicationEventPublisher eventPublisher;
private EnableKeyspaceEvents enableKeyspaceEvents = EnableKeyspaceEvents.OFF;
private @Nullable String keyspaceNotificationsConfigParameter = null;
private ShadowCopy shadowCopy = ShadowCopy.DEFAULT;
/**
* Creates new {@link RedisKeyValueAdapter} with default {@link RedisMappingContext} and default
* {@link RedisCustomConversions}.
*
* @param redisOps must not be {@literal null}.
*/
public RedisKeyValueAdapter(RedisOperations<?, ?> redisOps) {
this(redisOps, new RedisMappingContext());
}
/**
* Creates new {@link RedisKeyValueAdapter} with default {@link RedisCustomConversions}.
*
* @param redisOps must not be {@literal null}.
* @param mappingContext must not be {@literal null}.
*/
public RedisKeyValueAdapter(RedisOperations<?, ?> redisOps, RedisMappingContext mappingContext) {
this(redisOps, mappingContext, new RedisCustomConversions());
}
/**
* Creates new {@link RedisKeyValueAdapter}.
*
* @param redisOps must not be {@literal null}.
* @param mappingContext must not be {@literal null}.
* @param customConversions can be {@literal null}.
* @since 2.0
*/
public RedisKeyValueAdapter(RedisOperations<?, ?> redisOps, RedisMappingContext mappingContext,
@Nullable org.springframework.data.convert.CustomConversions customConversions) {
super(new RedisQueryEngine());
Assert.notNull(redisOps, "RedisOperations must not be null");
Assert.notNull(mappingContext, "RedisMappingContext must not be null");
MappingRedisConverter mappingConverter = new MappingRedisConverter(mappingContext,
new PathIndexResolver(mappingContext), new ReferenceResolverImpl(redisOps));
mappingConverter.setCustomConversions(customConversions == null ? new RedisCustomConversions() : customConversions);
mappingConverter.afterPropertiesSet();
this.converter = mappingConverter;
this.redisOps = redisOps;
initMessageListenerContainer();
}
/**
* Creates new {@link RedisKeyValueAdapter} with specific {@link RedisConverter}.
*
* @param redisOps must not be {@literal null}.
* @param redisConverter must not be {@literal null}.
*/
public RedisKeyValueAdapter(RedisOperations<?, ?> redisOps, RedisConverter redisConverter) {
super(new RedisQueryEngine());
Assert.notNull(redisOps, "RedisOperations must not be null");
this.converter = redisConverter;
this.redisOps = redisOps;
}
/**
* Default constructor.
*/
protected RedisKeyValueAdapter() {}
@Override
public Object put(Object id, Object item, String keyspace) {
RedisData rdo = item instanceof RedisData ? (RedisData) item : new RedisData();
if (!(item instanceof RedisData)) {
converter.write(item, rdo);
}
if (ObjectUtils.nullSafeEquals(EnableKeyspaceEvents.ON_DEMAND, enableKeyspaceEvents)
&& this.expirationListener.get() == null) {
if (rdo.getTimeToLive() != null && rdo.getTimeToLive() > 0) {
initKeyExpirationListener();
}
}
if (rdo.getId() == null) {
rdo.setId(converter.getConversionService().convert(id, String.class));
}
redisOps.execute((RedisCallback<Object>) connection -> {
byte[] key = toBytes(rdo.getId());
byte[] objectKey = createKey(rdo.getKeyspace(), rdo.getId());
boolean isNew = connection.del(objectKey) == 0;
connection.hMSet(objectKey, rdo.getBucket().rawMap());
if (isNew) {
connection.sAdd(toBytes(rdo.getKeyspace()), key);
}
if (expires(rdo)) {
connection.expire(objectKey, rdo.getTimeToLive());
}
if (keepShadowCopy()) { // add phantom key so values can be restored
byte[] phantomKey = ByteUtils.concat(objectKey, BinaryKeyspaceIdentifier.PHANTOM_SUFFIX);
if (expires(rdo)) {
connection.del(phantomKey);
connection.hMSet(phantomKey, rdo.getBucket().rawMap());
connection.expire(phantomKey, rdo.getTimeToLive() + PHANTOM_KEY_TTL);
} else if (!isNew) {
connection.del(phantomKey);
}
}
IndexWriter indexWriter = new IndexWriter(connection, converter);
if (isNew) {
indexWriter.createIndexes(key, rdo.getIndexedData());
} else {
indexWriter.deleteAndUpdateIndexes(key, rdo.getIndexedData());
}
return null;
});
return item;
}
@Override
public boolean contains(Object id, String keyspace) {
Boolean exists = redisOps
.execute((RedisCallback<Boolean>) connection -> connection.sIsMember(toBytes(keyspace), toBytes(id)));
return exists != null ? exists : false;
}
@Nullable
@Override
public Object get(Object id, String keyspace) {
return get(id, keyspace, Object.class);
}
@Nullable
@Override
public <T> T get(Object id, String keyspace, Class<T> type) {
String stringId = asString(id);
String stringKeyspace = asString(keyspace);
byte[] binId = createKey(stringKeyspace, stringId);
Map<byte[], byte[]> raw = redisOps
.execute((RedisCallback<Map<byte[], byte[]>>) connection -> connection.hGetAll(binId));
if (CollectionUtils.isEmpty(raw)) {
return null;
}
RedisData data = new RedisData(raw);
data.setId(stringId);
data.setKeyspace(stringKeyspace);
return readBackTimeToLiveIfSet(binId, converter.read(type, data));
}
@Override
public Object delete(Object id, String keyspace) {
return delete(id, keyspace, Object.class);
}
@Override
public <T> T delete(Object id, String keyspace, Class<T> type) {
byte[] binId = toBytes(id);
byte[] binKeyspace = toBytes(keyspace);
T o = get(id, keyspace, type);
if (o != null) {
byte[] keyToDelete = createKey(asString(keyspace), asString(id));
redisOps.execute((RedisCallback<Void>) connection -> {
connection.del(keyToDelete);
connection.sRem(binKeyspace, binId);
new IndexWriter(connection, converter).removeKeyFromIndexes(asString(keyspace), binId);
if (RedisKeyValueAdapter.this.keepShadowCopy()) {
RedisPersistentEntity<?> persistentEntity = converter.getMappingContext().getPersistentEntity(type);
if (persistentEntity != null && persistentEntity.isExpiring()) {
byte[] phantomKey = ByteUtils.concat(keyToDelete, BinaryKeyspaceIdentifier.PHANTOM_SUFFIX);
connection.del(phantomKey);
}
}
return null;
});
}
return o;
}
@Override
public List<?> getAllOf(String keyspace) {
return getAllOf(keyspace, Object.class, -1, -1);
}
@Override
public <T> Iterable<T> getAllOf(String keyspace, Class<T> type) {
return getAllOf(keyspace, type, -1, -1);
}
/**
* Get all elements for given keyspace.
*
* @param keyspace the keyspace to fetch entities from.
* @param type the desired target type.
* @param offset index value to start reading.
* @param rows maximum number or entities to return.
* @param <T>
* @return never {@literal null}.
* @since 2.5
*/
public <T> List<T> getAllOf(String keyspace, Class<T> type, long offset, int rows) {
byte[] binKeyspace = toBytes(keyspace);
Set<byte[]> ids = redisOps.execute((RedisCallback<Set<byte[]>>) connection -> connection.sMembers(binKeyspace));
List<T> result = new ArrayList<>();
List<byte[]> keys = new ArrayList<>(ids);
if (keys.isEmpty() || keys.size() < offset) {
return Collections.emptyList();
}
offset = Math.max(0, offset);
if (rows > 0) {
keys = keys.subList((int) offset, Math.min((int) offset + rows, keys.size()));
}
for (byte[] key : keys) {
result.add(get(key, keyspace, type));
}
return result;
}
@Override
public void deleteAllOf(String keyspace) {
redisOps.execute((RedisCallback<Void>) connection -> {
connection.del(toBytes(keyspace));
new IndexWriter(connection, converter).removeAllIndexes(asString(keyspace));
return null;
});
}
@Override
public CloseableIterator<Entry<Object, Object>> entries(String keyspace) {
throw new UnsupportedOperationException("Not yet implemented");
}
@Override
public long count(String keyspace) {
Long count = redisOps.execute((RedisCallback<Long>) connection -> connection.sCard(toBytes(keyspace)));
return count != null ? count : 0;
}
public void update(PartialUpdate<?> update) {
RedisPersistentEntity<?> entity = this.converter.getMappingContext()
.getRequiredPersistentEntity(update.getTarget());
String keyspace = entity.getKeySpace();
Object id = update.getId();
byte[] redisKey = createKey(keyspace, converter.getConversionService().convert(id, String.class));
RedisData rdo = new RedisData();
this.converter.write(update, rdo);
redisOps.execute((RedisCallback<Void>) connection -> {
RedisUpdateObject redisUpdateObject = new RedisUpdateObject(redisKey, keyspace, id);
for (PropertyUpdate pUpdate : update.getPropertyUpdates()) {
String propertyPath = pUpdate.getPropertyPath();
if (UpdateCommand.DEL.equals(pUpdate.getCmd()) || pUpdate.getValue() instanceof Collection
|| pUpdate.getValue() instanceof Map
|| (pUpdate.getValue() != null && pUpdate.getValue().getClass().isArray()) || (pUpdate.getValue() != null
&& !converter.getConversionService().canConvert(pUpdate.getValue().getClass(), byte[].class))) {
redisUpdateObject = fetchDeletePathsFromHashAndUpdateIndex(redisUpdateObject, propertyPath, connection);
}
}
if (!redisUpdateObject.fieldsToRemove.isEmpty()) {
connection.hDel(redisKey,
redisUpdateObject.fieldsToRemove.toArray(new byte[redisUpdateObject.fieldsToRemove.size()][]));
}
for (Index index : redisUpdateObject.indexesToUpdate) {
if (ObjectUtils.nullSafeEquals(DataType.ZSET, index.type)) {
connection.zRem(index.key, toBytes(redisUpdateObject.targetId));
} else {
connection.sRem(index.key, toBytes(redisUpdateObject.targetId));
}
}
if (!rdo.getBucket().isEmpty()) {
if (rdo.getBucket().size() > 1
|| (rdo.getBucket().size() == 1 && !rdo.getBucket().asMap().containsKey("_class"))) {
connection.hMSet(redisKey, rdo.getBucket().rawMap());
}
}
if (update.isRefreshTtl()) {
if (expires(rdo)) {
connection.expire(redisKey, rdo.getTimeToLive());
if (keepShadowCopy()) { // add phantom key so values can be restored
byte[] phantomKey = ByteUtils.concat(redisKey, BinaryKeyspaceIdentifier.PHANTOM_SUFFIX);
connection.hMSet(phantomKey, rdo.getBucket().rawMap());
connection.expire(phantomKey, rdo.getTimeToLive() + PHANTOM_KEY_TTL);
}
} else {
connection.persist(redisKey);
if (keepShadowCopy()) {
connection.del(ByteUtils.concat(redisKey, BinaryKeyspaceIdentifier.PHANTOM_SUFFIX));
}
}
}
new IndexWriter(connection, converter).updateIndexes(toBytes(id), rdo.getIndexedData());
return null;
});
}
private RedisUpdateObject fetchDeletePathsFromHashAndUpdateIndex(RedisUpdateObject redisUpdateObject, String path,
RedisConnection connection) {
redisUpdateObject.addFieldToRemove(toBytes(path));
byte[] value = connection.hGet(redisUpdateObject.targetKey, toBytes(path));
if (value != null && value.length > 0) {
byte[] existingValueIndexKey = ByteUtils.concatAll(toBytes(redisUpdateObject.keyspace), toBytes(":" + path),
toBytes(":"), value);
if (connection.exists(existingValueIndexKey)) {
redisUpdateObject.addIndexToUpdate(new RedisUpdateObject.Index(existingValueIndexKey, DataType.SET));
}
return redisUpdateObject;
}
Set<byte[]> existingFields = connection.hKeys(redisUpdateObject.targetKey);
for (byte[] field : existingFields) {
if (asString(field).startsWith(path + ".")) {
redisUpdateObject.addFieldToRemove(field);
value = connection.hGet(redisUpdateObject.targetKey, toBytes(field));
if (value != null) {
byte[] existingValueIndexKey = ByteUtils.concatAll(toBytes(redisUpdateObject.keyspace), toBytes(":"), field,
toBytes(":"), value);
if (connection.exists(existingValueIndexKey)) {
redisUpdateObject.addIndexToUpdate(new RedisUpdateObject.Index(existingValueIndexKey, DataType.SET));
}
}
}
}
String pathToUse = GeoIndexedPropertyValue.geoIndexName(path);
byte[] existingGeoIndexKey = ByteUtils.concatAll(toBytes(redisUpdateObject.keyspace), toBytes(":"),
toBytes(pathToUse));
if (connection.zRank(existingGeoIndexKey, toBytes(redisUpdateObject.targetId)) != null) {
redisUpdateObject.addIndexToUpdate(new RedisUpdateObject.Index(existingGeoIndexKey, DataType.ZSET));
}
return redisUpdateObject;
}
/**
* Execute {@link RedisCallback} via underlying {@link RedisOperations}.
*
* @param callback must not be {@literal null}.
* @see RedisOperations#execute(RedisCallback)
* @return
*/
@Nullable
public <T> T execute(RedisCallback<T> callback) {
return redisOps.execute(callback);
}
/**
* Get the {@link RedisConverter} in use.
*
* @return never {@literal null}.
*/
public RedisConverter getConverter() {
return this.converter;
}
public void clear() {
// nothing to do
}
private String asString(Object value) {
return value instanceof String ? (String) value
: getConverter().getConversionService().convert(value, String.class);
}
public byte[] createKey(String keyspace, String id) {
return toBytes(keyspace + ":" + id);
}
/**
* Convert given source to binary representation using the underlying {@link ConversionService}.
*
* @param source
* @return
* @throws ConverterNotFoundException
*/
public byte[] toBytes(Object source) {
if (source instanceof byte[]) {
return (byte[]) source;
}
return converter.getConversionService().convert(source, byte[].class);
}
/**
* Read back and set {@link TimeToLive} for the property.
*
* @param key
* @param target
* @return
*/
@Nullable
@SuppressWarnings({ "unchecked", "rawtypes" })
private <T> T readBackTimeToLiveIfSet(@Nullable byte[] key, @Nullable T target) {
if (target == null || key == null) {
return target;
}
RedisPersistentEntity<?> entity = this.converter.getMappingContext().getRequiredPersistentEntity(target.getClass());
if (entity.hasExplictTimeToLiveProperty()) {
RedisPersistentProperty ttlProperty = entity.getExplicitTimeToLiveProperty();
if (ttlProperty == null) {
return target;
}
TimeToLive ttl = ttlProperty.findAnnotation(TimeToLive.class);
Long timeout = redisOps.execute((RedisCallback<Long>) connection -> {
if (ObjectUtils.nullSafeEquals(TimeUnit.SECONDS, ttl.unit())) {
return connection.ttl(key);
}
return connection.pTtl(key, ttl.unit());
});
if (timeout != null || !ttlProperty.getType().isPrimitive()) {
PersistentPropertyAccessor<T> propertyAccessor = entity.getPropertyAccessor(target);
propertyAccessor.setProperty(ttlProperty,
converter.getConversionService().convert(timeout, ttlProperty.getType()));
target = propertyAccessor.getBean();
}
}
return target;
}
/**
* @return {@literal true} if {@link RedisData#getTimeToLive()} has a positive value.
* @param data must not be {@literal null}.
* @since 2.3.7
*/
private boolean expires(RedisData data) {
return data.getTimeToLive() != null && data.getTimeToLive() > 0;
}
/**
* Configure usage of {@link KeyExpirationEventMessageListener}.
*
* @param enableKeyspaceEvents
* @since 1.8
*/
public void setEnableKeyspaceEvents(EnableKeyspaceEvents enableKeyspaceEvents) {
this.enableKeyspaceEvents = enableKeyspaceEvents;
}
/**
* Configure a {@link RedisMessageListenerContainer} to listen for Keyspace expiry events. The container can only be
* set when this bean hasn't been yet {@link #afterPropertiesSet() initialized}.
*
* @param messageListenerContainer the container to use.
* @since 2.7.2
* @throws IllegalStateException when trying to set a {@link RedisMessageListenerContainer} after
* {@link #afterPropertiesSet()} has been called to initialize a managed container instance.
*/
public void setMessageListenerContainer(RedisMessageListenerContainer messageListenerContainer) {
Assert.notNull(messageListenerContainer, "RedisMessageListenerContainer must not be null");
if (this.managedListenerContainer && this.messageListenerContainer != null) {
throw new IllegalStateException(
"Cannot set RedisMessageListenerContainer after initializing a managed RedisMessageListenerContainer instance");
}
this.managedListenerContainer = false;
this.messageListenerContainer = messageListenerContainer;
}
/**
* Configure the {@literal notify-keyspace-events} property if not already set. Use an empty {@link String} or
* {@literal null} to retain existing server settings.
*
* @param keyspaceNotificationsConfigParameter can be {@literal null}.
* @since 1.8
*/
public void setKeyspaceNotificationsConfigParameter(String keyspaceNotificationsConfigParameter) {
this.keyspaceNotificationsConfigParameter = keyspaceNotificationsConfigParameter;
}
/**
* Configure storage of phantom keys (shadow copies) of expiring entities.
*
* @param shadowCopy must not be {@literal null}.
* @since 2.3
*/
public void setShadowCopy(ShadowCopy shadowCopy) {
this.shadowCopy = shadowCopy;
}
/**
* @see org.springframework.beans.factory.InitializingBean#afterPropertiesSet()
* @since 1.8
*/
@Override
public void afterPropertiesSet() {
if (this.managedListenerContainer) {
initMessageListenerContainer();
}
if (ObjectUtils.nullSafeEquals(EnableKeyspaceEvents.ON_STARTUP, this.enableKeyspaceEvents)) {
initKeyExpirationListener();
}
}
public void destroy() throws Exception {
if (this.expirationListener.get() != null) {
this.expirationListener.get().destroy();
}
if (this.managedListenerContainer && this.messageListenerContainer != null) {
this.messageListenerContainer.destroy();
this.messageListenerContainer = null;
}
}
@Override
public void onApplicationEvent(RedisKeyspaceEvent event) {
// just a customization hook
}
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.eventPublisher = applicationContext;
}
private void initMessageListenerContainer() {
this.messageListenerContainer = new RedisMessageListenerContainer();
this.messageListenerContainer.setConnectionFactory(((RedisTemplate<?, ?>) redisOps).getConnectionFactory());
this.messageListenerContainer.afterPropertiesSet();
this.messageListenerContainer.start();
}
private void initKeyExpirationListener() {
if (this.expirationListener.get() == null) {
MappingExpirationListener listener = new MappingExpirationListener(this.messageListenerContainer, this.redisOps,
this.converter);
listener.setKeyspaceNotificationsConfigParameter(keyspaceNotificationsConfigParameter);
if (this.eventPublisher != null) {
listener.setApplicationEventPublisher(this.eventPublisher);
}
if (this.expirationListener.compareAndSet(null, listener)) {
listener.init();
}
}
}
/**
* {@link MessageListener} implementation used to capture Redis keyspace notifications. Tries to read a previously
* created phantom key {@code keyspace:id:phantom} to provide the expired object as part of the published
* {@link RedisKeyExpiredEvent}.
*
* @author Christoph Strobl
* @since 1.7
*/
static class MappingExpirationListener extends KeyExpirationEventMessageListener {
private final RedisOperations<?, ?> ops;
private final RedisConverter converter;
/**
* Creates new {@link MappingExpirationListener}.
*
* @param listenerContainer
* @param ops
* @param converter
*/
MappingExpirationListener(RedisMessageListenerContainer listenerContainer, RedisOperations<?, ?> ops,
RedisConverter converter) {
super(listenerContainer);
this.ops = ops;
this.converter = converter;
}
@Override
public void onMessage(Message message, @Nullable byte[] pattern) {
if (!isKeyExpirationMessage(message)) {
return;
}
byte[] key = message.getBody();
byte[] phantomKey = ByteUtils.concat(key,
converter.getConversionService().convert(KeyspaceIdentifier.PHANTOM_SUFFIX, byte[].class));
Map<byte[], byte[]> hash = ops.execute((RedisCallback<Map<byte[], byte[]>>) connection -> {
Map<byte[], byte[]> hash1 = connection.hGetAll(phantomKey);
if (!CollectionUtils.isEmpty(hash1)) {
connection.del(phantomKey);
}
return hash1;
});
Object value = CollectionUtils.isEmpty(hash) ? null : converter.read(Object.class, new RedisData(hash));
byte[] channelAsBytes = message.getChannel();
String channel = !ObjectUtils.isEmpty(channelAsBytes)
? converter.getConversionService().convert(channelAsBytes, String.class)
: null;
RedisKeyExpiredEvent event = new RedisKeyExpiredEvent(channel, key, value);
ops.execute((RedisCallback<Void>) connection -> {
connection.sRem(converter.getConversionService().convert(event.getKeyspace(), byte[].class), event.getId());
new IndexWriter(connection, converter).removeKeyFromIndexes(event.getKeyspace(), event.getId());
return null;
});
publishEvent(event);
}
private boolean isKeyExpirationMessage(Message message) {
return BinaryKeyspaceIdentifier.isValid(message.getBody());
}
}
private boolean keepShadowCopy() {
switch (shadowCopy) {
case OFF:
return false;
case ON:
return true;
default:
return this.expirationListener.get() != null;
}
}
/**
* @author Christoph Strobl
* @since 1.8
*/
public enum EnableKeyspaceEvents {
/**
* Initializes the {@link KeyExpirationEventMessageListener} on startup.
*/
ON_STARTUP,
/**
* Initializes the {@link KeyExpirationEventMessageListener} on first insert having expiration time set.
*/
ON_DEMAND,
/**
* Turn {@link KeyExpirationEventMessageListener} usage off. No expiration events will be received.
*/
OFF
}
/**
* Configuration flag controlling storage of phantom keys (shadow copies) of expiring entities to read them later when
* publishing {@link RedisKeyspaceEvent}.
*
* @author Christoph Strobl
* @since 2.4
*/
public enum ShadowCopy {
/**
* Store shadow copies of expiring entities depending on the {@link EnableKeyspaceEvents}.
*/
DEFAULT,
/**
* Store shadow copies of expiring entities.
*/
ON,
/**
* Do not store shadow copies.
*/
OFF
}
/**
* Container holding update information like fields to remove from the Redis Hash.
*
* @author Christoph Strobl
*/
static class RedisUpdateObject {
private final String keyspace;
private final Object targetId;
private final byte[] targetKey;
private final Set<byte[]> fieldsToRemove = new LinkedHashSet<>();
private final Set<Index> indexesToUpdate = new LinkedHashSet<>();
RedisUpdateObject(byte[] targetKey, String keyspace, Object targetId) {
this.targetKey = targetKey;
this.keyspace = keyspace;
this.targetId = targetId;
}
void addFieldToRemove(byte[] field) {
fieldsToRemove.add(field);
}
void addIndexToUpdate(Index index) {
indexesToUpdate.add(index);
}
static class Index {
final DataType type;
final byte[] key;
public Index(byte[] key, DataType type) {
this.key = key;
this.type = type;
}
}
}
}
相关信息
相关文章
spring-data-redis AbstractOperations 源码
spring-data-redis BoundGeoOperations 源码
spring-data-redis BoundHashOperations 源码
spring-data-redis BoundKeyOperations 源码
spring-data-redis BoundListOperations 源码
spring-data-redis BoundOperationsProxyFactory 源码
spring-data-redis BoundSetOperations 源码
spring-data-redis BoundStreamOperations 源码
0
赞
热门推荐
-
2、 - 优质文章
-
3、 gate.io
-
8、 golang
-
9、 openharmony
-
10、 Vue中input框自动聚焦