[case7]Flux OOM实例

发布时间:2019-11-20 发布网站:脚本宝典
脚本宝典收集整理的这篇文章主要介绍了[case7]Flux OOM实例脚本宝典觉得挺不错的,现在分享给大家,也给大家做个参考。

本文主要研究下Flux的OOM产生的场景

FluxSink.OverflowStrategy

reactor-core-3.1.3.RELEASE-sources.jar!/reactor/core/publisher/FluxSink.java

    /**      * Enumeration for backpressure handling.      */     enum OverflowStrategy {         /**          * completely ignore downstream backpressure requests.          * <p>          * This may yield {@link IllegalstateException} when queues get full downstream.          */         IGNORE,         /**          * Signal an {@link IllegalStateException} when the downstream can't keep up          */         ERROR,         /**          * Drop the incoming signal if the downstream is not ready to receive it.          */         DROP,         /**          * Downstream will get only the latest signals from upstream.          */         LATEST,         /**          * Buffer all signals if the downstream can't keep up.          * <p>          * Warning! This does unbounded buffering and may lead to {@link OutOfMemoryError}.          */         BUFFER     }
可以看到BUFFER采用的是无界队列,可能产生OOM

实例

    @Test     public void testFluxOOM() throws InterruptedException {         final Flux<Integer> flux = Flux.<Integer> create(fluxSink -> {             //NOTE sink:class reactor.core.publisher.FluxCreate$SerializedSink             LOGGER.info("sink:{}",fluxSink.getClass());             while (true) {                 fluxSink.next(ThreadLocalRandom.current().nextInt());             }         }, FluxSink.OverflowStrategy.BUFFER)                 .publishOn(Schedulers.elastic(),Integer.MAX_VALUE); //NOTE 测试OOM          //NOTE flux:class reactor.core.publisher.FluxCreate,prefetch:-1         LOGGER.info("flux:{},prefetch:{}",flux.getClass(),flux.getPrefetch());          flux.subscribe(e -> {             LOGGER.info("subscribe:{}",e);             try {                 TimeUnit.SECONDS.sleep(10);             } catch (InterruptedException e1) {                 e1.printStackTrace();             }         });          TimeUnit.MINUTES.sleep(20);     }
jvm参数
-XMx2160K -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp -XX:+PrintGCDetails -Xloggc:/tmp/gc.log
注意这里使用了publishOn,另外prefetch参数设置为Integer.MAX_VALUE(默认为256),就是为了复现无界队列造成的OOM

输出

java.lang.OutOfMemoryError: GC overhead limit exceeded Dumping heap to /tmp/java_pid5295.hprof ... Heap dump file created [6410067 bytes in 0.149 secs] Exception in thread "main" java.lang.OutOfMemoryError: GC overhead limit exceeded     at java.util.jar.Manifest$FastInputStream.<init>(Manifest.java:332)     at java.util.jar.Manifest$FastInputStream.<init>(Manifest.java:327)     at java.util.jar.Manifest.read(Manifest.java:195)     at java.util.jar.Manifest.<init>(Manifest.java:69)     at java.util.jar.JarFile.getManifestFromReference(JarFile.java:199)     at java.util.jar.JarFile.getManifest(JarFile.java:180)     at sun.misc.URLClassPath$JarLoader$2.getManifest(URLClassPath.java:944)     at java.net.URLClassLoader.defineClass(URLClassLoader.java:450)     at java.net.URLClassLoader.access$100(URLClassLoader.java:73)  Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "main"  Process finished with exit code 137

heap dump

[case7]Flux OOM实例

使用MAT分析可以看到reactor.util.concurrent.SpscLinkedArrayQueue持有了很多未释放的数据,该队列由FluxCreate$BufferAsyncSink持有
    static final class BufferAsyncSink<T> extends BaseSink<T> {          final Queue<T> queue;          Throwable error;         volatile boolean done;          volatile int wip;         @SuppressWarnings("rawtypes")         static final AtomicIntegerFieldUpdater<BufferAsyncSink> WIP =                 AtomicIntegerFieldUpdater.newUpdater(BufferAsyncSink.class, "wip");          BufferAsyncSink(CoreSubscriber<? super T> actual, int capacityHint) {             super(actual);             this.queue = Queues.<T>unbounded(capacityHint).get();         }         //......     }    

reactor-core-3.1.3.RELEASE-sources.jar!/reactor/util/concurrent/Queues.java

    /**      * Returns an unbounded, linked-array-based Queue. Integer.max sized link will      * return the default {@link #SMALL_BUFFER_SIZE} size.      * @param linkSize the link size      * @param <T> the reified {@link Queue} generic type      * @return an unbounded {@link Queue} {@link Supplier}      */     @SuppressWarnings("unchecked")     public static <T> Supplier<Queue<T>> unbounded(int linkSize) {         if (linkSize == XS_BUFFER_SIZE) {             return XS_UNBOUNDED;         }         else if (linkSize == Integer.MAX_VALUE || linkSize == SMALL_BUFFER_SIZE) {             return unbounded();         }         return  () -> new SpscLinkedArrayQueue<>(linkSize);     }
可以看到Queues的unbounded方法创建了一个无界队列SpscLinkedArrayQueue来缓冲数据

小结

使用Flux要注意OOM的问题,不过reactor的类库已经尽可能小心地避免这个问题,普通场景的api调用貌似没问题,自己个性化参数的时候要额外注意,本实例就是使用publishOn时特意指定prefetch为Integer.MAX_VALUE,才造成OOM

脚本宝典总结

以上是脚本宝典为你收集整理的[case7]Flux OOM实例全部内容,希望文章能够帮你解决[case7]Flux OOM实例所遇到的问题。

如果觉得脚本宝典网站内容还不错,欢迎将脚本宝典推荐好友。

本图文内容来源于网友网络收集整理提供,作为学习参考使用,版权属于原作者。
如您有任何意见或建议可联系处理。小编QQ:384754419,请注明来意。