Orkes Operators: Parallelism and Reusability

Workflows don’t just move forward—they branch, scale, and reuse logic. Whether you’re processing tasks in parallel, delegating work to subflows, or triggering asynchronous side-jobs, these patterns let you build smarter, more modular orchestration. In this post, we’ll explore operators that boost concurrency and composability in Orkes Conductor: Fork/Join for running multiple branches in parallel Dynamic Fork for scalable, input-driven concurrency Sub Workflow for reusing logic across workflows Start Workflow for async, fire-and-forget execution From concurrent notifications to reusable payment flows, these operators help you scale orchestration cleanly—without duplicating logic or overcomplicating control flow. Let’s dive in. Fork/Join The Fork/Join task in Orkes Conductor enables parallel execution of multiple task branches within a workflow. When a Fork task is reached, it splits the workflow into separate paths that run concurrently, allowing different sequences of tasks to execute in parallel. Once all branches are complete, a Join task is used to synchronize them before continuing with the rest of the workflow. This pattern is ideal for optimizing performance and handling tasks that can be executed independently, such as running multiple validation checks or data fetch operations simultaneously. Example implementation A common use case for the Fork/Join task is in a multi-channel notification workflow. Suppose a Fork task dispatches three parallel notifications—email, SMS, and HTTP. Each branch sends its respective message independently to maximize concurrency. Since HTTP responses can be retried and don’t need to block the overall flow, the Join task is configured with joinOn and is set to only the email_notification and sms_notification tasks. This allows the workflow to proceed as soon as the critical email and SMS tasks are complete, while the HTTP notification continues in the background. This setup enables partial synchronization—balancing reliability with improved efficiency. Here’s the workflow visualized: Workflow using a Fork/Join operator Here’s the code snippet for creating the workflow in code: def register_notification_workflow(workflow_executor: WorkflowExecutor) -> ConductorWorkflow: # 1) Define each notification path email_branch = [ SimpleTask( task_def_name="process_notification_payload", task_reference_name="process_notification_payload_email" ), SimpleTask( task_def_name="email_notification", task_reference_name="email_notification_ref" ) ] sms_branch = [ SimpleTask( task_def_name="process_notification_payload", task_reference_name="process_notification_payload_sms" ), SimpleTask( task_def_name="sms_notification", task_reference_name="sms_notification_ref" ) ] http_branch = [ SimpleTask( task_def_name="process_notification_payload", task_reference_name="process_notification_payload_http" ), HttpTask( task_ref_name="http_notification_ref", http_input={ "uri": "${workflow.input.http_target_url}", "method": "POST", "headers": { "Content-Type": "application/json" }, "body": { "message": "Notification triggered" } } ) ] # 2) Fork-Join setup (only join on email + sms) fork_join = ForkTask( task_ref_name="my_fork_join_ref", forked_tasks=[email_branch, sms_branch, http_branch], join_on=["email_notification_ref", "sms_notification_ref"] ) workflow = ConductorWorkflow( name="notification_workflow_with_fork_join", executor=workflow_executor ) workflow.version = 1 workflow.add(fork_join) workflow.register(overwrite=True) return workflow @worker_task(task_definition_name="process_notification_payload") def process_notification_payload() -> dict: print("

Apr 28, 2025 - 14:13
 0
Orkes Operators: Parallelism and Reusability

Workflows don’t just move forward—they branch, scale, and reuse logic. Whether you’re processing tasks in parallel, delegating work to subflows, or triggering asynchronous side-jobs, these patterns let you build smarter, more modular orchestration.

In this post, we’ll explore operators that boost concurrency and composability in Orkes Conductor:

  • Fork/Join for running multiple branches in parallel
  • Dynamic Fork for scalable, input-driven concurrency
  • Sub Workflow for reusing logic across workflows
  • Start Workflow for async, fire-and-forget execution

From concurrent notifications to reusable payment flows, these operators help you scale orchestration cleanly—without duplicating logic or overcomplicating control flow. Let’s dive in.

Fork/Join

The Fork/Join task in Orkes Conductor enables parallel execution of multiple task branches within a workflow. When a Fork task is reached, it splits the workflow into separate paths that run concurrently, allowing different sequences of tasks to execute in parallel. Once all branches are complete, a Join task is used to synchronize them before continuing with the rest of the workflow. This pattern is ideal for optimizing performance and handling tasks that can be executed independently, such as running multiple validation checks or data fetch operations simultaneously.

Example implementation

A common use case for the Fork/Join task is in a multi-channel notification workflow. Suppose a Fork task dispatches three parallel notifications—email, SMS, and HTTP. Each branch sends its respective message independently to maximize concurrency. Since HTTP responses can be retried and don’t need to block the overall flow, the Join task is configured with joinOn and is set to only the email_notification and sms_notification tasks. This allows the workflow to proceed as soon as the critical email and SMS tasks are complete, while the HTTP notification continues in the background. This setup enables partial synchronization—balancing reliability with improved efficiency.

Here’s the workflow visualized:

Workflow using a Fork/Join operator Workflow using a Fork/Join operator

Here’s the code snippet for creating the workflow in code:


def register_notification_workflow(workflow_executor: WorkflowExecutor) -> ConductorWorkflow:
    # 1) Define each notification path
    email_branch = [
        SimpleTask(
            task_def_name="process_notification_payload",
            task_reference_name="process_notification_payload_email"
        ),
        SimpleTask(
            task_def_name="email_notification",
            task_reference_name="email_notification_ref"
        )
    ]

    sms_branch = [
        SimpleTask(
            task_def_name="process_notification_payload",
            task_reference_name="process_notification_payload_sms"
        ),
        SimpleTask(
            task_def_name="sms_notification",
            task_reference_name="sms_notification_ref"
        )
    ]

    http_branch = [
        SimpleTask(
            task_def_name="process_notification_payload",
            task_reference_name="process_notification_payload_http"
        ),
        HttpTask(
            task_ref_name="http_notification_ref",
            http_input={
                "uri": "${workflow.input.http_target_url}",
                "method": "POST",
                "headers": {
                    "Content-Type": "application/json"
                },
                "body": {
                    "message": "Notification triggered"
                }
            }
        )
    ]

    # 2) Fork-Join setup (only join on email + sms)
    fork_join = ForkTask(
        task_ref_name="my_fork_join_ref",
        forked_tasks=[email_branch, sms_branch, http_branch],
        join_on=["email_notification_ref", "sms_notification_ref"]
    )

    workflow = ConductorWorkflow(
        name="notification_workflow_with_fork_join",
        executor=workflow_executor
    )
    workflow.version = 1
    workflow.add(fork_join)
    workflow.register(overwrite=True)

    return workflow


@worker_task(task_definition_name="process_notification_payload")
def process_notification_payload() -> dict:
    print("