# The Third Step Into the World of RxJS: Combining Streams in RxJS
RxJS is a powerful tool, but it can also be tricky. Many beginners who have mastered basic operators like map, filter, and perhaps even take, start feeling confident. Then they encounter tasks where they need to combine multiple streams simultaneously... and that's where it all begins to fall apart. Panic sets in. Should you use combineLatest, forkJoin, merge, zip? What do you do when data arrives at different speeds? This material is for those feeling lost at this stage. Let’s calmly and methodically figure it out together. Why Is This Important? Your first challenge with combination operators is that they seem to speak their own language. If you think of an Observable as a stream of events, then combination operators are the conductors orchestrating multiple streams, aligning them in sync, and producing a "new melody." Imagine a classic scenario: you're building a form in Angular, and you need to show a "Submit" button only when all fields are validated. Or you need to merge data from two APIs. These are typical use cases where combination operators become your main instrument. When to Use Which Operator? RxJS provides several tools to merge data from different sources. However, despite the abundance of articles on this topic, developers in real projects often misunderstand how to apply them correctly. Let’s examine the most popular combination operators with real-world examples. Imagine this scenario: Factory A produces components (e.g., every hour). This is one stream of data. Factory B sends requests for components as they are ready for assembly. This is a second stream of data. As operators, we need to organize the supply of components for assembly based on different conditions. Each RxJS operator dictates its own method for managing this process. 1. zip — "One to One" The zip operator enforces strict synchronization. We wait for Factory A to produce a component and simultaneously for Factory B to request that specific component. Only then is the component passed for assembly. No component will be sent until a pair is matched: "ready component + request for the component." When to Use: Multiple streams logically depend on one another. You need to ensure strict synchronization of streams in order. Production Example: Factory A produces components every 3 hours. Factory B sends requests for components every 2 hours. Outcome: Each pair of "component + request" is sent to assembly. If a component is produced before the request, it waits until a signal to dispatch it arrives. Code Example in TypeScript: import { interval, zip } from 'rxjs'; import { map, take } from 'rxjs/operators'; // Factory A (produces components every 3 seconds) const factoryA$ = interval(3000).pipe(map(i => `Component ${i + 1}`)); // Factory B (requests for components every 2 seconds) const factoryB$ = interval(2000).pipe(map(i => `Request for Component ${i + 1}`)); // Combine one component from A with one request from B zip(factoryA$, factoryB$).pipe(take(3)).subscribe(([component, request]) => { console.log(`${request} paired with ${component}`); }); // Output: // Request for Component 1 paired with Component 1 // Request for Component 2 paired with Component 2 // Request for Component 3 paired with Component 3 Performance of zip The Number of Streams Matters: zip can combine not just two but multiple streams. However, as the number of streams grows, performance can degrade because the operator has to monitor each stream and wait for values to align before producing a new combination. This is particularly noticeable when streams emit data at different rates or have delays. Data Buffering: If one stream produces values faster than another, zip will need to buffer these values (temporarily store them) until data from all streams is ready. This may require more memory when handling streams with uneven data flow. For instance, if one stream emits millions of items quickly, zip will buffer them until the corresponding data from the other streams arrives. Pitfalls When Using zip Sensitivity to Value Counts: If streams emit a significantly different number of items, the operator will only work up to the point where one of the streams runs out of values. Any additional items from other streams will not be processed after one stream completes. Compatibility with Infinite Streams: If all input streams are infinite (e.g., interval or UI events) and there’s no mechanism to terminate them, the subscriber will never complete. This can result in memory leaks or hanging processes. More Advanced Features of zip Using a Transformation Function: By default, zip returns an array of synchronized values, but you can specify a function to immediately shape the result into the desired format. Example: import { zip, of } from 'rxjs'; const stream1$ = of(10, 20, 30); const stream2$ = of(1, 2, 3); zip(stream1$, stream

RxJS is a powerful tool, but it can also be tricky. Many beginners who have mastered basic operators like map
, filter
, and perhaps even take
, start feeling confident. Then they encounter tasks where they need to combine multiple streams simultaneously... and that's where it all begins to fall apart. Panic sets in. Should you use combineLatest
, forkJoin
, merge
, zip
? What do you do when data arrives at different speeds? This material is for those feeling lost at this stage. Let’s calmly and methodically figure it out together.
Why Is This Important?
Your first challenge with combination operators is that they seem to speak their own language. If you think of an Observable as a stream of events, then combination operators are the conductors orchestrating multiple streams, aligning them in sync, and producing a "new melody."
Imagine a classic scenario: you're building a form in Angular, and you need to show a "Submit" button only when all fields are validated. Or you need to merge data from two APIs. These are typical use cases where combination operators become your main instrument.
When to Use Which Operator?
RxJS provides several tools to merge data from different sources. However, despite the abundance of articles on this topic, developers in real projects often misunderstand how to apply them correctly. Let’s examine the most popular combination operators with real-world examples.
Imagine this scenario:
- Factory A produces components (e.g., every hour). This is one stream of data.
- Factory B sends requests for components as they are ready for assembly. This is a second stream of data.
- As operators, we need to organize the supply of components for assembly based on different conditions. Each RxJS operator dictates its own method for managing this process.
1. zip — "One to One"
The zip
operator enforces strict synchronization. We wait for Factory A to produce a component and simultaneously for Factory B to request that specific component. Only then is the component passed for assembly. No component will be sent until a pair is matched: "ready component + request for the component."
When to Use:
- Multiple streams logically depend on one another.
- You need to ensure strict synchronization of streams in order.
Production Example:
- Factory A produces components every 3 hours.
- Factory B sends requests for components every 2 hours.
Outcome: Each pair of "component + request" is sent to assembly. If a component is produced before the request, it waits until a signal to dispatch it arrives.
Code Example in TypeScript:
import { interval, zip } from 'rxjs';
import { map, take } from 'rxjs/operators';
// Factory A (produces components every 3 seconds)
const factoryA$ = interval(3000).pipe(map(i => `Component ${i + 1}`));
// Factory B (requests for components every 2 seconds)
const factoryB$ = interval(2000).pipe(map(i => `Request for Component ${i + 1}`));
// Combine one component from A with one request from B
zip(factoryA$, factoryB$).pipe(take(3)).subscribe(([component, request]) => {
console.log(`${request} paired with ${component}`);
});
// Output:
// Request for Component 1 paired with Component 1
// Request for Component 2 paired with Component 2
// Request for Component 3 paired with Component 3
Performance of zip
The Number of Streams Matters:
zip
can combine not just two but multiple streams. However, as the number of streams grows, performance can degrade because the operator has to monitor each stream and wait for values to align before producing a new combination. This is particularly noticeable when streams emit data at different rates or have delays.Data Buffering:
If one stream produces values faster than another,zip
will need to buffer these values (temporarily store them) until data from all streams is ready. This may require more memory when handling streams with uneven data flow. For instance, if one stream emits millions of items quickly,zip
will buffer them until the corresponding data from the other streams arrives.
Pitfalls When Using zip
Sensitivity to Value Counts:
If streams emit a significantly different number of items, the operator will only work up to the point where one of the streams runs out of values. Any additional items from other streams will not be processed after one stream completes.Compatibility with Infinite Streams:
If all input streams are infinite (e.g.,interval
or UI events) and there’s no mechanism to terminate them, the subscriber will never complete. This can result in memory leaks or hanging processes.
More Advanced Features of zip
-
Using a Transformation Function:
By default,
zip
returns an array of synchronized values, but you can specify a function to immediately shape the result into the desired format. Example:
import { zip, of } from 'rxjs';
const stream1$ = of(10, 20, 30);
const stream2$ = of(1, 2, 3);
zip(stream1$, stream2$, (x, y) => x * y).subscribe(result => {
console.log(result); // 10, 40, 90
});
// Output:
// 10
// 40
// 90
Recommendations for Use
-
Add Protection Against Infinite Streams:
Use limiting operators (
take
,takeUntil
) to prevent application hangs. -
Monitor Stream Frequencies:
If one stream is significantly "slower" than another, calculate delays properly to avoid inefficient waiting. In some cases, it might be better to replace
zip
with other operators likecombineLatest
. - Consider Completion Behavior: Explicitly manage completion or handle cases where streams emit different amounts of items.
zip
isn't just an operator that "stitches" streams together. It's a synchronization tool tailored for strictly ordered scenarios where the structure of the data between streams matters. However, keep in mind its sensitivity to completion, buffering, and potential delays. To effectively utilizezip
, it’s crucial to understand each stream's behavior and account for their limitations.
2. combineLatest — "Working with What's Available"
combineLatest
focuses on the current state of multiple streams. It emits a value only after all its sources have produced at least one value, and then updates anytime any of the streams emits new data.
combineLatest
waits for the first value from all sources before starting to work. Then, it emits a new combination of data whenever any of the streams produces updated data. This means that for every new request from Factory B, the current latest component from Factory A is used immediately.
When to Use:
- You need to track the state of multiple sources simultaneously.
- Tasks related to user input or application states.
Output: As soon as there’s a new update from B (a request), the current state of a component from A is taken and dispatched immediately.
TypeScript Code Example:
import { interval, combineLatest } from 'rxjs';
import { map, take } from 'rxjs/operators';
// Factory A (components every 3 seconds)
const factoryA$ = interval(3000).pipe(map(i => `Component ${i + 1}`));
// Factory B (requests every 2 seconds)
const factoryB$ = interval(2000).pipe(map(i => `Request for Component ${i + 1}`));
// Each new request uses the latest state of components
combineLatest([factoryA$, factoryB$]).pipe(take(4)).subscribe(([component, request]) => {
console.log(`${request} paired with ${component}`);
});
// Output:
// Request for Component 1 paired with Component 1
// Request for Component 2 paired with Component 1
// Request for Component 3 paired with Component 2
// Request for Component 4 paired with Component 2
Performance of combineLatest
Buffering and Memory:
combineLatest
keeps the latest value from each source in memory. In most cases, this doesn’t impose serious constraints, but for streams emitting large objects (e.g., arrays or JSON data), it might impact performance.Handling "Noisy" Streams:
If one stream emits data too frequently (e.g., a mouse movement stream),combineLatest
will frequently recalculate the result, even if the other stream remains unchanged.
Recommendation: For noisy streams, use operators likethrottleTime
ordebounceTime
to suppress excess events and avoid processing overload.
Pitfalls of combineLatest
Unpredictable Behavior with Missing Values:
If at least one of the streams hasn’t emitted a value,combineLatest
will not publish anything. This means that a stream that always "stays silent" can block the entire operation. This can be unexpected for beginners.Performance with a High Number of Streams:
combineLatest
can work with multiple streams, but performance drops significantly when trying to combine dozens of them. The reason is that, with each change, all combinations must be recalculated and reassembled.Unexpected Failures in Real Applications:
In applications where data comes from external APIs,combineLatest
can stop working unexpectedly if one API source suddenly goes silent. For example, if a service returns an HTTP error, that stream completes, making the combined stream useless.
Solution: Add error handling using operators likecatchError
.
Hidden Features of combineLatest
Using Custom Transform Functions:
Instead of receiving an array of values, you can transform the result directly using a custom function.
Example:
import { combineLatest, of } from 'rxjs';
const width$ = of(100);
const height$ = of(200);
combineLatest([width$, height$], (width, height) => width * height).subscribe(area =>
console.log(`Area: ${area}`)
);
// Output:
// Area: 20000
combineLatest
is an incredibly useful operator for working with data streams where the current state of all sources is essential. However, its use requires careful attention: pitfalls such as delays, inability to emit when a stream remains silent, or performance limitations can unexpectedly lead to bugs. To avoid such issues, always assess the emission frequencies of streams, their completion, and start with initial values (startWith
). This will make your applications reliable and predictable.
3. forkJoin — "All at Once"
forkJoin
takes multiple streams, waits for all of them to complete, and then returns a single final value. If even one stream doesn’t emit or doesn’t complete, no result will be emitted.
forkJoin
waits for ALL streams to complete and only then provides their latest results as a final list. In production terms, this means we’ll wait for Factory A to finish producing ALL components and for Factory B to send all the requests before processing everything at once.
When to Use:
- You need a single result from multiple sources.
- Ideal for API requests, when you need to wait for all responses.
Production Example:
- Factory A finishes producing all components.
- Factory B sends all requests.
Output: Processing begins only after both factories complete their operations.
TypeScript Code Example:
import { forkJoin, interval } from 'rxjs';
import { delay, take, map } from 'rxjs/operators';
// Factory A (component production)
const factoryA$ = interval(3000).pipe(map(i => i + 1), take(3));
// Factory B (requests for components)
const factoryB$ = interval(2000).pipe(map(i => i + 1), take(2));
// Start processing only after both streams complete
forkJoin([factoryA$, factoryB$]).subscribe(([details, requests]) => {
console.log(`Processed data: ${details} components ready for ${requests} requests`);
});
// Output:
// Processed data: 3 components ready for 2 requests
Conclusion:
forkJoin
is perfectly suited for tasks where only completed data matters. For instance, it’s great for batch processing after preparation.
Performance and Optimization
- Issue with Large Streams: If one of the attached streams generates a large amount of data before completing, it can place significant load on memory since each stream allocation requires resources until completion.
Pitfalls of forkJoin
Not Suitable for Infinite Streams:
If any of the streams in the set are infinite (e.g.,fromEvent
,interval
),forkJoin
will never emit anything. This is not an error but can be an unexpected behavior.-
Challenges with Error Handling:
One major challenge is that if any stream throws an error (error
), the results of all other streams are lost. This might be undesirable in scenarios where partial success is significant.Hidden Features of
forkJoin
Using an Object Instead of an Array:
forkJoin
supports object syntax, which can be convenient for working with REST APIs or associative structures.
import { forkJoin, of } from 'rxjs';
import { delay } from 'rxjs/operators';
// Simulating three data sources as objects:
const user$ = of({ id: 1, name: 'Ivan' }).pipe(delay(1000));
const orders$ = of([{ id: 101, total: 300 }, { id: 102, total: 450 }]).pipe(delay(1500));
const notifications$ = of(['Notification 1', 'Notification 2']).pipe(delay(2000));
// Using an object instead of an array:
forkJoin({
user: user$,
orders: orders$,
notifications: notifications$,
}).subscribe(result => {
console.log('Merged data:', result);
});
// Output:
// Merged data:
// {
// user: { id: 1, name: 'Ivan' },
// orders: [{ id: 101, total: 300 }, { id: 102, total: 450 }],
// notifications: ['Notification 1', 'Notification 2']
// }
Working with objects instead of arrays makes the code more readable, especially in scenarios involving different types of data (e.g., responses from various APIs). Instead of referencing values by array indexes (result[0]
, result[1]
), you can refer to the object keys (result.user
, result.orders
).
forkJoin
is an excellent tool for managing parallel processing and combining results. However, its strict requirement for stream completion imposes some limitations. To avoid unexpected errors, always consider stream completion, use error handling (catchError
), and add constraints (take
,timeout
). This approach will allow you to useforkJoin
reliably and effectively in most scenarios.
4. merge — "Send Everything Immediately"
merge
combines streams but emits items as soon as they arrive. Unlike combineLatest
, it doesn’t wait for all sources.
merge
simply merges all events from the streams into a single flow and emits them in the order they are received. In production, this means that as soon as a component is ready or a request arrives, it is immediately processed, without concern for synchronization.
When to Use:
- You need to combine events from multiple sources, but the order of their appearance isn’t important.
Production Example:
- Factory A produces components every 3 hours.
- Factory B sends requests every 2 hours.
Output: Any event from A or B is processed immediately.
TypeScript Example:
import { interval, merge } from 'rxjs';
import { map, take } from 'rxjs/operators';
// Factory A (producing components)
const factoryA$ = interval(3000).pipe(map(i => `Component ${i + 1} is ready`), take(3));
// Factory B (requests for components)
const factoryB$ = interval(2000).pipe(map(i => `Request for Component ${i + 1}`), take(2));
// All events are processed in order of arrival
merge(factoryA$, factoryB$).subscribe(event => {
console.log(event);
});
// Output (example):
// Request for Component 1
// Component 1 is ready
// Request for Component 2
// Component 2 is ready
// Component 3 is ready
Working Features
Simultaneous Merging of Multiple Streams:
Unlikezip
orforkJoin
,merge
does not depend on the sequence or frequency of data emissions from different streams. It simply publishes values as soon as they arrive.Does Not Wait for All Streams to Complete:
merge
continues to operate as long as at least one source stream remains active. The resulting stream completes only when all input Observables have completed.Stream Competition:
If values from multiple streams are emitted at the same time, they will be processed in the order they actually arrive (by timestamp).-
Operator Parameters:
-
concurrent (default is
Number.POSITIVE_INFINITY
): This parameter determines the maximum number of streams that can be processed simultaneously. If the specified number is exceeded, the remaining Observables will wait for the currently active ones to complete.
-
concurrent (default is
Pitfalls
Overload When Working with Infinite Streams:
Ifmerge
is merging infinite streams (likeinterval
,fromEvent
) that generate data too frequently, it can cause processing overload. This is especially critical when multiple such "noisy" streams are merged.
Solution: Use event frequency limiting operators likethrottleTime
,debounceTime
, or manually set theconcurrent
parameter to reduce the load.Premature Stream Closure:
If all input streams complete but one of them throws an error (error
),merge
will terminate with an error, and the data from other completed streams will be lost.
Solution: UsecatchError
to handle errors in each stream independently without terminating the main stream.Complexity in Handling Concurrent Streams:
With a large number of streams operating simultaneously, the order in whichmerge
outputs the data may become unpredictable. While logical, this behavior can cause confusion if you expected the values to appear in the same order as withconcat
.
Interesting Aspects of merge
-
Handling the Processing Order Using
concurrent
: Theconcurrent
option allows you to limit the number of streams processed simultaneously. This is useful when combining resource-heavy processes like HTTP requests. Example:
import { merge, of } from 'rxjs';
import { delay } from 'rxjs/operators';
const stream1$ = of('A').pipe(delay(1000));
const stream2$ = of('B').pipe(delay(1500));
const stream3$ = of('C').pipe(delay(500));
merge(stream1$, stream2$, stream3$, 2).subscribe(console.log);
// Output:
// A
// C
// B
Explanation: Here, only two streams will be processed simultaneously. As soon as one finishes, the next stream starts, maintaining the queue.
Performance
Parallel Performance:
merge
excels at performance because it processes data from all sources in parallel. This makes it an ideal choice when there are no constraints on the order of event processing.Resource Intensity of Streams:
Streams generating a large quantity of data can strain memory and CPU because each stream operates independently. It’s important to consider theconcurrent
parameter to manage simultaneous execution.Sensitivity to Event Frequency:
If streams are "noisy" (e.g., a stream generating mouse movement events), the resulting merged stream may become overloaded. Therefore, it is important to use limiting operators to optimize the load.
The
merge
operator is simple and versatile yet packed with powerful functionality. It is perfectly suited for tasks requiring parallel data processing, but it demands careful resource management when dealing with frequent or infinite streams. Use it when event order doesn’t matter, and always optimize load by limiting event frequency and the number of simultaneously processed streams (concurrent
).
4. concat — "First Come, First Served"
The concat
operator in RxJS offers a simple and intuitive way to process multiple streams sequentially, one after another. Unlike merge
, which works in parallel, concat
processes the next stream only after the previous stream has completed. It can be thought of as a "queue" of Observables.
Factory B will only start sending requests after the production of components in Factory A is fully completed.
When to Use:
- When you need to ensure the sequential execution of streams without risking data overlap.
- For tasks where data logically depends on one another (e.g., step-by-step processing, sequential API requests).
- When simultaneous data processing (like in
merge
) is unnecessary or unacceptable.
Production Example:
- Factory A produces 3 components (each taking 3 hours to complete).
- Factory B starts sending requests only after Factory A has finished production.
TypeScript Code Example:
import { interval, concat } from 'rxjs';
import { map, take } from 'rxjs/operators';
// Factory A (production of components)
const factoryA$ = interval(3000).pipe(
map(i => `Component ${i + 1} is ready`),
take(3) // Produces 3 components
);
// Factory B (processing requests for components)
const factoryB$ = interval(2000).pipe(
map(i => `Request for component ${i + 1} processed`),
take(2) // Processes 2 requests
);
// Run production and requests sequentially
concat(factoryA$, factoryB$).subscribe(event => {
console.log(event);
});
// Expected Output (approximate, considering timing):
// After 3 seconds: Component 1 is ready
// After 6 seconds: Component 2 is ready
// After 9 seconds: Component 3 is ready
// After 11 seconds: Request for component 1 processed
// After 13 seconds: Request for component 2 processed
Working Features
Strict Sequence:
The main feature ofconcat
is the strict sequential order of processing streams. First, data from the first Observable is processed, then, after it completes, the next stream is activated, and so on.Waits for Completion:
Each stream must complete (complete
) before the operator moves to the next one. This makesconcat
particularly useful for creating chains of sequential operations.Blocking by Infinite Streams:
If there’s an infinite stream (likeinterval
orfromEvent
), it will block the execution of the rest of the chain since it will never complete.No Limit on Number of Streams:
concat
can combine any number of Observables, provided either through an array, manually, or by using thearguments
operator.
Pitfalls
Using Infinite Streams:
If one of the Observables inconcat
generates infinite data (interval
,fromEvent
, etc.), the next stream will never be triggered because the current one won’t complete.
Solution: Use limiting operators liketake
ortakeUntil
to ensure the infinite stream eventually completes.Error Handling:
If one of the Observables throws an error (error
), the entireconcat
chain will terminate with an error, and no subsequent streams will be executed.
Solution: Handle errors locally usingcatchError
.Complexity in Handling Competition:
Whileconcat
ensures strict order of execution, if you mismanage resource usage or processing delays, infinite streams and unexpected competition between sources may occur. For such cases, other operators likemerge
or additional custom handling might better suit your needs.
The
concat
operator is an excellent choice when you need to process streams sequentially and ensure reliable order of execution. Despite its simplicity, it is particularly powerful in scenarios where order and completion are critical. To avoid pitfalls like infinite streams or early termination on errors, always control stream completion and implement proper error handling for a smooth and predictable sequence.
Interesting Aspects
Optimizing Sequential Data Loading:
concat
is often used to manage a chain of dependent requests. For example, first obtaining an authorization token and then using it to make API requests.
Example:
import { of, concat } from 'rxjs';
import { delay } from 'rxjs/operators';
const authToken$ = of('Token received').pipe(delay(500));
const fetchData$ = of('Server data loaded').pipe(delay(1000));
concat(authToken$, fetchData$).subscribe(console.log);
// Output:
// Token received
// Server data loaded
Performance
Clear Sequential Efficiency:
concat
places no extra load on the scheduler as it only manages values from one active stream at any given time.Delays with Large Streams:
Performance can decrease if the initial streams contain a large amount of data or take too long to execute, as subsequent Observables must wait for their completion. This is especially important to consider when working with streams in UI components.
The
concat
operator is a powerful tool for sequentially working with streams. It is especially useful if the order of emissions is critically important or if one task must be fully completed before proceeding to the next. However, when using it, it is essential to consider stream completion constraints, proper error handling, and potential delays caused by large or slow streams. Proper use ofconcat
makes working with sequential data intuitive and manageable.
Operator | Features |
---|---|
combineLatest |
Combines the latest values from all streams. Waits for all streams to emit at least one value. |
zip |
Groups values from streams together (one value from each). |
concat |
Processes streams sequentially (one starts only after the previous one completes). |
forkJoin |
Waits for all streams to complete, then returns the results. |
merge |
Processes data in parallel without waiting for stream completion. |
Conclusions
We’ve covered the foundational operators and succeeded in our first experiments with RxJS. Ahead lie more complex but fascinating challenges where your knowledge of combination operators will become an essential skill. The foundation is already set: we’ve explored how to use zip
and other operators for data synchronization. But knowledge is only half the battle. True understanding comes when you start applying these tools in real-world projects.
Think about all the complex scenarios where data streams may depend on each other: forms with multiple validations, interactions between API requests, or even time management in games. By understanding how to "orchestrate" streams in RxJS, you won’t just solve problems—you’ll discover entirely new, elegant ways to approach them. And this is just the beginning.
Every step you take in learning RxJS brings you closer to mastering asynchronous programming. Don’t hesitate to experiment and ask questions. It’s much easier to learn from small examples than to tackle large projects where every stream feels like a mini-puzzle.
Now it’s time to set new goals, explore different operators, and find what works best for your scenarios. RxJS may seem complex at first, but with every new challenge, it will reveal itself as a powerful tool. Let your journey through stream combination lead you confidently toward true expertise.
The next step awaits—keep going and continue to unlock the world of reactive programming. Good luck!