成人在线亚洲_国产日韩视频一区二区三区_久久久国产精品_99国内精品久久久久久久

您的位置:首頁技術文章
文章詳情頁

C和Java沒那么香了,Serverless時代Rust即將稱王?

瀏覽:130日期:2022-08-10 09:21:47
目錄高并發模式初探C語言的高并發案例Java的高并發實現Go的高并發實現Rust的高并發實現總結高并發模式初探

在這個高并發時代最重要的設計模式無疑是生產者、消費者模式,比如著名的消息隊列kafka其實就是一個生產者消費者模式的典型實現。其實生產者消費者問題,也就是有限緩沖問題,可以用以下場景進行簡要描述,生產者生成一定量的產品放到庫房,并不斷重復此過程;與此同時,消費者也在緩沖區消耗這些數據,但由于庫房大小有限,所以生產者和消費者之間步調協調,生產者不會在庫房滿的情況放入端口,消費者也不會在庫房空時消耗數據。詳見下圖:

C和Java沒那么香了,Serverless時代Rust即將稱王?

而如果在生產者與消費者之間完美協調并保持高效,這就是高并發要解決的本質問題。

C語言的高并發案例

筆者在前文曾經介紹過TDEngine的相關代碼,其中Sheduler模塊的相關調度算法就使用了生產、消費者模式進行消息傳遞功能的實現,也就是有多個生產者(producer)生成并不斷向隊列中傳遞消息,也有多個消費者(consumer)不斷從隊列中取消息。

后面我們也會說明類型功能在Go、Java等高級語言中類似的功能已經被封裝好了,但是在C語言中你就必須要用好互斥體( mutex)和信號量(semaphore)并協調他們之間的關系。由于C語言的實現是最復雜的,先來看結構體設計和他的注釋:

typedef struct { char label[16];//消息內容 sem_t emptySem;//此信號量代表隊列的可寫狀態 sem_t fullSem;//此信號量代表隊列的可讀狀態 pthread_mutex_t queueMutex;//此互斥體為保證消息不會被誤修改,保證線程程安全 int fullSlot;//隊尾位置 int emptySlot;//隊頭位置 int queueSize;#隊列長度 int numOfThreads;//同時操作的線程數量 pthread_t * qthread;//線程指針 SSchedMsg * queue;//隊列指針} SSchedQueue;

再來看Shceduler初始化函數,這里需要特別說明的是,兩個信號量的創建,其中emptySem是隊列的可寫狀態,初始化時其值為queueSize,即初始時隊列可寫,可接受消息長度為隊列長度,fullSem是隊列的可讀狀態,初始化時其值為0,即初始時隊列不可讀。具體代碼及我的注釋如下:

void *taosInitScheduler(int queueSize, int numOfThreads, char *label) { pthread_attr_t attr; SSchedQueue * pSched = (SSchedQueue *)malloc(sizeof(SSchedQueue)); memset(pSched, 0, sizeof(SSchedQueue)); pSched->queueSize = queueSize; pSched->numOfThreads = numOfThreads; strcpy(pSched->label, label); if (pthread_mutex_init(&pSched->queueMutex, NULL) < 0) { pError('init %s:queueMutex failed, reason:%s', pSched->label, strerror(errno)); goto _error; } //emptySem是隊列的可寫狀態,初始化時其值為queueSize,即初始時隊列可寫,可接受消息長度為隊列長度。 if (sem_init(&pSched->emptySem, 0, (unsigned int)pSched->queueSize) != 0) { pError('init %s:empty semaphore failed, reason:%s', pSched->label, strerror(errno)); goto _error; } //fullSem是隊列的可讀狀態,初始化時其值為0,即初始時隊列不可讀 if (sem_init(&pSched->fullSem, 0, 0) != 0) { pError('init %s:full semaphore failed, reason:%s', pSched->label, strerror(errno)); goto _error; } if ((pSched->queue = (SSchedMsg *)malloc((size_t)pSched->queueSize * sizeof(SSchedMsg))) == NULL) { pError('%s: no enough memory for queue, reason:%s', pSched->label, strerror(errno)); goto _error; } memset(pSched->queue, 0, (size_t)pSched->queueSize * sizeof(SSchedMsg)); pSched->fullSlot = 0;//實始化時隊列為空,故隊頭和隊尾的位置都是0 pSched->emptySlot = 0;//實始化時隊列為空,故隊頭和隊尾的位置都是0 pSched->qthread = malloc(sizeof(pthread_t) * (size_t)pSched->numOfThreads); pthread_attr_init(&attr); pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE); for (int i = 0; i < pSched->numOfThreads; ++i) { if (pthread_create(pSched->qthread + i, &attr, taosProcessSchedQueue, (void *)pSched) != 0) { pError('%s: failed to create rpc thread, reason:%s', pSched->label, strerror(errno)); goto _error; } } pTrace('%s scheduler is initialized, numOfThreads:%d', pSched->label, pSched->numOfThreads); return (void *)pSched;_error: taosCleanUpScheduler(pSched); return NULL;}

再來看讀消息的taosProcessSchedQueue函數這其實是消費者一方的實現,這個函數的主要邏輯是

1.使用無限循環,只要隊列可讀即sem_wait(&pSched->fullSem)不再阻塞就繼續向下處理

2.在操作msg前,加入互斥體防止msg被誤用。

3.讀操作完畢后修改fullSlot的值,注意這為避免fullSlot溢出,需要對于queueSize取余。同時退出互斥體。

4.對emptySem進行post操作,即把emptySem的值加1,如emptySem原值為5,讀取一個消息后,emptySem的值為6,即可寫狀態,且能接受的消息數量為6

具體代碼及注釋如下:

void *taosProcessSchedQueue(void *param) { SSchedMsg msg; SSchedQueue *pSched = (SSchedQueue *)param; //注意這里是個無限循環,只要隊列可讀即sem_wait(&pSched->fullSem)不再阻塞就繼續處理 while (1) { if (sem_wait(&pSched->fullSem) != 0) { pError('wait %s fullSem failed, errno:%d, reason:%s', pSched->label, errno, strerror(errno)); if (errno == EINTR) {/* sem_wait is interrupted by interrupt, ignore and continue */continue; } } //加入互斥體防止msg被誤用。 if (pthread_mutex_lock(&pSched->queueMutex) != 0) pError('lock %s queueMutex failed, reason:%s', pSched->label, strerror(errno)); msg = pSched->queue[pSched->fullSlot]; memset(pSched->queue + pSched->fullSlot, 0, sizeof(SSchedMsg)); //讀取完畢修改fullSlot的值,注意這為避免fullSlot溢出,需要對于queueSize取余。 pSched->fullSlot = (pSched->fullSlot + 1) % pSched->queueSize; //讀取完畢修改退出互斥體 if (pthread_mutex_unlock(&pSched->queueMutex) != 0) pError('unlock %s queueMutex failed, reason:%sn', pSched->label, strerror(errno)); //讀取完畢對emptySem進行post操作,即把emptySem的值加1,如emptySem原值為5,讀取一個消息后,emptySem的值為6,即可寫狀態,且能接受的消息數量為6 if (sem_post(&pSched->emptySem) != 0) pError('post %s emptySem failed, reason:%sn', pSched->label, strerror(errno)); if (msg.fp) (*(msg.fp))(&msg); else if (msg.tfp) (*(msg.tfp))(msg.ahandle, msg.thandle); }}

最后寫消息的taosScheduleTask函數也就是生產的實現,其基本邏輯是

1.寫隊列前先對emptySem進行減1操作,如emptySem原值為1,那么減1后為0,也就是隊列已滿,必須在讀取消息后,即emptySem進行post操作后,隊列才能進行可寫狀態。

2.加入互斥體防止msg被誤操作,寫入完成后退出互斥體

3.寫隊列完成后對fullSem進行加1操作,如fullSem原值為0,那么加1后為1,也就是隊列可讀,咱們上面介紹的讀取taosProcessSchedQueue中sem_wait(&pSched->fullSem)不再阻塞就繼續向下。

int taosScheduleTask(void *qhandle, SSchedMsg *pMsg) { SSchedQueue *pSched = (SSchedQueue *)qhandle; if (pSched == NULL) { pError('sched is not ready, msg:%p is dropped', pMsg); return 0; } //在寫隊列前先對emptySem進行減1操作,如emptySem原值為1,那么減1后為0,也就是隊列已滿,必須在讀取消息后,即emptySem進行post操作后,隊列才能進行可寫狀態。 if (sem_wait(&pSched->emptySem) != 0) pError('wait %s emptySem failed, reason:%s', pSched->label, strerror(errno));//加入互斥體防止msg被誤操作 if (pthread_mutex_lock(&pSched->queueMutex) != 0) pError('lock %s queueMutex failed, reason:%s', pSched->label, strerror(errno)); pSched->queue[pSched->emptySlot] = *pMsg; pSched->emptySlot = (pSched->emptySlot + 1) % pSched->queueSize; if (pthread_mutex_unlock(&pSched->queueMutex) != 0) pError('unlock %s queueMutex failed, reason:%s', pSched->label, strerror(errno)); //在寫隊列前先對fullSem進行加1操作,如fullSem原值為0,那么加1后為1,也就是隊列可讀,咱們上面介紹的讀取函數可以進行處理。 if (sem_post(&pSched->fullSem) != 0) pError('post %s fullSem failed, reason:%s', pSched->label, strerror(errno)); return 0;}Java的高并發實現

從并發模型來看,Go和Rust都有channel這個概念,也都是通過Channel來實現線(協)程間的同步,由于channel帶有讀寫狀態且保證數據順序,而且channel的封裝程度和效率明顯可以做的更高,因此Go和Rust官方都會建議使用channel(通信)來共享內存,而不是使用共享內存來通信。

為了讓幫助大家找到區別,我們先以Java為例來,看一下沒有channel的高級語言Java,生產者消費者該如何實現,代碼及注釋如下:

public class Storage { // 倉庫最大存儲量 private final int MAX_SIZE = 10; // 倉庫存儲的載體 private LinkedList<Object> list = new LinkedList<Object>(); // 鎖 private final Lock lock = new ReentrantLock(); // 倉庫滿的信號量 private final Condition full = lock.newCondition(); // 倉庫空的信號量 private final Condition empty = lock.newCondition(); public void produce() {// 獲得鎖lock.lock();while (list.size() + 1 > MAX_SIZE) { System.out.println('【生產者' + Thread.currentThread().getName() + '】倉庫已滿'); try {full.await(); } catch (InterruptedException e) {e.printStackTrace(); }}list.add(new Object());System.out.println('【生產者' + Thread.currentThread().getName() + '】生產一個產品,現庫存' + list.size()); empty.signalAll();lock.unlock(); } public void consume() {// 獲得鎖lock.lock();while (list.size() == 0) { System.out.println('【消費者' + Thread.currentThread().getName() + '】倉庫為空'); try {empty.await(); } catch (InterruptedException e) {e.printStackTrace(); }}list.remove();System.out.println('【消費者' + Thread.currentThread().getName() + '】消費一個產品,現庫存' + list.size()); full.signalAll();lock.unlock(); }}

在Java、C#這種面向對象,但是沒有channel語言中,生產者、消費者模式至少要借助一個lock和兩個信號量共同完成。其中鎖的作用是保證同是時間,倉庫中只有一個用戶進行數據的修改,而還需要表示倉庫滿的信號量,一旦達到倉庫滿的情況則將此信號量置為阻塞狀態,從而阻止其它生產者再向倉庫運商品了,反之倉庫空的信號量也是一樣,一旦倉庫空了,也要阻其它消費者再前來消費了。

Go的高并發實現

我們剛剛也介紹過了Go語言中官方推薦使用channel來實現協程間通信,所以不需要再添加lock和信號量就能實現模式了,以下代碼中我們通過子goroutine完成了生產者的功能,在在另一個子goroutine中實現了消費者的功能,注意要阻塞主goroutine以確保子goroutine能夠執行,從而輕而易舉的就這完成了生產者消費者模式。下面我們就通過具體實踐中來看一下生產者消費者模型的實現。

package mainimport ('fmt''time')func Product(ch chan<- int) { //生產者for i := 0; i < 3; i++ {fmt.Println('Product produceed', i)ch <- i //由于channel是goroutine安全的,所以此處沒有必要必須加鎖或者加lock操作.}}func Consumer(ch <-chan int) {for i := 0; i < 3; i++ {j := <-ch //由于channel是goroutine安全的,所以此處沒有必要必須加鎖或者加lock操作.fmt.Println('Consmuer consumed ', j)}}func main() {ch := make(chan int)go Product(ch)//注意生產者與消費者放在不同goroutine中go Consumer(ch)//注意生產者與消費者放在不同goroutine中time.Sleep(time.Second * 1)//防止主goroutine退出/*運行結果并不確定,可能為Product produceed 0Product produceed 1Consmuer consumed 0Consmuer consumed 1Product produceed 2Consmuer consumed 2*/}

可以看到和Java比起來使用GO來實現并發式的生產者消費者模式的確是更為清爽了。

Rust的高并發實現

不得不說Rust的難度實在太高了,雖然筆者之前在匯編、C、Java等方面的經驗可以幫助我快速掌握Go語言。但是假期看了兩天Rust真想大呼告辭,這尼瑪也太勸退了。在Rust官方提供的功能中,其實并不包括多生產者、多消費者的channel,std:sync空間下只有一個多生產者單消費者(mpsc)的channel。其樣例實現如下:

use std::sync::mpsc;use std::thread;use std::time::Duration;fn main() { let (tx, rx) = mpsc::channel(); let tx1 = mpsc::Sender::clone(&tx); let tx2 = mpsc::Sender::clone(&tx); thread::spawn(move || {let vals = vec![ String::from('1'), String::from('3'), String::from('5'), String::from('7'),];for val in vals { tx1.send(val).unwrap(); thread::sleep(Duration::from_secs(1));} }); thread::spawn(move || {let vals = vec![ String::from('11'), String::from('13'), String::from('15'), String::from('17'),];for val in vals { tx.send(val).unwrap(); thread::sleep(Duration::from_secs(1));} }); thread::spawn(move || {let vals = vec![ String::from('21'), String::from('23'), String::from('25'), String::from('27'),];for val in vals { tx2.send(val).unwrap(); thread::sleep(Duration::from_secs(1));} }); for rec in rx {println!('Got: {}', rec); }}

可以看到在Rust下實現生產者消費者是不難的,但是生產者可以clone多個,不過消費者卻只能有一個,究其原因是因為Rust下沒有GC也就是垃圾回收功能,而想保證安全Rust就必須要對于變更使用權限進行嚴格管理。在Rust下使用move關鍵字進行變更的所有權轉移,但是按照Rust對于變更生產周期的管理規定,線程間權限轉移的所有權接收者在同一時間只能有一個,這也是Rust官方只提供MPSC的原因,

use std::thread;fn main() { let s = 'hello'; let handle = thread::spawn(move || {println!('{}', s); }); handle.join().unwrap();}

當然Rust下有一個API比較貼心就是join,他可以所有子線程都執行結束再退出主線程,這比Go中要手工阻塞還是要有一定的提高。而如果你想用多生產者、多消費者的功能,就要入手crossbeam模塊了,這個模塊掌握起來難度也真的不低。

總結

通過上面的比較我們可以用一張表格來說明幾種主流語言的情況對比:

語言 安全性 運行速度 進程啟動速度 學習難度 C 低 極快 極快 困難 Java 高 一般 一般 一般 Go 高 較快 較快 一般 Rust 高 極快(基本比肩C) 極快(基本比肩C) 極困難

可以看到Rust以其高安全性、基本比肩C的運行及啟動速度必將在Serverless的時代獨占鰲頭,Go基本也能緊隨其后,而C語言程序中難以避免的野指針,Java相對較低的運行及啟動速度,可能都不太適用于函數式運算的場景,Java在企業級開發的時代打敗各種C#之類的對手,但是在云時代好像還真沒有之前統治力那么強了,真可謂是打敗你的往往不是你的對手,而是其它空間的降維打擊。

這篇文章的內容就到這了,希望能給你帶來幫助,也希望您可以多多關注好吧啦網的更多內容!

標簽: Java
成人在线亚洲_国产日韩视频一区二区三区_久久久国产精品_99国内精品久久久久久久
国产免费成人| 成人av在线资源| 久久精品免费看| 久久亚洲精选| 亚洲成人午夜电影| 亚洲精品自在在线观看| 国产精品萝li| 97精品超碰一区二区三区| 在线播放中文字幕一区| 日本最新不卡在线| 久久久国产亚洲精品| 综合激情成人伊人| 国产综合网站| 中文字幕一区二区三区精华液| 99精品视频一区二区三区| 欧美日本在线播放| 老司机午夜精品| 欧美日韩不卡在线| 国产在线一区二区综合免费视频| 欧美午夜电影网| 韩日精品视频一区| 制服丝袜国产精品| 国产999精品久久久久久绿帽| 日韩欧美国产1| 成人免费毛片aaaaa**| 精品入口麻豆88视频| 蜜臀久久99精品久久久久宅男| 色999日韩国产欧美一区二区| 日本麻豆一区二区三区视频| 欧美日韩一区二区电影| 国产乱一区二区| 欧美一区二区福利在线| 成人免费看片app下载| 欧美不卡一区二区| 色综合咪咪久久| 国产精品进线69影院| 国内精品久久久久久久97牛牛 | 粉嫩高潮美女一区二区三区| 日韩精品自拍偷拍| 欧美激情第10页| 亚洲色图视频免费播放| 国产精品视频免费一区| 日韩中文字幕区一区有砖一区 | 久久香蕉国产线看观看99| 欧美另类综合| 亚洲日本乱码在线观看| 影音先锋在线一区| 亚洲一区二区精品久久av| 在线免费观看不卡av| 国产成人免费av在线| 久久久99精品久久| 一本色道久久综合| 青青草97国产精品免费观看| 欧美久久婷婷综合色| 成人午夜av电影| 欧美极品aⅴ影院| 激情亚洲成人| 亚洲一区视频在线| 在线免费视频一区二区| 国产成人av电影免费在线观看| 久久久噜噜噜久噜久久综合| 91久久久久| 免费高清成人在线| 欧美不卡一区二区三区四区| 国产一区二区中文字幕免费看| 亚洲一区视频在线观看视频| 色噜噜狠狠一区二区三区果冻| 久久精品国产一区二区| 日韩一区二区三区高清免费看看| 波波电影院一区二区三区| 中文字幕佐山爱一区二区免费| 国产亚洲欧美一区二区| 麻豆精品久久精品色综合| 欧美zozo另类异族| 极品日韩久久| 日本美女视频一区二区| 亚洲精品一区二区精华| 在线成人黄色| 免费久久99精品国产| 精品剧情在线观看| 91蝌蚪porny九色| 亚洲午夜免费视频| 欧美无人高清视频在线观看| 国产精品综合久久| 中文幕一区二区三区久久蜜桃| 中文高清一区| 久久99九九99精品| 国产欧美日韩另类视频免费观看| 久久九九99| 国产黑丝在线一区二区三区| 国产精品不卡在线| 欧美日韩一区二区欧美激情| 欧美精品91| 日精品一区二区| 国产亚洲一区字幕| 日本高清无吗v一区| 99热这里都是精品| 婷婷综合久久一区二区三区| 久久久久久97三级| 另类天堂av| youjizz国产精品| 亚洲成a人片在线不卡一二三区| 日韩欧美激情四射| 久久精品三级| 91麻豆免费观看| 美女视频一区在线观看| 国产精品天美传媒| 欧美影院一区二区三区| 欧美激情精品久久久六区热门| 日韩影视精彩在线| 国产欧美一区视频| 欧美福利视频导航| 亚洲一区日韩| 91免费在线视频观看| 日本大胆欧美人术艺术动态 | 亚洲国产精品成人综合 | 成人一区在线观看| 性做久久久久久| 国产日产欧美精品一区二区三区| 色婷婷精品大在线视频| 激情亚洲网站| 99re66热这里只有精品3直播| 麻豆精品久久精品色综合| 亚洲精品免费电影| 26uuu成人网一区二区三区| 在线免费观看一区| 在线一区欧美| 91丨九色丨尤物| 韩国视频一区二区| 婷婷成人激情在线网| 亚洲欧洲精品成人久久奇米网| 欧美一区二区三区四区视频| 久久在线精品| 国产精品日韩一区二区三区| 国产精品yjizz| va亚洲va日韩不卡在线观看| 国产一区二区不卡在线| 欧美aaa在线| 亚洲123区在线观看| 亚洲人成网站精品片在线观看| 久久日一线二线三线suv| 欧美电影影音先锋| 色婷婷av一区二区三区软件 | 日韩一级黄色片| 欧美亚洲丝袜传媒另类| 久久riav二区三区| 亚洲国产裸拍裸体视频在线观看乱了中文| aaa亚洲精品| 国产成人午夜精品影院观看视频| 麻豆精品视频在线| 日韩福利视频网| 亚洲情趣在线观看| 中文字幕在线不卡一区二区三区| 精品国产91久久久久久久妲己| 777奇米成人网| 欧美视频自拍偷拍| 久久精品官网| 亚洲一区二区精品在线| 亚洲欧洲一区二区在线观看| 欧美日韩一区综合| 99re热这里只有精品免费视频| 国产一区二区三区免费看| 久久99精品一区二区三区三区| 奇米精品一区二区三区在线观看 | 欧美日韩一区二区三区高清| 色悠悠久久综合| 久久一区欧美| 久久看片网站| 久久福利精品| 久久精品电影| 日本韩国视频一区二区| 91精品福利在线| 91搞黄在线观看| 欧美视频一区二区三区在线观看| 久久亚洲风情| 色综合久久久久| 在线观看国产精品网站| 在线观看免费亚洲| 日本黄色一区二区| 欧美视频中文一区二区三区在线观看| 免费毛片一区二区三区久久久| 在线日韩av永久免费观看| 国产精品九九| 欧美全黄视频| 激情久久久久久| 国产精品v欧美精品v日韩精品| 成人动漫视频在线| 成人午夜碰碰视频| 粉嫩蜜臀av国产精品网站| 丁香一区二区三区| 国产91丝袜在线播放| 国产一区二区三区免费看| 黄色资源网久久资源365| 天天综合网 天天综合色| 五月天亚洲婷婷| 亚洲成a人片在线观看中文| 亚洲成a人片综合在线| 亚洲国产精品影院| 午夜精品在线视频一区| 久久国产精品一区二区|