博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
线程的工作方式-流水线
阅读量:5366 次
发布时间:2019-06-15

本文共 5659 字,大约阅读时间需要 18 分钟。

#include 
#include
#include
#include "errors.h"using namespace std;typedef struct stage_tag{ pthread_mutex_t mutex; //protect data pthread_cond_t avail; //data availiable 当前流水节点处于空闲可用状态, 等待接收数据,进行加工 pthread_cond_t read; //ready for data 当前处于数据准备状态,写入当前节点 int data_ready; //data present 节点数据状态;1 表示准备好 等待发送 ; 0 表示没有数据 long data; //data to process pthread_t thread; //thread for stage struct stage_tag *next; //next satge}stage_t;typedef struct pipe_tag{ pthread_mutex_t mutex; //mutex to protect pipe stage_t *head; //fist stage stage_t *tail; //final stage int stages; //number for stages int active; //active data element}pipe_t;int pipe_send(stage_t *stage,long data){ int status=pthread_mutex_lock(&stage->mutex); if(status!=0) return status; while(stage->data_ready)// 如果有数据,则下面必须等待知道收到ready信号 { status=pthread_cond_wait(&stage->read,&stage->mutex);//wait ready if(status!=0) //在read上阻塞,main线程改变了条件变量的值并发信息号,解除阻塞 { pthread_mutex_unlock(&stage->mutex); return status; } } //send new data stage->data=data; stage->data_ready=1; //此时有了data,就可以发送avail信号了 status=pthread_cond_signal(&stage->avail); //signal avail if(status!=0) { pthread_mutex_unlock(&stage->mutex); return status; } status=pthread_mutex_unlock(&stage->mutex); return status;}void *pipe_stage(void *arg) { stage_t *stage=(stage_t*)arg; stage_t *next_stage=stage->next; int status=pthread_mutex_lock(&stage->mutex); if(status!=0) err_abort(status,"Lock pipe stage"); while(1) { //if there's data int the pipe stage,wait for it be consumed //一般一个条件表达式都是在一个互斥锁的保护下被检查。 //当条件表达式未被满足时,线程将仍然阻塞在这个条件变量上。 //当另一个线程改变了条件的值并向条件变量发出信号时, //等待在这个条件变量上的一个线程或所有线程被唤醒, //接着都试图再次占有相应的互斥锁。阻塞在条件变量上的线程被唤醒以后, //直到pthread_cond_wait()函数返回之前条件的值都有可能发生变化。 //所以函数返回以后,在锁定相应的互斥锁之前,必须重新测试条件值。 //最好的测试方法是循环调用pthread_cond_wait函数,并把满足条件的表达式置为循环的终止条件。 //如:pthread_mutex_lock(); // while (condition_is_false) // pthread_cond_wait(); // pthread_mutex_unlock(); while(stage->data_ready!=1) { status=pthread_cond_wait(&stage->avail,&stage->mutex);//wait avail if(status!=0) err_abort(status,"wait for previous stage"); } pipe_send(next_stage,stage->data+1); //pipe_send() stage->data_ready=0; status=pthread_cond_signal(&stage->read); //signal ready if(status!=0) err_abort(status,"wake next stage"); }}int pipe_create(pipe_t *pipe,int stages){ stage_t **link=&pipe->head,*new_stage,*stage; int status=pthread_mutex_init(&pipe->mutex,NULL); if(status) err_abort(status,"Init pipe status"); pipe->stages=stages; pipe->active=0; for(int i=0;i
mutex,NULL); if(status) err_abort(status,"init stage mutex"); status=pthread_cond_init(&new_stage->avail,NULL); if(status) err_abort(status,"init avail condition"); status=pthread_cond_init(&new_stage->read,NULL); if(status) err_abort(status,"init read condition"); new_stage->data_ready=0; *link=new_stage; link=&new_stage->next; } *link=(stage_t*)NULL; pipe->tail=new_stage; for(stage=pipe->head;stage->next!=NULL;stage=stage->next) { status=pthread_create(&stage->thread,NULL,pipe_stage,(void*)stage); if(status) err_abort(status,"Create pipe stage"); } return 0;}int pipe_start(pipe_t *pipe,long vaule) { int status=pthread_mutex_lock(&pipe->mutex); if(status) err_abort(status,"lock pipe mutex"); pipe->active++;//记录活动的次数 status=pthread_mutex_unlock(&pipe->mutex); if(status) err_abort(status,"unlock pipe mutex"); pipe_send(pipe->head,vaule); //pipe_send() return 0;}int pipe_result(pipe_t *pipe,long *result){ stage_t *tail=pipe->tail; int status=pthread_mutex_lock(&pipe->mutex); if(status) err_abort(status,"lock pipe mutex"); int empty=0; if(pipe->active<=0) empty=1; else pipe->active--; status = pthread_mutex_unlock(&pipe->mutex); if(status!=0) err_abort( status, "unlock pipe mutex" ); if(empty) return 0; pthread_mutex_lock(&tail->mutex); while(!tail->data_ready) pthread_cond_wait(&tail->avail,&tail->mutex); *result=tail->data; tail->data_ready=0; pthread_cond_signal(&tail->read); pthread_mutex_unlock(&tail->mutex); return 1;}int main(){ pipe_t my_pipe; pipe_create(&my_pipe,3); //pipe_create() cout<<"Enter ingter values ,or \"=\" for next result"<
"; char line[128]; if(fgets(line,sizeof(line),stdin)==NULL) exit(0); if(strlen(line)<=1) continue; long result,value; if(strlen(line)<=2&&line[0]=='=') { if(pipe_result(&my_pipe,&result)) //pipe_result() cout<<"Result is:"<
<

流水线工作方式的图示

流水线中线程,线程队列对每个输入进行处理,一个阶段的线程处理完成后,将会把数据交给下一阶段的线程。最后一阶段的线程产生输出结果

如果前一阶段处理的速度高于下一阶段的线程,可以使用缓冲区作为使其同步工作的手段。

 

转载于:https://www.cnblogs.com/tianzeng/p/9243862.html

你可能感兴趣的文章
DLL 导出函数
查看>>
windows超过最大连接数解决命令
查看>>
12个大调都是什么
查看>>
angular、jquery、vue 的区别与联系
查看>>
Intellij idea创建javaWeb以及Servlet简单实现
查看>>
代理网站
查看>>
Open multiple excel files in WebBrowser, only the last one gets activated
查看>>
FFmpeg进行视频帧提取&音频重采样-Process.waitFor()引发的阻塞超时
查看>>
最近邻与K近邻算法思想
查看>>
【VS开发】ATL辅助COM组件开发
查看>>
FlatBuffers In Android
查看>>
《演说之禅》I &amp; II 读书笔记
查看>>
thinkphp3.2接入支付宝支付接口(PC端)
查看>>
【转】在Eclipse中安装和使用TFS插件
查看>>
C#中Monitor和Lock以及区别
查看>>
【NOIP2017】奶酪
查看>>
5.6.3.7 localeCompare() 方法
查看>>
Linux下好用的简单实用命令
查看>>
描绘应用程序级的信息
查看>>
php环境搭建脚本
查看>>