一次netty"引发的"诡异old gc问题排查过程

应用:新美大push服务-长连通道sailfish
日推送消息:180亿
QPS峰值: 35W
最大实时在线用户:2200W

push服务简单结构为

客户端sdk<=>长连通道<=>pushServer

1.客户端sdk: 负责提供客户客户端收发push的api
2.长连通道:负责维持海量客户端连接
3.pushServer:负责给业务方提供收发push的rpc服务,与长连通道通过tcp连接,自定义协议,具体的push服务设计另起文章

首先依据这篇文章把push长连通道应用的jvm参数调到最优,见海量连接服务端jvm参数调优杂记, 剩下的都是这篇文章之后所发生

2016年9月2号6:00 左右陆续收到两台机器的报警,上去看一下cat监控

改造之前gc情况

发现 在凌晨4:11分左右,这台机器cms old区域到达old gc阀值1525M(old区域设置为2048M, -XX:CMSInitiatingOccupancyFraction=70,所以阀值为1433M,前一分钟为1428.7M),于是进行old gc,结果进行一次old gc之后,啥也没回收掉,接下来一次次old gc,old区不减反增,甚是诡异!

gc日志

在4:10:29开始频繁old gc(其实这是第二次old gc了,之前已经有过一次,不过可以忽略,我就拿这次来分析),发现old gc过后,old区域大小基本没变,所以这个时候可以断定old区里面肯定有一直被引用的对象,猜测为缓存之类的对象

使用 jmap -dump:live,format=b,file=xxx [pid] 先触发一次gc再dump
重点关注这台10.32.145.237

dump 的时候,花了long long的时间,为了不影响线上引用,遂放弃。。。

9月3号早上又发现old gc,于是连忙起床去dump内存,总内存为1.8G,MAT载入分析

堆内存

光这两个家伙就占据了71.24%,其他的可以忽略不计
然后看到NioSocketChannel这个家伙,对应着某条TCP连接,于是追根溯源,找到这条连接对应的机器

NioSocketChannel 堆内存

然后去cmdb里面一查

cmdb

发现是pushServer的机器。长连通道服务器是用netty实现,自带缓冲区,对外连接着海量的客户端,将海量用户的请求转发给pushServer,而pushServer是BIO实现,无IO缓冲区,当pushServer的TCP缓冲区满了之后,TCP滑动窗口为0,那么长连服务器发送给这台机器的消息netty就一直会保存在自带的缓冲区ChannelOutBoundBuffer里,撑大old区。接下来需要进一步验证

9月5号早上,来公司验证,跑到10.12.22.253这台机器看一下tcp底层缓冲区的情况

tcp -antp | grep 9000

发现tcp发送队列积压了这么多数据没发出去,这种情况发生的原因是接收方来不及处理,接收方的接收队列里面数据积压,于是导致发送方发送不出去,接下来就跑到接收方机器上看下tcp的接收队列

10.32.177.127$ tcp -antp | grep 9000
10.4.210.192$ tcp -antp | grep 9000
10.4.210.193$ tcp -antp | grep 9000

果不其然,三台机器接受队列都撑得很大,到这里,问题基本排查出来了,结论是接收方处理速度过慢导致发送方积压消息过多,netty会把要发送的消息保存在ChannelOutboundBuffer里面,随着积压的消息越来越多,导致old区域逐渐扩大,最终old gc,然而这些消息对象都是live的,因此回收不掉,所以频繁old gc

9月5号下午
考虑到pushServer改造nio需要一段时间,长连通道这边又无法忍受频繁old gc而不得不重启应用,于是在通道端做了一点更改,在选择pushServer写的时候,只选择可写的Channel

 public ChannelGroupFuture writeAndFlushRandom(Object message) {
        final int size = super.size();
        if (size <= 0) {
            return super.writeAndFlush(message);
        }

        return super.writeAndFlush(message, new ChannelMatcher() {
            private int index = 0;
            private int matchedIndex = random.nextInt(size());

            @Override
            public boolean matches(Channel channel) {
                return matchedIndex == index++ && channel.isWritable();
            }
        });
    }

以上&& channel.isWritable()为新添加代码,追踪一下isWritable方法的实现,最终是调用到ChannelOutboundBuffer的isWritable方法

AbstractChannel.java

@Override
    public boolean isWritable() {
        ChannelOutboundBuffer buf = unsafe.outboundBuffer();
        return buf != null && buf.isWritable();
    }

ChannelOutboundBuffer.java

  public boolean isWritable() {
        return unwritable == 0;
    }

而unwritable这个field是在这里被确定

ChannelOutboundBuffer.java

private void incrementPendingOutboundBytes(long size, boolean invokeLater) {
        if (size == 0) {
            return;
        }

        long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, size);
        if (newWriteBufferSize >= channel.config().getWriteBufferHighWaterMark()) {
            setUnwritable(invokeLater);
        }
    }

其中channel.config().getWriteBufferHighWaterMark()返回的field是ChannelConfig里面对应的writeBufferHighWaterMark,可以看到,默认值为64K, 表示如果你在写之前调用调用isWriteable方法,netty最多给你缓存64K的数据, 否则,缓存就一直膨胀

DefaultChannelConfig.java

private volatile int writeBufferHighWaterMark = 64 * 1024;

由此可见,Channel可写至少是TCP缓冲区+netty缓冲区(默认64K)都没有写满, 我这边的做法就是当某个Channel写满之后,就放弃这条Channel,随机选择其他的Channel。

改造完之后,观察了一个多礼拜,old区域已缓慢稳定增长,达到预期效果

改造之后gc情况

可以发现,每次old区域都是1M左右的增长

另外一个问题:每次olg gc的时候重启机器,瞬间异常井喷

TransferToPushServerException 异常井喷

重启三次,三次异常,结合前面的ChannelOutboundBuffer,不难分析,这些写失败的都是之前被堵塞的buffer,重启之后,关闭与pushServer的连接,进入到如下方法

AbstractChannel.java

  public final void close(final ChannelPromise promise) {
            if (!promise.setUncancellable()) {
                return;
            }

            if (outboundBuffer == null) {
                // Only needed if no VoidChannelPromise.
                if (!(promise instanceof VoidChannelPromise)) {
                    // This means close() was called before so we just register a listener and return
                    closeFuture.addListener(new ChannelFutureListener() {
                        @Override
                        public void operationComplete(ChannelFuture future) throws Exception {
                            promise.setSuccess();
                        }
                    });
                }
                return;
            }

            if (closeFuture.isDone()) {
                // Closed already.
                safeSetSuccess(promise);
                return;
            }

            final boolean wasActive = isActive();
            final ChannelOutboundBuffer buffer = outboundBuffer;
            outboundBuffer = null; // Disallow adding any messages and flushes to outboundBuffer.
            Executor closeExecutor = closeExecutor();
            if (closeExecutor != null) {
                closeExecutor.execute(new OneTimeTask() {
                    @Override
                    public void run() {
                        try {
                            // Execute the close.
                            doClose0(promise);
                        } finally {
                            // Call invokeLater so closeAndDeregister is executed in the EventLoop again!
                            invokeLater(new OneTimeTask() {
                                @Override
                                public void run() {
                                    // Fail all the queued messages
                                    buffer.failFlushed(CLOSED_CHANNEL_EXCEPTION, false);
                                    buffer.close(CLOSED_CHANNEL_EXCEPTION);
                                    fireChannelInactiveAndDeregister(wasActive);
                                }
                            });
                        }
                    }
                });
            } else {
                try {
                    // Close the channel and fail the queued messages in all cases.
                    doClose0(promise);
                } finally {
                    // Fail all the queued messages.
                    buffer.failFlushed(CLOSED_CHANNEL_EXCEPTION, false);
                    buffer.close(CLOSED_CHANNEL_EXCEPTION);
                }
                if (inFlush0) {
                    invokeLater(new OneTimeTask() {
                        @Override
                        public void run() {
                            fireChannelInactiveAndDeregister(wasActive);
                        }
                    });
                } else {
                    fireChannelInactiveAndDeregister(wasActive);
                }
            }
        }

程序进入到outboundBuffer.failFlushed(CLOSED_CHANNEL_EXCEPTION, false);
然后进入到ChannelOutboundBufferfailFlushed方法

void failFlushed(Throwable cause, boolean notify) {
        // Make sure that this method does not reenter.  A listener added to the current promise can be notified by the
        // current thread in the tryFailure() call of the loop below, and the listener can trigger another fail() call
        // indirectly (usually by closing the channel.)
        //
        // See https://github.com/netty/netty/issues/1501
        if (inFail) {
            return;
        }

        try {
            inFail = true;
            for (;;) {
                if (!remove0(cause, notify)) {
                    break;
                }
            }
        } finally {
            inFail = false;
        }
    }

这里的for循环导致remove0 会遍历Entry缓存对象链表

private boolean remove0(Throwable cause, boolean notifyWritability) {
        Entry e = flushedEntry;
        if (e == null) {
            clearNioBuffers();
            return false;
        }
        Object msg = e.msg;

        ChannelPromise promise = e.promise;
        int size = e.pendingSize;

        removeEntry(e);

        if (!e.cancelled) {
            // only release message, fail and decrement if it was not canceled before.
            ReferenceCountUtil.safeRelease(msg);

            safeFail(promise, cause);
            decrementPendingOutboundBytes(size, false, notifyWritability);
        }

        // recycle the entry
        e.recycle();

        return true;
    }
private void removeEntry(Entry e) {
        if (-- flushed == 0) {
            // processed everything
            flushedEntry = null;
            if (e == tailEntry) {
                tailEntry = null;
                unflushedEntry = null;
            }
        } else {
            // 指针指向下一个待删除的缓存
            flushedEntry = e.next;
        }
    }

直到所有的缓存对象都被remove掉,remove0 每调用一次都会调用一次safeFail(promise, cause)方法,

 private static void safeFail(ChannelPromise promise, Throwable cause) {
        if (!(promise instanceof VoidChannelPromise) && !promise.tryFailure(cause)) {
            logger.warn("Failed to mark a promise as failure because it's done already: {}", promise, cause);
        }
    }

然后进入到

DefaultPromise.java

 @Override
    public boolean tryFailure(Throwable cause) {
        if (setFailure0(cause)) {
            notifyListeners();
            return true;
        }
        return false;
    }

DefaultPromise.java

static void notifyListener0(Future future, GenericFutureListener l) {
        try {
            l.operationComplete(future);
        } catch (Throwable t) {
            if (logger.isWarnEnabled()) {
                logger.warn("An exception was thrown by " + l.getClass().getName() + ".operationComplete()", t);
            }
        }
    }

最终一个future回调,调回到用户方法

@Override
    protected void channelRead0(ChannelHandlerContext ctx, TransferFromSdkDataPacket msg) throws Exception {
        TransferToPushServerDataPacket dataPacket = new TransferToPushServerDataPacket();

        dataPacket.setVersion(Constants.PUSH_SERVER_VERSION);
        dataPacket.setData(msg.getData());
        dataPacket.setConnectionId(ctx.channel().attr(AttributeKeys.CONNECTION_ID).get());

        final long startTime = System.nanoTime();

        try {
            ChannelGroupFuture channelFutures = pushServerChannels.writeAndFlushRandom(dataPacket);
            channelFutures.addListener(new GenericFutureListener<Future<? super Void>>() {
                @Override
                public void operationComplete(Future<? super Void> future) throws Exception {
                    if (future.isSuccess()) {
                        CatUtil.logTransaction(startTime, null, CatTransactions.TransferToPushServer, CatTransactions.TransferToPushServer);
                    } else {
                        Channel channel = (Channel) ((ChannelGroupFuture) future).group().toArray()[0];
                        final String pushServer = channel.remoteAddress().toString();
                        TransferToPushServerException e = new TransferToPushServerException(String.format("pushServer: %s", pushServer), future.cause());

                        CatUtil.logTransaction(new CatUtil.CatTransactionCallBack() {
                            @Override
                            protected void beforeComplete() {
                                Cat.logEvent(CatEvents.WriteToPushServerError, pushServer);
                            }
                        }, startTime, e, CatTransactions.TransferToPushServer, CatTransactions.TransferToPushServer);
                    }
                }

            });
        } catch (Exception e) {
            CatUtil.logTransaction(startTime, e, CatTransactions.TransferToPushServer, CatTransactions.TransferToPushServer);
        }
    }

而在用户方法里面,我们包装了一下自定义异常,喷到cat,导致瞬间TransferToPushServerException飙高

如果你觉得看的不过瘾,想系统学习Netty原理,那么你一定不要错过我的Netty源码分析系列视频:https://coding.imooc.com/class/230.html

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 158,736评论 4 362
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 67,167评论 1 291
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 108,442评论 0 243
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 43,902评论 0 204
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 52,302评论 3 287
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 40,573评论 1 216
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 31,847评论 2 312
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 30,562评论 0 197
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 34,260评论 1 241
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 30,531评论 2 245
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 32,021评论 1 258
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 28,367评论 2 253
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 33,016评论 3 235
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 26,068评论 0 8
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 26,827评论 0 194
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 35,610评论 2 274
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 35,514评论 2 269

推荐阅读更多精彩内容

  • 从三月份找实习到现在,面了一些公司,挂了不少,但最终还是拿到小米、百度、阿里、京东、新浪、CVTE、乐视家的研发岗...
    时芥蓝阅读 42,014评论 11 349
  • 一次CMS GC问题排查过程(理解原理+读懂GC日志) - iamzhongyong - ITeye技术网站 h...
    葡萄喃喃呓语阅读 1,867评论 0 11
  • 原文阅读 前言 这段时间懈怠了,罪过! 最近看到有同事也开始用上了微信公众号写博客了,挺好的~给他们点赞,这博客我...
    码农戏码阅读 5,878评论 2 31
  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,099评论 18 139
  • 每个孩子都是一颗花的种子, 只不过每个人的花期不同。 有的花,一开始就会很灿烂地绽放, 有的花,需要漫长的等待。 ...
    彭晓芬阅读 208评论 0 0