Concurrency

目标

Mutual exclusion (e.g., A and B don’t run at same time),进程的互斥

  • solved with locks

Ordering (e.g., B runs after A does something),进程的同步

  • solved with condition variables OR semaphores

CS 537 Videos 小林coding

可以理解为 Mutex. 它只是一个变量,只有 available/acquired 两种状态,每个线程可以获取/释放它。 Reference: OSTEP-Locks

锁设计的原则

  1. Mutual exclusion: Only one thread (at most) in critical section at a time
  2. Progress (deadlock-free): If several simultaneous requests, must allow one to proceed
  3. Bounded (starvation-free): Must eventually allow each waiting thread to enter
  4. Fairness: Each thread waits for (roughly) same amount of time
  5. Performance: CPU is not used unnecessarily (e.g., spinning)

Failed Attempt 1: Turn off interrupts

这是一个硬件方法。在 critical section, 禁用 interrupts,这样防止dispatcher做context switch. 缺点:不适合多处理器;不适用于用户进程,只适用于内核态,因为用户无法直接禁用开启interrupts

Failed Attempt 2: Test And Set

Alt text 也就是用一个标志来当作锁,错误的根本原因是,test和set这两个操作是分开的,不是atomic的

可行但过时: Peterson’s Algorithm

Alt text 在进入critical section之前,双方都可表明意愿,并且可通过 set turn为对方的id来表达谦让。 最后表达谦让的那个线程,谦让成功。 比起这个软件方法,有更好的硬件方法实现锁,所以它过时了。

硬件方法: test-and-set

1
2
3
void lock(lock_t *lock) {
    while (TestAndSet(&lock->flag, 1) == 1); // spin-wait (do nothing)
}

TestAndSet原子性地完成两件事:返回lock->flag的旧值0(test发现这把锁available),并把它的新值设为1,代表获取这把锁。

硬件方法: Compare-And-Swap

1
2
3
4
void acquire(lock_t *lock) { 
	while(CompareAndSwap(&lock->flag, 0, 1) == 1) ; 
	// spin-wait (do nothing) 
} 

如果旧lock-flag的值是我们希望的0,则把它设成 1;无论如何,返回旧值。

Spinning占CPU?yield()

1
2
3
4
5
void lock(lock_t *lock) {
    while (TestAndSet(&lock->flag, 1) == 1){
        yield();
    }
}

获取锁失败,就不等了,借助 OS 的 system call, yield()来放弃一直 spinning,让 OS 做一下 context switch给别的线程运行。

但是需要权衡 spin的开销和context switch的开销。 而且如果有大量线程都想获取锁,很有可能某个线程每次都获取失败,yield给别人,造成自己的 starvtion.(spin同理)

spin->sleep

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
void lock(lock_t *m) {
    while (TestAndSet(&m->guard, 1) == 1)
        ; // acquire guard lock by spinning
    if (m->flag == 0) {
        m->flag = 1; // lock is acquired
        m->guard = 0;
    } else {
        queue_add(m->q, gettid());
        m->guard = 0;
        park();
    }
}

void unlock(lock_t *m) { 
    while (TestAndSet(&m->guard, 1) == 1)
        ; // acquire guard lock by spinning
    if (queue_empty(m->q))
        m->flag = 0; // let go of lock; no one wants it
    else
        unpark(queue_remove(m->q)); // hold lock (for next thread!)
        // m->flag = 0; WE CAN'T DO THIS!!!
    m->guard = 0;
}

有两个锁,guard和flag。guard 是加在flag之外的,通过它的spinning来保证对于flag的操作是互斥的。

guard也不会spin太长时间,因为操作完flag,就会立即把guard的值设回0.

Solaris操作系统提供了两个atomic function: park(), unpark(threadId), 一旦有线程获取flag失败,它就会把自己加入到队列中,状态是 blocked。等待其它线程执行完 critical section,unlock flag锁时,直接把它从队列中唤醒,让它开始执行。(不去把flag设成 0,而是保持 1,表示刚被唤醒的线程正在执行。)

为什么线程A唤醒一个线程B时,不把flag设成0? 如果设成0,表明这把锁暂时没人用,但是线程B此时没有guard, guard在A手上。因此线程B也没有办法再去获取锁(set flag to 1).

缺陷:线程A获取锁失败,但就在它把guard设成0之后,执行park()之前,OS做了context switch, 让给线程B去做unlock(). 线程B尝试从queue中唤醒线程A,A从队列里出来了,但此时因为A没有执行park(),所以B的unpark()是没用的。线程A继续执行park(),它也没有进queue,将永远睡眠。

改进:setpark()

1
2
3
4
queue_add(m->q, gettid());
setpark();
m->guard = 0;
park();

setpark()的作用是,如果执行完park()之前就被切换到另一个线程,那么A之后再执行park()的时候,会立即返回,不会把它自己变成blcoked状态

条件变量

Basic

condition variable用来解决ordering问题,也就是进程的同步。

A condition variable is a queue of waiting threads.

pthread_cond_wait(pthread_cond_t *c, pthread_mutex_t *m);

  1. 原子性地把自己设成blocked & 释放锁
  2. call wait()之前,该线程必须有锁
  3. 返回之前,重新获取锁

pthread_cond_signal(pthread_cond_t *c);

  1. 从queue中唤醒一个线程
  2. 如果queue中没有线程,直接返回

条件变量需要和一个state variable一起使用。

1
2
3
4
5
6
7
// parent code
void thread_join() {
		Mutex_lock(&m);		 // w
		if (done == 0)		 	 // x
			Cond_wait(&c, &m); // y
		Mutex_unlock(&m);	 // z
}
1
2
3
4
5
6
7
//child code
void thread_exit() {
		Mutex_lock(&m);		// a
		done = 1;				// b
		Cond_signal(&c);		// c
		Mutex_unlock(&m);     // d
}

这个锁是用来保护state variable的。

The Producer/Consumer (Bounded Buffer) Problem

有一个shared buffer,producer往里面放data, consumer从里面拿data

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
void *producer(void *arg) { 
	for (int i = 0; i < loops; i++) { 
		Mutex_lock(&m); // p1 
		while (numfull == max) // p2 
			Cond_wait(&empty, &m); // p3 
		do_fill(i);  // p4
		Cond_signal(&fill); // 从消费者阻塞队列中唤醒一个,但不确定唤醒谁
		Mutex_unlock(&m); //p6
	}
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
void *consumer(void *arg) { 
	while (1) { 
		Mutex_lock(&m); 
		while (numfull == 0)
			Cond_wait(&fill, &m); //返回前,尝试获取锁,但有可能获取不到
		int tmp = do_get();
		Cond_signal(&empty);
		Mutex_unlock(&m); 
	}
 } 

这里用到两个CV:fill和 empty,还用到 while循环。

当生产者生产了一条数据,它的 signal只能唤醒fill队列中的线程(消费者线程);同理,消费者消费完数据,只能 signal到 empty queue,也就是唤醒消费者线程。

为什么用 while?当从wait中醒来时,必须再次检查 state variavle。 如果用 If会如何?比如一个生产者生产完了,执行 signal()通知一个消费者线程A醒过来,但A没获取到锁,消费者B获取到了。 B把numfull设成了0,然后消费者 A 又获取锁,从 wait中醒来,没有去检查numfull,直接做do_get(),这就读到了脏数据。

rules of thumb for CVs

  1. Keep state in addition to CV’s
  2. Always do wait/signal with lock held
  3. Whenever thread wakes from waiting, recheck state

信号量

Semaphore的作用相当于Lock+CV, 主要有P(), V()操作,分别是将信号量-1, +1

Sem_wait(): Waits until value > 0, then decrement Sem_post(): Increment value, then wake a single waiter

Binary Semaphores (Locks)

我们可以用信号量去实现线程的互斥。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
typedef struct __lock_t { 
	sem_t sem; //二进制信号量
} lock_t;

void init(lock_t *lock) {
	sem_init(&lock->sem, 1);
}
void acquire(lock_t *lock) {
	sem_wait(&lock->sem);// P操作,获取锁则把semaphore从1设成0
} 
void release(lock_t *lock) {
	sem_post(&lock->sem); 
}

初始化semaphore为1,代表可以有一个thread拿到锁。

线程A调用wait(),semaphore被设为0,线程A执行critical section;此时切换到线程B运行,它调用wait(), 把semaphore设为-1,然后进入睡眠状态,切换到线程A执行。

A执行完critical section之后,执行post(),semaphore设成0,并把阻塞的线程B唤醒。

线程B执行完逻辑后,执行post(),semaphore设为1.

互斥的实现:P——critical section——V

实现进程同步

semaphore初值为 0,同步顺序:代码 A——V——P——代码 B。

semaphore 的值可以理解为资源数量,刚开始资源数量为 0,需要代码 A 生产资源(semaphore 被设为 1),才有可能执行 P 及其之后的代码 B。

不然,若 P 操作最先开始执行,会把 semaphore 设为-1,该线程进入睡眠状态。

Producer/Consumer

这个问题既有互斥,又有同步。

特点

semaphore与普通Lock的不同:semaphore其实就是包装了“由硬件支持与队列”的锁,使得 wait(), post()操作完全 atomic,我们不用去担心这两个操作的安全问题。

semaphore 与 CV 的不同:semaphore 持有 state,CV 需要在wait()出来后反复检查 state。