val conf = new NeuralNetConfiguration.Builder()
.activation(Activation.LEAKYRELU)
.weightInit(WeightInit.XAVIER)
.updater(new Sgd(0.01))
.graphBuilder()
.addInputs("ip","other")
.addLayer("L1", new EmbeddingLayer.Builder().nIn(ipfeaturesize).nOut(512).activation(Activation.IDENTITY).build(),"ip")
.addVertex("merge",new MergeVertex(), "L1", "other")
.addLayer("L2", new DenseLayer.Builder().nIn(512+otherfeaturesize).nOut(10).build,"merge")
.addLayer("out", new OutputLayer.Builder(LossFunctions.LossFunction.XENT)
.activation(Activation.SIGMOID)
.nIn(10).nOut(1).build,"L2")
.setOutputs("out")
.build
val tm = new ParameterAveragingTrainingMaster.Builder(1)
.averagingFrequency(5)
.workerPrefetchNumBatches(2)
.rddTrainingApproach(RDDTrainingApproach.Direct)
.storageLevel(StorageLevel.DISK_ONLY)
.batchSizePerWorker(batchSizePerWorker)
.build
val sparkNet = new SparkComputationGraph(sc, conf, tm)
sparkNet.setListeners(new ScoreIterationListener(1))
sparkNet.fitMultiDataSet(TrainData)
but an error was encountered while the spark program was running,the error information as follows:
org.apache.spark.SparkException: Job aborted due to stage failure: ResultStage 21 (treeAggregate at ParameterAveragingTrainingMaster.java:667) has failed the maximum allowable number of times: 4. Most recent failure reason: org.apache.spark.shuffle.FetchFailedException: Too large frame: 2326895492
Is it necessary to configure any parametersļ¼
As youāve opened a new thread for this, Iāll continue the conversation here instead of the old thread were this is semi-unrelated.
Can you please share the full exception? And also how you defined TrainData ?
This is something that usually needs to be configured on Spark, but maybe you are trying to push too much data though at the same time, so Iāll need a bit more information to be able to tell you what is going on.
2ćtraindata contains two input feature(ip,other),the traindata type is multidataset as follows:
val ipfeature = Nd4j.zeros(1, 1)
val otherfeature = Nd4j.zeros(1, otherfeaturesize)
val labels = Nd4j.zeros(1, 1)
val features = new Array[INDArray](2)
features(0) = ipfeature
features(1) = otherfeature
val data = new org.nd4j.linalg.dataset.MultiDataSet(features,labels)
ps: the ip one-hot dimension approximates 20 million i guess is this cause spark error?
and the trainData magnitude approximates 1 million
3ćthe spark error info as follows:
ERROR yarn.ApplicationMaster: User class threw exception: org.apache.spark.SparkException: Job aborted due to stage failure: ResultStage 21 (treeAggregate at ParameterAveragingTrainingMaster.java:667) has failed the maximum allowable number of times: 4. Most recent failure reason: org.apache.spark.shuffle.FetchFailedException: Too large frame: 2326895492
at org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:519)
at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:450)
at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:61)
at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:153)
at org.apache.spark.Aggregator.combineCombinersByKey(Aggregator.scala:50)
at org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:84)
at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:105)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.IllegalArgumentException: Too large frame: 2326895492
at org.spark_project.guava.base.Preconditions.checkArgument(Preconditions.java:119)
at org.apache.spark.network.util.TransportFrameDecoder.decodeNext(TransportFrameDecoder.java:133)
at org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:81)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1359)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:935)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:138)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:645)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459)
at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
... 1 more
org.apache.spark.SparkException: Job aborted due to stage failure: ResultStage 21 (treeAggregate at ParameterAveragingTrainingMaster.java:667) has failed the maximum allowable number of times: 4. Most recent failure reason: org.apache.spark.shuffle.FetchFailedException: Too large frame: 2326895492 at org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:519) at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:450) at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:61) at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434) at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) at org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:153) at org.apache.spark.Aggregator.combineCombinersByKey(Aggregator.scala:50) at org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:84) at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:105) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:109) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.IllegalArgumentException: Too large frame: 2326895492 at org.spark_project.guava.base.Preconditions.checkArgument(Preconditions.java:119) at org.apache.spark.network.util.TransportFrameDecoder.decodeNext(TransportFrameDecoder.java:133) at org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:81) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1359) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:935) at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:138) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:645) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459) at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858) at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138) ... 1 more
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1599)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1587)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1586)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1586)
at org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:1368)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1817)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1769)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1758)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2027)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2124)
at org.apache.spark.rdd.RDD$$anonfun$fold$1.apply(RDD.scala:1092)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
at org.apache.spark.rdd.RDD.fold(RDD.scala:1086)
at org.apache.spark.rdd.RDD$$anonfun$treeAggregate$1.apply(RDD.scala:1155)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
at org.apache.spark.rdd.RDD.treeAggregate(RDD.scala:1131)
at org.apache.spark.api.java.JavaRDDLike$class.treeAggregate(JavaRDDLike.scala:439)
at org.apache.spark.api.java.AbstractJavaRDDLike.treeAggregate(JavaRDDLike.scala:45)
at org.deeplearning4j.spark.impl.paramavg.ParameterAveragingTrainingMaster.processResults(ParameterAveragingTrainingMaster.java:667)
at org.deeplearning4j.spark.impl.paramavg.ParameterAveragingTrainingMaster.doIteration(ParameterAveragingTrainingMaster.java:626)
at org.deeplearning4j.spark.impl.paramavg.ParameterAveragingTrainingMaster.executeTrainingDirect(ParameterAveragingTrainingMaster.java:474)
at org.deeplearning4j.spark.impl.paramavg.ParameterAveragingTrainingMaster.executeTrainingMDS(ParameterAveragingTrainingMaster.java:450)
at org.deeplearning4j.spark.impl.graph.SparkComputationGraph.fitMultiDataSet(SparkComputationGraph.java:291)
at org.deeplearning4j.spark.impl.graph.SparkComputationGraph.fitMultiDataSet(SparkComputationGraph.java:278)
at com.model.DnnTrainModel$$anonfun$main$1.apply$mcVI$sp(DnnTrainModel.scala:203)
at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160)
at com.model.DnnTrainModel$.main(DnnTrainModel.scala:201)
at com.model.DnnTrainModel.main(DnnTrainModel.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$4.run(ApplicationMaster.scala:706)
I guess the ip one-hot dimension is very large ,the embedding layer inout is ipfeature,is this cause the spark error? Looking forward to your answer.
Yes, it is very probable that this is the cause. This means that you have 20*10^6 * 512 = 1024 * 10^7 parameters in your embedding layer, which on its own is already around 38GB if I didnāt any conversion mistake there.
I think you will not be getting any reasonable results from that approach either. An IP Address is more like a sentence than like a word. So if you want to go down the embedding route, it is more reasonable to split it into its octets and embed those instead of using the full ip address. In that case you can probably also reduce the embedding size to be smaller for each octet. I would probably go as far as to reduce it by more than a factor of 4, and go with an embedding size of 16 maybe.
This would be 256*16*4 = 16384 parameters for your embedding, which is just 64kb worth of parameters.
Or if you donāt want to go down that route, you can get the AS Number for the ips and create an embedding based on that.
Given the amount of over-parameterization, I think a model like you initially suggested here, will overfit very quickly and never generalize well. Embeddings work the best if they are used in context very often, Given that you have a dataset of 1 million examples but an ip embedding size of 20 million, this looks like it will not be getting any kind of good embedding for the ips. They will essentially be random, esp. when the network has to deal with a new ip that it hasnāt seen before.