REBEL SOURCE
GuideFeedback

Content

Ad hoc con ... Lexical co ... Example: W ... What do we ... Implementa ... Waiting fo ... Full imple ... Conclusion
Avatar image of Mario Škrlec

Goroutine confinment

In this blog post, we will examine ad hoc and lexical confinement of data used by goroutines and study it with a producer/consumer pattern.

gogolang
Goroutine confinment

Image provided by Unsplash from Nicolas Houdayer. Download the image and support the author!

I learned of these terms from a great book Concurrency in Go. Be sure to check it out.

Ad hoc confinement

Ad hoc confinement is basically our first try at handling some data with goroutines. We just want to “do something” and we are not thinking about data accessibility or weather a piece of code should access a channel or not. You can find a basic example below:

This is a fine first try but there are some problems:

 

  1. data variable is accessible within loopData anonymous function and any other function within that scope but we are only accessing it within loopData.
  2. anyone can access receiver channel and consume data from it.

Lexical confinement

What we actually have here is a producer and a consumer but both of them can leak data to other parts of the program. Also, it is not clear immediately from reading code that something is a producer and something is a consumer. What we need to do is wrap the two goroutines into regular functions and hide the implementation from the client code.

From the code above, it is very clear what different parts of this program are doing. Also, producer is returning read channel only, therefor, any part of the program that is using that channel can only read from it. This is lexical confinement since it forces us to use the interface our functions define. 

Example: Worker pool

A worker pool is probably one of the first design patterns that you create when working with Go. It is a very useful design pattern with which you can process a lot of data at the same time depending on how many workers you create. In our example, we will close the worker pool in a lexical confinement and ensure that our data is safe from corruption by other parts of the program. 

Before we start diving into the code, it's important to say that the implementation could be done with generics but, for the sake of simplicity and readability, I made it to be with integers only. 

What do we want to do?

We want to define a function (producer) that will enable us to send the data to our consumers. Consumers will then process the data and/or send the processed data to our wait function that will then decide what to do with the processed data. Wait function can be empty since it waits for all the producers to finish. It is also important to stop producing data and stop all workers while still enabling them to finish already started work. Stopping the workers is the responsibility of the producer.  

Let's first start with the type that we will be working with.

Let's examine our Worker struct. workerNum represents the number of workers that we will create to consume our data. We will use our producerStream to send data to our workers that will then pass that data to our consumers. So let's first start with our producer.

Our Produce method will spawn a goroutine that will call our producer function. producer function is a function that the calling code defines and contains our logic to produce data. The second argument to this function is the stop function. As we said earlier, it is the responsibility of the producer to close all workers. You will do this from the code you write. If this is unclear, don't worry. Let's look at a possible implementation of our producer.

As soon as you call stopFn, all workers will be stopped and all channels closed.

Our consumer is the place where we spawn our workers and start processing our values that the producer is creating and consuming it with the provided consumer function that will receive the producer value.

On the client code, this would look something like this:

Implementation

If you run this code, it will have enough time to process all the values but that is only because we don't have any blocking utility. That is why we are using time.Sleep to wait for consumers to process the data. Here, you have a choice to create wait utilities yourself or we can wait as part of our Worker implementation.

Wait will consume data from consumerStream, effectively blocking until the producer closes the producerStream channel. After that happens, the code execution will continue. Let's replace time.Sleep with Wait and see what happens.

If you run this program, you will see that not all values are consumed successfully. That is because our producer is calling stopFn which closes both producerStream and consumerStream. That means that our producerStream still has values to send to the consumer stream but since it's closed, it cannot send them. Since consumerStream is also closed, it cannot process them. What we need to do is implement a mechanism that waits for all consumers to finish processing and then stops both producer and consumer.

Waiting for consumers to finish

We will do this by creating a done channel for every worker that we create. Worker producer will wait until all done channels send a signal that consumer is done processing. Consumer will send this signal after stopFn is called and producerStream is closed. Let's start by modifying our Worker struct.

As we said earlier, we are creating as many done channels as there are workers. Next, we will modify our Consume method to send these signals for every worker. 

It is important to remember here that, when stopFn is called, producerStream is closed so our range loop stop, and doneStream sends the signal that this particular worker has stopped processing. Next, let's modify our Producer method to wait for all consumers to finish. 

Full implementation

Now we are ready to see the full implementation of our worker. 

Every time you run this code, it will process all the values that you produce and wait for all currently working consumers to finish what they are doing. If you put this worker into a separate package, your client code will consist of a couple of lines of code while implementation details are hidden within your Worker struct.

Conclusion

I found out about lexical confinement while reading Concurrency in Go book but I wrote this worker for some other project that I was doing a couple of days ago. Creating it this way seemed to make more sense since I wanted to hide the implementation details of sending/consuming/blocking away from my business logic. This implementation uses integers as worker data but it could be generic. 

I hope you enjoyed this blog post. If you think this is a bad implementation of a worker pattern, please do tell in the comments. I am always open to criticism and new ideas. 

Avatar image of Mario Škrlec
Like
Tweet
Share
Copy link

Hello, visitor.

This blogging platform is created specifically for software developers. We aim to support many more programming languages and development environments but for that, we need your support. If you like this blogging platform, consider using it to write your blogs.

We tried to make your experience of creating blog as painless as possible soSign in and give it a try.

REBEL SOURCE

Guide

RebelSource 2022. All rights reserved.