Limit concurrent function executions using Go channels


This article will explain how to limit concurrent function execution using Go channels. I worked on a Go service that executes another program inside its HTTP request handler. The program that is executed uses a lot of CPU usage and memory usage since it works with photos and videos so I had to limit how many of them could be running at the same time.

Luckily, Go channels come very handy in situations like these. We will also add timeout support so that our waiting goroutines don’t block indefinitely. Buffered Go channels will be needed to develop this so let’s remember what they are.

Buffered Go channels

Buffered channels can hold a limited number of values (determined by the buffer size), and will only block the sending goroutine when the buffer is full. This can allow for some additional concurrency, but requires careful consideration to avoid deadlocks and other synchronization issues.

Buffered channels are an ideal solution for what we are going to build in this article.

Limiting concurrent function execution

We start by creating a new package called limiter and we define a data structure for Limiter type. We will provide a limit and a timeout when creating a Limiter and the data structure will hold that timeout and a buffered channel created with provided limit.

Translated to code we can write something like this:

type Limiter struct {
	timeout time.Duration
	ch      chan struct{}
}

func NewLimiter(limit int, timeout time.Duration) *Limiter {
	return &Limiter{
		timeout: timeout,
		ch:      make(chan struct{}, limit),
	}
}

We now need to define how a Limiter type will be used. I thought that it would be the best if we could pass a function that takes no arguments and returns no values to the limiter. If the buffer of a buffered channel is not full then the passed function will be executed. If the buffer is full then the caller will wait for a timeout that was defined when Limiter was created. If a slot is freed then the function will be executed and if time for timeout passes an error will be returned.

The buffered channel is created with the type of an empty struct. Empty structs are useful for signaling where data is not important because their memory footprint is 0. You can find more on empty structs reading this great article.

We can see the definition of Execute function in the code below.

func (l *Limiter) Execute(f func()) error {
	select {
	case l.ch <- struct{}{}:
		// Added execution for running...
	case <-time.After(l.timeout):
		return errors.New("execution timed out")
	}

	// On exit, remove this execution from limit channel
	defer func() {
		<-l.ch
	}()

	f()

	return nil
}

Execute function first tries to insert the empty struct into the buffered channel. If the channel is not full, the select statement will continue and the provided function will be executed. But, if the channel is full the select statement will wait and two things can happen. The buffered channel will be emptied below its capacity and we will execute the provided function. If the amount of time passes as defined in the timeout value of the limiter the select statement will enter the case where it returns the error. In that case, function will not be executed. Before invoking the provided function we add a function with defer statement that will be executed when Execute completes. This function will that the buffered channel is being emptied.

This is all for limiter, a struct that can limit function calls. I would also like to point that in a select statement you can define custom time intervals using channels like in the example code above. This can be very useful to check some things periodically or to do timeouts like in this example.

Here is an example on how to use the structure that we’ve defined:

// 1. Define the limiter
l := limiter.NewLimiter(5, 20*time.Second)

// 2. Somewhere in your code you can add a function for execution	
l.Execute(func() {
	// Processing..
})

If you want to see the entire file I’ve created a gist on GitHub so you can visit this link or copy the code from the GitHub embed below.

Thank you for reading this article and I hope that it helped you to solve your problem or learn something new.

Join the discussion

comments powered by Disqus