spring-data-redis JedisConnection 源码
spring-data-redis JedisConnection 代码
文件路径:/src/main/java/org/springframework/data/redis/connection/jedis/JedisConnection.java
/*
* Copyright 2011-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.redis.connection.jedis;
import redis.clients.jedis.BuilderFactory;
import redis.clients.jedis.CommandArguments;
import redis.clients.jedis.CommandObject;
import redis.clients.jedis.DefaultJedisClientConfig;
import redis.clients.jedis.HostAndPort;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisClientConfig;
import redis.clients.jedis.Pipeline;
import redis.clients.jedis.Response;
import redis.clients.jedis.Transaction;
import redis.clients.jedis.commands.ProtocolCommand;
import redis.clients.jedis.commands.ServerCommands;
import redis.clients.jedis.exceptions.JedisDataException;
import redis.clients.jedis.util.Pool;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.core.convert.converter.Converter;
import org.springframework.dao.DataAccessException;
import org.springframework.dao.InvalidDataAccessApiUsageException;
import org.springframework.data.redis.ExceptionTranslationStrategy;
import org.springframework.data.redis.FallbackExceptionTranslationStrategy;
import org.springframework.data.redis.RedisSystemException;
import org.springframework.data.redis.connection.*;
import org.springframework.data.redis.connection.convert.TransactionResultConverter;
import org.springframework.data.redis.connection.jedis.JedisInvoker.ResponseCommands;
import org.springframework.data.redis.connection.jedis.JedisResult.JedisResultBuilder;
import org.springframework.data.redis.connection.jedis.JedisResult.JedisStatusResult;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;
/**
* {@code RedisConnection} implementation on top of <a href="https://github.com/redis/jedis">Jedis</a> library.
*
* @author Costin Leau
* @author Jennifer Hickey
* @author Christoph Strobl
* @author Thomas Darimont
* @author Jungtaek Lim
* @author Konstantin Shchepanovskyi
* @author David Liu
* @author Milan Agatonovic
* @author Mark Paluch
* @author Ninad Divadkar
* @author Guy Korland
* @author Dengliming
*/
public class JedisConnection extends AbstractRedisConnection {
private final Log LOGGER = LogFactory.getLog(getClass());
private static final ExceptionTranslationStrategy EXCEPTION_TRANSLATION = new FallbackExceptionTranslationStrategy(
JedisExceptionConverter.INSTANCE);
private final Jedis jedis;
private final JedisInvoker invoker = new JedisInvoker((directFunction, pipelineFunction, converter,
nullDefault) -> doInvoke(false, directFunction, pipelineFunction, converter, nullDefault));
private final JedisInvoker statusInvoker = new JedisInvoker((directFunction, pipelineFunction, converter,
nullDefault) -> doInvoke(true, directFunction, pipelineFunction, converter, nullDefault));
private final JedisGeoCommands geoCommands = new JedisGeoCommands(this);
private final JedisHashCommands hashCommands = new JedisHashCommands(this);
private final JedisHyperLogLogCommands hllCommands = new JedisHyperLogLogCommands(this);
private final JedisKeyCommands keyCommands = new JedisKeyCommands(this);
private final JedisListCommands listCommands = new JedisListCommands(this);
private final JedisScriptingCommands scriptingCommands = new JedisScriptingCommands(this);
private final JedisServerCommands serverCommands = new JedisServerCommands(this);
private final JedisSetCommands setCommands = new JedisSetCommands(this);
private final JedisStreamCommands streamCommands = new JedisStreamCommands(this);
private final JedisStringCommands stringCommands = new JedisStringCommands(this);
private final JedisZSetCommands zSetCommands = new JedisZSetCommands(this);
private final @Nullable Pool<Jedis> pool;
private final JedisClientConfig sentinelConfig;
private List<JedisResult> pipelinedResults = new ArrayList<>();
private Queue<FutureResult<Response<?>>> txResults = new LinkedList<>();
private volatile @Nullable JedisSubscription subscription;
private volatile @Nullable Transaction transaction;
private volatile @Nullable Pipeline pipeline;
private boolean convertPipelineAndTxResults = true;
/**
* Constructs a new <code>JedisConnection</code> instance.
*
* @param jedis Jedis entity
*/
public JedisConnection(Jedis jedis) {
this(jedis, null, 0);
}
/**
* Constructs a new <code>JedisConnection</code> instance backed by a jedis pool.
*
* @param jedis
* @param pool can be null, if no pool is used
* @param dbIndex
*/
public JedisConnection(Jedis jedis, Pool<Jedis> pool, int dbIndex) {
this(jedis, pool, dbIndex, null);
}
/**
* Constructs a new <code>JedisConnection</code> instance backed by a jedis pool.
*
* @param jedis
* @param pool can be null, if no pool is used
* @param dbIndex
* @param clientName the client name, can be {@literal null}.
* @since 1.8
*/
protected JedisConnection(Jedis jedis, @Nullable Pool<Jedis> pool, int dbIndex, @Nullable String clientName) {
this(jedis, pool, createConfig(dbIndex, clientName), createConfig(dbIndex, clientName));
}
private static DefaultJedisClientConfig createConfig(int dbIndex, @Nullable String clientName) {
return DefaultJedisClientConfig.builder().database(dbIndex).clientName(clientName).build();
}
/**
* Constructs a new <code>JedisConnection</code> instance backed by a jedis pool.
*
* @param jedis
* @param pool can be null, if no pool is used
* @param nodeConfig node configuration
* @param sentinelConfig sentinel configuration
* @since 2.5
*/
protected JedisConnection(Jedis jedis, @Nullable Pool<Jedis> pool, JedisClientConfig nodeConfig,
JedisClientConfig sentinelConfig) {
this.jedis = jedis;
this.pool = pool;
this.sentinelConfig = sentinelConfig;
// select the db
// if this fail, do manual clean-up before propagating the exception
// as we're inside the constructor
if (nodeConfig.getDatabase() != jedis.getDB()) {
try {
select(nodeConfig.getDatabase());
} catch (DataAccessException ex) {
close();
throw ex;
}
}
}
@Nullable
private Object doInvoke(boolean status, Function<Jedis, Object> directFunction,
Function<ResponseCommands, Response<Object>> pipelineFunction, Converter<Object, Object> converter,
Supplier<Object> nullDefault) {
return doWithJedis(it -> {
if (isQueueing()) {
Response<Object> response = pipelineFunction.apply(JedisInvoker.createCommands(getRequiredTransaction()));
transaction(status ? newStatusResult(response) : newJedisResult(response, converter, nullDefault));
return null;
}
if (isPipelined()) {
Response<Object> response = pipelineFunction.apply(JedisInvoker.createCommands(getRequiredPipeline()));
pipeline(status ? newStatusResult(response) : newJedisResult(response, converter, nullDefault));
return null;
}
Object result = directFunction.apply(getJedis());
if (result == null) {
return nullDefault.get();
}
return converter.convert(result);
});
}
protected DataAccessException convertJedisAccessException(Exception ex) {
DataAccessException exception = EXCEPTION_TRANSLATION.translate(ex);
return exception != null ? exception : new RedisSystemException(ex.getMessage(), ex);
}
@Override
public RedisCommands commands() {
return this;
}
@Override
public RedisGeoCommands geoCommands() {
return geoCommands;
}
@Override
public RedisHashCommands hashCommands() {
return hashCommands;
}
@Override
public RedisHyperLogLogCommands hyperLogLogCommands() {
return hllCommands;
}
@Override
public RedisKeyCommands keyCommands() {
return keyCommands;
}
@Override
public RedisListCommands listCommands() {
return listCommands;
}
@Override
public RedisSetCommands setCommands() {
return setCommands;
}
@Override
public RedisStreamCommands streamCommands() {
return streamCommands;
}
@Override
public RedisStringCommands stringCommands() {
return stringCommands;
}
@Override
public RedisZSetCommands zSetCommands() {
return zSetCommands;
}
@Override
public RedisScriptingCommands scriptingCommands() {
return scriptingCommands;
}
@Override
public RedisServerCommands serverCommands() {
return serverCommands;
}
@Override
public Object execute(String command, byte[]... args) {
Assert.hasText(command, "A valid command needs to be specified");
Assert.notNull(args, "Arguments must not be null");
return doWithJedis(it -> {
ProtocolCommand protocolCommand = () -> JedisConverters.toBytes(command);
if (isQueueing() || isPipelined()) {
CommandArguments arguments = new CommandArguments(protocolCommand).addObjects(args);
CommandObject<Object> commandObject = new CommandObject<>(arguments, BuilderFactory.RAW_OBJECT);
if (isPipelined()) {
pipeline(newJedisResult(getRequiredPipeline().executeCommand(commandObject)));
} else {
transaction(newJedisResult(getRequiredTransaction().executeCommand(commandObject)));
}
return null;
}
return it.sendCommand(protocolCommand, args);
});
}
@Override
public void close() throws DataAccessException {
super.close();
JedisSubscription subscription = this.subscription;
try {
if (subscription != null) {
subscription.close();
}
} catch (Exception ex) {
LOGGER.debug("Cannot terminate subscription", ex);
} finally {
this.subscription = null;
}
// return the connection to the pool
if (pool != null) {
jedis.close();
return;
}
// else close the connection normally (doing the try/catch dance)
try {
jedis.quit();
} catch (Exception ex) {
LOGGER.debug("Failed to QUIT during close", ex);
}
try {
jedis.disconnect();
} catch (Exception ex) {
LOGGER.debug("Failed to disconnect during close", ex);
}
}
private Exception handleCloseException(@Nullable Exception exceptionToThrow, Exception cause) {
if (exceptionToThrow == null) {
return cause;
}
return exceptionToThrow;
}
@Override
public Jedis getNativeConnection() {
return jedis;
}
@Override
public boolean isClosed() {
return doWithJedis(it -> !it.isConnected());
}
@Override
public boolean isQueueing() {
return transaction != null;
}
@Override
public boolean isPipelined() {
return pipeline != null;
}
@Override
public void openPipeline() {
if (isQueueing()) {
throw new InvalidDataAccessApiUsageException("Cannot use Pipelining while a transaction is active");
}
if (pipeline == null) {
pipeline = jedis.pipelined();
}
}
@Override
public List<Object> closePipeline() {
if (pipeline != null) {
try {
return convertPipelineResults();
} finally {
pipeline = null;
pipelinedResults.clear();
}
}
return Collections.emptyList();
}
private List<Object> convertPipelineResults() {
List<Object> results = new ArrayList<>();
getRequiredPipeline().sync();
Exception cause = null;
for (JedisResult result : pipelinedResults) {
try {
Object data = result.get();
if (!result.isStatus()) {
results.add(result.conversionRequired() ? result.convert(data) : data);
}
} catch (JedisDataException e) {
DataAccessException dataAccessException = convertJedisAccessException(e);
if (cause == null) {
cause = dataAccessException;
}
results.add(dataAccessException);
} catch (DataAccessException e) {
if (cause == null) {
cause = e;
}
results.add(e);
}
}
if (cause != null) {
throw new RedisPipelineException(cause, results);
}
return results;
}
void pipeline(JedisResult result) {
if (isQueueing()) {
transaction(result);
} else {
pipelinedResults.add(result);
}
}
void transaction(FutureResult<Response<?>> result) {
txResults.add(result);
}
@Override
public byte[] echo(byte[] message) {
Assert.notNull(message, "Message must not be null");
return invoke().just(j -> j.echo(message));
}
@Override
public String ping() {
return invoke().just(ServerCommands::ping);
}
@Override
public void discard() {
try {
getRequiredTransaction().discard();
} catch (Exception ex) {
throw convertJedisAccessException(ex);
} finally {
txResults.clear();
transaction = null;
}
}
@Override
public List<Object> exec() {
try {
if (transaction == null) {
throw new InvalidDataAccessApiUsageException("No ongoing transaction; Did you forget to call multi");
}
List<Object> results = transaction.exec();
return !CollectionUtils.isEmpty(results)
? new TransactionResultConverter<>(txResults, JedisExceptionConverter.INSTANCE).convert(results)
: results;
} catch (Exception ex) {
throw convertJedisAccessException(ex);
} finally {
txResults.clear();
transaction = null;
}
}
@Nullable
public Pipeline getPipeline() {
return pipeline;
}
public Pipeline getRequiredPipeline() {
Pipeline pipeline = getPipeline();
if (pipeline == null) {
throw new IllegalStateException("Connection has no active pipeline");
}
return pipeline;
}
@Nullable
public Transaction getTransaction() {
return transaction;
}
public Transaction getRequiredTransaction() {
Transaction transaction = getTransaction();
if (transaction == null) {
throw new IllegalStateException("Connection has no active transaction");
}
return transaction;
}
public Jedis getJedis() {
return jedis;
}
/**
* Obtain a {@link JedisInvoker} to call Jedis methods on the current {@link Jedis} instance.
*
* @return the {@link JedisInvoker}.
* @since 2.5
*/
JedisInvoker invoke() {
return invoker;
}
/**
* Obtain a {@link JedisInvoker} to call Jedis methods returning a status response on the current {@link Jedis}
* instance. Status responses are not included in transactional and pipeline results.
*
* @return the {@link JedisInvoker}.
* @since 2.5
*/
JedisInvoker invokeStatus() {
return statusInvoker;
}
<T> JedisResult<T, T> newJedisResult(Response<T> response) {
return JedisResultBuilder.<T, T> forResponse(response).build();
}
<T, R> JedisResult<T, R> newJedisResult(Response<T> response, Converter<T, R> converter, Supplier<R> defaultValue) {
return JedisResultBuilder.<T, R> forResponse(response).mappedWith(converter)
.convertPipelineAndTxResults(convertPipelineAndTxResults).mapNullTo(defaultValue).build();
}
<T> JedisStatusResult<T, T> newStatusResult(Response<T> response) {
return JedisResultBuilder.<T, T> forResponse(response).buildStatusResult();
}
@Override
public void multi() {
if (isQueueing()) {
return;
}
if (isPipelined()) {
throw new InvalidDataAccessApiUsageException("Cannot use Transaction while a pipeline is open");
}
doWithJedis(it -> {
this.transaction = it.multi();
});
}
@Override
public void select(int dbIndex) {
getJedis().select(dbIndex);
}
@Override
public void unwatch() {
doWithJedis((Consumer<Jedis>) Jedis::unwatch);
}
@Override
public void watch(byte[]... keys) {
if (isQueueing()) {
throw new InvalidDataAccessApiUsageException("WATCH is not supported when a transaction is active");
}
doWithJedis(it -> {
for (byte[] key : keys) {
it.watch(key);
}
});
}
//
// Pub/Sub functionality
//
@Override
public Long publish(byte[] channel, byte[] message) {
return invoke().just(j -> j.publish(channel, message));
}
@Override
public Subscription getSubscription() {
return subscription;
}
@Override
public boolean isSubscribed() {
return (subscription != null && subscription.isAlive());
}
@Override
public void pSubscribe(MessageListener listener, byte[]... patterns) {
if (isSubscribed()) {
throw new RedisSubscribedConnectionException(
"Connection already subscribed; use the connection Subscription to cancel or add new channels");
}
if (isQueueing() || isPipelined()) {
throw new InvalidDataAccessApiUsageException("Cannot subscribe in pipeline / transaction mode");
}
doWithJedis(it -> {
JedisMessageListener jedisPubSub = new JedisMessageListener(listener);
subscription = new JedisSubscription(listener, jedisPubSub, null, patterns);
it.psubscribe(jedisPubSub, patterns);
});
}
@Override
public void subscribe(MessageListener listener, byte[]... channels) {
if (isSubscribed()) {
throw new RedisSubscribedConnectionException(
"Connection already subscribed; use the connection Subscription to cancel or add new channels");
}
if (isQueueing() || isPipelined()) {
throw new InvalidDataAccessApiUsageException("Cannot subscribe in pipeline / transaction mode");
}
doWithJedis(it -> {
JedisMessageListener jedisPubSub = new JedisMessageListener(listener);
subscription = new JedisSubscription(listener, jedisPubSub, channels, null);
it.subscribe(jedisPubSub, channels);
});
}
/**
* Specifies if pipelined results should be converted to the expected data type. If false, results of
* {@link #closePipeline()} and {@link #exec()} will be of the type returned by the Jedis driver
*
* @param convertPipelineAndTxResults Whether or not to convert pipeline and tx results
*/
public void setConvertPipelineAndTxResults(boolean convertPipelineAndTxResults) {
this.convertPipelineAndTxResults = convertPipelineAndTxResults;
}
@Override
protected boolean isActive(RedisNode node) {
Jedis verification = null;
try {
verification = getJedis(node);
verification.connect();
return verification.ping().equalsIgnoreCase("pong");
} catch (Exception e) {
return false;
} finally {
if (verification != null) {
verification.disconnect();
verification.close();
}
}
}
@Override
protected JedisSentinelConnection getSentinelConnection(RedisNode sentinel) {
return new JedisSentinelConnection(getJedis(sentinel));
}
protected Jedis getJedis(RedisNode node) {
return new Jedis(new HostAndPort(node.getHost(), node.getPort()), this.sentinelConfig);
}
@Nullable
private <T> T doWithJedis(Function<Jedis, T> callback) {
try {
return callback.apply(getJedis());
} catch (Exception ex) {
throw convertJedisAccessException(ex);
}
}
private void doWithJedis(Consumer<Jedis> callback) {
try {
callback.accept(getJedis());
} catch (Exception ex) {
throw convertJedisAccessException(ex);
}
}
}
相关信息
相关文章
spring-data-redis DefaultJedisClientConfiguration 源码
spring-data-redis JedisClientConfiguration 源码
spring-data-redis JedisClientUtils 源码
spring-data-redis JedisClusterConnection 源码
spring-data-redis JedisClusterGeoCommands 源码
spring-data-redis JedisClusterHashCommands 源码
spring-data-redis JedisClusterHyperLogLogCommands 源码
spring-data-redis JedisClusterKeyCommands 源码
0
赞
热门推荐
-
2、 - 优质文章
-
3、 gate.io
-
8、 golang
-
9、 openharmony
-
10、 Vue中input框自动聚焦