Deeplearning4j spark running error too large frame

my computationgraph built as follows:

val conf = new NeuralNetConfiguration.Builder()
.activation(Activation.LEAKYRELU)
.weightInit(WeightInit.XAVIER)
.updater(new Sgd(0.01))
.graphBuilder()
.addInputs("ip","other")
.addLayer("L1", new EmbeddingLayer.Builder().nIn(ipfeaturesize).nOut(512).activation(Activation.IDENTITY).build(),"ip")
.addVertex("merge",new MergeVertex(), "L1", "other")
.addLayer("L2", new DenseLayer.Builder().nIn(512+otherfeaturesize).nOut(10).build,"merge")
.addLayer("out", new OutputLayer.Builder(LossFunctions.LossFunction.XENT)
.activation(Activation.SIGMOID)
.nIn(10).nOut(1).build,"L2")
.setOutputs("out")
.build

val tm = new ParameterAveragingTrainingMaster.Builder(1)
.averagingFrequency(5)
.workerPrefetchNumBatches(2)
.rddTrainingApproach(RDDTrainingApproach.Direct)
.storageLevel(StorageLevel.DISK_ONLY)
.batchSizePerWorker(batchSizePerWorker) 
.build

val sparkNet = new SparkComputationGraph(sc, conf, tm)
sparkNet.setListeners(new ScoreIterationListener(1))
sparkNet.fitMultiDataSet(TrainData)

but an error was encountered while the spark program was running,the error information as follows:
org.apache.spark.SparkException: Job aborted due to stage failure: ResultStage 21 (treeAggregate at ParameterAveragingTrainingMaster.java:667) has failed the maximum allowable number of times: 4. Most recent failure reason: org.apache.spark.shuffle.FetchFailedException: Too large frame: 2326895492
Is it necessary to configure any parameters?

As you’ve opened a new thread for this, I’ll continue the conversation here instead of the old thread were this is semi-unrelated.

Can you please share the full exception? And also how you defined TrainData ?

This is something that usually needs to be configured on Spark, but maybe you are trying to push too much data though at the same time, so I’ll need a bit more information to be able to tell you what is going on.

1 Like

ok,first thank you for your answer,about your query ,the explanation is as follows:
1、spark configuration :

--conf spark.locality.wait=0 
--conf spark.yarn.executor.memoryOverhead=10G 
--conf spark.executor.extraJavaOptions=-Dorg.bytedeco.javacpp.maxbytes=12G 
--conf spark.driver.extraJavaOptions=-Dorg.bytedeco.javacpp.maxbytes=12G 
--master yarn-cluster 
--num-executors 10 
--executor-cores 1 
--executor-memory 15g 
--driver-memory 20g

2、traindata contains two input feature(ip,other),the traindata type is multidataset as follows:

    val ipfeature = Nd4j.zeros(1, 1)
    val otherfeature = Nd4j.zeros(1, otherfeaturesize)
    val labels = Nd4j.zeros(1, 1)
    val features = new Array[INDArray](2)
    features(0) = ipfeature
    features(1) = otherfeature
    val data = new org.nd4j.linalg.dataset.MultiDataSet(features,labels) 

ps: the ip one-hot dimension approximates 20 million i guess is this cause spark error?
and the trainData magnitude approximates 1 million

3、the spark error info as follows:

	ERROR yarn.ApplicationMaster: User class threw exception: org.apache.spark.SparkException: Job aborted due to stage failure: ResultStage 21 (treeAggregate at ParameterAveragingTrainingMaster.java:667) has failed the maximum allowable number of times: 4. Most recent failure reason: org.apache.spark.shuffle.FetchFailedException: Too large frame: 2326895492 
at org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:519)
at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:450) 
at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:61) 
at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434) 
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440) 
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) 
at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32) 
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) 
at org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:153) 
at org.apache.spark.Aggregator.combineCombinersByKey(Aggregator.scala:50) 
at org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:84) 
at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:105) 
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) 
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) 
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) 
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) 
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) 
at org.apache.spark.scheduler.Task.run(Task.scala:109) 
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
at java.lang.Thread.run(Thread.java:745) 
Caused by: java.lang.IllegalArgumentException: Too large frame: 2326895492 
at org.spark_project.guava.base.Preconditions.checkArgument(Preconditions.java:119) 
at org.apache.spark.network.util.TransportFrameDecoder.decodeNext(TransportFrameDecoder.java:133) 
at org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:81) 
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) 
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) 
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) 
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1359) 
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) 
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) 
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:935) 
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:138) 
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:645) 
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580) 
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497) 
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459) 
at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858) 
at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138) 
... 1 more 
org.apache.spark.SparkException: Job aborted due to stage failure: ResultStage 21 (treeAggregate at ParameterAveragingTrainingMaster.java:667) has failed the maximum allowable number of times: 4. Most recent failure reason: org.apache.spark.shuffle.FetchFailedException: Too large frame: 2326895492 	at org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:519) 	at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:450) 	at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:61) 	at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434) 	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440) 	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) 	at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32) 	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) 	at org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:153) 	at org.apache.spark.Aggregator.combineCombinersByKey(Aggregator.scala:50) 	at org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:84) 	at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:105) 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) 	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) 	at org.apache.spark.scheduler.Task.run(Task.scala:109) 	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345) 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 	at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.IllegalArgumentException: Too large frame: 2326895492 	at org.spark_project.guava.base.Preconditions.checkArgument(Preconditions.java:119) 	at org.apache.spark.network.util.TransportFrameDecoder.decodeNext(TransportFrameDecoder.java:133) 	at org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:81) 	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) 	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) 	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) 	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1359) 	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) 	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) 	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:935) 	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:138) 	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:645) 	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580) 	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497) 	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459) 	at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858) 	at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138) 	... 1 more 
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1599)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1587)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1586)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1586)
at org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:1368)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1817)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1769)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1758)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2027)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2124)
at org.apache.spark.rdd.RDD$$anonfun$fold$1.apply(RDD.scala:1092)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
at org.apache.spark.rdd.RDD.fold(RDD.scala:1086)
at org.apache.spark.rdd.RDD$$anonfun$treeAggregate$1.apply(RDD.scala:1155)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
at org.apache.spark.rdd.RDD.treeAggregate(RDD.scala:1131)
at org.apache.spark.api.java.JavaRDDLike$class.treeAggregate(JavaRDDLike.scala:439)
at org.apache.spark.api.java.AbstractJavaRDDLike.treeAggregate(JavaRDDLike.scala:45)
at org.deeplearning4j.spark.impl.paramavg.ParameterAveragingTrainingMaster.processResults(ParameterAveragingTrainingMaster.java:667)
at org.deeplearning4j.spark.impl.paramavg.ParameterAveragingTrainingMaster.doIteration(ParameterAveragingTrainingMaster.java:626)
at org.deeplearning4j.spark.impl.paramavg.ParameterAveragingTrainingMaster.executeTrainingDirect(ParameterAveragingTrainingMaster.java:474)
at org.deeplearning4j.spark.impl.paramavg.ParameterAveragingTrainingMaster.executeTrainingMDS(ParameterAveragingTrainingMaster.java:450)
at org.deeplearning4j.spark.impl.graph.SparkComputationGraph.fitMultiDataSet(SparkComputationGraph.java:291)
at org.deeplearning4j.spark.impl.graph.SparkComputationGraph.fitMultiDataSet(SparkComputationGraph.java:278)
at com.model.DnnTrainModel$$anonfun$main$1.apply$mcVI$sp(DnnTrainModel.scala:203)
at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160)
at com.model.DnnTrainModel$.main(DnnTrainModel.scala:201)
at com.model.DnnTrainModel.main(DnnTrainModel.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$4.run(ApplicationMaster.scala:706)

I guess the ip one-hot dimension is very large ,the embedding layer inout is ipfeature,is this cause the spark error? Looking forward to your answer.

Yes, it is very probable that this is the cause. This means that you have 20*10^6 * 512 = 1024 * 10^7 parameters in your embedding layer, which on its own is already around 38GB if I didn’t any conversion mistake there.

I think you will not be getting any reasonable results from that approach either. An IP Address is more like a sentence than like a word. So if you want to go down the embedding route, it is more reasonable to split it into its octets and embed those instead of using the full ip address. In that case you can probably also reduce the embedding size to be smaller for each octet. I would probably go as far as to reduce it by more than a factor of 4, and go with an embedding size of 16 maybe.

This would be 256*16*4 = 16384 parameters for your embedding, which is just 64kb worth of parameters.

Or if you don’t want to go down that route, you can get the AS Number for the ips and create an embedding based on that.

1 Like

Thank you for your answer and suggestion! In theory, it can be trained if enough resources are given.

Given the amount of over-parameterization, I think a model like you initially suggested here, will overfit very quickly and never generalize well. Embeddings work the best if they are used in context very often, Given that you have a dataset of 1 million examples but an ip embedding size of 20 million, this looks like it will not be getting any kind of good embedding for the ips. They will essentially be random, esp. when the network has to deal with a new ip that it hasn’t seen before.

1 Like

ok,thank you for your answer !