Uber has adopted Go as its primary programming language for developing microservices and has a post on its blog called Data Race Patterns in Go where they talk about data races found in their Go codebase.
I was reading it and thought to myself that many of the problems presented in the post would not even compile in Rust. Can Rust help us avoid writing code with common data races?
Examples written in Rust are not meant do be idiomatic Rust and do not wait for outputs generated by tasks for simplicity because the examples written in Go do not wait as well.
What is a data race
A data race happens when a task tries to access memory while another task tries to write to it at the same time. The task that’s accessing memory may read a value that was just modified, is being modified or will be modified and end up processing an unexpected value.
Loop index variable capture
Go has the range loop to iterate over a collection.
for index, value := range list {
...
}
index
is the index of the element in the list and value
is the value at that index. The problem with it is that value
is not a new variable at each iteration, it is just modified to have the value of the current iteration. This behaviour in combination with the fact that closures capture variables by reference instead of by copy can easily lead to concurrency bugs.
While ProcessJob
is running, the range
loop will be updating the job
variable which was captured by reference by ProcessJob
, it is possible that by that ProcessJob
gets to run, job
will be a reference to the last job
in jobs
because the variable has been updated.
for _, job := range jobs {
go func(){
ProcessJob(job)
}()
}
There’s a proposal to change for range
semantics in Go.
Rust provides concurrency through stackless coroutines, also known as async
and await
, the only thing we need to do is to choose a async runtime. I chose to use tokio which is the most popular runtime in the Rust ecosystem at the moment.
We can iterate through any collection that implements the Iterator trait by using the for in
construct.
for job in jobs {
tokio::spawn(process_job(job));
}
for job in jobs
is syntax sugar for for job in jobs.into_iter()
– it also is syntax sugar for advancing an iterator, but we don’t care about that – which lets us iterate over the values in a collection by consuming the collection.
We could iterate over references to the values inside of the collection by using for job in jobs.iter()
which creates an iterator that yields a reference to each value of the collection, if needed but the code would not compile in this case because a task(think goroutine) may run for longer than the amount of time the value being referenced by it lives for.
error[E0597]: `jobs` does not live long enough
--> src/for_range_and_closures.rs:6:16
|
6 | for job in jobs.iter() {
| ^^^^^^^^^^^ borrowed value does not live long enough
7 | tokio::spawn(process_job(job));
| ---------------- argument requires that `jobs` is borrowed for `'static`
8 | }
9 | }
| - `jobs` dropped here while still borrowed
There’s no need to use a closure but for completeness sake here it is and it would not compile as well.
error[E0597]: `jobs` does not live long enough
--> src/for_range_and_closures.rs:14:16
|
14 | for job in jobs.iter() {
| ^^^^^^^^^^^ borrowed value does not live long enough
15 | tokio::spawn((|| async { process_job(job) })());
| -------------------------- returning this value requires that `jobs` is borrowed for `'static`
16 | }
17 | }
| - `jobs` dropped here while still borrowed
Data race due to idiomatic err variable capture
Errors are values in Go and failable functions usually return a tuple with two values, one value being the result if the function succeeded and the other being an error if the function failed.
In this example, processing is happening concurrently between the current function and a goroutine. We want to return an error if the goroutine or the current function fails, so they both try to assign to the err
variable so the error can be returned to the caller of the current function.
x, err := Foo()
if err != nil {
...
}
go func() {
var y int
// Oops, err was captured by reference.
y, err = Bar()
if err != nil {
...
}
}()
var z int
// err is being written to by the goroutine as well.
z, err = Baz()
if err != nil {
...
}
This is an example of a data race because the err
variable is being modified by the current function and by the goroutine without synchronization.
As expected, the same example does not compile in Rust for several reasons.
error[E0373]: async block may outlive the current function, but it borrows `result`, which is owned by the current function
--> src/goroutine_var_reference_data_race.rs:41:24
|
41 | tokio::spawn(async {
| ________________________^
42 | | // Trying to assign to the `result` captured .
43 | | result = bar();
| | ------ `result` is borrowed here
44 | | if result.is_err() {
45 | | // ...
46 | | }
47 | | });
| |_____^ may outlive borrowed value `result`
|
= note: async blocks are not executed immediately and must either take a reference or ownership of outside variables they use
help: to force the async block to take ownership of `result` (and any other referenced variables), use the `move` keyword
|
41 | tokio::spawn(async move {
| ++++
error[E0506]: cannot assign to `result` because it is borrowed
--> src/goroutine_var_reference_data_race.rs:51:5
|
41 | tokio::spawn(async {
| _____-__________________-
| |_____|
| ||
42 | || // Trying to assign to the `result` captured .
43 | || result = bar();
| || ------ borrow occurs due to use in generator
44 | || if result.is_err() {
45 | || // ...
46 | || }
47 | || });
| ||_____-- argument requires that `result` is borrowed for `'static`
| |_____|
| borrow of `result` occurs here
...
51 | result = baz();
| ^^^^^^ assignment to borrowed `result` occurs here
error[E0502]: cannot borrow `result` as immutable because it is also borrowed as mutable
--> src/goroutine_var_reference_data_race.rs:52:8
|
41 | tokio::spawn(async {
| _____-__________________-
| |_____|
| ||
42 | || // Trying to assign to the `result` captured .
43 | || result = bar();
| || ------ first borrow occurs due to use of `result` in generator
44 | || if result.is_err() {
45 | || // ...
46 | || }
47 | || });
| ||_____-- argument requires that `result` is borrowed for `'static`
| |_____|
| mutable borrow occurs here
...
52 | if result.is_err() {
| ^^^^^^^^^^^^^^^ immutable borrow occurs here
We could try to translate the Go version to Rust and make it work by using atomic reference counting with Arc and a Mutex.
let result = foo();
if result.is_err() {
// ...
}
let result = Arc::new(Mutex::new(result));
let result_clone = Arc::clone(&result);
let task = tokio::spawn(async move {
let mut result = result_clone.lock().await;
*result = bar();
if result.is_err() {
// ...
}
});
{
let mut result = result.lock().await;
*result = baz();
if result.is_err() {
// ...
}
}
task.await;
Arc::try_unwrap(result).unwrap().into_inner()
Bugs due to copies of a slice’s internal state
Go slices are growable lists made of a pointer to a buffer, a capacity and a length. We can add elements to a slice by calling append, if the slice is full, it will grow to accommodate the new element.
The problem here is that by passing myResults
as argument to the closure, we are copying the length, the capacity and the pointer to the buffer of the slice because function arguments are passed by copy in Go.
When we try to append to myResults
, the copy the goroutine holds may have the wrong length and capacity because myResults
may have need to grow when another goroutine appended to it.
func ProcessAll(uuids []string) {
var myResults []string
var mutex sync.Mutex
safeAppend := func(res string) {
mutex.Lock()
myResults = append(myResults, res)
mutex.Unlock()
}
for _, uuid := range uuids {
go func(id string, results []string) {
res := Foo(id)
safeAppend(res)
}(uuid, myResults)
}
}
Like Go, Rust function arguments are passed by copy as well but unlike Go, Rust’s Vec, the growable array, is not passed by reference by default.
If we try to translate the Go code to Rust, it does not even compile. We create a closure safe_append
to ensure the mutex is always locked before modyfing the list of results, it does not compile because we try to use the closure inside several tasks but the closure gets moved after the first loop iteration.
fn process_all(uuids: Vec<String>) {
let mut my_results = Vec::new();
let mutex = Mutex::new(());
let mut safe_append = |res: String| async move {
mutex.lock().await;
my_results.push(res);
};
for uuid in uuids {
tokio::spawn(async {
let res = foo(uuid);
safe_append(res);
});
}
}
error[E0382]: use of moved value: `safe_append`
--> src/mutex_slice_append.rs:34:28
|
34 | tokio::spawn(async {
| ____________________________^
35 | | let res = foo(uuid);
36 | | safe_append(res);
| | ----------- use occurs due to use in generator
37 | | });
| |_________^ value moved here, in previous iteration of loop
The solution is simple as well. Rust’s Mutex is meant to hold the data being protected instead of just acting like a type of flag. Use reference counting so each task can access the same mutex and modify the data held by it.
fn process_all_2(uuids: Vec<String>) {
let my_results = Arc::new(Mutex::new(Vec::new()));
for uuid in uuids {
let my_results_clone = Arc::clone(&my_results);
tokio::spawn(async move {
let res = foo(uuid);
let mut my_results = my_results_clone.lock().await;
my_results.push(res);
});
}
}
The mutex is unlocked automatically on Drop thanks to RAII.
Data races because maps are not thread-safe
In this example, orders are being processed concurrently and eventual errors are added to a map where the key is the order id and the error is the value so we can know which orders were not processed.
The problem is that the map is not thread-safe which means that since there are several goroutines modifying the map without synchronization it may end up in an unexpected state.
func processOrders(uuids []string) error {
var errMap = make(map[string]error)
for _, uuid := range uuids {
go func(uuid string) {
orderHandle, err := GetOrder(uuid)
if err != nil {
// Data race
errMap[uuid] = err
return
}
...
}(uuid)
}
return combineErrors(errMap)
}
As expected, the same code does not compile in Rust because multiple tasks(think goroutine) may not have mutable access to a value at the same time without synchronization.
fn process_orders(uuids: Vec<String>) -> Result<(), Box<dyn std::error::Error>> {
let mut err_map = HashMap::new();
for uuid in uuids {
tokio::spawn(async {
match get_order(&uuid).await {
Err(err) => {
err_map.insert(uuid, err);
}
Ok(value) => {
// ...
}
}
});
}
combine_errors(err_map)
}
error[E0499]: cannot borrow `err_map` as mutable more than once at a time
--> src/thread_unsafe_hashmap.rs:23:28
|
23 | tokio::spawn(async {
| _________-__________________^
| |_________|
| ||
24 | || match get_order(&uuid).await {
25 | || Err(err) => {
26 | || err_map.insert(uuid, err);
| || ------- borrows occur due to use of `err_map` in generator
... ||
31 | || }
32 | || });
| ||_________^- argument requires that `err_map` is borrowed for `'static`
| |_________|
| `err_map` was mutably borrowed here in the previous iteration of the loop
The correct version is also pretty simple in this case. Use a mutex to protect the data so it can be mutate by several tasks concurrently and reference counting to ensure the every task operates on the data guarded by the same mutex.
fn process_orders_2(uuids: Vec<String>) -> Result<(), Box<dyn std::error::Error>> {
let mut err_map = Arc::new(Mutex::new(HashMap::new()));
for uuid in uuids {
let err_map_clone = Arc::clone(&err_map);
tokio::spawn(async move {
match get_order(&uuid).await {
Err(err) => {
let mut err_map = err_map_clone.lock().await;
err_map.insert(uuid, err);
}
Ok(value) => {
// ...
}
}
});
}
combine_errors(&err_map)
}
Breaking a mutex by copying it around
Function arguments are copied in Go. Its not uncommon to commit the mistake of passing a mutex by copy to multiple goroutines in order to synchronize access to a piece of data, the problem is that a mutex will be copied – including its internal state – when passed to a goroutine, which means each goroutine will have each its own mutex and all of them will be able to acquire it at the same time.
var a int
func CriticalSection(m synx.Mutex) {
m.Lock()
a += 1
m.Unlock()
}
func main() {
mutex := sync.Mutex{}
go CriticalSection(mutex) // mutex is copied
go CriticalSection(mutex) // mutex is copied
}
Rust mutexes are supposed to hold the data they protect instead of acting as a flag and when a variable is passed as argument to a function, we say that the value has been moved and it cannot be accessed using the old variable anymore, for this reason, the code does not compile.
use tokio::sync::Mutex;
fn main() {
let mutex = Mutex::new(0);
tokio::spawn(critical_section(mutex));
tokio::spawn(critical_section(mutex));
}
async fn critical_section(mutex: Mutex<i32>) {
let mut value = mutex.lock().await;
*value += 1;
}
--> src/mutex.rs:21:35
|
19 | let mutex = Mutex::new(0);
| ----- move occurs because `mutex` has type `tokio::sync::Mutex<i32>`, which does not implement the `Copy` trait
20 | tokio::spawn(critical_section(mutex));
| ----- value moved here
21 | tokio::spawn(critical_section(mutex));
| ^^^^^ value used here after move
The incorrect version does not compile and the correct version is pretty easy to write using reference couting to ensure every task uses the same mutex.
fn main() {
let mutex = Arc::new(Mutex::new(0));
tokio::spawn(critical_section(Arc::clone(&mutex)));
tokio::spawn(critical_section(Arc::clone(&mutex)));
}
async fn critical_section(mutex: Arc<Mutex<i32>>) {
let mut value = mutex.lock().await;
*value += 1;
}
Conclusion
Rust can not stop us from making every kind of mistake but it does seem like it can help us avoid at least some concurrency bugs.