信号量机制
信号量用于互斥
P(S) 临界区 V(S) -----P(S)临界区 V(S) 生产者消费者:typedef int semaphore //信号量值设置为1就是互斥量semaphore mutex = 1; //同一时刻只有一个进程可以读写缓冲区semaphore empty = N; //“空”的数目,缓冲区空消费者停下semaphore full = 0; //“满”的数目,缓冲区满生产者停下//full+empty == N //p(empty):生产一件,empty减1,full加1,如果empty为空被阻塞//p(full):消费一件,full减1,empty加1,如果full为空被阻塞生产者: while(true) p(empty) p(mutex) one>>buffer v(mutex) v(full) 消费者:while(true) p(full) p(mutex) one<
信号量用于同步
P(S) 代码A ------代码BV(S) //代码B执行后A再执行sem.wait() 代码A------代码Bsem.signal()信号量实现线程A和B的汇合,保证a1永远在b2前,b1永远在a2前a1 aArrive.signal bArrive.wait a2 ---------------b1bArrive.signalaArrive.waitb2
信号量用于多路复用
semaphore multiplex = n; //使得n个线程能同时在临界区multiplex.wait 临界区multiplex.signal
屏障Barriers
int n; //线程数int count = 0; //到达汇合点的线程数semaphore mutex = 1; //保护count的访问semaphore barrier = 0; //所有线程到达前都是0,到达后取正p(mutex)count = count + 1v(mutex)if(count == n) v(barrier) //第n个进程到来,随机唤醒一个等待进程p(barrier) //前n-1个进程在此排队v(barrier) //一旦线程被唤醒,有责任唤醒下一个线程
AND型信号量机制
将进程需要的所有共享资源一次全部分配给它;待该进程使用完后再一起释放//每次申请di个资源,当资源数少于ti时不予分配原语SP(S1,t1,d1;...;Sn,tn,dn); if(S1>=t1 and ... and Sn>=tn) for i := 1 to n do Si := Si - di endfor else wait in Si 原语SV(S1,d1;...;Sn,dn); for i := 1 to n do Si := Si + di; wake waited process endfor SP(S,1,1):互斥信号量SP(S,1,0):开关控制(S>=1时允许多进程进入临界区,S=0时禁止任何进程进入)
管程
public class ProducerConsumer{ static final int N = 100; //constant giving the buffer size static producer p = new producer(); static consumer c = new consumer(); static our_monitor mon = new our_monitor(); public static void main(String args[]){ p.start(); c.start(); } staic class producer extends Thread { public void run(){ int item; while(true){ item = produce_item(); mon.insert(item); } } private int produce_item(){...} } static class consumer extends Thread { public void run(){ int item; while(true){ item = mon.remove(); consume_item (item); } } private void consume_item(int item){...} } static class our_monitor{ private int buffer[] = new int[N]; private int count=0, lo=0, hi=0; public synchronized void insert(int val){ if(count == N){ try{wait();} catch(...){...} } buffer[hi] = val; hi = (hi + 1) % N; count ++; if(count == 1)notify(); } public synchronized void remove(){ int val; if(count == 0){ try{wait();} catch(...){...} } val = buffer[lo]; lo = (lo + 1) % N; count --; if(count == N-1)notify(); return val; } }}
管道通信
无名管道,杀死一个叫conky的进程:ps aux | grep conky | grep-v grep| awk '{print $2}' | xargs kill ps aux:显示所有进程grep conky:查找所有包含conky的进程grep -v grep:删除上述包含grep的进程(因为上面的grep指令也是一个进程)awk '{print $2}':取出进程信息条的第二个参数,也就是进程IDxargs kill: kill上述ID对应的进程
经典进程同步问题
生产者-消费者
生产者消费者:typedef int semaphore //信号量值设置为1就是互斥量semaphore mutex = 1; //同一时刻只有一个进程可以读写缓冲区semaphore empty = N; //“空”的数目,缓冲区空消费者停下semaphore full = 0; //“满”的数目,缓冲区满生产者停下//full+empty == N //p(empty):empty减1操作,如果empty为空被阻塞生产者: while(true) p(empty) p(mutex) one>>buffer v(mutex) v(full) ---------- 消费者:while(true)p(full)p(mutex)one<
生产者-消费者(拓展1)
描述:银行有n个服务柜台。每个顾客进店后先取一个号并等待叫号。当一个柜台人员空闲下来时,就叫下一个号
问题:设计一个使柜台人员和顾客同步的算法
思路:
- 生产者:顾客
- 消费者:n个服务柜台
通过信号量同步int next_cstmr = 0; //下一个要服务的客户编号Semaphore s_mutex = 1; //服务器进程互斥访问next_cstmrSemaphore cstmr_cnt = 0; //正在等待的客户数process servers i //(i = 1, 2, 3 ..., n)while(true){ p(s_mutex) p(cstmr_cnt) next_cstmr ++ v(s_mutex) ... //为持有next_cstmr的客户服务 ...}process customer i{ v(cstmr_cnt)}
通过变量取值同步int cstmr_id = 0; //当前客户编号semaphore mutex = 1; //对cstmr_id互斥访问int next_cstmr = 0; //下一个要服务客户编号semaphore s_mutex = 1; //服务器进程互斥访nexy_cstmrprocess customer i{ p(mutex) cstmr_id++ v(mutex)}process servers i{ while(true){ p(s_mutex) p(mutex) if(next_cstmr < cstmr_id) next_cstmr ++ v(mutex) v(s_mutex) ... //为next_cstmr号码持有者服务 ... }}
生产者-消费者(拓展2)
描述:一个可以装A、B两种物品的仓库,其容量无限大,但 要求仓库中A、B两种物品的数量满足下述不等式: -M ≤ A物品数量 - B物品数量 ≤N
semaphore mutex = 1;semaphore sa = N;semaphore sb = M;procedure A: while(true) p(sa) p(mutex) A产品入库 v(mutex) v(sb) ------------procedure B:while(true) p(sb) p(mutex) B产品入库 v(mutex) v(sa)
生产者-消费者(拓展3)
描述:系统中有多个生产者进程和消费者进程,共享用一个可以存1000个产品的缓冲区(初始为空),当缓冲区为未满时,生产者进程可以放入一件其生产的产品,否则等待;当缓冲区为未空时,消费者进程可以取走一件产品, 否则等待。要求一个消费者进程从缓冲区连续取出10件产品后,其他消费者进程才可以取产品。
buffer array [1000]; //存放产品的缓冲区 buffer nextp; //用于临时存放生产者生产的产品 buffer nextc [10]; //用于临时存放消费者取出的产品 semaphore empty = 1000; //空缓冲区的数目 semaphore full = 0; //满缓冲区的数目 semaphore mutex1 = 1; //用于生产者之间的互斥,以及生产者消费者互斥 semaphore mutex2 = 1; //用于消费者之间的互斥int in = 0; //指示生产者的存位置 int out = 0; //指示消费者的取位置 Producer() //生产者进程 { Produce an item put in nextp; //生产一个产品,存在临时缓冲区 P(empty); //申请一个空缓冲区 P(mutex1); //生产者申请使用缓冲区 array[in]=nextp; //将产品存入缓冲区 in = (in+1)%1000; //指针后移 V(mutex1); //生产者缓冲区使用完毕,释放互斥信号量 V(full); //增加一个满缓冲区 }Consumer() //消费者进程 { P(mutex2); //消费者申请使用缓冲区 for(int i = 0;i<10;i++) //一个消费者进程需从缓冲区连续取走 10 件 产品 { P(full); //申请一个满缓冲区 P(mutex1) //互斥生产者 nextc[i] = array[out]; //将产品取出,存于临时缓冲区 out = (out+1)%1000; //指针后移 V(mutex1) //解除生产者互斥 V(empty); //增加一个空缓冲区 } V(mutex2); //消费者缓冲区使用完毕,释放互斥信号量 Consume the items in nextc; //消费掉这10个产品 }
读写者问题(开关灯模式)
描述:对共享资源的读写操作,任一时刻“写者” 最多只允许一个,而“读者”则允许多个――“读-写”互斥,“写 - 写”互斥,“读-读”允许。第一个进屋的人开灯,最后一个离开屋的人关灯。
应用场景: 对共享数据结构、数据库、文件的多线程并发访问
问题:系统负载高的时候,写者几乎没有机会工作
int readers = 0 //临界区内读者的数目semaphore mutex = 1 //对readers的访问保护semaphore roomEmpty = 1 //1:临界区没有线程,0:临界区有线程读者: p(mutex) readers++; if(readers == 1) //第一个读者进房间,如果里面有写者,需等写者写完房间为空才能读 p(roomEmpty) //房间里已有读者时,就说明房间里已经没有写者,就无需等房间为空时再读v(mutex)read... //临界区p(mutex) reader--; if(readers == 0) v(roomEmpty)v(mutex)写者:p(roomEmpty) //房间不为空,要等待 write... //临界区v(roomEmpty) //写完后出房间了,告诉所有人房间空了 采用一般信号量机制: RN:同时读的读者最大数目 mx:允许写,初值为1 L:允许读者数目,初值RNwriter: SP(mx,1,1;L,RN,0) write SV(mx,1) -------Reader:SP(L,1,1;mx,1,0) readSV(L,1)
闸机版的读写问题:当一个写者到达, 已进入的读者可以结束,但是新的读者无法进入
int readers = 0 //临界区内读者的数目semaphore mutex = 1 //对readers的访问保护semaphore roomEmpty = 1 //1:临界区没有线程,0:临界区有线程semaphore turnstile = 1 //闸机 读者: p(turnstile) v(turnstile) p(mutex) readers++; if(readers == 1) //第一个读者 p(roomEmpty)v(mutex)read... //临界区p(mutex) reader--; if(readers == 0) v(roomEmpty)v(mutex)----------写者:p(turnstile) p(roomEmpty) write... //临界区 v(roomEmpty)v(turnstile)
理发师问题
描述:理发店里有一位理发师、一把理发椅和n把供等候理发的顾客坐的椅子;如果没有顾客,理发师便在理发椅上睡觉,当一个顾 客到来时,叫醒理发师;如果理发师正在理发时,又有顾客来到,则如果有空椅子可坐,就坐下来等待,否则就离开。
semaphore customers = 0; //等待理发的顾客 semaphore barbers = 0; //等待顾客的理发师 int waiting = 0; //等待的顾客数(不包含正在理发的顾客)semaphore mutex = 1; //互斥访问waiting//CHIRS = 10理发师线程: while(true){ p(customers)//顾客为0,睡觉 p(mutex) waiting --; v(mutex) v(barbers) //准备好剪发 Cut hair(); } --------------- 顾客线程:p(mutex) if(waiting
构建水分子问题
描述:一个线程提供氧原子,一个提供氢原子。为构建水分子,需要使用barrier让线程同步从而构建水分子
信号量定义:oxygen = 0; //counterhydrogen = 0; //countermutex = 1; //protect the counterBarrier barrier(3) //3表示需要调用3次wait后barrier才开放oxyQueue = 0; //氧气线程等待的信号量hydroQueue = 0; //氢气线程等待的信号量//信号量上睡眠来模拟队列P(oxyQueue) //加入队列V(oxyQueue) //离开队列//氧气线程P(mutex)oxygen += 1if hydrogen >= 2 //构建水分子成功 V(hydroQueue) V(hydroQueue) hydrogen -= 2 V(oxyQueue) oxygen -= 1else: V(mutex)P(oxyQueue)bond() //原子组合操作barrier.wait()V(mutex)//当三个线程离开barrier时候,最后那个线程拿着mutex, 虽然我们不知道那个线程hold mutext,但是我们一定要释放一次。 因为氧气只有一个线程,就放在氧气线程中做了。 //氢气线程P(mutex)hydrogen += 1if hydrogen >= 2 and oxygen >=1 //构建水分子成功 V(hydroQueue) V(hydroQueue) hydrogen -= 2 V(oxyQueue) oxygen -= 1else: V(mutex)P(oxyQueue)bond() //原子组合操作barrier.wait()
吸烟者问题
描述:三个吸烟者在一间房间内,还有一个香烟供应者。为了 制造并抽掉香烟,每个吸烟者需要三样东西:烟草、纸 和火柴。供应者有丰富的货物提供。三个吸烟者中,第一个有自己的烟草,第二个有自己的纸,第三个有自己的火柴。供应者随机选择两样东西放在桌子上,允许欠缺相应两样材料一个吸烟者吸烟。当吸烟者完成吸烟后唤醒供应者,供应者再放两样东西(随机地)在桌面上,然后唤醒另一个吸烟者。如此往复。 问题的目标是当资源足够让一个或者多个应用继续,这些应用应该被唤醒。 反之,让应用继续睡眠。
isTobacco = isPaper = isMatch = False//表示材料是否在桌子上 agentSem = Semaphore (1) //表示唤醒/睡眠提供者的信号量 tobacco = Semaphore (0) //表示烟草可用 paper = Semaphore (0) //表示纸可用 match = Semaphore (0) //表示火柴可用tobaccoSem = Semaphore (0) //用于通知拥有tobacco的吸烟者 paperSem = Semaphore (0) //用于通知拥有paper的吸烟者 matchSem = Semaphore (0) //用于通知拥有match的吸烟//PusherA/B/C是三个帮助线程,他们响应agent的信号,记录可用材料,通知相应吸烟者pusherA:tobacco.wait()mutex.wait() if isPaper: //如果发现已经有paper(加上有tobacco) isPaper = False //唤醒smoker with match matchSem.signal() else isMatch: isMatch = False paperSem.signal() else isTobacco = True //如果它是第一个醒来,就会设置isTobacco为真mutex.signal()---------------Agent A://同时放了to和pa agentSem.wait()//睡眠提供者 tobacco.signal() paper.signal() --------------- Smoker with match:matchSem.wait()makeCigarette()agentSem.signal()//唤醒提供者smoke()