博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
聊聊hystrix的execution.isolation.semaphore.maxConcurrentRequests属性
阅读量:6035 次
发布时间:2019-06-20

本文共 6869 字,大约阅读时间需要 22 分钟。

本文主要研究一下hystrix的execution.isolation.semaphore.maxConcurrentRequests属性

AbstractCommand.applyHystrixSemantics

hystrix-core-1.5.12-sources.jar!/com/netflix/hystrix/AbstractCommand.java

private Observable
applyHystrixSemantics(final AbstractCommand
_cmd) { // mark that we're starting execution on the ExecutionHook // if this hook throws an exception, then a fast-fail occurs with no fallback. No state is left inconsistent executionHook.onStart(_cmd); /* determine if we're allowed to execute */ if (circuitBreaker.attemptExecution()) { final TryableSemaphore executionSemaphore = getExecutionSemaphore(); final AtomicBoolean semaphoreHasBeenReleased = new AtomicBoolean(false); final Action0 singleSemaphoreRelease = new Action0() { @Override public void call() { if (semaphoreHasBeenReleased.compareAndSet(false, true)) { executionSemaphore.release(); } } }; final Action1
markExceptionThrown = new Action1
() { @Override public void call(Throwable t) { eventNotifier.markEvent(HystrixEventType.EXCEPTION_THROWN, commandKey); } }; if (executionSemaphore.tryAcquire()) { try { /* used to track userThreadExecutionTime */ executionResult = executionResult.setInvocationStartTime(System.currentTimeMillis()); return executeCommandAndObserve(_cmd) .doOnError(markExceptionThrown) .doOnTerminate(singleSemaphoreRelease) .doOnUnsubscribe(singleSemaphoreRelease); } catch (RuntimeException e) { return Observable.error(e); } } else { return handleSemaphoreRejectionViaFallback(); } } else { return handleShortCircuitViaFallback(); } }复制代码

这个方法调用了getExecutionSemaphore来获取TryableSemaphore,执行之前进行tryAcquire,执行结束之后进行release

AbstractCommand.getExecutionSemaphore

hystrix-core-1.5.12-sources.jar!/com/netflix/hystrix/AbstractCommand.java

/**     * Get the TryableSemaphore this HystrixCommand should use for execution if not running in a separate thread.     *      * @return TryableSemaphore     */    protected TryableSemaphore getExecutionSemaphore() {        if (properties.executionIsolationStrategy().get() == ExecutionIsolationStrategy.SEMAPHORE) {            if (executionSemaphoreOverride == null) {                TryableSemaphore _s = executionSemaphorePerCircuit.get(commandKey.name());                if (_s == null) {                    // we didn't find one cache so setup                    executionSemaphorePerCircuit.putIfAbsent(commandKey.name(), new TryableSemaphoreActual(properties.executionIsolationSemaphoreMaxConcurrentRequests()));                    // assign whatever got set (this or another thread)                    return executionSemaphorePerCircuit.get(commandKey.name());                } else {                    return _s;                }            } else {                return executionSemaphoreOverride;            }        } else {            // return NoOp implementation since we're not using SEMAPHORE isolation            return TryableSemaphoreNoOp.DEFAULT;        }    }复制代码
  • 这里针对ExecutionIsolationStrategy进行判断,如果是SEMAPHORE,则根据commandKey获取或新建对应的TryableSemaphore
  • 创建的话,使用的是TryableSemaphoreActual,其numberOfPermits参数就是execution.isolation.semaphore.maxConcurrentRequests的值
  • 如果ExecutionIsolationStrategy是THREAD的话,这里TryableSemaphore返回的是TryableSemaphoreNoOp.DEFAULT,也就是不做任何操作,都放行

TryableSemaphore

/* package */static interface TryableSemaphore {        /**         * Use like this:         * 

* *

         * if (s.tryAcquire()) {         * try {         * // do work that is protected by 's'         * } finally {         * s.release();         * }         * }         * 
* * @return boolean */ public abstract boolean tryAcquire(); /** * ONLY call release if tryAcquire returned true. *

* *

         * if (s.tryAcquire()) {         * try {         * // do work that is protected by 's'         * } finally {         * s.release();         * }         * }         * 
*/ public abstract void release(); public abstract int getNumberOfPermitsUsed(); }复制代码

定义了三个方法,tryAcquire,release、getNumberOfPermitsUsed

TryableSemaphoreActual

/**     * Semaphore that only supports tryAcquire and never blocks and that supports a dynamic permit count.     * 

* Using AtomicInteger increment/decrement instead of java.util.concurrent.Semaphore since we don't need blocking and need a custom implementation to get the dynamic permit count and since * AtomicInteger achieves the same behavior and performance without the more complex implementation of the actual Semaphore class using AbstractQueueSynchronizer. */ /* package */static class TryableSemaphoreActual implements TryableSemaphore { protected final HystrixProperty

numberOfPermits; private final AtomicInteger count = new AtomicInteger(0); public TryableSemaphoreActual(HystrixProperty
numberOfPermits) { this.numberOfPermits = numberOfPermits; } @Override public boolean tryAcquire() { int currentCount = count.incrementAndGet(); if (currentCount > numberOfPermits.get()) { count.decrementAndGet(); return false; } else { return true; } } @Override public void release() { count.decrementAndGet(); } @Override public int getNumberOfPermitsUsed() { return count.get(); } }复制代码

内部是使用AtomicInteger来进行计数,tryAcquire方法,是先增,如果超过限制,则再减

TryableSemaphoreNoOp.DEFAULT

/* package */static class TryableSemaphoreNoOp implements TryableSemaphore {        public static final TryableSemaphore DEFAULT = new TryableSemaphoreNoOp();        @Override        public boolean tryAcquire() {            return true;        }        @Override        public void release() {        }        @Override        public int getNumberOfPermitsUsed() {            return 0;        }    }复制代码

TryableSemaphoreNoOp.DEFAULT是TryableSemaphoreNoOp的单例,默认放行

小结

hystrix的ExecutionIsolationStrategy分为SEMAPHORE及THREAD模式,在command的执行前后逻辑,内置了对TryableSemaphore的tryAcquire及release操作。只是在获取TryableSemaphore实现类的时候,针对SEMAPHORE模式才真正根据execution.isolation.semaphore.maxConcurrentRequests属性进行限制,而如果是THREAD模式,则返回一个都放行的TryableSemaphoreNoOp实例。

doc

转载地址:http://tjlhx.baihongyu.com/

你可能感兴趣的文章
外网用户通过citrix打印慢的解决方法
查看>>
STL容器的使用
查看>>
关于std::map
查看>>
JXL导出Excel文件兼容性问题
查看>>
VBoot1.0发布,Vue & SpringBoot 综合开发入门
查看>>
centos7 安装wps 后 演示无法启动
查看>>
git简单命令
查看>>
LAMP编译部署
查看>>
XenDesktop7.6安装部署入门教程
查看>>
HashMap的工作原理及HashMap和Hashtable的区别
查看>>
GregorianCalendar日历程序
查看>>
Sublime 中运行 Shell 、Python、Lua、Groovy...等各种脚本
查看>>
【Java集合源码剖析】ArrayList源码剖析
查看>>
linux的目录结构
查看>>
这次逻辑通了,
查看>>
HTMLHelper
查看>>
快速构建Windows 8风格应用29-捕获图片与视频
查看>>
java程序:set改造成map
查看>>
C++ 排序函数 sort(),qsort()的使用方法
查看>>
OC语言Block和协议
查看>>