本文主要研究一下sentinel的StatisticSlot

StatisticSlot

com/alibaba/csp/sentinel/slots/statistic/StatisticSlot.java

public class StatisticSlot extends AbstractLinkedProcessorSlot<DefaultNode> {

    @Override
    public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, Object... args)
        throws Throwable {

        try {
            fireEntry(context, resourceWrapper, node, count, args);
            node.increaseThreadNum();
            node.addPassRequest();

            if (context.getCurEntry().getOriginNode() != null) {
                context.getCurEntry().getOriginNode().increaseThreadNum();
                context.getCurEntry().getOriginNode().addPassRequest();
            }

            if (resourceWrapper.getType() == EntryType.IN) {
                Constants.ENTRY_NODE.increaseThreadNum();
                Constants.ENTRY_NODE.addPassRequest();
            }

        } catch (BlockException e) {
            context.getCurEntry().setError(e);

            // Add block count.
            node.increaseBlockedQps();
            if (context.getCurEntry().getOriginNode() != null) {
                context.getCurEntry().getOriginNode().increaseBlockedQps();
            }

            if (resourceWrapper.getType() == EntryType.IN) {
                Constants.ENTRY_NODE.increaseBlockedQps();
            }

            throw e;
        } catch (Throwable e) {
            context.getCurEntry().setError(e);

            // Should not happen
            node.increaseExceptionQps();
            if (context.getCurEntry().getOriginNode() != null) {
                context.getCurEntry().getOriginNode().increaseExceptionQps();
            }

            if (resourceWrapper.getType() == EntryType.IN) {
                Constants.ENTRY_NODE.increaseExceptionQps();
            }
            throw e;
        }
    }

    @Override
    public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
        DefaultNode node = (DefaultNode)context.getCurNode();

        if (context.getCurEntry().getError() == null) {
            long rt = TimeUtil.currentTimeMillis() - context.getCurEntry().getCreateTime();
            if (rt > Constants.TIME_DROP_VALVE) {
                rt = Constants.TIME_DROP_VALVE;
            }

            node.rt(rt);
            if (context.getCurEntry().getOriginNode() != null) {
                context.getCurEntry().getOriginNode().rt(rt);
            }

            node.decreaseThreadNum();

            if (context.getCurEntry().getOriginNode() != null) {
                context.getCurEntry().getOriginNode().decreaseThreadNum();
            }

            if (resourceWrapper.getType() == EntryType.IN) {
                Constants.ENTRY_NODE.rt(rt);
                Constants.ENTRY_NODE.decreaseThreadNum();
            }
        } else {
            // error may happen
            // node.rt(-2);
        }

        fireExit(context, resourceWrapper, count);
    }

}
  • 重写了entry以及exit方法
  • entry是在成功通过的时候,调用node的increaseThreadNum以及addPassRequest;不通过的话,则调用increaseBlockedQps;异常的话,则调用increaseExceptionQps
  • exit方法调用了node的decreaseThreadNum,最后触发真正的exit操作

StatisticNode

com/alibaba/csp/sentinel/node/StatisticNode.java

public class StatisticNode implements Node {

    private transient Metric rollingCounterInSecond = new ArrayMetric(1000 / SampleCountProperty.sampleCount,
        IntervalProperty.INTERVAL);

    private transient Metric rollingCounterInMinute = new ArrayMetric(1000, 2 * 60);

    private AtomicInteger curThreadNum = new AtomicInteger(0);

    private long lastFetchTime = -1;

    //......

    @Override
    public int curThreadNum() {
        return curThreadNum.get();
    }

    @Override
    public void addPassRequest() {
        rollingCounterInSecond.addPass();
        rollingCounterInMinute.addPass();
    }

    @Override
    public void increaseBlockedQps() {
        rollingCounterInSecond.addBlock();
        rollingCounterInMinute.addBlock();
    }

    @Override
    public void increaseExceptionQps() {
        rollingCounterInSecond.addException();
        rollingCounterInMinute.addException();

    }

    @Override
    public void increaseThreadNum() {
        curThreadNum.incrementAndGet();
    }

    @Override
    public void decreaseThreadNum() {
        curThreadNum.decrementAndGet();
    }

}
  • StatisticNode有个curThreadNum,记录了每个node的调用线程数,方便后面进行流控

DefaultController

com/alibaba/csp/sentinel/slots/block/flow/controller/DefaultController.java

public class DefaultController implements Controller {

    double count = 0;
    int grade = 0;

    public DefaultController(double count, int grade) {
        super();
        this.count = count;
        this.grade = grade;
    }

    @Override
    public boolean canPass(Node node, int acquireCount) {
        int curCount = avgUsedTokens(node);
        if (curCount + acquireCount > count) {
            return false;
        }

        return true;
    }

    private int avgUsedTokens(Node node) {
        if (node == null) {
            return -1;
        }
        return grade == RuleConstant.FLOW_GRADE_THREAD ? node.curThreadNum() : (int)node.passQps();
    }

}
  • 这里的canPass方法通过node的相关统计数来进行限流判断
  • 如果是FLOW_GRADE_THREAD,则采用curThreadNum,否则采用passQps统计数

DefaultSlotsChainBuilder

com/alibaba/csp/sentinel/slots/DefaultSlotsChainBuilder.java

public class DefaultSlotsChainBuilder implements SlotsChainBuilder {

    @Override
    public ProcessorSlotChain build() {
        ProcessorSlotChain chain = new DefaultProcessorSlotChain();
        chain.addLast(new NodeSelectorSlot());
        chain.addLast(new ClusterBuilderSlot());
        chain.addLast(new LogSlot());
        chain.addLast(new StatisticSlot());
        chain.addLast(new SystemSlot());
        chain.addLast(new AuthoritySlot());
        chain.addLast(new FlowSlot());
        chain.addLast(new DegradeSlot());

        return chain;
    }

}

StatisticSlot虽然在SystemSlot、SystemSlot、AuthoritySlot、FlowSlot前面,但是其递增统计数则是fireEntry动作之后进行的,相当于aop拦截的after,也可以将Node的curThreadNum理解为通过限流条件之后的请求线程数

小结

  • StatisticSlot类似after拦截,在通过后续所有slot的条件之后,才进行统计
  • 统计通过StatisticNode维护了curThreadNum、rollingCounterInSecond、rollingCounterInMinute等相关数据
  • DefaultController默认根据指定的限流条件进行控制,如果是FLOW_GRADE_THREAD,则采用curThreadNum,否则采用passQps统计数来判断

doc

本文固定链接: http://www.js-code.com/node-js/node-js_36259.html