Long-running jobs in Temporal.io

TL;DR A long-running job should be implemented as one long-running activity that uses heartbeat for resumption. What I needed to do There was a data-sync backfilling to be done where we backfill data from one service's database into another one. It was about 5.5M DB entities that needed to be copied, so we expected the total running time to be some number of hours. We wanted to write a job with some kind of progress record, i.e. a "cursor". Such that, should the job fail at any point, it could, on retry, continue where it left off. First instinct: Parameterized activity After reading the Temporal docs, my first instinct was to implement one workflow with one parameterized activity which the workflow would call in a loop. This idea was dismissed after talking to my colleagues. Because each activity invocation results in events being added to the Temporal Events. Temporal works under the assumption that there will be few events per workflow, not tens of thousands. We didn't want to risk exhausting Temporal's disk space. The good solution: One long-running Activity with Heartbeats I'll just show you some code here. At the beginning of the Activity we use GetHeartbeatDetails to get the cursor that was recorded by a previous, unsuccessful activity execution. After each batch we record the cursor via RecordHeartbeat. You'll want to do some info logging throughout the activity execution. Errors don't need to be logged, just return them to the runtime, it will surface them. As you can see, the batch size is configurable. We ran it with a size of 100 and that worked well. package workflows import ( "context" "fmt" "go.temporal.io/sdk/activity" ) type BackfillingActivities struct { BatchSize int } type Heartbeat struct { Cursor string } func (a *BackfillingActivities) CommentsBackfillingActivity(ctx context.Context) error { cursor := "" if activity.HasHeartbeatDetails(ctx) { var heartbeat Heartbeat if err := activity.GetHeartbeatDetails(ctx, &heartbeat); err != nil { return fmt.Errorf("error getting heartbeat: %w", err) } cursor = heartbeat.Cursor } var err error for { cursor, err = a.processBatch(ctx, cursor, a.BatchSize) if err != nil { return err } if cursor == "" { break } else { activity.RecordHeartbeat(ctx, Heartbeat{cursor}) } } return nil } // processBatch processes one batch starting from cursor and returns the next cursor or an error. func (a BackfillingActivities) processBatch(ctx context.Context, cursor string, batchSize int) (string, error) { logger := activity.GetLogger(ctx) logger.Info(fmt.Sprintf("Starting processing batch with cursor \"%s\"", cursor) //-> batch := SELECT FROM WHERE primary_key > {cursor} ORDER BY primary_key ASC LIMIT {batchSize} logger.Info(fmt.Sprintf("Found %d items", len(batch)) //-> Transform into target data structure //-> Load into target DB if len(batch)

Mar 17, 2025 - 18:33
 0
Long-running jobs in Temporal.io

TL;DR

A long-running job should be implemented as one long-running activity that
uses heartbeat for resumption.

What I needed to do

There was a data-sync backfilling to be done where we backfill data from one
service's database into another one. It was about 5.5M DB entities that needed
to be copied, so we expected the total running time to be some number of hours.

We wanted to write a job with some kind of progress record, i.e. a "cursor".
Such that, should the job fail at any point, it could, on retry, continue where
it left off.

First instinct: Parameterized activity

After reading the Temporal docs, my first instinct was to implement one workflow
with one parameterized activity which the workflow would call in a loop.

This idea was dismissed after talking to my colleagues. Because each activity
invocation results in events being added to the Temporal Events. Temporal works
under the assumption that there will be few events per workflow, not tens of
thousands. We didn't want to risk exhausting Temporal's disk space.

The good solution: One long-running Activity with Heartbeats

I'll just show you some code here.

At the beginning of the Activity we use GetHeartbeatDetails to get the cursor
that was recorded by a previous, unsuccessful activity execution. After each
batch we record the cursor via RecordHeartbeat.

You'll want to do some info logging throughout the activity execution. Errors
don't need to be logged, just return them to the runtime, it will surface them.

As you can see, the batch size is configurable. We ran it with a size of 100 and
that worked well.

package workflows

import (
    "context"
    "fmt"

    "go.temporal.io/sdk/activity"
)

type BackfillingActivities struct {
    BatchSize int
}

type Heartbeat struct {
    Cursor string
}

func (a *BackfillingActivities) CommentsBackfillingActivity(ctx context.Context) error {
    cursor := ""
    if activity.HasHeartbeatDetails(ctx) {
        var heartbeat Heartbeat
        if err := activity.GetHeartbeatDetails(ctx, &heartbeat); err != nil {
            return fmt.Errorf("error getting heartbeat: %w", err)
        }
        cursor = heartbeat.Cursor
    }

    var err error
    for {
        cursor, err = a.processBatch(ctx, cursor, a.BatchSize)
        if err != nil {
            return err
        }
        if cursor == "" {
            break
        } else {
            activity.RecordHeartbeat(ctx, Heartbeat{cursor})
        }
    }
    return nil
}

// processBatch processes one batch starting from cursor and returns the next cursor or an error.
func (a BackfillingActivities) processBatch(ctx context.Context, cursor string, batchSize int) (string, error) {
    logger := activity.GetLogger(ctx)
    logger.Info(fmt.Sprintf("Starting processing batch with cursor \"%s\"", cursor)

    //-> batch := SELECT FROM WHERE primary_key > {cursor} ORDER BY primary_key ASC LIMIT {batchSize}

    logger.Info(fmt.Sprintf("Found %d items", len(batch))

    //-> Transform into target data structure

    //-> Load into target DB

    if len(batch) < batchSize {
        return "", nil
    }
    lastItem := batch[len(batch)-1]
    return lastItem.Id, nil
}