spring-data-redis RedisStreamCommands 源码

  • 2022-08-16
  • 浏览 (315)

spring-data-redis RedisStreamCommands 代码

文件路径:/src/main/java/org/springframework/data/redis/connection/RedisStreamCommands.java

/*
 * Copyright 2018-2022 the original author or authors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      https://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.springframework.data.redis.connection;

import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import org.springframework.data.domain.Range;
import org.springframework.data.redis.connection.stream.*;
import org.springframework.data.redis.connection.stream.StreamInfo.XInfoConsumers;
import org.springframework.data.redis.connection.stream.StreamInfo.XInfoGroups;
import org.springframework.data.redis.connection.stream.StreamInfo.XInfoStream;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;

/**
 * Stream-specific Redis commands.
 *
 * @author Mark Paluch
 * @author Christoph Strobl
 * @author Tugdual Grall
 * @author Dengliming
 * @author Mark John Moreno
 * @see <a href="https://redis.io/topics/streams-intro">Redis Documentation - Streams</a>
 * @since 2.2
 */
public interface RedisStreamCommands {

	/**
	 * Acknowledge one or more records, identified via their id, as processed.
	 *
	 * @param key the {@literal key} the stream is stored at.
	 * @param group name of the consumer group.
	 * @param recordIds the String representation of the {@literal id's} of the records to acknowledge.
	 * @return length of acknowledged messages. {@literal null} when used in pipeline / transaction.
	 * @see <a href="https://redis.io/commands/xack">Redis Documentation: XACK</a>
	 */
	@Nullable
	default Long xAck(byte[] key, String group, String... recordIds) {
		return xAck(key, group, Arrays.stream(recordIds).map(RecordId::of).toArray(RecordId[]::new));
	}

	/**
	 * Acknowledge one or more records, identified via their id, as processed.
	 *
	 * @param key the {@literal key} the stream is stored at.
	 * @param group name of the consumer group.
	 * @param recordIds the {@literal id's} of the records to acknowledge.
	 * @return length of acknowledged messages. {@literal null} when used in pipeline / transaction.
	 * @see <a href="https://redis.io/commands/xack">Redis Documentation: XACK</a>
	 */
	@Nullable
	Long xAck(byte[] key, String group, RecordId... recordIds);

	/**
	 * Append a new record with the given {@link Map field/value pairs} as content to the stream stored at {@code key}.
	 *
	 * @param key the {@literal key} the stream is stored at.
	 * @param content the records content modeled as {@link Map field/value pairs}.
	 * @return the server generated {@link RecordId id}. {@literal null} when used in pipeline / transaction.
	 * @see <a href="https://redis.io/commands/xadd">Redis Documentation: XADD</a>
	 */
	@Nullable
	default RecordId xAdd(byte[] key, Map<byte[], byte[]> content) {
		return xAdd(StreamRecords.newRecord().in(key).ofMap(content));
	}

	/**
	 * Append the given {@link MapRecord record} to the stream stored at {@code Record#getStream}. If you prefer manual id
	 * assignment over server generated ones make sure to provide an id via {@code Record#withId}.
	 *
	 * @param record the {@link MapRecord record} to append.
	 * @return the {@link RecordId id} after save. {@literal null} when used in pipeline / transaction.
	 */
	@Nullable
	default RecordId xAdd(MapRecord<byte[], byte[], byte[]> record) {
		return xAdd(record, XAddOptions.none());
	}

	/**
	 * Append the given {@link MapRecord record} to the stream stored at {@code Record#getStream}. If you prefer manual id
	 * assignment over server generated ones make sure to provide an id via {@code Record#withId}.
	 *
	 * @param record the {@link MapRecord record} to append.
	 * @param options additional options (eg. {@literal MAXLEN}). Must not be {@literal null}, use
	 *          {@link XAddOptions#none()} instead.
	 * @return the {@link RecordId id} after save. {@literal null} when used in pipeline / transaction.
	 * @since 2.3
	 */
	@Nullable
	RecordId xAdd(MapRecord<byte[], byte[], byte[]> record, XAddOptions options);

	/**
	 * Additional options applicable for {@literal XADD} command.
	 *
	 * @author Christoph Strobl
	 * @author Mark John Moreno
	 * @author Liming Deng
	 * @since 2.3
	 */
	class XAddOptions {

		private static final XAddOptions NONE = new XAddOptions(null, false, false, null);

		private final @Nullable Long maxlen;
		private final boolean nomkstream;
		private final boolean approximateTrimming;
		private final @Nullable RecordId minId;

		private XAddOptions(@Nullable Long maxlen, boolean nomkstream, boolean approximateTrimming,
				@Nullable RecordId minId) {
			this.maxlen = maxlen;
			this.nomkstream = nomkstream;
			this.approximateTrimming = approximateTrimming;
			this.minId = minId;
		}

		/**
		 * @return
		 */
		public static XAddOptions none() {
			return NONE;
		}

		/**
		 * Disable creation of stream if it does not already exist.
		 *
		 * @return new instance of {@link XAddOptions}.
		 * @since 2.6
		 */
		public static XAddOptions makeNoStream() {
			return new XAddOptions(null, true, false, null);
		}

		/**
		 * Disable creation of stream if it does not already exist.
		 *
		 * @param makeNoStream {@code true} to not create a stream if it does not already exist.
		 * @return new instance of {@link XAddOptions}.
		 * @since 2.6
		 */
		public static XAddOptions makeNoStream(boolean makeNoStream) {
			return new XAddOptions(null, makeNoStream, false, null);
		}

		/**
		 * Limit the size of the stream to the given maximum number of elements.
		 *
		 * @return new instance of {@link XAddOptions}.
		 */
		public static XAddOptions maxlen(long maxlen) {
			return new XAddOptions(maxlen, false, false, null);
		}

		/**
		 * Apply {@code MINID} trimming strategy, that evicts entries with IDs lower than the one specified.
		 *
		 * @param minId the minimum record Id to retain.
		 * @return new instance of {@link XAddOptions}.
		 * @since 2.7
		 */
		public XAddOptions minId(RecordId minId) {
			return new XAddOptions(maxlen, nomkstream, approximateTrimming, minId);
		}

		/**
		 * Apply efficient trimming for capped streams using the {@code ~} flag.
		 *
		 * @return new instance of {@link XAddOptions}.
		 */
		public XAddOptions approximateTrimming(boolean approximateTrimming) {
			return new XAddOptions(maxlen, nomkstream, approximateTrimming, minId);
		}

		/**
		 * @return {@literal true} if {@literal NOMKSTREAM} is set.
		 * @since 2.6
		 */
		public boolean isNoMkStream() {
			return nomkstream;
		}

		/**
		 * Limit the size of the stream to the given maximum number of elements.
		 *
		 * @return can be {@literal null}.
		 */
		@Nullable
		public Long getMaxlen() {
			return maxlen;
		}

		/**
		 * @return {@literal true} if {@literal MAXLEN} is set.
		 */
		public boolean hasMaxlen() {
			return maxlen != null && maxlen > 0;
		}

		/**
		 * @return {@literal true} if {@literal approximateTrimming} is set.
		 */
		public boolean isApproximateTrimming() {
			return approximateTrimming;
		}

		/**
		 * @return the minimum record Id to retain during trimming.
		 * @since 2.7
		 */
		@Nullable
		public RecordId getMinId() {
			return minId;
		}

		/**
		 * @return {@literal true} if {@literal MINID} is set.
		 * @since 2.7
		 */
		public boolean hasMinId() {
			return minId != null;
		}

		@Override
		public boolean equals(Object o) {
			if (this == o) {
				return true;
			}
			if (!(o instanceof XAddOptions)) {
				return false;
			}
			XAddOptions that = (XAddOptions) o;
			if (nomkstream != that.nomkstream) {
				return false;
			}
			if (approximateTrimming != that.approximateTrimming) {
				return false;
			}
			if (!ObjectUtils.nullSafeEquals(maxlen, that.maxlen)) {
				return false;
			}
			return ObjectUtils.nullSafeEquals(minId, that.minId);
		}

		@Override
		public int hashCode() {
			int result = ObjectUtils.nullSafeHashCode(maxlen);
			result = 31 * result + (nomkstream ? 1 : 0);
			result = 31 * result + (approximateTrimming ? 1 : 0);
			result = 31 * result + ObjectUtils.nullSafeHashCode(minId);
			return result;
		}
	}

	/**
	 * Change the ownership of a pending message to the given new {@literal consumer} without increasing the delivered
	 * count.
	 *
	 * @param key the {@literal key} the stream is stored at.
	 * @param group the name of the {@literal consumer group}.
	 * @param newOwner the name of the new {@literal consumer}.
	 * @param options must not be {@literal null}.
	 * @return list of {@link RecordId ids} that changed user.
	 * @see <a href="https://redis.io/commands/xclaim">Redis Documentation: XCLAIM</a>
	 * @since 2.3
	 */
	@Nullable
	List<RecordId> xClaimJustId(byte[] key, String group, String newOwner, XClaimOptions options);

	/**
	 * Change the ownership of a pending message to the given new {@literal consumer}.
	 *
	 * @param key the {@literal key} the stream is stored at.
	 * @param group the name of the {@literal consumer group}.
	 * @param newOwner the name of the new {@literal consumer}.
	 * @param minIdleTime must not be {@literal null}.
	 * @param recordIds must not be {@literal null}.
	 * @return list of {@link ByteRecord} that changed user.
	 * @see <a href="https://redis.io/commands/xclaim">Redis Documentation: XCLAIM</a>
	 * @since 2.3
	 */
	@Nullable
	default List<ByteRecord> xClaim(byte[] key, String group, String newOwner, Duration minIdleTime,
			RecordId... recordIds) {
		return xClaim(key, group, newOwner, XClaimOptions.minIdle(minIdleTime).ids(recordIds));
	}

	/**
	 * Change the ownership of a pending message to the given new {@literal consumer}.
	 *
	 * @param key the {@literal key} the stream is stored at.
	 * @param group the name of the {@literal consumer group}.
	 * @param newOwner the name of the new {@literal consumer}.
	 * @param options must not be {@literal null}.
	 * @return list of {@link ByteRecord} that changed user.
	 * @see <a href="https://redis.io/commands/xclaim">Redis Documentation: XCLAIM</a>
	 * @since 2.3
	 */
	@Nullable
	List<ByteRecord> xClaim(byte[] key, String group, String newOwner, XClaimOptions options);

	/**
	 * @author Christoph Strobl
	 * @since 2.3
	 */
	class XClaimOptions {

		private final List<RecordId> ids;
		private final Duration minIdleTime;
		private final @Nullable Duration idleTime;
		private final @Nullable Instant unixTime;
		private final @Nullable Long retryCount;
		private final boolean force;

		private XClaimOptions(List<RecordId> ids, Duration minIdleTime, @Nullable Duration idleTime,
				@Nullable Instant unixTime, @Nullable Long retryCount, boolean force) {

			this.ids = new ArrayList<>(ids);
			this.minIdleTime = minIdleTime;
			this.idleTime = idleTime;
			this.unixTime = unixTime;
			this.retryCount = retryCount;
			this.force = force;
		}

		/**
		 * Set the {@literal min-idle-time} to limit the command to messages that have been idle for at at least the given
		 * {@link Duration}.
		 *
		 * @param minIdleTime must not be {@literal null}.
		 * @return new instance of {@link XClaimOptions}.
		 */
		public static XClaimOptionsBuilder minIdle(Duration minIdleTime) {
			return new XClaimOptionsBuilder(minIdleTime);
		}

		/**
		 * Set the {@literal min-idle-time} to limit the command to messages that have been idle for at at least the given
		 * {@literal milliseconds}.
		 *
		 * @param millis
		 * @return new instance of {@link XClaimOptions}.
		 */
		public static XClaimOptionsBuilder minIdleMs(long millis) {
			return minIdle(Duration.ofMillis(millis));
		}

		/**
		 * Set the idle time since last delivery of a message. To specify a specific point in time use
		 * {@link #time(Instant)}.
		 *
		 * @param idleTime idle time.
		 * @return {@code this}.
		 */
		public XClaimOptions idle(Duration idleTime) {
			return new XClaimOptions(ids, minIdleTime, idleTime, unixTime, retryCount, force);
		}

		/**
		 * Sets the idle time to a specific unix time (in milliseconds). To define a relative idle time use
		 * {@link #idle(Duration)}.
		 *
		 * @param unixTime idle time.
		 * @return {@code this}.
		 */
		public XClaimOptions time(Instant unixTime) {
			return new XClaimOptions(ids, minIdleTime, idleTime, unixTime, retryCount, force);
		}

		/**
		 * Set the retry counter to the specified value.
		 *
		 * @param retryCount can be {@literal null}. If {@literal null} no change to the retry counter will be made.
		 * @return new instance of {@link XClaimOptions}.
		 */
		public XClaimOptions retryCount(long retryCount) {
			return new XClaimOptions(ids, minIdleTime, idleTime, unixTime, retryCount, force);
		}

		/**
		 * Forces creation of a pending message entry in the PEL even if it does not already exist as long a the given
		 * stream record id is valid.
		 *
		 * @return new instance of {@link XClaimOptions}.
		 */
		public XClaimOptions force() {
			return new XClaimOptions(ids, minIdleTime, idleTime, unixTime, retryCount, true);
		}

		/**
		 * Get the {@link List} of {@literal ID}.
		 *
		 * @return never {@literal null}.
		 */
		public List<RecordId> getIds() {
			return ids;
		}

		/**
		 * Get the {@literal ID} array as {@link String strings}.
		 *
		 * @return never {@literal null}.
		 */
		public String[] getIdsAsStringArray() {
			return getIds().stream().map(RecordId::getValue).toArray(String[]::new);
		}

		/**
		 * Get the {@literal min-idle-time}.
		 *
		 * @return never {@literal null}.
		 */
		public Duration getMinIdleTime() {
			return minIdleTime;
		}

		/**
		 * Get the {@literal IDLE ms} time.
		 *
		 * @return can be {@literal null}.
		 */
		@Nullable
		public Duration getIdleTime() {
			return idleTime;
		}

		/**
		 * Get the {@literal TIME ms-unix-time}
		 *
		 * @return
		 */
		@Nullable
		public Instant getUnixTime() {
			return unixTime;
		}

		/**
		 * Get the {@literal RETRYCOUNT count}.
		 *
		 * @return
		 */
		@Nullable
		public Long getRetryCount() {
			return retryCount;
		}

		/**
		 * Get the {@literal FORCE} flag.
		 *
		 * @return
		 */
		public boolean isForce() {
			return force;
		}

		public static class XClaimOptionsBuilder {

			private final Duration minIdleTime;

			XClaimOptionsBuilder(Duration minIdleTime) {

				Assert.notNull(minIdleTime, "Min idle time must not be null");

				this.minIdleTime = minIdleTime;
			}

			/**
			 * Set the {@literal ID}s to claim.
			 *
			 * @param ids must not be {@literal null}.
			 * @return
			 */
			public XClaimOptions ids(List<?> ids) {

				List<RecordId> idList = ids.stream()
						.map(it -> it instanceof RecordId ? (RecordId) it : RecordId.of(it.toString()))
						.collect(Collectors.toList());

				return new XClaimOptions(idList, minIdleTime, null, null, null, false);
			}

			/**
			 * Set the {@literal ID}s to claim.
			 *
			 * @param ids must not be {@literal null}.
			 * @return
			 */
			public XClaimOptions ids(RecordId... ids) {
				return ids(Arrays.asList(ids));
			}

			/**
			 * Set the {@literal ID}s to claim.
			 *
			 * @param ids must not be {@literal null}.
			 * @return
			 */
			public XClaimOptions ids(String... ids) {
				return ids(Arrays.asList(ids));
			}
		}
	}

	/**
	 * Removes the records with the given id's from the stream. Returns the number of items deleted, that may be different
	 * from the number of id's passed in case certain id's do not exist.
	 *
	 * @param key the {@literal key} the stream is stored at.
	 * @param recordIds the id's of the records to remove.
	 * @return number of removed entries. {@literal null} when used in pipeline / transaction.
	 * @see <a href="https://redis.io/commands/xdel">Redis Documentation: XDEL</a>
	 */
	@Nullable
	default Long xDel(byte[] key, String... recordIds) {
		return xDel(key, Arrays.stream(recordIds).map(RecordId::of).toArray(RecordId[]::new));
	}

	/**
	 * Removes the records with the given id's from the stream. Returns the number of items deleted, that may be different
	 * from the number of id's passed in case certain id's do not exist.
	 *
	 * @param key the {@literal key} the stream is stored at.
	 * @param recordIds the id's of the records to remove.
	 * @return number of removed entries. {@literal null} when used in pipeline / transaction.
	 * @see <a href="https://redis.io/commands/xdel">Redis Documentation: XDEL</a>
	 */
	@Nullable
	Long xDel(byte[] key, RecordId... recordIds);

	/**
	 * Create a consumer group.
	 *
	 * @param key the {@literal key} the stream is stored at.
	 * @param groupName name of the consumer group to create.
	 * @param readOffset the offset to start at.
	 * @return {@literal ok} if successful. {@literal null} when used in pipeline / transaction.
	 */
	@Nullable
	String xGroupCreate(byte[] key, String groupName, ReadOffset readOffset);

	/**
	 * Create a consumer group.
	 *
	 * @param key the {@literal key} the stream is stored at.
	 * @param groupName name of the consumer group to create.
	 * @param readOffset the offset to start at.
	 * @param mkStream if true the group will create the stream if not already present (MKSTREAM)
	 * @return {@literal ok} if successful. {@literal null} when used in pipeline / transaction.
	 * @since 2.3
	 */
	@Nullable
	String xGroupCreate(byte[] key, String groupName, ReadOffset readOffset, boolean mkStream);

	/**
	 * Delete a consumer from a consumer group.
	 *
	 * @param key the {@literal key} the stream is stored at.
	 * @param groupName the name of the group to remove the consumer from.
	 * @param consumerName the name of the consumer to remove from the group.
	 * @return {@literal true} if successful. {@literal null} when used in pipeline / transaction.
	 */
	@Nullable
	default Boolean xGroupDelConsumer(byte[] key, String groupName, String consumerName) {
		return xGroupDelConsumer(key, Consumer.from(groupName, consumerName));
	}

	/**
	 * Delete a consumer from a consumer group.
	 *
	 * @param key the {@literal key} the stream is stored at.
	 * @param consumer consumer identified by group name and consumer name.
	 * @return {@literal true} if successful. {@literal null} when used in pipeline / transaction.
	 */
	@Nullable
	Boolean xGroupDelConsumer(byte[] key, Consumer consumer);

	/**
	 * Destroy a consumer group.
	 *
	 * @param key the {@literal key} the stream is stored at.
	 * @param groupName name of the consumer group.
	 * @return {@literal true} if successful. {@literal null} when used in pipeline / transaction.
	 */
	@Nullable
	Boolean xGroupDestroy(byte[] key, String groupName);

	/**
	 * Obtain general information about the stream stored at the specified {@literal key}.
	 *
	 * @param key the {@literal key} the stream is stored at.
	 * @return {@literal null} when used in pipeline / transaction.
	 * @since 2.3
	 */
	@Nullable
	XInfoStream xInfo(byte[] key);

	/**
	 * Obtain information about {@literal consumer groups} associated with the stream stored at the specified
	 * {@literal key}.
	 *
	 * @param key the {@literal key} the stream is stored at.
	 * @return {@literal null} when used in pipeline / transaction.
	 * @since 2.3
	 */
	@Nullable
	XInfoGroups xInfoGroups(byte[] key);

	/**
	 * Obtain information about every consumer in a specific {@literal consumer group} for the stream stored at the
	 * specified {@literal key}.
	 *
	 * @param key the {@literal key} the stream is stored at.
	 * @param groupName name of the {@literal consumer group}.
	 * @return {@literal null} when used in pipeline / transaction.
	 * @since 2.3
	 */
	@Nullable
	XInfoConsumers xInfoConsumers(byte[] key, String groupName);

	/**
	 * Get the length of a stream.
	 *
	 * @param key the {@literal key} the stream is stored at.
	 * @return length of the stream. {@literal null} when used in pipeline / transaction.
	 * @see <a href="https://redis.io/commands/xlen">Redis Documentation: XLEN</a>
	 */
	@Nullable
	Long xLen(byte[] key);

	/**
	 * Obtain the {@link PendingMessagesSummary} for a given {@literal consumer group}.
	 *
	 * @param key the {@literal key} the stream is stored at. Must not be {@literal null}.
	 * @param groupName the name of the {@literal consumer group}. Must not be {@literal null}.
	 * @return a summary of pending messages within the given {@literal consumer group} or {@literal null} when used in
	 *         pipeline / transaction.
	 * @see <a href="https://redis.io/commands/xpending">Redis Documentation: xpending</a>
	 * @since 2.3
	 */
	@Nullable
	PendingMessagesSummary xPending(byte[] key, String groupName);

	/**
	 * Obtained detailed information about all pending messages for a given {@link Consumer}.
	 *
	 * @param key the {@literal key} the stream is stored at. Must not be {@literal null}.
	 * @param consumer the consumer to fetch {@link PendingMessages} for. Must not be {@literal null}.
	 * @return pending messages for the given {@link Consumer} or {@literal null} when used in pipeline / transaction.
	 * @see <a href="https://redis.io/commands/xpending">Redis Documentation: xpending</a>
	 * @since 2.3
	 */
	@Nullable
	default PendingMessages xPending(byte[] key, Consumer consumer) {
		return xPending(key, consumer.getGroup(), consumer.getName());
	}

	/**
	 * Obtained detailed information about all pending messages for a given {@literal consumer}.
	 *
	 * @param key the {@literal key} the stream is stored at. Must not be {@literal null}.
	 * @param groupName the name of the {@literal consumer group}. Must not be {@literal null}.
	 * @param consumerName the consumer to fetch {@link PendingMessages} for. Must not be {@literal null}.
	 * @return pending messages for the given {@link Consumer} or {@literal null} when used in pipeline / transaction.
	 * @see <a href="https://redis.io/commands/xpending">Redis Documentation: xpending</a>
	 * @since 2.3
	 */
	@Nullable
	default PendingMessages xPending(byte[] key, String groupName, String consumerName) {
		return xPending(key, groupName, XPendingOptions.unbounded().consumer(consumerName));
	}

	/**
	 * Obtain detailed information about pending {@link PendingMessage messages} for a given {@link Range} within a
	 * {@literal consumer group}.
	 *
	 * @param key the {@literal key} the stream is stored at. Must not be {@literal null}.
	 * @param groupName the name of the {@literal consumer group}. Must not be {@literal null}.
	 * @param range the range of messages ids to search within. Must not be {@literal null}.
	 * @param count limit the number of results. Must not be {@literal null}.
	 * @return pending messages for the given {@literal consumer group} or {@literal null} when used in pipeline /
	 *         transaction.
	 * @see <a href="https://redis.io/commands/xpending">Redis Documentation: xpending</a>
	 * @since 2.3
	 */
	@Nullable
	default PendingMessages xPending(byte[] key, String groupName, Range<?> range, Long count) {
		return xPending(key, groupName, XPendingOptions.range(range, count));
	}

	/**
	 * Obtain detailed information about pending {@link PendingMessage messages} for a given {@link Range} and
	 * {@link Consumer} within a {@literal consumer group}.
	 *
	 * @param key the {@literal key} the stream is stored at. Must not be {@literal null}.
	 * @param consumer the name of the {@link Consumer}. Must not be {@literal null}.
	 * @param range the range of messages ids to search within. Must not be {@literal null}.
	 * @param count limit the number of results. Must not be {@literal null}.
	 * @return pending messages for the given {@link Consumer} or {@literal null} when used in pipeline / transaction.
	 * @see <a href="https://redis.io/commands/xpending">Redis Documentation: xpending</a>
	 * @since 2.3
	 */
	@Nullable
	default PendingMessages xPending(byte[] key, Consumer consumer, Range<?> range, Long count) {
		return xPending(key, consumer.getGroup(), consumer.getName(), range, count);
	}

	/**
	 * Obtain detailed information about pending {@link PendingMessage messages} for a given {@link Range} and
	 * {@literal consumer} within a {@literal consumer group}.
	 *
	 * @param key the {@literal key} the stream is stored at. Must not be {@literal null}.
	 * @param groupName the name of the {@literal consumer group}. Must not be {@literal null}.
	 * @param consumerName the name of the {@literal consumer}. Must not be {@literal null}.
	 * @param range the range of messages ids to search within. Must not be {@literal null}.
	 * @param count limit the number of results. Must not be {@literal null}.
	 * @return pending messages for the given {@literal consumer} in given {@literal consumer group} or {@literal null}
	 *         when used in pipeline / transaction.
	 * @see <a href="https://redis.io/commands/xpending">Redis Documentation: xpending</a>
	 * @since 2.3
	 */
	@Nullable
	default PendingMessages xPending(byte[] key, String groupName, String consumerName, Range<?> range, Long count) {
		return xPending(key, groupName, XPendingOptions.range(range, count).consumer(consumerName));
	}

	/**
	 * Obtain detailed information about pending {@link PendingMessage messages} applying given {@link XPendingOptions
	 * options}.
	 *
	 * @param key the {@literal key} the stream is stored at. Must not be {@literal null}.
	 * @param groupName the name of the {@literal consumer group}. Must not be {@literal null}.
	 * @param options the options containing {@literal range}, {@literal consumer} and {@literal count}. Must not be
	 *          {@literal null}.
	 * @return pending messages matching given criteria or {@literal null} when used in pipeline / transaction.
	 * @see <a href="https://redis.io/commands/xpending">Redis Documentation: xpending</a>
	 * @since 2.3
	 */
	@Nullable
	PendingMessages xPending(byte[] key, String groupName, XPendingOptions options);

	/**
	 * Value Object holding parameters for obtaining pending messages.
	 *
	 * @author Christoph Strobl
	 * @since 2.3
	 */
	class XPendingOptions {

		private final @Nullable String consumerName;
		private final Range<?> range;
		private final @Nullable Long count;

		private XPendingOptions(@Nullable String consumerName, Range<?> range, @Nullable Long count) {

			this.range = range;
			this.count = count;
			this.consumerName = consumerName;
		}

		/**
		 * Create new {@link XPendingOptions} with an unbounded {@link Range} ({@literal - +}).
		 *
		 * @return new instance of {@link XPendingOptions}.
		 */
		public static XPendingOptions unbounded() {
			return new XPendingOptions(null, Range.unbounded(), null);
		}

		/**
		 * Create new {@link XPendingOptions} with an unbounded {@link Range} ({@literal - +}).
		 *
		 * @param count the max number of messages to return. Must not be {@literal null}.
		 * @return new instance of {@link XPendingOptions}.
		 */
		public static XPendingOptions unbounded(Long count) {
			return new XPendingOptions(null, Range.unbounded(), count);
		}

		/**
		 * Create new {@link XPendingOptions} with given {@link Range} and limit.
		 *
		 * @return new instance of {@link XPendingOptions}.
		 */
		public static XPendingOptions range(Range<?> range, Long count) {
			return new XPendingOptions(null, range, count);
		}

		/**
		 * Append given consumer.
		 *
		 * @param consumerName must not be {@literal null}.
		 * @return new instance of {@link XPendingOptions}.
		 */
		public XPendingOptions consumer(String consumerName) {
			return new XPendingOptions(consumerName, range, count);
		}

		/**
		 * @return never {@literal null}.
		 */
		public Range<?> getRange() {
			return range;
		}

		/**
		 * @return can be {@literal null}.
		 */
		@Nullable
		public Long getCount() {
			return count;
		}

		/**
		 * @return can be {@literal null}.
		 */
		@Nullable
		public String getConsumerName() {
			return consumerName;
		}

		/**
		 * @return {@literal true} if a consumer name is present.
		 */
		public boolean hasConsumer() {
			return StringUtils.hasText(consumerName);
		}

		/**
		 * @return {@literal true} count is set.
		 */
		public boolean isLimited() {
			return count != null && count > -1;
		}
	}

	/**
	 * Retrieve all {@link ByteRecord records} within a specific {@link Range} from the stream stored at {@literal key}.
	 * <br />
	 * Use {@link Range#unbounded()} to read from the minimum and the maximum ID possible.
	 *
	 * @param key the {@literal key} the stream is stored at.
	 * @param range must not be {@literal null}.
	 * @return {@literal null} when used in pipeline / transaction.
	 * @see <a href="https://redis.io/commands/xrange">Redis Documentation: XRANGE</a>
	 */
	@Nullable
	default List<ByteRecord> xRange(byte[] key, Range<String> range) {
		return xRange(key, range, Limit.unlimited());
	}

	/**
	 * Retrieve a {@link Limit limited number} of {@link ByteRecord records} within a specific {@link Range} from the
	 * stream stored at {@literal key}. <br />
	 * Use {@link Range#unbounded()} to read from the minimum and the maximum ID possible. <br />
	 * Use {@link Limit#unlimited()} to read all records.
	 *
	 * @param key the {@literal key} the stream is stored at.
	 * @param range must not be {@literal null}.
	 * @param limit must not be {@literal null}.
	 * @return {@literal null} when used in pipeline / transaction.
	 * @see <a href="https://redis.io/commands/xrange">Redis Documentation: XRANGE</a>
	 */
	@Nullable
	List<ByteRecord> xRange(byte[] key, Range<String> range, Limit limit);

	/**
	 * Read records from one or more {@link StreamOffset}s.
	 *
	 * @param streams the streams to read from.
	 * @return {@literal null} when used in pipeline / transaction.
	 * @see <a href="https://redis.io/commands/xread">Redis Documentation: XREAD</a>
	 */
	@Nullable
	default List<ByteRecord> xRead(StreamOffset<byte[]>... streams) {
		return xRead(StreamReadOptions.empty(), streams);
	}

	/**
	 * Read records from one or more {@link StreamOffset}s.
	 *
	 * @param readOptions read arguments.
	 * @param streams the streams to read from.
	 * @return {@literal null} when used in pipeline / transaction.
	 * @see <a href="https://redis.io/commands/xread">Redis Documentation: XREAD</a>
	 */
	@Nullable
	List<ByteRecord> xRead(StreamReadOptions readOptions, StreamOffset<byte[]>... streams);

	/**
	 * Read records from one or more {@link StreamOffset}s using a consumer group.
	 *
	 * @param consumer consumer/group.
	 * @param streams the streams to read from.
	 * @return list with members of the resulting stream. {@literal null} when used in pipeline / transaction.
	 * @see <a href="https://redis.io/commands/xreadgroup">Redis Documentation: XREADGROUP</a>
	 */
	@Nullable
	default List<ByteRecord> xReadGroup(Consumer consumer, StreamOffset<byte[]>... streams) {
		return xReadGroup(consumer, StreamReadOptions.empty(), streams);
	}

	/**
	 * Read records from one or more {@link StreamOffset}s using a consumer group.
	 *
	 * @param consumer consumer/group.
	 * @param readOptions read arguments.
	 * @param streams the streams to read from.
	 * @return list with members of the resulting stream. {@literal null} when used in pipeline / transaction.
	 * @see <a href="https://redis.io/commands/xreadgroup">Redis Documentation: XREADGROUP</a>
	 */
	@Nullable
	List<ByteRecord> xReadGroup(Consumer consumer, StreamReadOptions readOptions, StreamOffset<byte[]>... streams);

	/**
	 * Read records from a stream within a specific {@link Range} in reverse order.
	 *
	 * @param key the stream key.
	 * @param range must not be {@literal null}.
	 * @return list with members of the resulting stream. {@literal null} when used in pipeline / transaction.
	 * @see <a href="https://redis.io/commands/xrevrange">Redis Documentation: XREVRANGE</a>
	 */
	@Nullable
	default List<ByteRecord> xRevRange(byte[] key, Range<String> range) {
		return xRevRange(key, range, Limit.unlimited());
	}

	/**
	 * Read records from a stream within a specific {@link Range} applying a {@link Limit} in reverse order.
	 *
	 * @param key the stream key.
	 * @param range must not be {@literal null}.
	 * @param limit must not be {@literal null}.
	 * @return list with members of the resulting stream. {@literal null} when used in pipeline / transaction.
	 * @see <a href="https://redis.io/commands/xrevrange">Redis Documentation: XREVRANGE</a>
	 */
	@Nullable
	List<ByteRecord> xRevRange(byte[] key, Range<String> range, Limit limit);

	/**
	 * Trims the stream to {@code count} elements.
	 *
	 * @param key the stream key.
	 * @param count length of the stream.
	 * @return number of removed entries. {@literal null} when used in pipeline / transaction.
	 * @see <a href="https://redis.io/commands/xtrim">Redis Documentation: XTRIM</a>
	 */
	@Nullable
	Long xTrim(byte[] key, long count);

	/**
	 * Trims the stream to {@code count} elements.
	 *
	 * @param key the stream key.
	 * @param count length of the stream.
	 * @param approximateTrimming the trimming must be performed in a approximated way in order to maximize performances.
	 * @return number of removed entries. {@literal null} when used in pipeline / transaction.
	 * @since 2.4
	 * @see <a href="https://redis.io/commands/xtrim">Redis Documentation: XTRIM</a>
	 */
	@Nullable
	Long xTrim(byte[] key, long count, boolean approximateTrimming);
}

相关信息

spring-data-redis 源码目录

相关文章

spring-data-redis AbstractRedisConnection 源码

spring-data-redis BitFieldSubCommands 源码

spring-data-redis ClusterCommandExecutionFailureException 源码

spring-data-redis ClusterCommandExecutor 源码

spring-data-redis ClusterInfo 源码

spring-data-redis ClusterNodeResourceProvider 源码

spring-data-redis ClusterSlotHashUtil 源码

spring-data-redis ClusterTopology 源码

spring-data-redis ClusterTopologyProvider 源码

spring-data-redis ConnectionUtils 源码

0  赞