Uber has adopted Go as its primary programming language for developing microservices and has a post on its blog called Data Race Patterns in Go where they talk about data races found in their Go codebase.

I was reading it and thought to myself that many of the problems presented in the post would not even compile in Rust. Can Rust help us avoid writing code with common data races?

Examples written in Rust are not meant do be idiomatic Rust and do not wait for outputs generated by tasks for simplicity because the examples written in Go do not wait as well.

What is a data race

A data race happens when a task tries to access memory while another task tries to write to it at the same time. The task that’s accessing memory may read a value that was just modified, is being modified or will be modified and end up processing an unexpected value.

Loop index variable capture

Go has the range loop to iterate over a collection.

for index, value := range list {
  ...
}

index is the index of the element in the list and value is the value at that index. The problem with it is that value is not a new variable at each iteration, it is just modified to have the value of the current iteration. This behaviour in combination with the fact that closures capture variables by reference instead of by copy can easily lead to concurrency bugs.

While ProcessJob is running, the range loop will be updating the job variable which was captured by reference by ProcessJob, it is possible that by that ProcessJob gets to run, job will be a reference to the last job in jobs because the variable has been updated.

for _, job := range jobs {
  go func(){
    ProcessJob(job)
  }()
}

There’s a proposal to change for range semantics in Go.

Rust provides concurrency through stackless coroutines, also known as async and await, the only thing we need to do is to choose a async runtime. I chose to use tokio which is the most popular runtime in the Rust ecosystem at the moment.

We can iterate through any collection that implements the Iterator trait by using the for in construct.

for job in jobs {
  tokio::spawn(process_job(job));
}

for job in jobs is syntax sugar for for job in jobs.into_iter() – it also is syntax sugar for advancing an iterator, but we don’t care about that – which lets us iterate over the values in a collection by consuming the collection.

We could iterate over references to the values inside of the collection by using for job in jobs.iter() which creates an iterator that yields a reference to each value of the collection, if needed but the code would not compile in this case because a task(think goroutine) may run for longer than the amount of time the value being referenced by it lives for.

error[E0597]: `jobs` does not live long enough
 --> src/for_range_and_closures.rs:6:16
  |
6 |     for job in jobs.iter() {
  |                ^^^^^^^^^^^ borrowed value does not live long enough
7 |         tokio::spawn(process_job(job));
  |                      ---------------- argument requires that `jobs` is borrowed for `'static`
8 |     }
9 | }
  | - `jobs` dropped here while still borrowed

There’s no need to use a closure but for completeness sake here it is and it would not compile as well.

error[E0597]: `jobs` does not live long enough
  --> src/for_range_and_closures.rs:14:16
   |
14 |     for job in jobs.iter() {
   |                ^^^^^^^^^^^ borrowed value does not live long enough
15 |         tokio::spawn((|| async { process_job(job) })());
   |                          -------------------------- returning this value requires that `jobs` is borrowed for `'static`
16 |     }
17 | }
   | - `jobs` dropped here while still borrowed

Data race due to idiomatic err variable capture

Errors are values in Go and failable functions usually return a tuple with two values, one value being the result if the function succeeded and the other being an error if the function failed.

In this example, processing is happening concurrently between the current function and a goroutine. We want to return an error if the goroutine or the current function fails, so they both try to assign to the err variable so the error can be returned to the caller of the current function.

x, err := Foo()
if err != nil {
  ...
}

go func() {
  var y int
  // Oops, err was captured by reference.
  y, err = Bar()
  if err != nil {
    ...
  }
}()

var z int
// err is being written to by the goroutine as well.
z, err = Baz()
if err != nil {
  ...
}

This is an example of a data race because the err variable is being modified by the current function and by the goroutine without synchronization.

As expected, the same example does not compile in Rust for several reasons.

error[E0373]: async block may outlive the current function, but it borrows `result`, which is owned by the current function
  --> src/goroutine_var_reference_data_race.rs:41:24
   |
41 |       tokio::spawn(async {
   |  ________________________^
42 | |         // Trying to assign to the `result` captured .
43 | |         result = bar();
   | |         ------ `result` is borrowed here
44 | |         if result.is_err() {
45 | |             // ...
46 | |         }
47 | |     });
   | |_____^ may outlive borrowed value `result`
   |
   = note: async blocks are not executed immediately and must either take a reference or ownership of outside variables they use
help: to force the async block to take ownership of `result` (and any other referenced variables), use the `move` keyword
   |
41 |     tokio::spawn(async move {
   |                        ++++

error[E0506]: cannot assign to `result` because it is borrowed
  --> src/goroutine_var_reference_data_race.rs:51:5
   |
41 |        tokio::spawn(async {
   |   _____-__________________-
   |  |_____|
   | ||
42 | ||         // Trying to assign to the `result` captured .
43 | ||         result = bar();
   | ||         ------ borrow occurs due to use in generator
44 | ||         if result.is_err() {
45 | ||             // ...
46 | ||         }
47 | ||     });
   | ||_____-- argument requires that `result` is borrowed for `'static`
   |  |_____|
   |        borrow of `result` occurs here
...
51 |        result = baz();
   |        ^^^^^^ assignment to borrowed `result` occurs here

error[E0502]: cannot borrow `result` as immutable because it is also borrowed as mutable
  --> src/goroutine_var_reference_data_race.rs:52:8
   |
41 |        tokio::spawn(async {
   |   _____-__________________-
   |  |_____|
   | ||
42 | ||         // Trying to assign to the `result` captured .
43 | ||         result = bar();
   | ||         ------ first borrow occurs due to use of `result` in generator
44 | ||         if result.is_err() {
45 | ||             // ...
46 | ||         }
47 | ||     });
   | ||_____-- argument requires that `result` is borrowed for `'static`
   |  |_____|
   |        mutable borrow occurs here
...
52 |        if result.is_err() {
   |           ^^^^^^^^^^^^^^^ immutable borrow occurs here

We could try to translate the Go version to Rust and make it work by using atomic reference counting with Arc and a Mutex.

    let result = foo();
    if result.is_err() {
        // ...
    }

    let result = Arc::new(Mutex::new(result));

    let result_clone = Arc::clone(&result);
    let task = tokio::spawn(async move {
        let mut result = result_clone.lock().await;
        *result = bar();
        if result.is_err() {
            // ...
        }
    });

    {
        let mut result = result.lock().await;
        *result = baz();
        if result.is_err() {
            // ...
        }
    }

    task.await;

    Arc::try_unwrap(result).unwrap().into_inner()

Bugs due to copies of a slice’s internal state

Go slices are growable lists made of a pointer to a buffer, a capacity and a length. We can add elements to a slice by calling append, if the slice is full, it will grow to accommodate the new element.

The problem here is that by passing myResults as argument to the closure, we are copying the length, the capacity and the pointer to the buffer of the slice because function arguments are passed by copy in Go.

When we try to append to myResults, the copy the goroutine holds may have the wrong length and capacity because myResults may have need to grow when another goroutine appended to it.

func ProcessAll(uuids []string) {
  var myResults []string
  var mutex sync.Mutex
  safeAppend := func(res string) {
    mutex.Lock()
    myResults = append(myResults, res)
    mutex.Unlock()
  }

  for _, uuid := range uuids {
    go func(id string, results []string) {
      res := Foo(id)
      safeAppend(res)
    }(uuid, myResults)
  }
}

Like Go, Rust function arguments are passed by copy as well but unlike Go, Rust’s Vec, the growable array, is not passed by reference by default.

If we try to translate the Go code to Rust, it does not even compile. We create a closure safe_append to ensure the mutex is always locked before modyfing the list of results, it does not compile because we try to use the closure inside several tasks but the closure gets moved after the first loop iteration.

fn process_all(uuids: Vec<String>) {
    let mut my_results = Vec::new();
    let mutex = Mutex::new(());

    let mut safe_append = |res: String| async move {
        mutex.lock().await;
        my_results.push(res);
    };

    for uuid in uuids {
        tokio::spawn(async {
            let res = foo(uuid);
            safe_append(res);
        });
    }
}

error[E0382]: use of moved value: `safe_append`
  --> src/mutex_slice_append.rs:34:28
   |
34 |           tokio::spawn(async {
   |  ____________________________^
35 | |             let res = foo(uuid);
36 | |             safe_append(res);
   | |             ----------- use occurs due to use in generator
37 | |         });
   | |_________^ value moved here, in previous iteration of loop

The solution is simple as well. Rust’s Mutex is meant to hold the data being protected instead of just acting like a type of flag. Use reference counting so each task can access the same mutex and modify the data held by it.

fn process_all_2(uuids: Vec<String>) {
    let my_results = Arc::new(Mutex::new(Vec::new()));

    for uuid in uuids {
        let my_results_clone = Arc::clone(&my_results);
        tokio::spawn(async move {
            let res = foo(uuid);
            let mut my_results = my_results_clone.lock().await;
            my_results.push(res);
        });
    }
}

The mutex is unlocked automatically on Drop thanks to RAII.

Data races because maps are not thread-safe

In this example, orders are being processed concurrently and eventual errors are added to a map where the key is the order id and the error is the value so we can know which orders were not processed.

The problem is that the map is not thread-safe which means that since there are several goroutines modifying the map without synchronization it may end up in an unexpected state.

func processOrders(uuids []string) error {
  var errMap = make(map[string]error)
  for _, uuid := range uuids {
    go func(uuid string) {
      orderHandle, err := GetOrder(uuid)
      if err != nil {
        // Data race
        errMap[uuid] = err
        return
      }
      ...
    }(uuid)
  }
  return combineErrors(errMap)
}

As expected, the same code does not compile in Rust because multiple tasks(think goroutine) may not have mutable access to a value at the same time without synchronization.

fn process_orders(uuids: Vec<String>) -> Result<(), Box<dyn std::error::Error>> {
    let mut err_map = HashMap::new();

    for uuid in uuids {
        tokio::spawn(async {
            match get_order(&uuid).await {
                Err(err) => {
                    err_map.insert(uuid, err);
                }
                Ok(value) => {
                    // ...
                }
            }
        });
    }

    combine_errors(err_map)
}

error[E0499]: cannot borrow `err_map` as mutable more than once at a time
  --> src/thread_unsafe_hashmap.rs:23:28
   |
23 |            tokio::spawn(async {
   |   _________-__________________^
   |  |_________|
   | ||
24 | ||             match get_order(&uuid).await {
25 | ||                 Err(err) => {
26 | ||                     err_map.insert(uuid, err);
   | ||                     ------- borrows occur due to use of `err_map` in generator
...  ||
31 | ||             }
32 | ||         });
   | ||_________^- argument requires that `err_map` is borrowed for `'static`
   |  |_________|
   |            `err_map` was mutably borrowed here in the previous iteration of the loop

The correct version is also pretty simple in this case. Use a mutex to protect the data so it can be mutate by several tasks concurrently and reference counting to ensure the every task operates on the data guarded by the same mutex.

fn process_orders_2(uuids: Vec<String>) -> Result<(), Box<dyn std::error::Error>> {
    let mut err_map = Arc::new(Mutex::new(HashMap::new()));

    for uuid in uuids {
        let err_map_clone = Arc::clone(&err_map);
        tokio::spawn(async move {
            match get_order(&uuid).await {
                Err(err) => {
                    let mut err_map = err_map_clone.lock().await;
                    err_map.insert(uuid, err);
                }
                Ok(value) => {
                    // ...
                }
            }
        });
    }

    combine_errors(&err_map)
}

Breaking a mutex by copying it around

Function arguments are copied in Go. Its not uncommon to commit the mistake of passing a mutex by copy to multiple goroutines in order to synchronize access to a piece of data, the problem is that a mutex will be copied – including its internal state – when passed to a goroutine, which means each goroutine will have each its own mutex and all of them will be able to acquire it at the same time.

var a int

func CriticalSection(m synx.Mutex) {
  m.Lock()
  a += 1
  m.Unlock()
}

func main() {
  mutex := sync.Mutex{}
  go CriticalSection(mutex) // mutex is copied
  go CriticalSection(mutex) // mutex is copied
}

Rust mutexes are supposed to hold the data they protect instead of acting as a flag and when a variable is passed as argument to a function, we say that the value has been moved and it cannot be accessed using the old variable anymore, for this reason, the code does not compile.

use tokio::sync::Mutex;

fn main() {
    let mutex = Mutex::new(0);
    tokio::spawn(critical_section(mutex));
    tokio::spawn(critical_section(mutex));
}

async fn critical_section(mutex: Mutex<i32>) {
    let mut value = mutex.lock().await;
    *value += 1;
}

  --> src/mutex.rs:21:35
   |
19 |     let mutex = Mutex::new(0);
   |         ----- move occurs because `mutex` has type `tokio::sync::Mutex<i32>`, which does not implement the `Copy` trait
20 |     tokio::spawn(critical_section(mutex));
   |                                   ----- value moved here
21 |     tokio::spawn(critical_section(mutex));
   |                                   ^^^^^ value used here after move

The incorrect version does not compile and the correct version is pretty easy to write using reference couting to ensure every task uses the same mutex.

fn main() {
    let mutex = Arc::new(Mutex::new(0));
    tokio::spawn(critical_section(Arc::clone(&mutex)));
    tokio::spawn(critical_section(Arc::clone(&mutex)));
}

async fn critical_section(mutex: Arc<Mutex<i32>>) {
    let mut value = mutex.lock().await;
    *value += 1;
}

Conclusion

Rust can not stop us from making every kind of mistake but it does seem like it can help us avoid at least some concurrency bugs.