Selectors - Go SDK
This page shows how to do the following:
In Go, the select
statement lets a goroutine wait on multiple communication operations.
A select
blocks until one of its cases can run, then it executes that case.
It chooses one at random if multiple are ready.
However, a normal Go select statement can not be used inside of Workflows directly because of the random nature.
Temporal's Go SDK Selector
s are similar and act as a replacement.
They can block on sending and receiving from Channels but as a bonus can listen on Future deferred work.
Usage of Selectors to defer and process work (in place of Go's select
) are necessary in order to ensure deterministic Workflow code execution (though using select
in Activity code is fine).
Full API example
The API is sufficiently different from select
that it bears documenting:
func SampleWorkflow(ctx workflow.Context) error {
// standard Workflow setup code omitted...
// API Example: declare a new selector
selector := workflow.NewSelector(ctx)
// API Example: defer code execution until the Future that represents Activity result is ready
work := workflow.ExecuteActivity(ctx, ExampleActivity)
selector.AddFuture(work, func(f workflow.Future) {
// deferred code omitted...
})
// more parallel timers and activities initiated...
// API Example: receive information from a Channel
var signalVal string
channel := workflow.GetSignalChannel(ctx, channelName)
selector.AddReceive(channel, func(c workflow.ReceiveChannel, more bool) {
// matching on the channel doesn't consume the message.
// So it has to be explicitly consumed here
c.Receive(ctx, &signalVal)
// do something with received information
})
// API Example: block until the next Future is ready to run
// important! none of the deferred code runs until you call selector.Select
selector.Select(ctx)
// Todo: document selector.HasPending
}
Use Selectors with Futures
You usually add Future
s after Activities
:
// API Example: defer code execution until after an activity is done
work := workflow.ExecuteActivity(ctx, ExampleActivity)
selector.AddFuture(work, func(f workflow.Future) {
// deferred code omitted...
})
selector.Select(ctx)
is the primary mechanism which blocks on and executes Future
work.
It is intentionally flexible; you may call it conditionally or multiple times:
// API Example: blocking conditionally
if somecondition != nil {
selector.Select(ctx)
}
// API Example: popping off all remaining Futures
for i := 0; i < len(someArray); i++ {
selector.Select(ctx) // this will wait for one branch
// you can interrupt execution here
}
A Future matches only once per Selector instance even if Select is called multiple times. If multiple items are available, the order of matching is not defined.
Use Selectors with Timers
An important use case of futures is setting up a race between a timer and a pending activity, effectively adding a "soft" timeout that doesn't result in any errors or retries of that activity.
For example, the Timer sample shows how you can write a long running order processing operation where:
- if processing takes too long, we send out a notification email to user about the delay, but we won't cancel the operation
- if the operation finishes before the timer fires, then we want to cancel the timer.
var processingDone bool
f := workflow.ExecuteActivity(ctx, OrderProcessingActivity)
selector.AddFuture(f, func(f workflow.Future) {
processingDone = true
// cancel timerFuture
cancelHandler()
})
// use timer future to send notification email if processing takes too long
timerFuture := workflow.NewTimer(childCtx, processingTimeThreshold)
selector.AddFuture(timerFuture, func(f workflow.Future) {
if !processingDone {
// processing is not done yet when timer fires, send notification email
_ = workflow.ExecuteActivity(ctx, SendEmailActivity).Get(ctx, nil)
}
})
// wait the timer or the order processing to finish
selector.Select(ctx)
We create timers with the workflow.NewTimer
API.
Use Selectors with Channels
selector.AddReceive(channel, func(c workflow.ReceiveChannel, more bool) {})
is the primary mechanism which receives messages from Channels
.
// API Example: receive information from a Channel
var signalVal string
channel := workflow.GetSignalChannel(ctx, channelName)
selector.AddReceive(channel, func(c workflow.ReceiveChannel, more bool) {
c.Receive(ctx, &signalVal)
// do something with received information
})
Merely matching on the channel doesn't consume the message; it has to be explicitly consumed with a c.Receive(ctx, &signalVal)
call.
Query Selector state
You can use the selector.HasPending
API to ensure that signals are not lost when a Workflow is closed (e.g. by ContinueAsNew
).
Learn more
Usage of Selectors is best learned by example:
- Setting up a race condition between an Activity and a Timer, and conditionally execute (Timer example)
- Receiving information in a Channel (Mutex example)
- Looping through a list of work and scheduling them all in parallel (DSL example)
- Executing activities in parallel, pick the first result, cancel remainder (Pick First example)