Optimizing UPDATE of millions of rows easily

The other day, we received a file containing buildings with their number of dwelling and a matching identifier important to us, weighted by a score indicating the quality of the matching. The idea is to try and know that such and such a dwelling is in such and such a building. So I was asked to import these buildings and link them to the dwellings. Knowing that the buildings and fast_housing tables already exist, nothing impossible. Easy peasy. I had to add two fields rnb_id and rnb_score (the score representing the quality of the matching) to the buildings table. Then update the fast_housing table, which already contained a building_id field. A 1-n relationship. Simple, basic. fast_housing is a set of national housing units, vacant, rented, etc. We probably don't have everything, but it's still just over 10 million rows, 42 columns, all partitioned by department and several indexes in a PostgreSQL database. Importing buildings Importing buildings isn't really the problem. There are some 26 million rows in the file, and even the naive way, we're doing pretty well. We stream a buildings.jsonl file (a JSON file where each line is an element), map it in the format we want, assemble it in batches of 1000, save it and it's ready to go. Basically, we'll do INSERT ON CONFLICT UPDATE and it'll be fine. import { chunkify, map, tap } from '@zerologementvacant/utils/node'; import { Bar } from 'cli-progress'; import { parse as parseJSONL } from 'jsonlines'; import fs from 'node:fs'; import path from 'node:path'; import { Readable } from 'node:stream'; import { ReadableStream, WritableStream } from 'node:stream/web'; import { BuildingApi } from '~/models/BuildingApi'; import buildingRepository from '~/repositories/buildingRepository'; const CHUNK_SIZE = 1_000; const TOTAL = 25_958_378; async function run(): Promise { const progress = new Bar({ fps: 4, etaAsynchronousUpdate: true, etaBuffer: CHUNK_SIZE, stopOnComplete: true }); progress.start(TOTAL, 0); const file = path.join(__dirname, 'buildings.jsonl'); await stream(file) .pipeThrough(mapper()) .pipeThrough(chunkify({ size: CHUNK_SIZE })) .pipeThrough(saver()) .pipeThrough( tap((buildings) => { progress.increment(buildings.length); }) ) .pipeTo(reporter()); } interface Input { idbat: string; nlogh: number; rnb_id: string | null; rnb_id_score: number; } function stream(file: string): ReadableStream { const fileStream = fs.createReadStream(file); return Readable.toWeb( fileStream.pipe( parseJSONL({ emitInvalidLines: true }) ) ); } function mapper() { return map((building) => ({ id: building.idbat, housingCount: building.nlogh, rnbId: building.rnb_id, rnbIdScore: building.rnb_id_score, rentHousingCount: 0, vacantHousingCount: 0 })); } function saver() { return tap(async (buildings) => { await buildingRepository.saveMany(buildings, { onConflict: ['id'], merge: ['housing_count', 'rnb_id', 'rnb_id_score'] }); }); } function reporter() { let total = 0; return new WritableStream({ write(chunk) { total += chunk.length; }, close() { console.log(`Total saved: ${total}`); } }); } run(); Estimated execution time: about 1 h 02 min. So far, so good. No memory problems thanks to streams. Correct performance. Almost too easy. Linking buildings to homes The other file is buildings_housing.jsonl. Behind this barbaric name lie the links between dwellings and buildings, when they exist. 39 million lines this time. No kidding anymore. For each dwelling, we need to add or modify the building identifier, the building_id field. I'll try the naive approach again, after all, it worked the first time... We'll try to stream the file and save the result in the fast_housing table. And that's where it gets complicated. You can't really do a mass UPDATE, since every row is different. So we test with a line-by-line UPDATE, the naive way. Spoiler alert: it's bad. import { tap } from '@zerologementvacant/utils/node'; import { Bar } from 'cli-progress'; import { Readable } from 'node:stream'; import { ReadableStream, WritableStream } from 'node:stream/web'; import db from '~/infra/database'; import { Housing } from '~/repositories/housingRepository'; const TOTAL = 39_048_568; const TEMPORARY_TABLE = 'buildings_housing_updates'; async function run(): Promise { const progress = new Bar({ fps: 4, etaAsynchronousUpdate: true, etaBuffer: 1000, stopOnComplete: true }); progress.start(TOTAL, 0); await createTemporaryTableStream() .pipeThrough(saver()) .pipeThrough( tap(() => { progress.increment(); }) ) .pipeTo(reporter()); } interface Input { idbat: string; idlocal: string; geocode: string; } function createTemporaryTableStream(): ReadableStream { return Readable.toWeb(db(TEMPORAR

Apr 5, 2025 - 11:24
 0
Optimizing UPDATE of millions of rows easily

The other day, we received a file containing buildings with their number of dwelling and a matching identifier important to us, weighted by a score indicating the quality of the matching. The idea is to try and know that such and such a dwelling is in such and such a building.

So I was asked to import these buildings and link them to the dwellings.

Knowing that the buildings and fast_housing tables already exist, nothing impossible. Easy peasy.

I had to add two fields rnb_id and rnb_score (the score representing the quality of the matching) to the buildings table. Then update the fast_housing table, which already contained a building_id field.

A 1-n relationship. Simple, basic.

fast_housing is a set of national housing units, vacant, rented, etc. We probably don't have everything, but it's still just over 10 million rows, 42 columns, all partitioned by department and several indexes in a PostgreSQL database.

Importing buildings

Importing buildings isn't really the problem. There are some 26 million rows in the file, and even the naive way, we're doing pretty well.

We stream a buildings.jsonl file (a JSON file where each line is an element), map it in the format we want, assemble it in batches of 1000, save it and it's ready to go.

Basically, we'll do INSERT ON CONFLICT UPDATE and it'll be fine.

import { chunkify, map, tap } from '@zerologementvacant/utils/node';
import { Bar } from 'cli-progress';
import { parse as parseJSONL } from 'jsonlines';
import fs from 'node:fs';
import path from 'node:path';
import { Readable } from 'node:stream';
import { ReadableStream, WritableStream } from 'node:stream/web';

import { BuildingApi } from '~/models/BuildingApi';
import buildingRepository from '~/repositories/buildingRepository';

const CHUNK_SIZE = 1_000;
const TOTAL = 25_958_378;

async function run(): Promise<void> {
  const progress = new Bar({
    fps: 4,
    etaAsynchronousUpdate: true,
    etaBuffer: CHUNK_SIZE,
    stopOnComplete: true
  });
  progress.start(TOTAL, 0);

  const file = path.join(__dirname, 'buildings.jsonl');
  await stream(file)
    .pipeThrough(mapper())
    .pipeThrough(chunkify({ size: CHUNK_SIZE }))
    .pipeThrough(saver())
    .pipeThrough(
      tap((buildings) => {
        progress.increment(buildings.length);
      })
    )
    .pipeTo(reporter());
}

interface Input {
  idbat: string;
  nlogh: number;
  rnb_id: string | null;
  rnb_id_score: number;
}

function stream(file: string): ReadableStream<Input> {
  const fileStream = fs.createReadStream(file);
  return Readable.toWeb(
    fileStream.pipe(
      parseJSONL({
        emitInvalidLines: true
      })
    )
  );
}

function mapper() {
  return map<Input, BuildingApi>((building) => ({
    id: building.idbat,
    housingCount: building.nlogh,
    rnbId: building.rnb_id,
    rnbIdScore: building.rnb_id_score,
    rentHousingCount: 0,
    vacantHousingCount: 0
  }));
}

function saver() {
  return tap<ReadonlyArray<BuildingApi>>(async (buildings) => {
    await buildingRepository.saveMany(buildings, {
      onConflict: ['id'],
      merge: ['housing_count', 'rnb_id', 'rnb_id_score']
    });
  });
}

function reporter() {
  let total = 0;

  return new WritableStream<ReadonlyArray<BuildingApi>>({
    write(chunk) {
      total += chunk.length;
    },
    close() {
      console.log(`Total saved: ${total}`);
    }
  });
}

run();

Estimated execution time: about 1 h 02 min.

So far, so good.

No memory problems thanks to streams. Correct performance. Almost too easy.

Linking buildings to homes

The other file is buildings_housing.jsonl. Behind this barbaric name lie the links between dwellings and buildings, when they exist.

39 million lines this time. No kidding anymore.

For each dwelling, we need to add or modify the building identifier, the building_id field.

I'll try the naive approach again, after all, it worked the first time... We'll try to stream the file and save the result in the fast_housing table.

And that's where it gets complicated.

You can't really do a mass UPDATE, since every row is different. So we test with a line-by-line UPDATE, the naive way. Spoiler alert: it's bad.

import { tap } from '@zerologementvacant/utils/node';
import { Bar } from 'cli-progress';
import { Readable } from 'node:stream';
import { ReadableStream, WritableStream } from 'node:stream/web';

import db from '~/infra/database';
import { Housing } from '~/repositories/housingRepository';

const TOTAL = 39_048_568;
const TEMPORARY_TABLE = 'buildings_housing_updates';

async function run(): Promise<void> {
  const progress = new Bar({
    fps: 4,
    etaAsynchronousUpdate: true,
    etaBuffer: 1000,
    stopOnComplete: true
  });
  progress.start(TOTAL, 0);

  await createTemporaryTableStream()
    .pipeThrough(saver())
    .pipeThrough(
      tap(() => {
        progress.increment();
      })
    )
    .pipeTo(reporter());
}

interface Input {
  idbat: string;
  idlocal: string;
  geocode: string;
}

function createTemporaryTableStream(): ReadableStream<Input> {
  return Readable.toWeb(db(TEMPORARY_TABLE).select().stream());
}

function saver() {
  return tap<Input>(async (input) => {
    await Housing()
      .where({
        geo_code: input.geocode,
        local_id: input.idlocal
      })
      .update({
        building_id: input.idbat
      });
  });
}

function reporter<A>() {
  let total = 0;

  return new WritableStream<A>({
    write() {
      total++;
    },
    close() {
      console.log(`Total saved: ${total}`);
    }
  });
}

run();  

Estimated execution time: between 16 and 25 hours...

Here I've tested using the temporary table I filled in beforehand, but with a file, it's the same thing. It's just as slow.

The lazy one is the saver function. A line-by-line UPDATE on such a volume is too slow.

The solution: mass UPDATE FROM the temporary table

So I did a little research, and postgres seems to optimize UPDATE from another table. Much better!

Execution time: about 3 h!

import { chunkify, tap } from '@zerologementvacant/utils/node';
import { Bar } from 'cli-progress';
import { Readable } from 'node:stream';
import { ReadableStream, WritableStream } from 'node:stream/web';

import db from '~/infra/database';
import { housingTable } from '~/repositories/housingRepository';

const CHUNK_SIZE = 10_000;
const TOTAL = 39_048_568;
const TEMPORARY_TABLE = 'buildings_housing_updates';

async function run(): Promise<void> {
  const progress = new Bar({
    fps: 4,
    etaAsynchronousUpdate: true,
    etaBuffer: 1000,
    stopOnComplete: true
  });
  progress.start(TOTAL, 0);

  await createTemporaryTableStream()
    .pipeThrough(
      chunkify({
        size: CHUNK_SIZE
      })
    )
    .pipeThrough(saver())
    .pipeThrough(
      tap((chunk) => {
        progress.increment(chunk.length);
      })
    )
    .pipeTo(reporter());
}

interface Input {
  idbat: string;
  idlocal: string;
  geocode: string;
}

function createTemporaryTableStream(): ReadableStream<Input> {
  return Readable.toWeb(db(TEMPORARY_TABLE).select().stream());
}

function saver() {
  return tap<ReadonlyArray<Input>>(async (chunk) => {
    await db(housingTable)
      .update({
        building_id: db.ref(`${TEMPORARY_TABLE}.idbat`)
      })
      .updateFrom(TEMPORARY_TABLE)
      .where(`${housingTable}.local_id`, db.ref(`${TEMPORARY_TABLE}.idlocal`))
      .where(`${housingTable}.geo_code`, db.ref(`${TEMPORARY_TABLE}.geocode`))
      .whereIn(
        [`${TEMPORARY_TABLE}.geocode`, `${TEMPORARY_TABLE}.idlocal`],
        chunk.map((building) => [building.geocode, building.idlocal])
      );
  });
}

function reporter<A>() {
  let total = 0;

  return new WritableStream<ReadonlyArray<A>>({
    write(chunk) {
      total += chunk.length;
    },
    close() {
      console.log(`Total saved: ${total}`);
    }
  });
}

run();

Rewriting the saver function so that it bulk updates from a temporary table has drastically reduced execution time.

function saver() {
  return tap<ReadonlyArray<Input>>(async (chunk) => {
    await db(housingTable)
      .update({
        building_id: db.ref(`${TEMPORARY_TABLE}.idbat`)
      })
      .updateFrom(TEMPORARY_TABLE)
      .where(`${housingTable}.local_id`, db.ref(`${TEMPORARY_TABLE}.idlocal`))
      .where(`${housingTable}.geo_code`, db.ref(`${TEMPORARY_TABLE}.geocode`))
      .whereIn(
        [`${TEMPORARY_TABLE}.geocode`, `${TEMPORARY_TABLE}.idlocal`],
        chunk.map((building) => [building.geocode, building.idlocal])
      );
  });
}

Updating from another table allows rows to be processed in chunks, in this case 1000 by 1000. This reduces the number of queries, and potentially enables postgres to choose a better execution plan, reduce disk I/O, make better use of indexes, etc.

If you know of any other solutions for optimizing UPDATEs on large volumes of data, let's talk!

Thanks