spring-kafka AbstractKafkaHeaderMapper 源码
spring-kafka AbstractKafkaHeaderMapper 代码
文件路径:/spring-kafka/src/main/java/org/springframework/kafka/support/AbstractKafkaHeaderMapper.java
/*
* Copyright 2018-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.kafka.support;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.common.header.Header;
import org.springframework.core.log.LogAccessor;
import org.springframework.lang.Nullable;
import org.springframework.messaging.MessageHeaders;
import org.springframework.util.Assert;
import org.springframework.util.ObjectUtils;
import org.springframework.util.PatternMatchUtils;
/**
* Base for Kafka header mappers.
*
* @author Gary Russell
* @author Artem Bilan
*
* @since 2.1.3
*
*/
public abstract class AbstractKafkaHeaderMapper implements KafkaHeaderMapper {
protected final LogAccessor logger = new LogAccessor(LogFactory.getLog(getClass())); // NOSONAR
private final List<HeaderMatcher> matchers = new ArrayList<>();
private final Map<String, Boolean> rawMappedHeaders = new HashMap<>();
{
this.rawMappedHeaders.put(KafkaHeaders.LISTENER_INFO, true);
}
private final boolean outbound;
private boolean mapAllStringsOut;
private Charset charset = StandardCharsets.UTF_8;
/**
* Construct a mapper that will match the supplied patterns (outbound) and all headers
* (inbound). For outbound mapping, certain internal framework headers are never
* mapped.
* @param patterns the patterns.
*/
public AbstractKafkaHeaderMapper(String... patterns) {
this(true, patterns);
}
/**
* Construct a mapper that will match the supplied patterns (outbound) and all headers
* (inbound). For outbound mapping, certain internal framework headers are never
* mapped.
* @param outbound true for an outbound mapper.
* @param patterns the patterns.
*/
protected AbstractKafkaHeaderMapper(boolean outbound, String... patterns) {
Assert.notNull(patterns, "'patterns' must not be null");
this.outbound = outbound;
if (outbound) {
this.matchers.add(new NeverMatchHeaderMatcher(
KafkaHeaders.ACKNOWLEDGMENT,
KafkaHeaders.CONSUMER,
KafkaHeaders.KEY,
KafkaHeaders.OFFSET,
KafkaHeaders.PARTITION,
KafkaHeaders.RAW_DATA,
KafkaHeaders.RECEIVED_KEY,
KafkaHeaders.RECEIVED_PARTITION,
KafkaHeaders.RECEIVED_TIMESTAMP,
KafkaHeaders.RECEIVED_TOPIC,
KafkaHeaders.TIMESTAMP,
KafkaHeaders.TIMESTAMP_TYPE,
KafkaHeaders.BATCH_CONVERTED_HEADERS,
KafkaHeaders.NATIVE_HEADERS,
KafkaHeaders.TOPIC,
KafkaHeaders.DELIVERY_ATTEMPT,
KafkaHeaders.LISTENER_INFO,
KafkaHeaders.GROUP_ID));
}
for (String pattern : patterns) {
this.matchers.add(new SimplePatternBasedHeaderMatcher(pattern));
}
}
/**
* Subclasses can invoke this to add custom {@link HeaderMatcher}s.
* @param matchersToAdd the matchers to add.
* @since 2.3
*/
protected final void addMatchers(HeaderMatcher... matchersToAdd) {
Assert.notNull(matchersToAdd, "'matchersToAdd' cannot be null");
Assert.noNullElements(matchersToAdd, "'matchersToAdd' cannot have null elements");
Collections.addAll(this.matchers, matchersToAdd);
}
/**
* Set to true to map all {@code String} valued outbound headers to {@code byte[]}.
* To map to a {@code String} for inbound, there must be an entry in the rawMappedHeaders map.
* @param mapAllStringsOut true to map all strings.
* @since 2.2.5
* @see #setRawMappedHeaders(Map)
*/
public void setMapAllStringsOut(boolean mapAllStringsOut) {
this.mapAllStringsOut = mapAllStringsOut;
}
protected Charset getCharset() {
return this.charset;
}
/**
* Set the charset to use when mapping String-valued headers to/from byte[]. Default UTF-8.
* @param charset the charset.
* @since 2.2.5
* @see #setRawMappedHeaders(Map)
*/
public void setCharset(Charset charset) {
Assert.notNull(charset, "'charset' cannot be null");
this.charset = charset;
}
/**
* Set the headers to not perform any conversion on (except {@code String} to
* {@code byte[]} for outbound). Inbound headers that match will be mapped as
* {@code byte[]} unless the corresponding boolean in the map value is true,
* in which case it will be mapped as a String.
* @param rawMappedHeaders the header names to not convert and
* @since 2.2.5
* @see #setCharset(Charset)
* @see #setMapAllStringsOut(boolean)
*/
public void setRawMappedHeaders(Map<String, Boolean> rawMappedHeaders) {
if (!ObjectUtils.isEmpty(rawMappedHeaders)) {
this.rawMappedHeaders.clear();
this.rawMappedHeaders.putAll(rawMappedHeaders);
}
}
/**
* Add a raw mapped header.
* @param name the header name.
* @param toString convert to string on inbound when true.
* @since 2.7.1
* @see #setRawMappedHeaders(Map)
*/
public void addRawMappedHeader(String name, boolean toString) {
this.rawMappedHeaders.put(name, toString);
}
protected boolean matches(String header, Object value) {
if (matches(header)) {
if ((header.equals(MessageHeaders.REPLY_CHANNEL) || header.equals(MessageHeaders.ERROR_CHANNEL))
&& !(value instanceof String)) {
this.logger.debug(() -> "Cannot map " + header + " when type is [" + value.getClass()
+ "]; it must be a String");
return false;
}
return true;
}
return false;
}
protected boolean matches(String header) {
Assert.state(this.outbound, "This mapper cannot be used for outbound mapping");
return doesMatch(header);
}
/**
* Matches header names for inbound mapping when configured as an inbound mapper.
* @param header the header name.
* @return true if it can be mapped.
* @since 2.8.8
*/
protected boolean matchesForInbound(String header) {
if (this.outbound) {
return true;
}
if (this.matchers.size() == 0) {
return true;
}
return doesMatch(header);
}
private boolean doesMatch(String header) {
for (HeaderMatcher matcher : this.matchers) {
if (matcher.matchHeader(header)) {
return !matcher.isNegated();
}
}
this.logger.debug(() -> MessageFormat.format("headerName=[{0}] WILL NOT be mapped; matched no patterns",
header));
return false;
}
/**
* Check if the value is a String and convert to byte[], if so configured.
* @param key the header name.
* @param value the header value.
* @return the value to add.
* @since 2.2.5
*/
protected Object headerValueToAddOut(String key, Object value) {
Object valueToAdd = mapRawOut(key, value);
if (valueToAdd == null) {
valueToAdd = value;
}
return valueToAdd;
}
@Nullable
private byte[] mapRawOut(String header, Object value) {
if (this.mapAllStringsOut || this.rawMappedHeaders.containsKey(header)) {
if (value instanceof byte[]) {
return (byte[]) value;
}
else if (value instanceof String) {
return ((String) value).getBytes(this.charset);
}
}
return null;
}
/**
* Check if the header value should be mapped to a String, if so configured.
* @param header the header.
* @return the value to add.
*/
protected Object headerValueToAddIn(Header header) {
Object mapped = mapRawIn(header.key(), header.value());
if (mapped == null) {
mapped = header.value();
}
return mapped;
}
@Nullable
private String mapRawIn(String header, byte[] value) {
Boolean asString = this.rawMappedHeaders.get(header);
if (Boolean.TRUE.equals(asString)) {
return new String(value, this.charset);
}
return null;
}
/**
* A matcher for headers.
* @since 2.3
*/
protected interface HeaderMatcher {
/**
* Return true if the header matches.
* @param headerName the header name.
* @return true for a match.
*/
boolean matchHeader(String headerName);
/**
* Return true if this matcher is a negative matcher.
* @return true for a negative matcher.
*/
boolean isNegated();
}
/**
* A matcher that never matches a set of headers.
* @since 2.3
*/
protected static class NeverMatchHeaderMatcher implements HeaderMatcher {
private final Set<String> neverMatchHeaders;
protected NeverMatchHeaderMatcher(String... headers) {
this.neverMatchHeaders = Arrays.stream(headers)
.collect(Collectors.toSet());
}
@Override
public boolean matchHeader(String headerName) {
return this.neverMatchHeaders.contains(headerName);
}
@Override
public boolean isNegated() {
return true;
}
}
/**
* A pattern-based header matcher that matches if the specified
* header matches the specified simple pattern.
* <p> The {@code negate == true} state indicates if the matching should be treated as "not matched".
* @see org.springframework.util.PatternMatchUtils#simpleMatch(String, String)
*/
protected static class SimplePatternBasedHeaderMatcher implements HeaderMatcher {
private static final LogAccessor LOGGER =
new LogAccessor(LogFactory.getLog(SimplePatternBasedHeaderMatcher.class));
private final String pattern;
private final boolean negate;
protected SimplePatternBasedHeaderMatcher(String pattern) {
this(pattern.startsWith("!") ? pattern.substring(1) : pattern, pattern.startsWith("!"));
}
SimplePatternBasedHeaderMatcher(String pattern, boolean negate) {
Assert.notNull(pattern, "Pattern must no be null");
this.pattern = pattern.toLowerCase();
this.negate = negate;
}
@Override
public boolean matchHeader(String headerName) {
String header = headerName.toLowerCase();
if (PatternMatchUtils.simpleMatch(this.pattern, header)) {
LOGGER.debug(() ->
MessageFormat.format(
"headerName=[{0}] WILL " + (this.negate ? "NOT " : "")
+ "be mapped, matched pattern=" + (this.negate ? "!" : "") + "{1}",
headerName, this.pattern));
return true;
}
return false;
}
@Override
public boolean isNegated() {
return this.negate;
}
}
}
相关信息
相关文章
spring-kafka Acknowledgment 源码
spring-kafka AllowDenyCollectionManager 源码
spring-kafka CompositeProducerInterceptor 源码
spring-kafka CompositeProducerListener 源码
spring-kafka DefaultKafkaHeaderMapper 源码
spring-kafka EndpointHandlerMethod 源码
spring-kafka ExponentialBackOffWithMaxRetries 源码
spring-kafka JacksonMimeTypeModule 源码
0
赞
热门推荐
-
2、 - 优质文章
-
3、 gate.io
-
8、 golang
-
9、 openharmony
-
10、 Vue中input框自动聚焦