Chapter 6 - Coding with streams

0.0(0)
studied byStudied by 1 person
0.0(0)
full-widthCall Kai
learnLearn
examPractice Test
spaced repetitionSpaced Repetition
heart puzzleMatch
flashcardsFlashcards
GameKnowt Play
Card Sorting

1/39

encourage image

There's no tags or description

Looks like no tags are added yet.

Study Analytics
Name
Mastery
Learn
Test
Matching
Spaced

No study sessions yet.

40 Terms

1
New cards

Buffering vs Streaming - How buffering works with data

Almost all the asynchronous APIs that we've seen so far in this book work using buffer mode. For an input operation, buffer mode causes all the data coming from a resource to be collected into a buffer until the operation is completed; it is then passed back to the caller as one single blob of data.

<p>Almost all the asynchronous APIs that we've seen so far in this book work using buffer mode. For an input operation, buffer mode causes all the data coming from a resource to be collected into a buffer until the operation is completed; it is then passed back to the caller as one single blob of data.</p>
2
New cards

Buffering vs Streaming - How Streaming works with data

  • streams allow us to process the data as soon as it arrives from the resource.

  • what are the differences between these two approaches? Purely from an efficiency perspective, streams can be more efficient in terms of both space (memory usage) and time (computation clock time).

  • Node.js streams have another important advantage: composability.

<ul><li><p>streams allow us to process the data as soon as it arrives from the resource.</p></li><li><p>what are the differences between these two approaches? Purely from an efficiency perspective, streams can be more efficient in terms of both space (memory usage) and time (computation clock time).</p></li><li><p>Node.js streams have another important advantage: composability.</p></li></ul><p></p>
3
New cards

Spatial efficiency (Difference between streams and buffers)

first of all, streams allow us to do things that would not be possible by buffering data and processing it all at once. For example, consider the case in which we have to read a very big file, let's say, in the order of hundreds of megabytes or even gigabytes. Clearly, using an API that returns a big buffer when the file is completely read is not a good idea. Imagine reading a few of these big files concurrently; our application would easily run out of memory. Besides that, buffers in V8 are limited in size. You cannot allocate more than a few gigabytes of data, so we may hit a wall way before running out of physical memory.

<p>first of all, streams allow us to do things that would not be possible by buffering data and processing it all at once. For example, consider the case in which we have to read a very big file, let's say, in the order of hundreds of megabytes or even gigabytes. Clearly, using an API that returns a big buffer when the file is completely read is not a good idea. Imagine reading a few of these big files concurrently; our application would easily run out of memory. Besides that, buffers in V8 are limited in size. You cannot allocate more than a few gigabytes of data, so we may hit a wall way before running out of physical memory.</p>
4
New cards

Example - Gzipping using a buffered API

knowt flashcard image
5
New cards

Exampe - Gzipping using streams

knowt flashcard image
6
New cards

Time efficiency (how buffers and streams differ at time efficiency)

Let's now consider the case of an application that compresses a file and uploads it to a remote HTTP server, which, in turn, decompresses it and saves it on the filesystem:

  • If the client component of our application was implemented using a buffered API, the upload would start only when the entire file had been read and compressed. On the other hand, the decompression would start on the server only when all the data had been received.

  • A better solution to achieve the same result involves the use of streams. On the client machine, streams allow us to compress and send the data chunks as soon as they are read from the filesystem, whereas on the server, they allow us to decompress every chunk as soon as it is received from the remote peer. (check img)

(for better detailed example: 193-196)

<p>Let's now consider the case of an application that compresses a file and uploads it to a remote HTTP server, which, in turn, decompresses it and saves it on the filesystem:</p><ul><li><p>If the client component of our application was implemented using a buffered API, the upload would start only when the entire file had been read and compressed. On the other hand, the decompression would start on the server only when all the data had been received.</p></li><li><p>A better solution to achieve the same result involves the use of streams. On the client machine, streams allow us to compress and send the data chunks as soon as they are read from the filesystem, whereas on the server, they allow us to decompress every chunk as soon as it is received from the remote peer. (check img)</p></li></ul><p>(for better detailed example: 193-196)</p><p></p>
7
New cards

Composability with Pipes

the pipe() method, which allows us to connect the different processing units, each being responsible for one single functionality.

In perfect Node.js style. This is possible because streams have a uniform interface, and they can understand each other in terms of API.

The only prerequisite is that the next stream in the pipeline has to support the data type produced by the previous stream, which can be either binary, text, or even objects.

(for an example in depth: 196-199)

8
New cards

Anatomy of streams

Every stream in Node.js is an implementation of one of the four base abstract classes available in the stream core module:

  • Readable

  • Writable

  • Duplex

  • Transform

Each stream class is also an instance of EventEmitter. Streams, in fact, can produce several types of event, such as end when a Readable stream has finished reading, finish when a Writable stream has completed writing, or error when something goes wrong.

9
New cards

Operating modes

One reason why streams are so flexible is the fact that they can handle not just binary data, but almost any JavaScript value. In fact, they support two operating modes:

  • Binary mode: To stream data in the form of chunks, such as buffers or strings.

  • Object mode: To stream data as a sequence of discrete objects (allowing us to use almost any JavaScript value).

These two operating modes allow us to use streams not just for I/O, but also as a tool to elegantly compose processing units in a functional fashion.

10
New cards

Readable streams

  • A Readable stream represents a source of data. In Node.js, it's implemented using the Readable abstract class, which is available in the stream module.

  • There are two approaches to receive the data from a Readable stream: non-flowing (or paused) and flowing.

11
New cards

Readable - The non-flowing mode

  • The non-flowing or paused mode is the default pattern for reading from a Readable stream. It involves attaching a listener to the stream for the readable event, which signals the availability of new data to read.

  • Then, in a loop, we read the data continuously until the internal buffer is emptied. This can be done using the read() method, which synchronously reads from the internal buffer and returns a Buffer object representing the chunk of data.

<ul><li><p>The non-flowing or paused mode is the default pattern for reading from a Readable stream. It involves attaching a listener to the stream for the readable event, which signals the availability of new data to read.</p></li><li><p>Then, in a loop, we read the data continuously until the internal buffer is emptied. This can be done using the read() method, which synchronously reads from the internal buffer and returns a Buffer object representing the chunk of data.</p></li></ul><p></p>
12
New cards

Readable - Flowing mode

  • Another way to read from a stream is by attaching a listener to the data event. This will switch the stream into using flowing mode, where the data is not pulled using read(), but instead is pushed to the data listener as soon as it arrives.

  • Flowing mode offers less flexibility to control the flow of data compared to non- flowing mode. The default operating mode for streams is non-flowing, so to enable flowing mode, it's necessary to attach a listener to the data event or explicitly invoke the resume() method. To temporarily stop the stream from emitting data events,

<ul><li><p>Another way to read from a stream is by attaching a listener to the data event. <mark data-color="green" style="background-color: green; color: inherit">This will switch the stream into using flowing mode, where the data is not pulled using read(), but instead is pushed to the data listener as soon as it arrives.</mark></p></li><li><p>Flowing mode offers less flexibility to control the flow of data compared to non- flowing mode. The default operating mode for streams is non-flowing, so to enable flowing mode, it's necessary to attach a listener to the data event or explicitly invoke the resume() method. To temporarily stop the stream from emitting data events,</p></li></ul><p></p>
13
New cards

Implementing Readable streams

  • Now that we know how to read from a stream, the next step is to learn how to implement a new custom Readable stream. To do this, it's necessary to create a new class by inheriting the prototype Readable from the stream module. The concrete stream must provide an implementation of the _read() method.

  • Please note that read() is a method called by the stream consumers, while _read() is a method to be implemented by a stream subclass and should never be called directly. The underscore usually indicates that the method is not public and should not be called directly.

<ul><li><p>Now that we know how to read from a stream, the next step is to learn how to implement a new custom Readable stream. To do this, it's necessary to create a new class by inheriting the prototype Readable from the stream module. The concrete stream must provide an implementation of the _read() method.</p></li><li><p>Please note that read() is a method called by the stream consumers, while _read() is a method to be implemented by a stream subclass and should never be called directly. The underscore usually indicates that the method is not public and should not be called directly.</p></li></ul><p></p>
14
New cards

Simplified construction (of creating a custom readable stream)

knowt flashcard image
15
New cards

Readable streams from iterables

You can easily create Readable stream instances from arrays or other iterable objects (that is, generators, iterators, and async iterators) using the Readable.from() helper.

Try not to instantiate large arrays in memory. Imagine if, in the previous example, we wanted to list all the mountains in the world. There are about 1 million mountains, so if we were to load all of them in an array upfront, we would allocate a quite significant amount of memory. Even if we then consume the data in the array through a Readable stream, all the data has already been preloaded, so we are effectively voiding the memory efficiency of streams. It's always preferable to load and consume the data in chunks, and you could do so by using native streams such as fs.createReadStream, by building a custom stream, or by using Readable.from with lazy iterables such as generators, iterators, or async iterators.

<p>You can easily create Readable stream instances from arrays or other iterable objects (that is, generators, iterators, and async iterators) using the Readable.from() helper.</p><p><mark data-color="green" style="background-color: green; color: inherit">Try not to instantiate large arrays in memory.</mark> Imagine if, in the previous example, we wanted to list all the mountains in the world. There are about 1 million mountains, so if we were to load all of them in an array upfront, we would allocate a quite significant amount of memory. Even if we then consume the data in the array through a Readable stream, all the data has already been preloaded, so we are effectively voiding the memory efficiency of streams. <mark data-color="green" style="background-color: green; color: inherit">It's always preferable to load and consume the data in chunks, and you could do so by using native streams such as fs.createReadStream, by building a custom stream, or by using Readable.from with lazy iterables such as generators, iterators, or async iterators.</mark></p>
16
New cards

Writable streams

A Writable stream represents a data destination. Imagine, for instance, a file on the filesystem, a database table, a socket, the standard output, or the standard error interface. In Node.js, these kinds of abstractions can be implemented using the Writable abstract class, which is available in the stream module

17
New cards

Backpreassure

Node.js data streams, like liquids in a piping system, can suffer from bottlenecks when data is written faster than the stream can handle. To manage this, incoming data is buffered in memory.

However, without feedback from the stream to the writer, the buffer could keep growing, potentially leading to excessive memory usage.

Node.js streams are designed to maintain steady and predictable memory usage, even during large data transfers. Writable streams include a built-in signaling mechanism to alert the application when the internal buffer has accumulated too much data.

This signal indicates that it’s better to pause and wait for the buffered data to be flushed to the stream’s destination before sending more data. The writable. write() method returns false once the buffer size exceeds the highWaterMark limit.

<p>Node.js data streams, like liquids in a piping system, can suffer from bottlenecks when data is written faster than the stream can handle. To manage this, incoming data is buffered in memory.</p><p>However, without feedback from the stream to the writer, the buffer could keep growing, potentially leading to excessive memory usage.</p><p>Node.js streams are designed to maintain steady and predictable memory usage, even during large data transfers. <strong>Writable streams include a built-in signaling mechanism to alert the application when the internal buffer has accumulated too much data.</strong> </p><p>This signal indicates that it’s better to pause and wait for the buffered data to be flushed to the stream’s destination before sending more data. The writable. write() method returns false once the buffer size exceeds the highWaterMark limit.</p>
18
New cards

Implementing Writable streams

We can implement a new Writable stream by inheriting the class Writable and providing an implementation for the _write() method. Let’s try to do it immediately while discussing the details along the way.

<p>We can implement a new Writable stream by inheriting the class Writable and providing an implementation for the _write() method. Let’s try to do it immediately while discussing the details along the way.</p>
19
New cards

Simplified construction (of creating a custom writable stream)

As we saw for Readable streams, Writable streams also offer a simplified construction mechanism. If we were to rewrite ToFileStream using the simplified construction for Writable streams, it would look like this:

<p>As we saw for Readable streams, Writable streams also offer a simplified construction mechanism. If we were to rewrite ToFileStream using the simplified construction for Writable streams, it would look like this:</p>
20
New cards

What is Duplex Streams

A Duplex stream is a stream that is both Readable and Writable. It is useful when we want to describe an entity that is both a data source and a data destination, such as network sockets, for example.

Duplex streams inherit the methods of both stream.Readable and stream.Writable, so this is nothing new to us. This means that we can read() or write() data, or listen for both readable and drain events.

To create a custom Duplex stream, we have to provide an implementation for both _read() and _write(). The options object passed to the Duplex() constructor is internally forwarded to both the Readable and Writable constructors.

<p><strong>A Duplex stream is a stream that is both Readable and Writable.</strong> It is useful when we want to describe an entity that is both a data source and a data destination, such as network sockets, for example.</p><p><strong>Duplex streams inherit the methods of both <em>stream.Readable </em>and <em>stream.Writable</em>,</strong> so this is nothing new to us. This means that we can read() or write() data, or listen for both readable and drain events.</p><p>To create a custom Duplex stream, we have to provide an implementation for both _read() and _write(). The options object passed to the Duplex() constructor is internally forwarded to both the Readable and Writable constructors.</p>
21
New cards

what is a transform steams

Transform streams are a special kind of Duplex stream that are specifically designed to handle data transformations.

In a simple Duplex stream, there is no immediate relationship between the data read from the stream and the data written into it (at least, the stream is agnostic to such a relationship). Think about a TCP socket, which just sends and receives data to and from the remote peer; the socket is not aware of any relationship between the input and output. Figure 6.4 illustrates the data flow in a Duplex stream:

On the other hand, Transform streams apply some kind of transformation to each chunk of data that they receive from their Writable side, and then make the transformed data available on their Readable side. Figure 6.5

From a user perspective, the programmatic interface of a Transform stream is exactly like that of a Duplex stream. However, when we want to implement a new Duplex stream, we have to provide both the read() and write() methods, while for implementing a new Transform stream, we have to fill in another pair of methods: transform() and flush().

<p>Transform streams are a special kind of Duplex stream that are specifically designed to handle data transformations.</p><p>In a simple Duplex stream, there is no immediate relationship between the data read from the stream and the data written into it (at least, the stream is agnostic to such a relationship). Think about a TCP socket, which just sends and receives data to and from the remote peer; the socket is not aware of any relationship between the input and output. Figure 6.4 illustrates the data flow in a Duplex stream:</p><p>On the other hand,<strong> Transform streams apply some kind of transformation to each chunk of data that they receive from their Writable side, and then make the transformed data available on their Readable side.</strong> Figure 6.5</p><p>From a user perspective, the programmatic interface of a Transform stream is exactly like that of a Duplex stream.<strong> However, when we want to implement a new Duplex stream, we have to provide both the <em>read() and </em>write() methods, while for implementing a new Transform stream, we have to fill in another pair of methods: <em>transform() and </em>flush().</strong></p>
22
New cards

Implementing Transform streams

Streams process data in chunks, and these chunks don’t always align with the boundaries of the target search string. For example, if the string we are trying to match is split across two chunks, the split() operation on a chunk alone won’t detect it, potentially leaving part of the match unnoticed. The tail variable ensures that the last portion of a chunk—potentially part of a match—is preserved and concatenated with the next chunk.

In Transform streams, it’s not uncommon for the logic to involve buffering data from multiple chunks before there’s enough information to perform the transformation.

(139-242)

<p>Streams process data in chunks, and these chunks don’t always align with the boundaries of the target search string. For example, if the string we are trying to match is split across two chunks, the split() operation on a chunk alone won’t detect it, potentially leaving part of the match unnoticed. The tail variable ensures that the last portion of a chunk—potentially part of a match—is preserved and concatenated with the next chunk.</p><p>In Transform streams, it’s not uncommon for the logic to involve buffering data from multiple chunks before there’s enough information to perform the transformation.</p><p>(139-242)</p>
23
New cards

Filtering and aggregating data with Transform streams

As we discussed earlier, Transform streams are a great tool for building data transformation pipelines. But Transform streams aren’t limited to those examples. They’re often used for tasks like filtering and aggregating data.


To make this more concrete, imagine a Fortune 500 company asks us to analyze a large file containing all their sales data for 2024. The file, data.csv, is a sales report in CSV format, and they want us to calculate the total profit for sales made in Italy.

Let’s use Node.js streams. Streams are well suited for this kind of task because they can process large datasets incrementally, without loading everything into memory.

<p>As we discussed earlier, Transform streams are a great tool for building data transformation pipelines. But Transform streams aren’t limited to those examples. They’re often used for tasks like filtering and aggregating data.</p><div data-type="horizontalRule"><hr></div><p>To make this more concrete, imagine a Fortune 500 company asks us to analyze a large file containing all their sales data for 2024. The file, data.csv, is a sales report in CSV format, and they want us to calculate the total profit for sales made in Italy.</p><p>Let’s use Node.js streams. Streams are well suited for this kind of task because they can process large datasets incrementally, without loading everything into memory.</p>
24
New cards

Pattern: transform filter

Invoke this.push() in a conditional way to allow only some data to reach the next stage of the pipeline.


The Transformer Filter pattern (often called the Pipes and Filters pattern) is a software architectural pattern used to process streams of data. It breaks down complex processing tasks into a series of separate, independent steps.

In Node.js, this is most commonly and natively implemented using Streams.

The Core Concept

Imagine an assembly line. Raw material comes in, passes through several stations where it is modified or inspected, and a finished product comes out.

  1. Source: The origin of the data (e.g., reading a file, an HTTP request).

  2. Filter/Transformer: A component that receives data, performs a single operation (modifies it, filters it, or enriches it), and passes it to the next step.

  3. Sink: The final destination (e.g., writing to a file, sending a response, saving to a DB).

Why use it in Node.js?

  • Memory Efficiency: You don't load the entire dataset into memory. You process it chunk by chunk.

  • Decoupling: Each filter does one thing well. You can easily add, remove, or reorder filters without breaking the whole system.

  • Backpressure: Node.js streams automatically handle "backpressure"—if the writing step is slow, the reading step automatically slows down so memory doesn't overflow.

<p><strong>Invoke this.push() in a conditional way to allow only some data to reach the next stage of the pipeline.</strong></p><div data-type="horizontalRule"><hr></div><p>The <strong>Transformer Filter pattern</strong> (often called the <strong>Pipes and Filters</strong> pattern) is a software architectural pattern used to process streams of data. It breaks down complex processing tasks into a series of separate, independent steps.</p><p>In Node.js, this is most commonly and natively implemented using <strong>Streams</strong>.</p><p>The Core Concept</p><p>Imagine an assembly line. <span>Raw material comes in, passes through several stations where it is modified or inspected, and a finished product comes out.</span></p><p></p><ol><li><p><strong>Source:</strong> The origin of the data (e.g., reading a file, an HTTP request).</p></li><li><p><strong>Filter/Transformer:</strong> A component that receives data, performs a single operation (modifies it, filters it, or enriches it), and passes it to the next step.</p></li><li><p><strong>Sink:</strong> The final destination (e.g., writing to a file, sending a response, saving to a DB).</p></li></ol><p>Why use it in Node.js?</p><ul><li><p><strong>Memory Efficiency:</strong> You don't load the entire dataset into memory. You process it chunk by chunk.</p></li><li><p><strong>Decoupling:</strong> Each filter does one thing well. You can easily add, remove, or reorder filters without breaking the whole system.</p></li><li><p><strong>Backpressure:</strong> Node.js streams automatically handle "backpressure"—if the writing step is slow, the reading step automatically slows down so memory doesn't overflow.</p></li></ul><p></p>
25
New cards

Pattern: Streaming aggregation

Use _transform() to process the data and accumulate the partial result, then call this.push() only in the _flush() method to emit the result when all the data has been processed.

<p><strong>Use _transform() to process the data and accumulate the partial result, then call this.push() only in the _flush() method to emit the result when all the data has been processed.</strong></p>
26
New cards

what is PassThrough streams?

There is a fifth type of stream that is worth mentioning: PassThrough. This type of stream is a special type of Transform stream that outputs every data chunk without applying any transformation.PassThrough is possibly the most underrated type of stream, but there are several circumstances in which it can be a very valuable tool in our toolbelt. For instance, PassThrough streams can be useful for observability or to implement late piping and lazy stream patterns.

27
New cards

Observability - How to observe how much data is flowing through one or more streams?

Note that you could implement an alternative version of the monitor stream by using a custom transform stream instead. In such a case, you would have to make sure that the received chunks are pushed without any modification or delay, which is something that a PassThrough stream would do automatically for you. Both approaches are equally valid, so pick the approach that feels more natural to you.

<p>Note that you could implement an alternative version of the monitor stream by using a custom transform stream instead. In such a case, you would have to make sure that the received chunks are pushed without any modification or delay, which is something that a PassThrough stream would do automatically for you. Both approaches are equally valid, so pick the approach that feels more natural to you.</p>
28
New cards

Pattern: Late piping

Definition: A pattern used when an API requires a stream input immediately, but the data source or necessary transformations (e.g., compression) are not yet ready or need to happen asynchronously.

(148-250)

<p><strong>Definition:</strong> A pattern used when an API requires a stream input <strong>immediately</strong>, but the data source or necessary transformations (e.g., compression) are not yet ready or need to happen asynchronously.</p><img src="https://knowt-user-attachments.s3.amazonaws.com/c9face11-a57d-437b-9674-124f17613f9c.png" data-width="100%" data-align="center" alt=""><p>(148-250)</p>
29
New cards

Lazy streams

In more generic terms, creating a stream instance might initialize expensive operations straight away (for example, open a file or a socket, initialize a connection to a database, and so on), even before we start to use such a stream. This might not be desirable if you are creating a large number of stream instances for later consumption.

In these cases, you might want to delay the expensive initialization until you need to consume data from the stream.

<p>In more generic terms, creating a stream instance might initialize expensive operations straight away (for example, open a file or a socket, initialize a connection to a database, and so on), even before we start to use such a stream. This might not be desirable if you are creating a large number of stream instances for later consumption.</p><p>In these cases, you might want to delay the expensive initialization until you need to consume data from the stream.</p>
30
New cards

Connecting streams using pipes - What does pipe() do for us under the hood?

Very intuitively, the pipe() method takes the data that is emitted from the readable stream and pumps it into the provided writable stream.

Also, the writable stream is ended automatically when the readable stream emits an end event (unless we specify {end: false} as options).

The pipe() method returns the writable stream passed in the first argument, allowing us to create chained invocations if such a stream is also Readable (such as a Duplex or Transform stream).

31
New cards

Piping two streams together - Suction

Piping two streams together will create suction, which allows the data to flow automatically to the writable stream, so there is no need to call read() or write(),

but most importantly, there is no need to control the backpressure anymore, because it’s automatically taken care of.

<p>Piping two streams together will create suction, which allows the data to flow automatically to the writable stream, so there is no need to call read() or write(), </p><p>but most importantly, there is no need to control the backpressure anymore, because it’s automatically taken care of.</p>
32
New cards

Pipe and error handling with pipeline()

Handling errors manually in a pipeline is not just cumbersome, but also error-prone—something we should avoid if we can!

Luckily, the core node:stream package offers us an excellent utility function that can make building pipelines a much safer and more enjoyable practice, which is the pipeline() helper function.

<p>Handling errors manually in a pipeline is not just cumbersome, but also error-prone—something we should avoid if we can!</p><p>Luckily, the core node:stream package offers us an excellent utility function that can make building pipelines a much safer and more enjoyable practice, which is the pipeline() helper function.</p>
33
New cards

Asynchronous control flow patterns with streams

Going through the examples that we have presented so far, it should be clear that streams can be useful not only to handle I/O, but also as an elegant programming pattern that can be used to process any kind of data. But the advantages do not end at its simple appearance; streams can also be leveraged to turn “asynchronous control flow” into “flow control,” as we will see in this section.

34
New cards
35
New cards

What’s sequential executio

36
New cards
37
New cards
38
New cards
39
New cards
40
New cards