spring-data-redis ReactiveSubscription 源码
spring-data-redis ReactiveSubscription 代码
文件路径:/src/main/java/org/springframework/data/redis/connection/ReactiveSubscription.java
/*
* Copyright 2018-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.redis.connection;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.nio.ByteBuffer;
import java.util.Set;
import org.springframework.util.Assert;
import org.springframework.util.ObjectUtils;
/**
* Subscription for Redis channels using reactive infrastructure. A {@link ReactiveSubscription} allows subscribing to
* {@link #subscribe(ByteBuffer...) channels} and {@link #pSubscribe(ByteBuffer...) patterns}. It provides access to the
* {@link ChannelMessage} {@link #receive() stream} that emits only messages for channels and patterns registered in
* this {@link ReactiveSubscription}.
* <p>
* A reactive Redis connection can have multiple subscriptions. If two or more subscriptions subscribe to the same
* target (channel/pattern) and one unsubscribes, then the other one will no longer receive messages for the target due
* to how Redis handled Pub/Sub subscription.
*
* @author Mark Paluch
* @author Christoph Strobl
* @since 2.1
*/
public interface ReactiveSubscription {
/**
* Subscribes to the {@code channels} and adds these to the current subscription.
*
* @param channels channel names. Must not be empty.
* @return empty {@link Mono} that completes once the channel subscription is registered.
*/
Mono<Void> subscribe(ByteBuffer... channels);
/**
* Subscribes to the channel {@code patterns} and adds these to the current subscription.
*
* @param patterns channel patterns. Must not be empty.
* @return empty {@link Mono} that completes once the pattern subscription is registered.
*/
Mono<Void> pSubscribe(ByteBuffer... patterns);
/**
* Cancels the current subscription for all {@link #getChannels() channels}.
*
* @return empty {@link Mono} that completes once the channel subscriptions are unregistered.
*/
Mono<Void> unsubscribe();
/**
* Cancels the current subscription for all given channels.
*
* @param channels channel names. Must not be empty.
* @return empty {@link Mono} that completes once the channel subscriptions are unregistered.
*/
Mono<Void> unsubscribe(ByteBuffer... channels);
/**
* Cancels the subscription for all channels matched by {@link #getPatterns()} patterns}.
*
* @return empty {@link Mono} that completes once the patterns subscriptions are unregistered.
*/
Mono<Void> pUnsubscribe();
/**
* Cancels the subscription for all channels matching the given patterns.
*
* @param patterns must not be empty.
* @return empty {@link Mono} that completes once the patterns subscriptions are unregistered.
*/
Mono<Void> pUnsubscribe(ByteBuffer... patterns);
/**
* Returns the (named) channels for this subscription.
*
* @return {@link Set} of named channels.
*/
Set<ByteBuffer> getChannels();
/**
* Returns the channel patters for this subscription.
*
* @return {@link Set} of channel patterns.
*/
Set<ByteBuffer> getPatterns();
/**
* Retrieve the message stream emitting {@link Message messages}. The resulting message stream contains only messages
* for subscribed and registered {@link #getChannels() channels} and {@link #getPatterns() patterns}.
* <p>
* Stream publishing uses {@link reactor.core.publisher.ConnectableFlux} turning the stream into a hot sequence.
* Emission is paused if there is no demand. Messages received in that time are buffered. This stream terminates
* either if all subscribers unsubscribe or if this {@link Subscription} is {@link #cancel() is terminated}.
*
* @return {@link Flux} emitting the {@link Message} stream.
*/
Flux<Message<ByteBuffer, ByteBuffer>> receive();
/**
* Unsubscribe from all {@link #getChannels() channels} and {@link #getPatterns() patterns} and request termination of
* all active {@link #receive() message streams}. Active streams will terminate with a
* {@link java.util.concurrent.CancellationException}.
*
* @return a {@link Mono} that completes once termination is finished.
*/
Mono<Void> cancel();
/**
* {@link Message} represents a Redis channel message within Redis pub/sub.
*
* @param <C> channel representation type.
* @param <M> message representation type.
* @author Christoph Strobl
* @since 2.1
*/
interface Message<C, M> {
/**
* Get the channel the message published to.
*
* @return never {@literal null}.
*/
C getChannel();
/**
* Get the actual message body.
*
* @return never {@literal null}.
*/
M getMessage();
}
/**
* Value object for a Redis channel message.
*
* @param <C> type of how the channel name is represented.
* @param <M> type of how the message is represented.
* @author Mark Paluch
* @author Christoph Strobl
* @since 2.1
*/
class ChannelMessage<C, M> implements Message<C, M> {
private final C channel;
private final M message;
/**
* Create a new {@link ChannelMessage}.
*
* @param channel must not be {@literal null}.
* @param message must not be {@literal null}.
*/
public ChannelMessage(C channel, M message) {
Assert.notNull(channel, "Channel must not be null");
Assert.notNull(message, "Message must not be null");
this.channel = channel;
this.message = message;
}
@Override
public C getChannel() {
return channel;
}
@Override
public M getMessage() {
return message;
}
@Override
public boolean equals(Object o) {
if (this == o)
return true;
if (o == null || getClass() != o.getClass())
return false;
ChannelMessage<?, ?> that = (ChannelMessage<?, ?>) o;
if (!ObjectUtils.nullSafeEquals(channel, that.channel)) {
return false;
}
return ObjectUtils.nullSafeEquals(message, that.message);
}
@Override
public int hashCode() {
int result = ObjectUtils.nullSafeHashCode(channel);
result = 31 * result + ObjectUtils.nullSafeHashCode(message);
return result;
}
@Override
public String toString() {
return "ChannelMessage {" + "channel=" + channel + ", message=" + message + '}';
}
}
/**
* Value object for a Redis channel message received from a pattern subscription.
*
* @param <P> type of how the pattern is represented.
* @param <C> type of how the channel name is represented.
* @param <M> type of how the message is represented.
* @author Mark Paluch
* @author Christoph Strobl
* @since 2.1
*/
class PatternMessage<P, C, M> extends ChannelMessage<C, M> {
private final P pattern;
/**
* Create a new {@link PatternMessage}.
*
* @param pattern must not be {@literal null}.
* @param channel must not be {@literal null}.
* @param message must not be {@literal null}.
*/
public PatternMessage(P pattern, C channel, M message) {
super(channel, message);
Assert.notNull(pattern, "Pattern must not be null");
this.pattern = pattern;
}
/**
* Get the pattern that matched the channel.
*
* @return never {@literal null}.
*/
public P getPattern() {
return pattern;
}
@Override
public boolean equals(Object o) {
if (this == o)
return true;
if (o == null || getClass() != o.getClass())
return false;
if (!super.equals(o))
return false;
PatternMessage<?, ?, ?> that = (PatternMessage<?, ?, ?>) o;
return ObjectUtils.nullSafeEquals(pattern, that.pattern);
}
@Override
public int hashCode() {
int result = super.hashCode();
result = 31 * result + ObjectUtils.nullSafeHashCode(pattern);
return result;
}
@Override
public String toString() {
return "PatternMessage{" + "channel=" + getChannel() + ", pattern=" + pattern + ", message=" + getMessage() + '}';
}
}
}
相关信息
相关文章
spring-data-redis AbstractRedisConnection 源码
spring-data-redis BitFieldSubCommands 源码
spring-data-redis ClusterCommandExecutionFailureException 源码
spring-data-redis ClusterCommandExecutor 源码
spring-data-redis ClusterInfo 源码
spring-data-redis ClusterNodeResourceProvider 源码
spring-data-redis ClusterSlotHashUtil 源码
spring-data-redis ClusterTopology 源码
0
赞
热门推荐
-
2、 - 优质文章
-
3、 gate.io
-
8、 golang
-
9、 openharmony
-
10、 Vue中input框自动聚焦