之前做的东华车管数据采集平台总是发生数据丢失的情况,虽然不频繁但是还是要关注一下原因,于是今天提高了Netty的Log级别,打算查找一下问题出在哪了,提高级别代码:1
2ServerBootstrap b =new ServerBootstrap();
b.group(bossGroup,workerGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 2048).handler(new LoggingHandler(LogLevel.DEBUG)).childHandler(new ChildChannelHandler());
将Loglevel设置成DEBUG模式就OK了。
于是开始安心的观察日志:1
2
3
4
5
6
7
8
9
10
11
122017-01-19 10:04:46 [ nioEventLoopGroup-1-0:1625429 ] - [ INFO ] 消息主体:60160308049620860021010707190117020453395443491162627407087d081f00002e37008801008c00f9
2017-01-19 10:04:49 [ nioEventLoopGroup-1-0:1628830 ] - [ ERROR ] LEAK: ByteBuf.release() was not called before it's garbage-collected. Enable advanced leak reporting to find out where the leak occurred. To enable advanced leak reporting, specify the JVM option '-Dio.netty.leakDetectionLevel=advanced' or call ResourceLeakDetector.setLevel() See http://netty.io/wiki/reference-counted-objects.html for more information.
2017-01-19 10:04:49 [ nioEventLoopGroup-1-0:1628845 ] - [ INFO ] 入缓存队列操作结果:9
2017-01-19 10:04:49 [ nioEventLoopGroup-1-0:1628845 ] - [ INFO ] 消息主体:601603080496208600210107071901170204573954434611626262170f88091f00002e37008801008c00fa
2017-01-19 10:04:53 [ nioEventLoopGroup-1-0:1632839 ] - [ INFO ] 入缓存队列操作结果:9
2017-01-19 10:04:53 [ nioEventLoopGroup-1-0:1632839 ] - [ INFO ] 消息主体:60160308049620860021010707190117020501395443581162624817108a091f00002e37008801008c00fb
2017-01-19 10:04:55 [ nioEventLoopGroup-1-0:1634196 ] - [ INFO ] 入缓存队列操作结果:9
2017-01-19 10:04:55 [ nioEventLoopGroup-1-0:1634196 ] - [ INFO ] 消息主体:601603080496208600210107071901170205023954436011626244571288091f00002e37008801008c00fc
2017-01-19 10:04:56 [ nioEventLoopGroup-1-0:1635288 ] - [ INFO ] 入缓存队列操作结果:9
2017-01-19 10:04:56 [ nioEventLoopGroup-1-0:1635288 ] - [ INFO ] 消息主体:60160308049620860021010707190117020503395443651162624107118a091f00002e37008801008c00fd
2017-01-19 10:04:57 [ nioEventLoopGroup-1-0:1636443 ] - [ INFO ] 入缓存队列操作结果:9
2017-01-19 10:04:57 [ nioEventLoopGroup-1-0:1636443 ] - [ INFO ] 消息主体:601603080496208600210107071901170205053954437111626234671088091f00002e37008801008c00fe
注意这句话:1
LEAK: ByteBuf.release() was not called before it's garbage-collected. Enable advanced leak reporting to find out where the leak occurred. To enable advanced leak reporting, specify the JVM option '-Dio.netty.leakDetectionLevel=advanced' or call ResourceLeakDetector.setLevel() See http://netty.io/wiki/reference-counted-objects.html for more information.
通过这句话我们可以得知,只要加入1
ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.ADVANCED);
将警告级别设置成Advaced即可查到更详细的泄漏信息,之后再度查看日志:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
1322017-01-19 10:35:59 [ nioEventLoopGroup-1-0:665092 ] - [ ERROR ] LEAK: ByteBuf.release() was not called before it's garbage-collected. See http://netty.io/wiki/reference-counted-objects.html for more information.
Recent access records: 5
#5:
io.netty.buffer.AdvancedLeakAwareByteBuf.readBytes(AdvancedLeakAwareByteBuf.java:435)
com.dhcc.ObdServer.ObdServerHandler.channelRead(ObdServerHandler.java:31)
io.netty.channel.ChannelHandlerInvokerUtil.invokeChannelReadNow(ChannelHandlerInvokerUtil.java:84)
io.netty.channel.DefaultChannelHandlerInvoker.invokeChannelRead(DefaultChannelHandlerInvoker.java:153)
io.netty.channel.PausableChannelEventExecutor.invokeChannelRead(PausableChannelEventExecutor.java:86)
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:389)
io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:243)
io.netty.channel.ChannelHandlerInvokerUtil.invokeChannelReadNow(ChannelHandlerInvokerUtil.java:84)
io.netty.channel.DefaultChannelHandlerInvoker.invokeChannelRead(DefaultChannelHandlerInvoker.java:153)
io.netty.channel.PausableChannelEventExecutor.invokeChannelRead(PausableChannelEventExecutor.java:86)
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:389)
io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:956)
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:127)
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:514)
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:471)
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:385)
io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:351)
io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
io.netty.util.internal.chmv8.ForkJoinTask$RunnableExecuteAction.exec(ForkJoinTask.java:1412)
io.netty.util.internal.chmv8.ForkJoinTask.doExec(ForkJoinTask.java:280)
io.netty.util.internal.chmv8.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:877)
io.netty.util.internal.chmv8.ForkJoinPool.scan(ForkJoinPool.java:1706)
io.netty.util.internal.chmv8.ForkJoinPool.runWorker(ForkJoinPool.java:1661)
io.netty.util.internal.chmv8.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:126)
#4:
Hint: 'ObdServerHandler#0' will handle the message from this point.
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:387)
io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:243)
io.netty.channel.ChannelHandlerInvokerUtil.invokeChannelReadNow(ChannelHandlerInvokerUtil.java:84)
io.netty.channel.DefaultChannelHandlerInvoker.invokeChannelRead(DefaultChannelHandlerInvoker.java:153)
io.netty.channel.PausableChannelEventExecutor.invokeChannelRead(PausableChannelEventExecutor.java:86)
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:389)
io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:956)
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:127)
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:514)
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:471)
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:385)
io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:351)
io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
io.netty.util.internal.chmv8.ForkJoinTask$RunnableExecuteAction.exec(ForkJoinTask.java:1412)
io.netty.util.internal.chmv8.ForkJoinTask.doExec(ForkJoinTask.java:280)
io.netty.util.internal.chmv8.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:877)
io.netty.util.internal.chmv8.ForkJoinPool.scan(ForkJoinPool.java:1706)
io.netty.util.internal.chmv8.ForkJoinPool.runWorker(ForkJoinPool.java:1661)
io.netty.util.internal.chmv8.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:126)
#3:
io.netty.buffer.AdvancedLeakAwareByteBuf.release(AdvancedLeakAwareByteBuf.java:721)
io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:237)
io.netty.channel.ChannelHandlerInvokerUtil.invokeChannelReadNow(ChannelHandlerInvokerUtil.java:84)
io.netty.channel.DefaultChannelHandlerInvoker.invokeChannelRead(DefaultChannelHandlerInvoker.java:153)
io.netty.channel.PausableChannelEventExecutor.invokeChannelRead(PausableChannelEventExecutor.java:86)
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:389)
io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:956)
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:127)
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:514)
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:471)
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:385)
io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:351)
io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
io.netty.util.internal.chmv8.ForkJoinTask$RunnableExecuteAction.exec(ForkJoinTask.java:1412)
io.netty.util.internal.chmv8.ForkJoinTask.doExec(ForkJoinTask.java:280)
io.netty.util.internal.chmv8.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:877)
io.netty.util.internal.chmv8.ForkJoinPool.scan(ForkJoinPool.java:1706)
io.netty.util.internal.chmv8.ForkJoinPool.runWorker(ForkJoinPool.java:1661)
io.netty.util.internal.chmv8.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:126)
#2:
io.netty.buffer.AdvancedLeakAwareByteBuf.retain(AdvancedLeakAwareByteBuf.java:693)
io.netty.handler.codec.DelimiterBasedFrameDecoder.decode(DelimiterBasedFrameDecoder.java:277)
io.netty.handler.codec.DelimiterBasedFrameDecoder.decode(DelimiterBasedFrameDecoder.java:216)
io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:316)
io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:230)
io.netty.channel.ChannelHandlerInvokerUtil.invokeChannelReadNow(ChannelHandlerInvokerUtil.java:84)
io.netty.channel.DefaultChannelHandlerInvoker.invokeChannelRead(DefaultChannelHandlerInvoker.java:153)
io.netty.channel.PausableChannelEventExecutor.invokeChannelRead(PausableChannelEventExecutor.java:86)
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:389)
io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:956)
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:127)
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:514)
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:471)
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:385)
io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:351)
io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
io.netty.util.internal.chmv8.ForkJoinTask$RunnableExecuteAction.exec(ForkJoinTask.java:1412)
io.netty.util.internal.chmv8.ForkJoinTask.doExec(ForkJoinTask.java:280)
io.netty.util.internal.chmv8.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:877)
io.netty.util.internal.chmv8.ForkJoinPool.scan(ForkJoinPool.java:1706)
io.netty.util.internal.chmv8.ForkJoinPool.runWorker(ForkJoinPool.java:1661)
io.netty.util.internal.chmv8.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:126)
#1:
io.netty.buffer.AdvancedLeakAwareByteBuf.skipBytes(AdvancedLeakAwareByteBuf.java:465)
io.netty.handler.codec.DelimiterBasedFrameDecoder.decode(DelimiterBasedFrameDecoder.java:272)
io.netty.handler.codec.DelimiterBasedFrameDecoder.decode(DelimiterBasedFrameDecoder.java:216)
io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:316)
io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:230)
io.netty.channel.ChannelHandlerInvokerUtil.invokeChannelReadNow(ChannelHandlerInvokerUtil.java:84)
io.netty.channel.DefaultChannelHandlerInvoker.invokeChannelRead(DefaultChannelHandlerInvoker.java:153)
io.netty.channel.PausableChannelEventExecutor.invokeChannelRead(PausableChannelEventExecutor.java:86)
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:389)
io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:956)
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:127)
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:514)
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:471)
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:385)
io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:351)
io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
io.netty.util.internal.chmv8.ForkJoinTask$RunnableExecuteAction.exec(ForkJoinTask.java:1412)
io.netty.util.internal.chmv8.ForkJoinTask.doExec(ForkJoinTask.java:280)
io.netty.util.internal.chmv8.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:877)
io.netty.util.internal.chmv8.ForkJoinPool.scan(ForkJoinPool.java:1706)
io.netty.util.internal.chmv8.ForkJoinPool.runWorker(ForkJoinPool.java:1661)
io.netty.util.internal.chmv8.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:126)
Created at:
io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:250)
io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:155)
io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:146)
io.netty.buffer.AbstractByteBufAllocator.ioBuffer(AbstractByteBufAllocator.java:107)
io.netty.channel.AdaptiveRecvByteBufAllocator$HandleImpl.allocate(AdaptiveRecvByteBufAllocator.java:104)
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:113)
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:514)
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:471)
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:385)
io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:351)
io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
io.netty.util.internal.chmv8.ForkJoinTask$RunnableExecuteAction.exec(ForkJoinTask.java:1412)
io.netty.util.internal.chmv8.ForkJoinTask.doExec(ForkJoinTask.java:280)
io.netty.util.internal.chmv8.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:877)
io.netty.util.internal.chmv8.ForkJoinPool.scan(ForkJoinPool.java:1706)
io.netty.util.internal.chmv8.ForkJoinPool.runWorker(ForkJoinPool.java:1661)
io.netty.util.internal.chmv8.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:126)
定位到我的代码中为:1
2ByteBuf buff=(ByteBuf) msg;
byte[] req=new byte[buff.readableBytes()];
于是可以确定是ByteBuff内存泄漏导致的问题,于是从这方面着手调查,发现netty5默认的分配bytebuff的方式是PooledByteBufAllocator,所以要手动回收,要不然会造成内存泄漏。
于是释放ByteBuff即可1
ReferenceCountUtil.release(buff);
这里引入一个网友对于这行代码的说明:
ReferenceCountUtil.release()其实是ByteBuf.release()方法(从ReferenceCounted接口继承而来)的包装。netty4中的ByteBuf使用了引用计数(netty4实现了一个可选的ByteBuf池),每一个新分配的ByteBuf>>的引用计数值为1,每对这个ByteBuf对象增加一个引用,需要调用ByteBuf.retain()方法,而每减少一个引用,需要调用ByteBuf.release()方法。当这个ByteBuf对象的引用计数值为0时,表示此对象可回收。我这只是用ByteBuf说明,还有其他对象实现了ReferenceCounted接口,此时同理。
在检查问题的过程中,我还怀疑是不是我的Netty使用了UDP协议导致的数据丢失,于是这里附上Netty使用的是TCP还是UDP的判断方法:
关于TCP和UDP
socket可以基于TCP,也可以基于UDP。区别在于UDP的不保证数据包都正确收到,所以性能更好,但容错不高。TCP保证不错,所以性能没那么好。
UDP基本只适合做在线视频传输之类,我们的需求应该会是TCP。那这2种方式在写法上有什么不同?网上搜到这样的说法:
在ChannelFactory 的选择上,UDP的通信选择 NioDatagramChannelFactory,TCP的通信我们选择的是NioServerSocketChannelFactory;
在Bootstrap的选择上,UDP选择的是ConnectionlessBootstrap,而TCP选择的是ServerBootstrap。对于编解码器decoder和Encoder,以及ChannelPipelineFactory,UDP开发与TCP并没有什么区别,在此不做详细介绍。
对于ChannelHandler,是UDP与TCP区别的核心所在。大家都知道UDP是无连接的,也就是说你通过 MessageEvent 参数对象的 getChannel() 方法获取当前会话连接,但是其 isConnected() 永远都返回 false。
UDP 开发中在消息获取事件回调方法中,获取了当前会话连接 channel 对象后可直接通过 channel 的 write 方法发送数据给对端 channel.write(message, remoteAddress),第一个参数仍然是要发送的消息对象,
第二个参数则是要发送的对端 SocketAddress 地址对象。
这里最需要注意的一点是SocketAddress,在TCP通信中我们可以通过channel.getRemoteAddress()获得,但在UDP通信中,我们必须从MessageEvent中通过调用getRemoteAddress()方法获得对端的SocketAddress 地址。