Stream processing?


instead of using these batch processing techniques with DataSetIterators and so on I’d like to use stream processing. I am building a LSTM using the computations graph, we receive data constantly and need to validate it as soon as n messages arrive. We don’t have pictures or CSVs or anything like that.

Can someone help me find some resources on how to best go on about this? I can’t seem to find anything in the documentation.

Kind regards

@BeckerBoy what sort of infrastructure do you have? I would recommend either spark streaming or kafka for that.

Depending on what you have, I can make a more specific recommendation.

The software runs on an embedded system. We receive something I would call “messages” for this thread (can’t go too much into detail what kind of messages).
I manually collect them into a INDArray (is there any better datastructure?)

I’m sorry if these questions are stupid but sadly time is not on my side, so I can’t read through all the tutorials out there and figure it out myself.

Thank you very much.

Also, if my sliding window is of size 10, should I collect the messages into an array that is 10x4 or should I collect them into 10 arrays of size 4?

@BeckerBoy let’s just pretend your stuff is MQTT with IOT. It would work the same as kafka. Under a publish subscribe pattern if you use cpu, you can just do the inference all at once.

Otherwise, you could also batch it and just save incoming arrays to disk then process elsewhere offline.

It really comes down to a memory vs disk vs latency required to run your software.

NdArrays are fine. One trick with NDArrays and saving them is you can usually concatenate results.

Concatenate would work with something like:

INDArray arr = load from disk...;
INDArray arr2 = ...from incoming stream...l
INDArray concat = Nd4j.concat(0,arr,arr2,); concat to disk...

It really depends on what your use case is but hopefully this gives you a starting mental model.

@agibsonccc Thank you. I won’t save to disk though but I get what you mean. The problem I am facing however is how I am supposed to feed the data into the network?

So I collect the data into the INDArray, feed it to my NormalizerStandardize and then pass it into the model? Can you tell me which of the models methods I am supposed to use? In Keras we used the “predict” method, but that doesn’t exist in DL4J’s ComputationGraph. The documentation is not really helpful as it just states “Evaluate the network” for evaluate, “Conduct feed forward pass” for feedForward, and so on. I’d need a INDArray as output which I can compare to an INDArray that stores the thresholds.

Essentially what I am trying to get is a minimal working example without the use of RecordReaders, DataSetIterators and so on. Just feed in an array containing the data (10x4; 10 being the message count, 4 being the features) and get out a single array (1x4, giving me four values that I can use to compute tresholds for production and later in production use for comparing against said tresholds).

@BeckerBoy could you be clearer then? If you have ndarrays, then just call .output().

That’s in a lot of our examples and in this case has nothing to do with streaming at all.
Try browsing through some of these: GitHub - eclipse/deeplearning4j-examples: Deeplearning4j Examples (DL4J, DL4J Spark, DataVec)

Do you mean for creating your data and vectorization?
It depends what record reader you used.

So we actually have a few questions then. Please try to keep the topics separated from here on out so it’s easier to answer you.

  1. What do you use in place of datasetierators/record readers during training? Usually will depend on your pipeline. How you go from raw data to array depends on a lot of factors. For example if you have images, you’d want to use the native image loader, if you have csvs, that’d be different. Try to be more specific about what you’re looking for there. Generally your goal would be to replicate what you did in training in inference.

  2. For the normalize standardize and other normalizers, you can save and load those as well. Find an example of that in our tests: deeplearning4j/ at master · eclipse/deeplearning4j · GitHub

Basically fit your normalizer for calculating things like mean/variance. Then add it to the model.

Yes, I think using the word “streaming” might have been unfitting, what I meant was, that we don’t have data in files, we get them via network (which is a stream) and have to immediatly verify them.

That’s strange, I’ve browsed through some of the examples and as far as I have seen all of them use DataSets, RecordReaders and DataSetIterators for evaluation. Could you maybe point out one that doesn’t?

  1. I don’t have the training implemented yet, but I want to use CSVSequenceRecordReader and I think the SequenceRecordReaderDataSetIterator is the most fitting iterator. The raw data is just a message containing x fields, of which 4 are relevant (and therefore extracted; The CSV file for training contains these extracted fields from “valid” messages).

  2. Thank you. I think using the normalizer is not so much a problem, this part is relatively easy to understand for newcomers.

I am sorry my questions are not so clear and might be a bit nooby, but I unfortunately don’t really have a choice. I got ordered to implement this and the time I got for it is not very much.

@BeckerBoy yes that’s mainly in the context of batch processing with a known test set. Imagine having a training folder and a test folder. You have 1 iterator per train/test set.
Sorry if that’s confusing but I understand where the issue is.

For your sequence iterator use case, do you want to use csvs? In that case, how might the data be coming in? Usually we don’t recommend iterators or record readers for real time inference.

The main reason is the wide variety of use cases you would expect the data to be in. If you can post the pre processing or data conversion to ndarray you want to do I can help you convert it for your use case.

Oh ok, so the “test set” would in my case be real time data, right?

The data I would read from a CSV would be data from our sensor that I will log this weekend. The data used at run time will be received via said sensor. I sadly can’t go into much detail how exactly this data looks but I can try an abstract version.

We receive messages from a different devices, multiple per second. These messages can be different types, let’s make up some that do not represent what data we really receive: X, Y, and Z.
We start at index i = 0; The sliding window is of size 10.

The array I am filling is created like this:

INDArray array = ND4j.create(10, 4) // 10 samples, 4 fields per sample

When I receive X or Y I set

temp_x_value = message.getData();


temp_y_value = message.getData(); 

When I receive Z messages I set

array.put(i, 0, temp_x_value);
array.put(i, 1, temp_y_value);
array.put(i, 2, message.getData());
i++; // Increase counter when receiving Z messages

now when the sliding window is full, I’d like to pass the data into the network and afterwards compare the tresholds with the output

normalizer.transform(array); // Normalizer is at this stage loaded from disk
INDArray output = model.outputSingle(array); // Do I want outputSingle() ? I need 1x4 array output
boolean valuesAreValid = true;
for (int n = 0; n < FEATURE_COUNT; n++) {
    // I bet there is a better way?
    valuesAreValid &= output.getDouble(n) <= tresholds.getDoubel(n);  

After all this I set the index i to 0 again and put a new INDArray in arrays place. In reality we have multiple devices, which can dynamically register and unregister, so the INDArrays are attached to the “instances” of these connections. Something along the lines of:

Message message = receiveMessage();
int id = message.getDeviceId();
INDArray slidingWindow = devides[id].pushMessage(message); // The counting and buffer filling part
if (null != slidingWindow) {
    bool deviceValid = detector.detect(slidingWindow); // The preprocessing and evaluation part
    if(!deviceValid) {

Now I have to admit, I’ve never worked with pandas before and as far as I understand ND4j is designed to be a Java version of said library. I am not sure if what I am doing with the INDArrays is correct.

@BeckerBoy I would give up on using iterators with streaming. They’re fundamentally not compatible. Let’s be clear that training and inference are vastly different use cases and shouldn’t be mixed.

Please try to acknowledge my question about how you’re pre processing the data. You keep mentioning you can’t tell me but your problem can’t be that unique. CSVs are something every body has. If it’s not that and some other format you have to find a way to get from raw data → ndarray.

How you do that should not happen inside an iterator. All of our iterators are meant to be used with a record reader. Record readers are for batch processing. I mentioned batch processing for a reason: that’s what you have for training data. If you want to do any sort of neural network updates like incremental training then save the data to disk and use it like that.

For just inference you do not in any scenario use the iterators. You just implement the steps to convert from raw input to ndarray and then pass that to output. Nothing more.

Using your abstraction now (again I’m not interested in your secret sauce just helping you here):

Message message = receiveMessage();
int id = message.getDeviceId();
Normalizer normalizer =...;
INDArray slidingWindow = devides[id].pushMessage(message); // The counting and buffer filling part
INDArray[] output = model.output(array);

if (null != slidingWindow) {
    bool deviceValid = detector.detect(slidingWindow); // The preprocessing and evaluation part
    if(!deviceValid) {

For this part:

normalizer.transform(array); // Normalizer is at this stage loaded from disk
INDArray output = model.outputSingle(array); // Do I want outputSingle() ? I need 1x4 array output
boolean valuesAreValid = true;
for (int n = 0; n < FEATURE_COUNT; n++) {
    // I bet there is a better way?
    valuesAreValid &= output.getDouble(n) <= tresholds.getDoubel(n);  

Consider using bitmask operators instead converting everything to zero/1 then summing over that for your counts. You can also do and/or etc or bitmasks as well.

Thank you, the transform and output have to be applied to a not full window? I thought I need to first fill the buffer with 10 messages, then call transform and output? If combining all pieces of code:

public boolean validateSlidingWindow(final INDArray slidingWindow) {
    INDArray output = model.outputSingle(slidingWindow);
    return; // New, faster check?

public void doTheThing() {
    Message message = receiveMessage();
    int id = message.getDeviceId();
    // Normalizer normalizer =...; // Is a member of this class, initialized in constructor

    // The counting and buffer filling part, returns null if window is not yet full
    INDArray slidingWindow = devides[id].pushMessage(message); 

    if (null != slidingWindow) {
        boolean deviceValid = validateSlidingWindow(slidingWindow); 
        if(!deviceValid) {

For the training part I would have liked to use the CSVSequenceRecordReader and the SequenceRecordReaderDataSetIterator. I yet have to find out what the “labels” refer to. In the Keras prototype we didn’t need that.
There are three CSVs (one for training, 80 % of data; one for threshold computing, 10 % of data; one for testing, another 10 % of data). The CSVs are already processed to be the same format like the sliding window.


Which means I was hoping for the Iterator to fetch 1 to 11, 2 to 12, 3 to 13, and so on (10 messages each pass, moving the sliding window by one message each time).

With the bitmasks thing you mean something like;

Or am I mistaken?

For messages, it depends on where the features come from. If your data is columnar (eg: csv) then you’ll need to ensure you first assemble whatever your feature vector is. You also need to ensure that it’s in the correct order (eg: the messages get inserted as columns in the right column in your overall feature vector)

Regarding the label, it should be an index of a column that represents your targets. I’m not quite sure what you’re referencing with keras but for supervised learning you always need a label column. You also need to be aware of what the label is.

The way dl4j does the parsing with the record readers is we either do regression or classification based on the specified index of the column in the data.

The reason we represent it like that is because most CSV data usually has the label in the same row as the rest of the features. This would be a label column.

For inference you won’t have that since you’re predicting the label.

Beyond that yes you’re spot on with the bit masks.

Thank you a lot. I found out what we use as “label”. Suppose we have a sliding window of size 10, we use the 11th message as the label, as we are predicting whether or a received message is spoofed. We use an early stopping function (tf.keras.callbacks.EarlyStopping), monitoring the val_loss to stop fitting early. I have two questions regarding this:

  • Is there an automatic way for training to read sliding windows of size N and use messag N+1 (relative to the start of the sliding window) as the output? Or do we have to read the CSV “by hand” for such a use case?

  • I found EarlyStoppingConfiguration and EarlyStoppingTrainer, but I couldn’t find any parameter that seems to do the same as we did with Keras, is there something similiar somewhere or do I need to implement this myself?

Kind regards,

@BeckerBoy example here: deeplearning4j-examples/ at bc1bac672faec222afd2b424aa00eaa008c59beb · eclipse/deeplearning4j-examples · GitHub

You’ll want to configure the termination conditions.

Thank you very much for your fast reply. Is it the DataSetLossCalculator I need then? I am unsure if it’s the same as val_loss.