發(fā)布于:2021-01-30 10:55:20
0
211
0
GPars是Groovy(和Java)中用于高級并發(fā)的開源庫。如果您聽過actors、dataflow或parallel collections之類的術(shù)語,并且希望用Java友好的語言嘗試這些概念,那么現(xiàn)在您有機會了。在本文中,我計劃對gpar中可用的抽象進(jìn)行快速概述。然后我們將更詳細(xì)地研究基于流的并發(fā)數(shù)據(jù)處理、并行化收集操作以及異步運行組合函數(shù)。
隨著多核芯片逐漸成為主流電腦、平板電腦和手機的標(biāo)準(zhǔn),并發(fā)編程越來越重要。不幸的是,我們從Java中了解到的廣泛使用的基于線程的并發(fā)模型與人腦的工作方式并不匹配。線程和鎖在代碼中引入了太多的不確定性,這常常導(dǎo)致難以追蹤和修復(fù)的細(xì)微錯誤。這樣的代碼不能被可靠地測試或分析。不可避免地,為了使并發(fā)編程有效,我們需要使用更自然的心智模型。
并發(fā)可以是Groovy
將直觀的并發(fā)模型引入主流是gpar雄心勃勃的挑戰(zhàn)。我們采用了眾所周知的并發(fā)概念,如actors、CSP、dataflow等,并用Java實現(xiàn)了它們,使用了一種美味的Groovy DSL,使庫具有流暢和易于使用的風(fēng)格。盡管主要針對Groovy,但一些gpar抽象可以直接從Java使用。由于Groovy社區(qū)對并發(fā)的興趣以及他們對項目的支持,gpar目前是Groovy發(fā)行版的標(biāo)準(zhǔn)部分。在Groovy中啟動并運行并發(fā)不需要額外的設(shè)置。
被認(rèn)為是致命的循環(huán)
我想讓你停下來想一想計算機科學(xué)專業(yè)的學(xué)生在學(xué)習(xí)編程時通常被分配的一些瑣碎的練習(xí)。例如,這樣的任務(wù)之一是查找集合的最大值。可能會應(yīng)用一些復(fù)雜的度量來增加解決方案的計算量。你首先想到的是什么算法?
你很有可能會提出一個迭代來遍歷這個集合并記住迄今為止發(fā)現(xiàn)的最大元素。一旦我們到達(dá)集合的末尾,我們記住的迄今為止最大的元素必須是整個集合的全局最大值。清楚,簡單-好吧,錯了!如果你想知道原因,就繼續(xù)讀下去。
做你的選擇
在并發(fā)空間中似乎沒有一個一刀切的解決方案。多種范式已經(jīng)逐漸出現(xiàn),盡管有一些重疊,但它們都適用于不同類型的問題。GPars本身在2008年作為一個用于并行收集處理的小型Groovy庫起步。不久之后,它增加了對actor模型的支持。隨著時間的推移,其他并發(fā)抽象已經(jīng)被集成。以下是GPars版本0.12中當(dāng)前可用內(nèi)容的快速列表:
并行集合提供了直觀的方式來并行處理Java / Groovy集合,地圖和一般所有幾何可分解問題的處理
異步功能使Groovy閉包能夠異步運行,同時又可以毫不費力地協(xié)調(diào)它們之間的相互通信。
Fork / Join使您能夠同時處理遞歸的分治式算法
數(shù)據(jù)流變量(又名Promises)為線程間通信提供了一種輕量級的機制。
數(shù)據(jù)流通道和運算符使您可以將活動的數(shù)據(jù)轉(zhuǎn)換元素組織到高度并發(fā)的數(shù)據(jù)處理管道和網(wǎng)絡(luò)中
CSP是基于理論數(shù)學(xué)的著名并發(fā)模型,它使用通過同步通道進(jìn)行通信的獨立并發(fā)運行流程的抽象
Actor / Active對象為您提供了低禮儀事件驅(qū)動的主動組件的抽象,這些組件異步交換數(shù)據(jù)
顧名思義,代理可以保護(hù)多個線程需要同時訪問的數(shù)據(jù)
您可以在GPars用戶指南中查看每個模型的詳細(xì)信息,并將它們與它們的典型適用范圍進(jìn)行并排比較。另外,Dierk?nig即將出版的第二版書“Groovy in Action”對gpar進(jìn)行了詳細(xì)的介紹。
在本文中,我選擇了三種最有可能向您展示直觀并發(fā)好處的抽象:并行集合、異步函數(shù)和數(shù)據(jù)流操作符。我們就潛進(jìn)去吧!
幾何分解
現(xiàn)在,這是一個很好的地方來解釋為什么我們前面描述的求最大值的順序算法是一個錯誤的選擇。并不是說這個解決方案是錯誤的。很明顯,它給了你正確的答案,不是嗎?它失敗的地方在于它的有效性。它禁止隨著工人人數(shù)的增加而擴大規(guī)模。它完全忽略了這樣一種可能性,即系統(tǒng)可能會在這個問題上放置多個處理器。
超市幾十年前就解決了這個難題。當(dāng)收銀臺的隊伍排得太長時,他們會另外叫一個出納來為顧客服務(wù),這樣工作量就會分散,吞吐量也會增加。
回到尋找最大值的問題:利用Groovy函數(shù)集合API,GPars添加了每個流行迭代方法的并行版本,如eachParallel()、collectpallel()、findAllParallel()、maxpallel()和其他方法。這些方法對用戶隱藏了實際的實現(xiàn)。在幕后,集合被劃分成更小的塊,可能是按層次組織的,每個塊都將由不同的線程處理(圖1)。
實際工作由用戶必須提供的線程池中的線程執(zhí)行。GPAR提供了兩種類型的線程池:
GParsExecutorsPool 使用直接的Java 5 Executor
GParsPool 使用Fork / Join線程池
Parallel collections in use:GParsPool.withPool 16, { def myFavorite = programmingLanguages.collectParallel {it.grabSpec()} .maxParallel {it.spec.count(/parallel/)}}
在withPool代碼塊中,并行收集方法自動在周圍線程池的線程之間分配工作。向池中添加的線程越多,獲得的并行度就越高。如果沒有明確的池大小要求,池將為運行時檢測到的每個可用處理器創(chuàng)建一個線程,從而提供最大的計算能力。這樣就不會有人為的上限限制算法內(nèi)部的并行性。無論是在舊的單處理器機器上還是在未來的百核芯片上,代碼都將全速運行。
GPars并行集合API提供了經(jīng)常被稱為循環(huán)并行問題或更一般的幾何分解問題的解決方案。還有其他類型的挑戰(zhàn)需要更具創(chuàng)造性的并發(fā)方法。我們將在文章的下一部分討論其中的兩個。
異步函數(shù)
在看到集合被并發(fā)處理之后,我們現(xiàn)在將重點討論函數(shù)。Groovy對函數(shù)編程有很好的支持;畢竟,能夠并行化方法和函數(shù)調(diào)用肯定會派上用場。為了接近Groovy所在的領(lǐng)域,我選擇了軟件項目構(gòu)建編排問題作為下一步的主題。
注意:當(dāng)并行化構(gòu)建進(jìn)程時,通常I/O限制比CPU限制更多,我們明顯地增加了磁盤和網(wǎng)絡(luò)帶寬的利用率,而不是處理器的利用率。它不僅是CPU,而且是其他資源,通過并發(fā)代碼可以提高它們的利用率。顯然,所演示的原理可以完全以同樣的方式應(yīng)用于CPU受限的問題
讓我們假設(shè)我們有一組函數(shù),可能實現(xiàn)為shell腳本、gradle任務(wù)或GAnt方法,它們可以執(zhí)行構(gòu)建的不同部分。傳統(tǒng)的構(gòu)建腳本可以如下所示:
清單1:構(gòu)建腳本的順序版本
println "Starting the build process."def projectRoot = checkout('git@github.com:vaclav/GPars.git')def classes = compileSources(projectRoot)def api = generateAPIDoc(projectRoot)def guide = generateUserDocumentation(projectRoot)def result = deploy(packageProject(classes, api, guide))
我們現(xiàn)在的任務(wù)是盡可能地安全地并行化構(gòu)建,而不需要過度的努力。您可能會看到compileSources()、generateAPIDoc()和generateUserGuide()可以安全地并行運行,因為它們沒有相互依賴關(guān)系。他們只需要等待checkout()完成就可以開始工作。但是,腳本會連續(xù)運行它們。
我相信你能想象出比這個更復(fù)雜的構(gòu)建場景。然而,如果沒有一個好的抽象,如果我們的任務(wù)是并發(fā)地運行構(gòu)建任務(wù),即使使用這樣一個人工簡化的構(gòu)建腳本,我們也要做相當(dāng)多的工作(清單2)。你覺得這個怎么樣?
清單2:使用異步函數(shù)的構(gòu)建腳本的并發(fā)版本
withPool { /* We need asynchronous variants of all the individual build steps */ def aCheckout = checkout.asyncFun() def aCompileSources = compileSources.asyncFun() def aGenerateAPIDoc = generateAPIDoc.asyncFun() def aGenerateUserDocumentation = generateUserDocumentation.asyncFun() def aPackageProject = packageProject.asyncFun() def aDeploy = deploy.asyncFun() /* Here's the composition of asynchronous build steps to form a process */ Promise projectRoot = aCheckout('git@github.com:vaclav/GPars.git') Promise classes = aCompileSources(projectRoot) Promise api = aGenerateAPIDoc(projectRoot) Promise guide = aGenerateUserDocumentation(projectRoot) Promise result = aDeploy(aPackageProject(classes, api, guide)) /* Now we're setup and can wait for the build to finish */ println "Starting the build process. This line is quite likely to be printed first ..." println result.get()}
線路保持完全相同。我們只通過asyncFun()方法將原始函數(shù)轉(zhuǎn)換為異步函數(shù)。另外,整個代碼塊現(xiàn)在被包裝在GParsPool.with池()塊,以便函數(shù)有一些線程用于它們的艱苦工作。
顯然,在asyncFun()方法中發(fā)生了一些神奇的事情,允許函數(shù)異步運行,并且在它們需要彼此的數(shù)據(jù)時進(jìn)行協(xié)作。
細(xì)節(jié)之美
本質(zhì)上,asyncFun()將原始函數(shù)包裝到新函數(shù)中。新函數(shù)的簽名與原始函數(shù)的簽名略有不同。例如,原始的compileSources()函數(shù)將字符串作為參數(shù)并返回字符串作為結(jié)果:
String compileSources = {String projectRoot -> ...}
新構(gòu)造的aCompileSources()函數(shù)返回字符串的承諾,而不是字符串本身。此外,String和Promise
PromiseaCompileSources = {String | PromiseprojectRoot -> ...}
信守諾言
Promise接口是幾個GPars api的基礎(chǔ)。這有點類似于java.util.concurrent文件.Future,因為它表示一個正在進(jìn)行的異步活動,可以通過其blocking get()方法等待和獲取結(jié)果。兩者之間最重要的區(qū)別是,與Java的未來不同,承諾允許非阻塞讀取。
promise.then {println "Now we have a result: $it"}
這允許我們的異步函數(shù)僅在計算所需的所有值都可用時使用系統(tǒng)線程。因此,例如,只有在類、api和guide局部變量都綁定到結(jié)果值之后,打包才會開始。在此之前,aPackage()函數(shù)將在后臺靜默地等待,沒有計劃也沒有活動(圖2)。
現(xiàn)在,您應(yīng)該能夠看到構(gòu)建塊是多么完美地結(jié)合在一起。由于異步函數(shù)返回并接受承諾,因此它們可以按照與同步原始函數(shù)相同的方式進(jìn)行組合。函數(shù)組合的第二個也是可能更突出的好處是,我們不必明確指定哪些任務(wù)可以并行運行。我敢肯定,如果我們繼續(xù)將任務(wù)添加到構(gòu)建過程中,我們很快就會失去關(guān)于哪些活動可以安全地與哪些其他活動并行運行的全局視圖。幸運的是,我們的異步函數(shù)將在運行時自行發(fā)現(xiàn)并行性。在任何時候,所有準(zhǔn)備好參數(shù)的任務(wù)都將從分配的線程池中獲取一個線程并開始運行。通過限制池中的線程數(shù),可以設(shè)置并行運行的任務(wù)數(shù)的上限。
數(shù)據(jù)流在哪里
現(xiàn)在,我們已經(jīng)準(zhǔn)備好進(jìn)行今天的集合中最有趣的抽象—數(shù)據(jù)流并發(fā)。在上一個示例的基礎(chǔ)上,我們現(xiàn)在繼續(xù)。為了構(gòu)建多個項目,我們將反復(fù)運行構(gòu)建腳本。如果您愿意,可以將其視為未來構(gòu)建服務(wù)器的初始階段。各種項目的構(gòu)建請求將通過管道傳入,我們的構(gòu)建服務(wù)器將在系統(tǒng)資源允許的情況下依次處理它們。
您可能會嘗試最簡單的解決方案—為每個傳入請求運行上一個練習(xí)中基于異步函數(shù)的代碼。然而,這在很大程度上是次優(yōu)的可能性很高。由于請求隊列中堆積了多個請求,我們有機會獲得更大的并行性。不僅同一項目的獨立部分可以并行構(gòu)建,而且不同項目的不同部分也可以同時處理。簡單地說,不同項目的處理可以在時間上重疊(圖3)。
順其自然
這種模型自然適合于此類問題,稱為數(shù)據(jù)流網(wǎng)絡(luò)。它通常用于并行數(shù)據(jù)處理,如加密和壓縮、數(shù)據(jù)挖掘、圖像處理等。基本上,數(shù)據(jù)流網(wǎng)絡(luò)由活動的數(shù)據(jù)轉(zhuǎn)換元素(稱為操作符)組成,通過異步通道連接。每個操作符使用來自其輸入通道的數(shù)據(jù),并通過多個輸出通道發(fā)布結(jié)果。它還有一個相關(guān)的轉(zhuǎn)換功能,它將通過輸入通道接收的數(shù)據(jù)轉(zhuǎn)換為向下發(fā)送到輸出通道的數(shù)據(jù)。在這種情況下,操作符共享一個線程池,因此沒有數(shù)據(jù)可處理的非活動操作符不會占用系統(tǒng)線程。對于我們的簡單構(gòu)建服務(wù)器,網(wǎng)絡(luò)可以如圖4所示。
我們每一步都有一個操作員。這些通道表示構(gòu)建任務(wù)之間的依賴關(guān)系,每個操作符只需要一個系統(tǒng)線程,并在其所有輸入通道都有值可讀取之后啟動計算。
清單3:使用數(shù)據(jù)流操作符的并發(fā)構(gòu)建服務(wù)器
/* We need channels to wire active elements together */def urls = new DataflowQueue()def checkedOutProjects = new DataflowBroadcast()def compiledProjects = new DataflowQueue()def apiDocs = new DataflowQueue()def userDocs = new DataflowQueue()def packages = new DataflowQueue()def done = new DataflowQueue()/* Here's the composition of individual build steps into a process */operator(inputs: [urls], outputs: [checkedOutProjects], maxForks: 3) {url ->bindAllOutputs checkout(url)}operator([checkedOutProjects.createReadChannel()], [compiledProjects]) {projectRoot ->bindOutput compileSources(projectRoot)}operator(checkedOutProjects.createReadChannel(), apiDocs) {projectRoot ->bindOutput generateAPIDoc(projectRoot)}operator(checkedOutProjects.createReadChannel(), userDocs) {projectRoot ->bindOutput generateUserDocumentation(projectRoot)}operator([compiledProjects, apiDocs, userDocs], [packages]) {classes, api, guide ->bindOutput packageProject(classes, api, guide)}def deployer = operator(packages, done) {packagedProject ->if (deploy(packagedProject) == 'success') bindOutput true else bindOutput false}/* Now we're setup and can wait for the build to finish */println "Starting the build process. This line is quite likely to be printed first ..."deployer.join() //Wait for the last operator in the network to finish
這個模型應(yīng)用于我們的問題的好處是,例如,當(dāng)?shù)谝粋€操作符執(zhí)行項目簽出并獲取項目的源代碼時,它可以立即獲取隊列中的下一個請求,并在編譯、打包和部署前很久就開始獲取其源代碼。
通過更改分配給網(wǎng)絡(luò)中特定任務(wù)的操作員數(shù)量,可以調(diào)整系統(tǒng)。例如,如果我們意識到獲取源是一個瓶頸,并且假設(shè)硬件(網(wǎng)絡(luò)帶寬)仍然沒有得到充分利用,那么我們可以通過增加源獲取操作符的數(shù)量來提高服務(wù)器的吞吐量(圖5)。
顯然,您的調(diào)優(yōu)選項比分叉大量使用的操作符要遠(yuǎn)得多。為了簡單地說出一些其他的可能性,您可以考慮在網(wǎng)絡(luò)的重復(fù)部分之間采用負(fù)載平衡方案,實施生產(chǎn)節(jié)流——可以通過同步通信通道,也可以使用類似看板的工作進(jìn)度節(jié)流方案。對于某些問題,可以考慮數(shù)據(jù)緩存和推測性計算。
摘要
在深入了解Groovy并發(fā)之后,您可能處于一個很好的位置來進(jìn)行更深入的研究。為了快速入門,請考慮遵循gpar的一個快速方法,我絕對建議您查看用戶指南以了解更多詳細(xì)信息。
如果你帶著GPAR去兜風(fēng)我會很高興的。去享受并發(fā)性吧,因為并發(fā)性太棒了!