Golang LLD: Design a Task Scheduler
In this blog post, I've shared my approach to solve this low level design problem, Design a Task Scheduler. Code for this can be found on my Github repo: https://github.com/the-arcade-01/golang-low-level-design/tree/main/task-scheduler Requirements Users should be able to create task, schedule task & stop task. Users can schedule task of type One Time: execute task only once Fixed Rate: execute task at every recurring delay Fixed Delay: execute task at every recurring delay but after the its previous run completion. Task execution should be based on priority and execution time. Multiple task should execute in concurrently, take care of application's CPU and memory usage. Assume a task execution to be a function call, eg: task.Run() simply prints task details. Thought Process Thinking about the entities that will be involved. Task entity will have all the information about a task. Tasks needs to be executed based on priority & execution time. So, some kind of data structure for maintaining the order. We choose Heap. Why? Heap offers us O(logn) time complexity for insert & deletion operation while maintaining the correct order. Whereas a traditional sort on list/slice will be of O(nlogn) and we need to perform sort after each insert & deletion step. For concurrent execution of tasks while managing CPU & memory, we will use Worker Pool pattern. This will allow us to open n fixed go routines which will listen to a worker queue for incoming task. For executing the task, we will have to run a separate go routines whose job will be to pick task from the task heap and put them into the worker pool. Code structure Code structure of our app. . ├── cmd │ └── main.go ├── go.mod ├── internal │ ├── config │ │ └── pool.go │ ├── models │ │ └── runnable.go │ └── task │ ├── heap.go │ ├── scheduler.go │ ├── service.go │ └── task.go Worker pool is defined in config folder. Worker pool takes in a runnable interface instance for executing a Run() (we will look that later). All task related functionality is present inside task folder. Code overview First we define the worker pool. A worker pool has a no. of workers & a worker queue (in which task are put in). And this worker pool will be a Singleton structure. Because this will load as a configuration in our system. For making it singleton, we will use sync.Once. var ( once sync.Once pool *WorkerPool ) type WorkerPool struct { workers int workerQueue chan models.Runnable wg sync.WaitGroup } Next step in here is to spawn workers no. of go routines which will listen to the worker queue. func NewWorkerPool(workers int) *WorkerPool { once.Do(func() { pool = &WorkerPool{ workers: workers, workerQueue: make(chan models.Runnable), } for i := range pool.workers { pool.wg.Add(1) fmt.Printf("Worker routine initialized, ID: %v\n", i) go func(i int) { defer pool.wg.Done() for run := range pool.workerQueue { run.Run() } }(i) } }) return pool } The Runnable interface looks like this, our Task will implement the Run() function. type Runnable interface { Run() } Now moving to the heap for the tasks. To make a heap we need to implement to sets of properties First is sort methods, in which we write our ordering logic. type TaskHeap []*Task // sort methods func (h TaskHeap) Len() int { return len(h) } func (h TaskHeap) Less(i, j int) bool { if h[i].ExecutionTime.Compare(h[j].ExecutionTime) == 0 { return h[i].Priority

In this blog post, I've shared my approach to solve this low level design problem, Design a Task Scheduler.
Code for this can be found on my Github repo:
https://github.com/the-arcade-01/golang-low-level-design/tree/main/task-scheduler
Requirements
- Users should be able to create task, schedule task & stop task.
- Users can schedule task of type
- One Time: execute task only once
- Fixed Rate: execute task at every recurring delay
- Fixed Delay: execute task at every recurring delay but after the its previous run completion.
- Task execution should be based on priority and execution time.
- Multiple task should execute in concurrently, take care of application's CPU and memory usage.
- Assume a task execution to be a function call, eg:
task.Run()
simply prints task details.
Thought Process
Thinking about the entities that will be involved.
Task
entity will have all the information about a task.
Tasks needs to be executed based on priority & execution time. So, some kind of data structure for maintaining the order.
We choose Heap.
Why?
Heap offers usO(logn)
time complexity for insert & deletion operation while maintaining the correct order. Whereas a traditional sort on list/slice will be ofO(nlogn)
and we need to perform sort after each insert & deletion step.For concurrent execution of tasks while managing CPU & memory, we will use Worker Pool pattern.
This will allow us to openn
fixed go routines which will listen to a worker queue for incoming task.
For executing the task, we will have to run a separate go routines whose job will be to pick task from the task heap and put them into the worker pool.
Code structure
Code structure of our app.
.
├── cmd
│ └── main.go
├── go.mod
├── internal
│ ├── config
│ │ └── pool.go
│ ├── models
│ │ └── runnable.go
│ └── task
│ ├── heap.go
│ ├── scheduler.go
│ ├── service.go
│ └── task.go
- Worker pool is defined in
config
folder. Worker pool takes in arunnable
interface instance for executing aRun()
(we will look that later). - All task related functionality is present inside
task
folder.
Code overview
First we define the worker pool. A worker pool has a no. of workers & a worker queue (in which task are put in). And this worker pool will be a Singleton structure. Because this will load as a configuration in our system.
For making it singleton, we will use sync.Once
.
var (
once sync.Once
pool *WorkerPool
)
type WorkerPool struct {
workers int
workerQueue chan models.Runnable
wg sync.WaitGroup
}
Next step in here is to spawn workers
no. of go routines which will listen to the worker queue.
func NewWorkerPool(workers int) *WorkerPool {
once.Do(func() {
pool = &WorkerPool{
workers: workers,
workerQueue: make(chan models.Runnable),
}
for i := range pool.workers {
pool.wg.Add(1)
fmt.Printf("Worker routine initialized, ID: %v\n", i)
go func(i int) {
defer pool.wg.Done()
for run := range pool.workerQueue {
run.Run()
}
}(i)
}
})
return pool
}
The Runnable
interface looks like this, our Task
will implement the Run()
function.
type Runnable interface {
Run()
}
Now moving to the heap for the tasks. To make a heap we need to implement to sets of properties
First is sort
methods, in which we write our ordering logic.
type TaskHeap []*Task
// sort methods
func (h TaskHeap) Len() int { return len(h) }
func (h TaskHeap) Less(i, j int) bool {
if h[i].ExecutionTime.Compare(h[j].ExecutionTime) == 0 {
return h[i].Priority < h[j].Priority
}
return h[i].ExecutionTime.Before(h[j].ExecutionTime)
}
func (h TaskHeap) Swap(i, j int) {
h[i], h[j] = h[j], h[i]
}
& second is the heap
methods.
// heap methods
func (h *TaskHeap) Push(x any) {
*h = append(*h, x.(*Task))
}
func (h *TaskHeap) Pop() any {
old := *h
n := len(old)
item := old[n-1]
old[n-1] = nil
*h = old[0 : n-1]
return item
}
Now moving to the TaskScheduler
, it will also be a Singleton structure as I've used it as a single entry point for scheduling & executing of the task.
type TaskScheduler struct {
mtx *sync.Mutex
cond *sync.Cond
pool *config.WorkerPool
taskQueue *TaskHeap
}
Schedule function will look like this, takes in a Task
and we perform heap insertion on the task queue.
func (t *TaskScheduler) Schedule(task *Task) {
t.mtx.Lock()
defer t.mtx.Unlock()
fmt.Printf("Task: %v Scheduled\n", task.Name)
heap.Push(t.taskQueue, task)
t.cond.Signal()
}
Stop function will look like this.
func (t *TaskScheduler) Stop(task *Task) {
t.mtx.Lock()
defer t.mtx.Unlock()
for i, item := range *t.taskQueue {
if item == task {
fmt.Printf("Task: %v Stopped\n", task.Name)
heap.Remove(t.taskQueue, i)
break
}
}
}
Now the Execute()
function. Its task is to take the Task
out of the task queue and put that into the worker pool.
To implement this behavior we've to approaches:
-
Brute Force approach:
- Run a ticker (a simple cron) every 1 sec or so.
- Check the size of queue and the top element, if valid for execution then push it in worker pool.
- Pros: simple and straightforward to execute.
- Cons: running a ticker is unnecessary if there are no task to perform. This in total increase the cpu consumption and the tasks doesn't get executed properly on their correct time.
-
Optimizing the process
- For this we will use a signal that will notify our
Execute()
func go routine, that, hey! a new task has been inserted!! wake up, check whether you need to put it in pool or not. If not, then go back to sleep again till its execution time. - Using this we ensure that our go routine, is only working when its require to.
- To use signal in Golang, we use
sync.Cond
, it helps us to define condition on which we need to signal our go routine. - If the task queue is empty, we do wait
t.cond.Wait()
. - For signalling the go routine, we do that in the
Schedule(task *Task)
function at end using thist.cond.Signal()
.
- For this we will use a signal that will notify our
func (t *TaskScheduler) Execute() {
for {
t.mtx.Lock()
if t.taskQueue.Len() == 0 {
t.cond.Wait()
}
task := heap.Pop(t.taskQueue).(*Task)
if task.ExecutionTime.After(time.Now()) {
heap.Push(t.taskQueue, task)
t.mtx.Unlock()
time.Sleep(time.Until(task.ExecutionTime))
continue
}
t.mtx.Unlock()
t.pool.Add(task)
}
}
Now the Run()
function which our Task
structure will implement.
In this func, we will do the execution of task and further scheduling as per TaskType
.
func (t *Task) Run() {
if t.Type == FixedRate {
t.UpdateExecutionTime(time.Now().Add(time.Duration(t.RecurringDelay) * time.Second))
taskScheduler.Schedule(t)
}
// performing execution of the task, simple print statement
t.execute()
if t.Type == FixedDelay {
t.UpdateExecutionTime(time.Now().Add(time.Duration(t.RecurringDelay) * time.Second))
taskScheduler.Schedule(t)
}
}
Conclusion
Thanks for reading it.
Please follow me on my socials:
And if you've any suggestions or find any bugs in my code or want to share your own approach, feel free to open an Issue on my github Repo with the format below:
Title: "Approach: "
Example: "Approach: Design a Cache System"