# The Third Step Into the World of RxJS: Combining Streams in RxJS

RxJS is a powerful tool, but it can also be tricky. Many beginners who have mastered basic operators like map, filter, and perhaps even take, start feeling confident. Then they encounter tasks where they need to combine multiple streams simultaneously... and that's where it all begins to fall apart. Panic sets in. Should you use combineLatest, forkJoin, merge, zip? What do you do when data arrives at different speeds? This material is for those feeling lost at this stage. Let’s calmly and methodically figure it out together. Why Is This Important? Your first challenge with combination operators is that they seem to speak their own language. If you think of an Observable as a stream of events, then combination operators are the conductors orchestrating multiple streams, aligning them in sync, and producing a "new melody." Imagine a classic scenario: you're building a form in Angular, and you need to show a "Submit" button only when all fields are validated. Or you need to merge data from two APIs. These are typical use cases where combination operators become your main instrument. When to Use Which Operator? RxJS provides several tools to merge data from different sources. However, despite the abundance of articles on this topic, developers in real projects often misunderstand how to apply them correctly. Let’s examine the most popular combination operators with real-world examples. Imagine this scenario: Factory A produces components (e.g., every hour). This is one stream of data. Factory B sends requests for components as they are ready for assembly. This is a second stream of data. As operators, we need to organize the supply of components for assembly based on different conditions. Each RxJS operator dictates its own method for managing this process. 1. zip — "One to One" The zip operator enforces strict synchronization. We wait for Factory A to produce a component and simultaneously for Factory B to request that specific component. Only then is the component passed for assembly. No component will be sent until a pair is matched: "ready component + request for the component." When to Use: Multiple streams logically depend on one another. You need to ensure strict synchronization of streams in order. Production Example: Factory A produces components every 3 hours. Factory B sends requests for components every 2 hours. Outcome: Each pair of "component + request" is sent to assembly. If a component is produced before the request, it waits until a signal to dispatch it arrives. Code Example in TypeScript: import { interval, zip } from 'rxjs'; import { map, take } from 'rxjs/operators'; // Factory A (produces components every 3 seconds) const factoryA$ = interval(3000).pipe(map(i => `Component ${i + 1}`)); // Factory B (requests for components every 2 seconds) const factoryB$ = interval(2000).pipe(map(i => `Request for Component ${i + 1}`)); // Combine one component from A with one request from B zip(factoryA$, factoryB$).pipe(take(3)).subscribe(([component, request]) => { console.log(`${request} paired with ${component}`); }); // Output: // Request for Component 1 paired with Component 1 // Request for Component 2 paired with Component 2 // Request for Component 3 paired with Component 3 Performance of zip The Number of Streams Matters: zip can combine not just two but multiple streams. However, as the number of streams grows, performance can degrade because the operator has to monitor each stream and wait for values to align before producing a new combination. This is particularly noticeable when streams emit data at different rates or have delays. Data Buffering: If one stream produces values faster than another, zip will need to buffer these values (temporarily store them) until data from all streams is ready. This may require more memory when handling streams with uneven data flow. For instance, if one stream emits millions of items quickly, zip will buffer them until the corresponding data from the other streams arrives. Pitfalls When Using zip Sensitivity to Value Counts: If streams emit a significantly different number of items, the operator will only work up to the point where one of the streams runs out of values. Any additional items from other streams will not be processed after one stream completes. Compatibility with Infinite Streams: If all input streams are infinite (e.g., interval or UI events) and there’s no mechanism to terminate them, the subscriber will never complete. This can result in memory leaks or hanging processes. More Advanced Features of zip Using a Transformation Function: By default, zip returns an array of synchronized values, but you can specify a function to immediately shape the result into the desired format. Example: import { zip, of } from 'rxjs'; const stream1$ = of(10, 20, 30); const stream2$ = of(1, 2, 3); zip(stream1$, stream

Apr 19, 2025 - 15:35
 0
# The Third Step Into the World of RxJS: Combining Streams in RxJS

RxJS is a powerful tool, but it can also be tricky. Many beginners who have mastered basic operators like map, filter, and perhaps even take, start feeling confident. Then they encounter tasks where they need to combine multiple streams simultaneously... and that's where it all begins to fall apart. Panic sets in. Should you use combineLatest, forkJoin, merge, zip? What do you do when data arrives at different speeds? This material is for those feeling lost at this stage. Let’s calmly and methodically figure it out together.

Why Is This Important?

Your first challenge with combination operators is that they seem to speak their own language. If you think of an Observable as a stream of events, then combination operators are the conductors orchestrating multiple streams, aligning them in sync, and producing a "new melody."

Imagine a classic scenario: you're building a form in Angular, and you need to show a "Submit" button only when all fields are validated. Or you need to merge data from two APIs. These are typical use cases where combination operators become your main instrument.

When to Use Which Operator?

RxJS provides several tools to merge data from different sources. However, despite the abundance of articles on this topic, developers in real projects often misunderstand how to apply them correctly. Let’s examine the most popular combination operators with real-world examples.

Imagine this scenario:

  • Factory A produces components (e.g., every hour). This is one stream of data.
  • Factory B sends requests for components as they are ready for assembly. This is a second stream of data.
  • As operators, we need to organize the supply of components for assembly based on different conditions. Each RxJS operator dictates its own method for managing this process.

1. zip — "One to One"

The zip operator enforces strict synchronization. We wait for Factory A to produce a component and simultaneously for Factory B to request that specific component. Only then is the component passed for assembly. No component will be sent until a pair is matched: "ready component + request for the component."

When to Use:

  • Multiple streams logically depend on one another.
  • You need to ensure strict synchronization of streams in order.

Production Example:

  1. Factory A produces components every 3 hours.
  2. Factory B sends requests for components every 2 hours.

Outcome: Each pair of "component + request" is sent to assembly. If a component is produced before the request, it waits until a signal to dispatch it arrives.

Code Example in TypeScript:

import { interval, zip } from 'rxjs';
import { map, take } from 'rxjs/operators';

// Factory A (produces components every 3 seconds)
const factoryA$ = interval(3000).pipe(map(i => `Component ${i + 1}`));

// Factory B (requests for components every 2 seconds)
const factoryB$ = interval(2000).pipe(map(i => `Request for Component ${i + 1}`));

// Combine one component from A with one request from B
zip(factoryA$, factoryB$).pipe(take(3)).subscribe(([component, request]) => {
  console.log(`${request} paired with ${component}`);
});

// Output:
// Request for Component 1 paired with Component 1
// Request for Component 2 paired with Component 2
// Request for Component 3 paired with Component 3

Performance of zip

  1. The Number of Streams Matters:

    zip can combine not just two but multiple streams. However, as the number of streams grows, performance can degrade because the operator has to monitor each stream and wait for values to align before producing a new combination. This is particularly noticeable when streams emit data at different rates or have delays.

  2. Data Buffering:

    If one stream produces values faster than another, zip will need to buffer these values (temporarily store them) until data from all streams is ready. This may require more memory when handling streams with uneven data flow. For instance, if one stream emits millions of items quickly, zip will buffer them until the corresponding data from the other streams arrives.

Pitfalls When Using zip

  1. Sensitivity to Value Counts:

    If streams emit a significantly different number of items, the operator will only work up to the point where one of the streams runs out of values. Any additional items from other streams will not be processed after one stream completes.

  2. Compatibility with Infinite Streams:

    If all input streams are infinite (e.g., interval or UI events) and there’s no mechanism to terminate them, the subscriber will never complete. This can result in memory leaks or hanging processes.

More Advanced Features of zip

  1. Using a Transformation Function: By default, zip returns an array of synchronized values, but you can specify a function to immediately shape the result into the desired format. Example:
import { zip, of } from 'rxjs';

const stream1$ = of(10, 20, 30);
const stream2$ = of(1, 2, 3);

zip(stream1$, stream2$, (x, y) => x * y).subscribe(result => {
  console.log(result); // 10, 40, 90
});

// Output:
// 10
// 40
// 90

Recommendations for Use

  1. Add Protection Against Infinite Streams: Use limiting operators (take, takeUntil) to prevent application hangs.
  2. Monitor Stream Frequencies: If one stream is significantly "slower" than another, calculate delays properly to avoid inefficient waiting. In some cases, it might be better to replace zip with other operators like combineLatest.
  3. Consider Completion Behavior: Explicitly manage completion or handle cases where streams emit different amounts of items.

zip isn't just an operator that "stitches" streams together. It's a synchronization tool tailored for strictly ordered scenarios where the structure of the data between streams matters. However, keep in mind its sensitivity to completion, buffering, and potential delays. To effectively utilize zip, it’s crucial to understand each stream's behavior and account for their limitations.

2. combineLatest — "Working with What's Available"

combineLatest focuses on the current state of multiple streams. It emits a value only after all its sources have produced at least one value, and then updates anytime any of the streams emits new data.

combineLatest waits for the first value from all sources before starting to work. Then, it emits a new combination of data whenever any of the streams produces updated data. This means that for every new request from Factory B, the current latest component from Factory A is used immediately.

When to Use:

  • You need to track the state of multiple sources simultaneously.
  • Tasks related to user input or application states.

Output: As soon as there’s a new update from B (a request), the current state of a component from A is taken and dispatched immediately.

TypeScript Code Example:

import { interval, combineLatest } from 'rxjs';
import { map, take } from 'rxjs/operators';

// Factory A (components every 3 seconds)
const factoryA$ = interval(3000).pipe(map(i => `Component ${i + 1}`));

// Factory B (requests every 2 seconds)
const factoryB$ = interval(2000).pipe(map(i => `Request for Component ${i + 1}`));

// Each new request uses the latest state of components
combineLatest([factoryA$, factoryB$]).pipe(take(4)).subscribe(([component, request]) => {
  console.log(`${request} paired with ${component}`);
});

// Output:
// Request for Component 1 paired with Component 1
// Request for Component 2 paired with Component 1
// Request for Component 3 paired with Component 2
// Request for Component 4 paired with Component 2

Performance of combineLatest

  1. Buffering and Memory:

    combineLatest keeps the latest value from each source in memory. In most cases, this doesn’t impose serious constraints, but for streams emitting large objects (e.g., arrays or JSON data), it might impact performance.

  2. Handling "Noisy" Streams:

    If one stream emits data too frequently (e.g., a mouse movement stream), combineLatest will frequently recalculate the result, even if the other stream remains unchanged.

    Recommendation: For noisy streams, use operators like throttleTime or debounceTime to suppress excess events and avoid processing overload.

Pitfalls of combineLatest

  1. Unpredictable Behavior with Missing Values:

    If at least one of the streams hasn’t emitted a value, combineLatest will not publish anything. This means that a stream that always "stays silent" can block the entire operation. This can be unexpected for beginners.

  2. Performance with a High Number of Streams:

    combineLatest can work with multiple streams, but performance drops significantly when trying to combine dozens of them. The reason is that, with each change, all combinations must be recalculated and reassembled.

  3. Unexpected Failures in Real Applications:

    In applications where data comes from external APIs, combineLatest can stop working unexpectedly if one API source suddenly goes silent. For example, if a service returns an HTTP error, that stream completes, making the combined stream useless.
    Solution: Add error handling using operators like catchError.

Hidden Features of combineLatest

Using Custom Transform Functions:

Instead of receiving an array of values, you can transform the result directly using a custom function.

Example:

import { combineLatest, of } from 'rxjs';

const width$ = of(100);
const height$ = of(200);

combineLatest([width$, height$], (width, height) => width * height).subscribe(area =>
  console.log(`Area: ${area}`)
);

// Output:
// Area: 20000

combineLatest is an incredibly useful operator for working with data streams where the current state of all sources is essential. However, its use requires careful attention: pitfalls such as delays, inability to emit when a stream remains silent, or performance limitations can unexpectedly lead to bugs. To avoid such issues, always assess the emission frequencies of streams, their completion, and start with initial values (startWith). This will make your applications reliable and predictable.

3. forkJoin — "All at Once"

forkJoin takes multiple streams, waits for all of them to complete, and then returns a single final value. If even one stream doesn’t emit or doesn’t complete, no result will be emitted.

forkJoin waits for ALL streams to complete and only then provides their latest results as a final list. In production terms, this means we’ll wait for Factory A to finish producing ALL components and for Factory B to send all the requests before processing everything at once.

When to Use:

  • You need a single result from multiple sources.
  • Ideal for API requests, when you need to wait for all responses.

Production Example:

  1. Factory A finishes producing all components.
  2. Factory B sends all requests.

Output: Processing begins only after both factories complete their operations.

TypeScript Code Example:

import { forkJoin, interval } from 'rxjs';
import { delay, take, map } from 'rxjs/operators';

// Factory A (component production)
const factoryA$ = interval(3000).pipe(map(i => i + 1), take(3));

// Factory B (requests for components)
const factoryB$ = interval(2000).pipe(map(i => i + 1), take(2));

// Start processing only after both streams complete
forkJoin([factoryA$, factoryB$]).subscribe(([details, requests]) => {
  console.log(`Processed data: ${details} components ready for ${requests} requests`);
});

// Output:
// Processed data: 3 components ready for 2 requests

Conclusion:

forkJoin is perfectly suited for tasks where only completed data matters. For instance, it’s great for batch processing after preparation.

Performance and Optimization

  1. Issue with Large Streams: If one of the attached streams generates a large amount of data before completing, it can place significant load on memory since each stream allocation requires resources until completion.

Pitfalls of forkJoin

  1. Not Suitable for Infinite Streams:

    If any of the streams in the set are infinite (e.g., fromEvent, interval), forkJoin will never emit anything. This is not an error but can be an unexpected behavior.

  2. Challenges with Error Handling:

    One major challenge is that if any stream throws an error (error), the results of all other streams are lost. This might be undesirable in scenarios where partial success is significant.

    Hidden Features of forkJoin

  3. Using an Object Instead of an Array:

    forkJoin supports object syntax, which can be convenient for working with REST APIs or associative structures.

import { forkJoin, of } from 'rxjs';
   import { delay } from 'rxjs/operators';

   // Simulating three data sources as objects:
   const user$ = of({ id: 1, name: 'Ivan' }).pipe(delay(1000));
   const orders$ = of([{ id: 101, total: 300 }, { id: 102, total: 450 }]).pipe(delay(1500));
   const notifications$ = of(['Notification 1', 'Notification 2']).pipe(delay(2000));

   // Using an object instead of an array:
   forkJoin({
     user: user$,
     orders: orders$,
     notifications: notifications$,
   }).subscribe(result => {
     console.log('Merged data:', result);
   });
   // Output:
   // Merged data:
   // {
   //   user: { id: 1, name: 'Ivan' },
   //   orders: [{ id: 101, total: 300 }, { id: 102, total: 450 }],
   //   notifications: ['Notification 1', 'Notification 2']
   // }

Working with objects instead of arrays makes the code more readable, especially in scenarios involving different types of data (e.g., responses from various APIs). Instead of referencing values by array indexes (result[0], result[1]), you can refer to the object keys (result.user, result.orders).

forkJoin is an excellent tool for managing parallel processing and combining results. However, its strict requirement for stream completion imposes some limitations. To avoid unexpected errors, always consider stream completion, use error handling (catchError), and add constraints (take, timeout). This approach will allow you to use forkJoin reliably and effectively in most scenarios.

4. merge — "Send Everything Immediately"

merge combines streams but emits items as soon as they arrive. Unlike combineLatest, it doesn’t wait for all sources.

merge simply merges all events from the streams into a single flow and emits them in the order they are received. In production, this means that as soon as a component is ready or a request arrives, it is immediately processed, without concern for synchronization.

When to Use:

  • You need to combine events from multiple sources, but the order of their appearance isn’t important.

Production Example:

  1. Factory A produces components every 3 hours.
  2. Factory B sends requests every 2 hours.

Output: Any event from A or B is processed immediately.

TypeScript Example:

import { interval, merge } from 'rxjs';
import { map, take } from 'rxjs/operators';

// Factory A (producing components)
const factoryA$ = interval(3000).pipe(map(i => `Component ${i + 1} is ready`), take(3));

// Factory B (requests for components)
const factoryB$ = interval(2000).pipe(map(i => `Request for Component ${i + 1}`), take(2));

// All events are processed in order of arrival
merge(factoryA$, factoryB$).subscribe(event => {
  console.log(event);
});

// Output (example):
// Request for Component 1
// Component 1 is ready
// Request for Component 2
// Component 2 is ready
// Component 3 is ready

Working Features

  1. Simultaneous Merging of Multiple Streams:

    Unlike zip or forkJoin, merge does not depend on the sequence or frequency of data emissions from different streams. It simply publishes values as soon as they arrive.

  2. Does Not Wait for All Streams to Complete:

    merge continues to operate as long as at least one source stream remains active. The resulting stream completes only when all input Observables have completed.

  3. Stream Competition:

    If values from multiple streams are emitted at the same time, they will be processed in the order they actually arrive (by timestamp).

  4. Operator Parameters:

    • concurrent (default is Number.POSITIVE_INFINITY): This parameter determines the maximum number of streams that can be processed simultaneously. If the specified number is exceeded, the remaining Observables will wait for the currently active ones to complete.

Pitfalls

  1. Overload When Working with Infinite Streams:

    If merge is merging infinite streams (like interval, fromEvent) that generate data too frequently, it can cause processing overload. This is especially critical when multiple such "noisy" streams are merged.

    Solution: Use event frequency limiting operators like throttleTime, debounceTime, or manually set the concurrent parameter to reduce the load.

  2. Premature Stream Closure:

    If all input streams complete but one of them throws an error (error), merge will terminate with an error, and the data from other completed streams will be lost.

    Solution: Use catchError to handle errors in each stream independently without terminating the main stream.

  3. Complexity in Handling Concurrent Streams:

    With a large number of streams operating simultaneously, the order in which merge outputs the data may become unpredictable. While logical, this behavior can cause confusion if you expected the values to appear in the same order as with concat.

Interesting Aspects of merge

  1. Handling the Processing Order Using concurrent: The concurrent option allows you to limit the number of streams processed simultaneously. This is useful when combining resource-heavy processes like HTTP requests. Example:
import { merge, of } from 'rxjs';
   import { delay } from 'rxjs/operators';

   const stream1$ = of('A').pipe(delay(1000));
   const stream2$ = of('B').pipe(delay(1500));
   const stream3$ = of('C').pipe(delay(500));

   merge(stream1$, stream2$, stream3$, 2).subscribe(console.log);

   // Output:
   // A
   // C
   // B

Explanation: Here, only two streams will be processed simultaneously. As soon as one finishes, the next stream starts, maintaining the queue.

Performance

  1. Parallel Performance:

    merge excels at performance because it processes data from all sources in parallel. This makes it an ideal choice when there are no constraints on the order of event processing.

  2. Resource Intensity of Streams:

    Streams generating a large quantity of data can strain memory and CPU because each stream operates independently. It’s important to consider the concurrent parameter to manage simultaneous execution.

  3. Sensitivity to Event Frequency:

    If streams are "noisy" (e.g., a stream generating mouse movement events), the resulting merged stream may become overloaded. Therefore, it is important to use limiting operators to optimize the load.

The merge operator is simple and versatile yet packed with powerful functionality. It is perfectly suited for tasks requiring parallel data processing, but it demands careful resource management when dealing with frequent or infinite streams. Use it when event order doesn’t matter, and always optimize load by limiting event frequency and the number of simultaneously processed streams (concurrent).

4. concat — "First Come, First Served"

The concat operator in RxJS offers a simple and intuitive way to process multiple streams sequentially, one after another. Unlike merge, which works in parallel, concat processes the next stream only after the previous stream has completed. It can be thought of as a "queue" of Observables.

Factory B will only start sending requests after the production of components in Factory A is fully completed.

When to Use:

  • When you need to ensure the sequential execution of streams without risking data overlap.
  • For tasks where data logically depends on one another (e.g., step-by-step processing, sequential API requests).
  • When simultaneous data processing (like in merge) is unnecessary or unacceptable.

Production Example:

  1. Factory A produces 3 components (each taking 3 hours to complete).
  2. Factory B starts sending requests only after Factory A has finished production.

TypeScript Code Example:

import { interval, concat } from 'rxjs';
import { map, take } from 'rxjs/operators';

// Factory A (production of components)
const factoryA$ = interval(3000).pipe(
    map(i => `Component ${i + 1} is ready`),
    take(3) // Produces 3 components
);

// Factory B (processing requests for components)
const factoryB$ = interval(2000).pipe(
    map(i => `Request for component ${i + 1} processed`),
    take(2) // Processes 2 requests
);

// Run production and requests sequentially
concat(factoryA$, factoryB$).subscribe(event => {
    console.log(event);
});

// Expected Output (approximate, considering timing):
// After 3 seconds: Component 1 is ready
// After 6 seconds: Component 2 is ready
// After 9 seconds: Component 3 is ready
// After 11 seconds: Request for component 1 processed
// After 13 seconds: Request for component 2 processed

Working Features

  1. Strict Sequence:

    The main feature of concat is the strict sequential order of processing streams. First, data from the first Observable is processed, then, after it completes, the next stream is activated, and so on.

  2. Waits for Completion:

    Each stream must complete (complete) before the operator moves to the next one. This makes concat particularly useful for creating chains of sequential operations.

  3. Blocking by Infinite Streams:

    If there’s an infinite stream (like interval or fromEvent), it will block the execution of the rest of the chain since it will never complete.

  4. No Limit on Number of Streams:

    concat can combine any number of Observables, provided either through an array, manually, or by using the arguments operator.

Pitfalls

  1. Using Infinite Streams:

    If one of the Observables in concat generates infinite data (interval, fromEvent, etc.), the next stream will never be triggered because the current one won’t complete.

    Solution: Use limiting operators like take or takeUntil to ensure the infinite stream eventually completes.

  2. Error Handling:

    If one of the Observables throws an error (error), the entire concat chain will terminate with an error, and no subsequent streams will be executed.

    Solution: Handle errors locally using catchError.

  3. Complexity in Handling Competition:

    While concat ensures strict order of execution, if you mismanage resource usage or processing delays, infinite streams and unexpected competition between sources may occur. For such cases, other operators like merge or additional custom handling might better suit your needs.

The concat operator is an excellent choice when you need to process streams sequentially and ensure reliable order of execution. Despite its simplicity, it is particularly powerful in scenarios where order and completion are critical. To avoid pitfalls like infinite streams or early termination on errors, always control stream completion and implement proper error handling for a smooth and predictable sequence.

Interesting Aspects

Optimizing Sequential Data Loading:

concat is often used to manage a chain of dependent requests. For example, first obtaining an authorization token and then using it to make API requests.

Example:

import { of, concat } from 'rxjs';
import { delay } from 'rxjs/operators';

const authToken$ = of('Token received').pipe(delay(500));
const fetchData$ = of('Server data loaded').pipe(delay(1000));

concat(authToken$, fetchData$).subscribe(console.log);

// Output:
// Token received
// Server data loaded

Performance

  1. Clear Sequential Efficiency:

    concat places no extra load on the scheduler as it only manages values from one active stream at any given time.

  2. Delays with Large Streams:

    Performance can decrease if the initial streams contain a large amount of data or take too long to execute, as subsequent Observables must wait for their completion. This is especially important to consider when working with streams in UI components.

The concat operator is a powerful tool for sequentially working with streams. It is especially useful if the order of emissions is critically important or if one task must be fully completed before proceeding to the next. However, when using it, it is essential to consider stream completion constraints, proper error handling, and potential delays caused by large or slow streams. Proper use of concat makes working with sequential data intuitive and manageable.

Operator Features
combineLatest Combines the latest values from all streams. Waits for all streams to emit at least one value.
zip Groups values from streams together (one value from each).
concat Processes streams sequentially (one starts only after the previous one completes).
forkJoin Waits for all streams to complete, then returns the results.
merge Processes data in parallel without waiting for stream completion.

Conclusions

We’ve covered the foundational operators and succeeded in our first experiments with RxJS. Ahead lie more complex but fascinating challenges where your knowledge of combination operators will become an essential skill. The foundation is already set: we’ve explored how to use zip and other operators for data synchronization. But knowledge is only half the battle. True understanding comes when you start applying these tools in real-world projects.

Think about all the complex scenarios where data streams may depend on each other: forms with multiple validations, interactions between API requests, or even time management in games. By understanding how to "orchestrate" streams in RxJS, you won’t just solve problems—you’ll discover entirely new, elegant ways to approach them. And this is just the beginning.

Every step you take in learning RxJS brings you closer to mastering asynchronous programming. Don’t hesitate to experiment and ask questions. It’s much easier to learn from small examples than to tackle large projects where every stream feels like a mini-puzzle.

Now it’s time to set new goals, explore different operators, and find what works best for your scenarios. RxJS may seem complex at first, but with every new challenge, it will reveal itself as a powerful tool. Let your journey through stream combination lead you confidently toward true expertise.

The next step awaits—keep going and continue to unlock the world of reactive programming. Good luck!