spring-batch ThrottleLimitResultQueue 源码

  • 2022-08-16
  • 浏览 (398)

spring-batch ThrottleLimitResultQueue 代码

文件路径:/spring-batch-infrastructure/src/main/java/org/springframework/batch/repeat/support/ThrottleLimitResultQueue.java

/*
 * Copyright 2002-2007 the original author or authors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      https://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.springframework.batch.repeat.support;

import java.util.NoSuchElementException;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;

/**
 * An implementation of the {@link ResultQueue} that throttles the number of expected
 * results, limiting it to a maximum at any given time.
 *
 * @author Dave Syer
 */
public class ThrottleLimitResultQueue<T> implements ResultQueue<T> {

	// Accumulation of result objects as they finish.
	private final BlockingQueue<T> results;

	// Accumulation of dummy objects flagging expected results in the future.
	private final Semaphore waits;

	private final Object lock = new Object();

	private volatile int count = 0;

	/**
	 * @param throttleLimit the maximum number of results that can be expected at any
	 * given time.
	 */
	public ThrottleLimitResultQueue(int throttleLimit) {
		results = new LinkedBlockingQueue<>();
		waits = new Semaphore(throttleLimit);
	}

	@Override
	public boolean isEmpty() {
		return results.isEmpty();
	}

	/*
	 * (non-Javadoc)
	 *
	 * @see org.springframework.batch.repeat.support.ResultQueue#isExpecting()
	 */
	@Override
	public boolean isExpecting() {
		// Base the decision about whether we expect more results on a
		// counter of the number of expected results actually collected.
		// Do not synchronize! Otherwise put and expect can deadlock.
		return count > 0;
	}

	/**
	 * Tell the queue to expect one more result. Blocks until a new result is available if
	 * already expecting too many (as determined by the throttle limit).
	 *
	 * @see ResultQueue#expect()
	 */
	@Override
	public void expect() throws InterruptedException {
		synchronized (lock) {
			waits.acquire();
			count++;
		}
	}

	@Override
	public void put(T holder) throws IllegalArgumentException {
		if (!isExpecting()) {
			throw new IllegalArgumentException("Not expecting a result.  Call expect() before put().");
		}
		// There should be no need to block here, or to use offer()
		results.add(holder);
		// Take from the waits queue now to allow another result to
		// accumulate. But don't decrement the counter.
		waits.release();
	}

	@Override
	public T take() throws NoSuchElementException, InterruptedException {
		if (!isExpecting()) {
			throw new NoSuchElementException("Not expecting a result.  Call expect() before take().");
		}
		T value;
		synchronized (lock) {
			value = results.take();
			// Decrement the counter only when the result is collected.
			count--;
		}
		return value;
	}

}

相关信息

spring-batch 源码目录

相关文章

spring-batch RepeatInternalState 源码

spring-batch RepeatInternalStateSupport 源码

spring-batch RepeatSynchronizationManager 源码

spring-batch RepeatTemplate 源码

spring-batch ResultHolder 源码

spring-batch ResultHolderResultQueue 源码

spring-batch ResultQueue 源码

spring-batch TaskExecutorRepeatTemplate 源码

spring-batch package-info 源码

0  赞