Reactive Programming in Node.js

Reactive programming is about shifting how we think about flow. Instead of writing code that constantly asks for updates, you write code that responds the moment something changes. It means your app reacts to data as it arrives, like listening to live announcements rather than checking a board repeatedly. You work with push-based streams instead of polling, and structure your logic around events, not rigid control flow. Instead of manually triggering logic or polling for new data, reactive code hooks into a data stream and reacts as values arrive. Your app becomes event-aware, responsive, and resilient under pressure. In this post, we’ll explore what this looks like in practice using two real-world examples: Streaming data from a PostgreSQL database Subscribing to smart contract events on-chain Both use RxJS and Node.js to bring structure and composability to asynchronous systems. Node thrives on async. That’s its thing. But as systems scale, async logic becomes more complicated to manage. Things get messy when you’re handling thousands of concurrent DB reads. Your app must also respond to multiple on-chain events, often from different sources at once. Reactive programming gives you structure. It turns async chaos into a clean pipeline of data you can filter, map, and compose. The Core Concepts Here are the building blocks you need to know: Streams: Continuous flows of data over time (like incoming flight updates or smart contract events) Observables: Data sources you can subscribe to and cancel Operators: Functions that transform, filter, or combine streams declaratively Example 1: Postgres + Node Streams Let’s say you’re querying a boarding_list table in Postgres. Instead of loading all the rows into memory, you stream them individually and process them as they arrive. Here’s how you wrap a Node stream in an RxJS Observable: import pg from 'pg'; import QueryStream from 'pg-query-stream'; import { Observable } from 'rxjs'; const client = new pg.Client({ connectionString: process.env.DATABASE_URL }); await client.connect(); const query = new QueryStream('SELECT * FROM boarding_list'); const stream = client.query(query); const flightRows$ = new Observable(subscriber => { stream.on('data', row => subscriber.next(row)); stream.on('end', () => { subscriber.complete(); client.end(); }); stream.on('error', err => { subscriber.error(err); client.end(); }); }); flightRows$.subscribe({ next: row => console.log('Boarding:', row), error: err => console.error('Error:', err), complete: () => console.log('All rows processed.') }); This setup: Streams rows with backpressure handled natively by Node Turns the stream into an Observable for composability Starts the flow only when subscribed Let’s filter for only delayed flights and format a user-friendly alert message: import { filter, map } from 'rxjs'; flightRows$.pipe( filter(row => row.status === 'delayed'), map(row => `Flight ${row.flight_number} is delayed to ${row.destination}`) ).subscribe(console.log); You’ve now taken a live DB stream and turned it into an alert system with two lines of transformation logic. There is no buffering, no polling, just clean flow. Example 2: Smart Contract Events + Event Streams Now let’s move on-chain. You’re building an Air Rewards program on-chain. Every time a traveler checks in, your smart contract emits a MilesEarned event. You want to detect that and show a notification in real time. import { ethers } from 'ethers'; import { filter, map, fromEventPattern } from 'rxjs'; const provider = new ethers.JsonRpcProvider('https://arb1.arbitrum.io/rpc'); const contract = new ethers.Contract( '0xAirMilesDAO', ['event MilesEarned(address indexed user, uint256 flightId, uint256 miles)'], provider ); type MilesEarnedEvent = [string, BigInt, BigInt]; const milesEarned$: Observable = fromEventPattern( handler => contract.on('MilesEarned', handler), handler => contract.off('MilesEarned', handler) ); milesEarned$.pipe( filter(([user]) => ethers.getAddress(user) === ethers.getAddress('0xYourWallet')), map(([user, flightId, miles]) => `${user} earned ${miles} miles on flight ${flightId}` ) ).subscribe(console.log); That’s a smart contract turned into a reactive data stream. There is no polling, no retries, just a live flow of events your app can listen to and respond to immediately. When Should You Reach for Reactive? Reactive programming shines when: You’re dealing with high-throughput data You need to combine multiple event sources You want composable logic that reads like a story It might be overkill for a simple CRUD app, but it's incredibly helpful for systems with live data. Whether your data comes from a DB, a blockchain, or any other high-throughput application, reactive patterns help you build better Node.js applications.

May 13, 2025 - 06:27
 0
Reactive Programming in Node.js

Reactive programming is about shifting how we think about flow. Instead of writing code that constantly asks for updates, you write code that responds the moment something changes.

It means your app reacts to data as it arrives, like listening to live announcements rather than checking a board repeatedly. You work with push-based streams instead of polling, and structure your logic around events, not rigid control flow.

Instead of manually triggering logic or polling for new data, reactive code hooks into a data stream and reacts as values arrive. Your app becomes event-aware, responsive, and resilient under pressure.

In this post, we’ll explore what this looks like in practice using two real-world examples:

  1. Streaming data from a PostgreSQL database
  2. Subscribing to smart contract events on-chain

Both use RxJS and Node.js to bring structure and composability to asynchronous systems.

Node thrives on async. That’s its thing. But as systems scale, async logic becomes more complicated to manage. Things get messy when you’re handling thousands of concurrent DB reads. Your app must also respond to multiple on-chain events, often from different sources at once.

Reactive programming gives you structure. It turns async chaos into a clean pipeline of data you can filter, map, and compose.

The Core Concepts

Here are the building blocks you need to know:

  • Streams: Continuous flows of data over time (like incoming flight updates or smart contract events)

  • Observables: Data sources you can subscribe to and cancel

  • Operators: Functions that transform, filter, or combine streams declaratively

Example 1: Postgres + Node Streams

Table of flight boarding information

Let’s say you’re querying a boarding_list table in Postgres. Instead of loading all the rows into memory, you stream them individually and process them as they arrive.

Here’s how you wrap a Node stream in an RxJS Observable:

import pg from 'pg';
import QueryStream from 'pg-query-stream';
import { Observable } from 'rxjs';

const client = new pg.Client({ connectionString: process.env.DATABASE_URL });
await client.connect();

const query = new QueryStream('SELECT * FROM boarding_list');
const stream = client.query(query);

const flightRows$ = new Observable(subscriber => {
  stream.on('data', row => subscriber.next(row));
  stream.on('end', () => {
    subscriber.complete();
    client.end();
  });
  stream.on('error', err => {
    subscriber.error(err);
    client.end();
  });
});

flightRows$.subscribe({
  next: row => console.log('Boarding:', row),
  error: err => console.error('Error:', err),
  complete: () => console.log('All rows processed.')
});

This setup:

  • Streams rows with backpressure handled natively by Node
  • Turns the stream into an Observable for composability
  • Starts the flow only when subscribed

Let’s filter for only delayed flights and format a user-friendly alert message:

import { filter, map } from 'rxjs';

flightRows$.pipe(
  filter(row => row.status === 'delayed'),
  map(row => `Flight ${row.flight_number} is delayed to ${row.destination}`)
).subscribe(console.log);

You’ve now taken a live DB stream and turned it into an alert system with two lines of transformation logic. There is no buffering, no polling, just clean flow.

Example 2: Smart Contract Events + Event Streams

Now let’s move on-chain.

Phone with air rewards membership information

You’re building an Air Rewards program on-chain. Every time a traveler checks in, your smart contract emits a MilesEarned event. You want to detect that and show a notification in real time.

import { ethers } from 'ethers';
import { filter, map, fromEventPattern } from 'rxjs';

const provider = new ethers.JsonRpcProvider('https://arb1.arbitrum.io/rpc');
const contract = new ethers.Contract(
  '0xAirMilesDAO',
  ['event MilesEarned(address indexed user, uint256 flightId, uint256 miles)'],
  provider
);

type MilesEarnedEvent = [string, BigInt, BigInt];

const milesEarned$: Observable<MilesEarnedEvent> = fromEventPattern(
  handler => contract.on('MilesEarned', handler),
  handler => contract.off('MilesEarned', handler)
);

milesEarned$.pipe(
  filter(([user]) => ethers.getAddress(user) === ethers.getAddress('0xYourWallet')),
  map(([user, flightId, miles]) =>
    `${user} earned ${miles} miles on flight ${flightId}`
  )
).subscribe(console.log);

That’s a smart contract turned into a reactive data stream. There is no polling, no retries, just a live flow of events your app can listen to and respond to immediately.

When Should You Reach for Reactive?

Reactive programming shines when:

  • You’re dealing with high-throughput data
  • You need to combine multiple event sources
  • You want composable logic that reads like a story

It might be overkill for a simple CRUD app, but it's incredibly helpful for systems with live data.

Whether your data comes from a DB, a blockchain, or any other high-throughput application, reactive patterns help you build better Node.js applications.