dubbo ForkingClusterInvoker 源码
dubbo ForkingClusterInvoker 代码
文件路径:/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/ForkingClusterInvoker.java
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dubbo.rpc.cluster.support;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.threadlocal.NamedInternalThreadFactory;
import org.apache.dubbo.common.threadpool.manager.FrameworkExecutorRepository;
import org.apache.dubbo.rpc.Invocation;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.Result;
import org.apache.dubbo.rpc.RpcContext;
import org.apache.dubbo.rpc.RpcException;
import org.apache.dubbo.rpc.cluster.Directory;
import org.apache.dubbo.rpc.cluster.LoadBalance;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import static org.apache.dubbo.common.constants.CommonConstants.DEFAULT_TIMEOUT;
import static org.apache.dubbo.common.constants.CommonConstants.FORKS_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.TIMEOUT_KEY;
import static org.apache.dubbo.rpc.cluster.Constants.DEFAULT_FORKS;
/**
* NOTICE! This implementation does not work well with async call.
* <p>
* Invoke a specific number of invokers concurrently, usually used for demanding real-time operations, but need to waste more service resources.
*
* <a href="http://en.wikipedia.org/wiki/Fork_(topology)">Fork</a>
*/
public class ForkingClusterInvoker<T> extends AbstractClusterInvoker<T> {
/**
* Use {@link NamedInternalThreadFactory} to produce {@link org.apache.dubbo.common.threadlocal.InternalThread}
* which with the use of {@link org.apache.dubbo.common.threadlocal.InternalThreadLocal} in {@link RpcContext}.
*/
private final ExecutorService executor;
public ForkingClusterInvoker(Directory<T> directory) {
super(directory);
executor = directory.getUrl().getOrDefaultFrameworkModel().getBeanFactory()
.getBean(FrameworkExecutorRepository.class).getSharedExecutor();
}
@Override
@SuppressWarnings({"unchecked", "rawtypes"})
public Result doInvoke(final Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
try {
checkInvokers(invokers, invocation);
final List<Invoker<T>> selected;
final int forks = getUrl().getParameter(FORKS_KEY, DEFAULT_FORKS);
final int timeout = getUrl().getParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT);
if (forks <= 0 || forks >= invokers.size()) {
selected = invokers;
} else {
selected = new ArrayList<>(forks);
while (selected.size() < forks) {
Invoker<T> invoker = select(loadbalance, invocation, invokers, selected);
if (!selected.contains(invoker)) {
//Avoid add the same invoker several times.
selected.add(invoker);
}
}
}
RpcContext.getServiceContext().setInvokers((List) selected);
final AtomicInteger count = new AtomicInteger();
final BlockingQueue<Object> ref = new LinkedBlockingQueue<>(1);
for (final Invoker<T> invoker : selected) {
URL consumerUrl = RpcContext.getServiceContext().getConsumerUrl();
executor.execute(() -> {
try {
if (ref.size() > 0) {
return;
}
Result result = invokeWithContextAsync(invoker, invocation, consumerUrl);
ref.offer(result);
} catch (Throwable e) {
int value = count.incrementAndGet();
if (value >= selected.size()) {
ref.offer(e);
}
}
});
}
try {
Object ret = ref.poll(timeout, TimeUnit.MILLISECONDS);
if (ret instanceof Throwable) {
Throwable e = (Throwable) ret;
throw new RpcException(e instanceof RpcException ? ((RpcException) e).getCode() : RpcException.UNKNOWN_EXCEPTION,
"Failed to forking invoke provider " + selected + ", but no luck to perform the invocation. " +
"Last error is: " + e.getMessage(), e.getCause() != null ? e.getCause() : e);
}
return (Result) ret;
} catch (InterruptedException e) {
throw new RpcException("Failed to forking invoke provider " + selected + ", " +
"but no luck to perform the invocation. Last error is: " + e.getMessage(), e);
}
} finally {
// clear attachments which is binding to current thread.
RpcContext.getClientAttachment().clearAttachments();
}
}
}
相关信息
相关文章
dubbo AbstractClusterInvoker 源码
dubbo AvailableClusterInvoker 源码
dubbo BroadcastClusterInvoker 源码
0
赞
热门推荐
-
2、 - 优质文章
-
3、 gate.io
-
8、 golang
-
9、 openharmony
-
10、 Vue中input框自动聚焦