并发编程 代码练习
在《编程练习》一书中,本章称为“演员”。 在软件中, 参与者模型是一种非常特定的代码设计方式:
计算机科学中的参与者模型是并行计算的数学模型,该模型将“参与者”视为并行计算的通用原语。 响应收到的消息,参与者可以:做出本地决策,创建更多参与者,发送更多消息以及确定如何响应收到的下一条消息。 参与者可以修改自己的私有状态,但只能通过消息传递间接相互影响(避免基于锁的同步)。
这是《编程风格练习》重点系列的第 16 个帖子。其他帖子包括:
- 以编程风格介绍练习
- 以编程风格进行练习,将内容堆叠起来
- 编程风格的练习,Kwisatz Haderach风格
- 编程风格的练习,递归
- 具有高阶功能的编程风格的练习
- 以编程风格进行练习
- 以编程风格进行练习,回到面向对象的编程
- 编程风格的练习:地图也是对象
- 编程风格的练习:事件驱动的编程
- 编程风格的练习和事件总线
- 反思编程风格的练习
- 面向方面的编程风格的练习
- 编程风格的练习:FP&I / O
- 关系数据库风格的练习
- 编程风格的练习:电子表格
- 并发编程风格的练习 (本文)
- 编程风格的练习:在线程之间共享数据
- 使用Hazelcast以编程风格进行练习
- MapReduce风格的练习
- 编程风格的练习总结
原始的Python代码
如上所述,原始的Python代码基于actor模型。 每个类都是一个参与者,它从Thread
继承并通过消息与其他参与者进行通信。 这是类图:
参与者之间通过消息进行交流,如上一则有关事件驱动编程的文章所述 。 事件处理在_dispatch()
函数中实现。
ActiveWFObject
的代码如下:
defrun(self): whilenotself._stopMe: message=self.queue.get() self._dispatch(message) ifmessage[0]=='die': self._stopMe=True
本质上, run()
函数是一个循环,该循环:
- 从队列中读取消息
- 使用接收到的消息作为参数调用
_dispatch()
函数 -
die
消息停止循环
移植到Kotlin
将代码移植到Kotlin的第一步只是将Python类与Kotlin类一对一映射的问题:
Python是一种动态类型的语言。 因此,在原始的Python代码中,消息是一个数组,其中的第一个元素按照约定是定义消息的“类型”的String
。 受益于Kotlin的静态键入性质的一种改进是创建消息类型层次结构。
请注意,就像在事件驱动的编程文章中一样 ,消息类型在产生它们的类中定义。
走向更灵活的设计
已移植的Actor
类的代码如下所示:
abstractclassActor:Runnable{
privatevalqueue=ArrayDeque<Message>() privatevarstop=false internalvarthread=Thread(this,this::class.simpleName).apply{
(1) start() } finaloverridefunrun(){
while(!stop){
valmessage:Message?=queue.poll() if(message!=null){
dispatch(message) if(message==Die)stop=true } } } abstractfundispatch(message:Message) funsend(message:Message)=queue.add(message) }
- 实例化对象时线程开始
恕我直言,将对象的生命周期与线程的生命周期绑定在一起是一个问题。 它使线程成为实现细节,但绝对不是。 此外,它阻止了轻松迁移到其他并发模型。
因此,我们应该从Actor
类中删除thread
属性,并将线程逻辑移至调用代码:
valwordFrequencyController=WordFrequencyController() createThread(wordFrequencyController){
start() join() } fun<T:Actor>createThread(actor:T)=Thread(actor,actor::class.simpleName)
进一步完善
此时,可以使用更高级的API,例如最新Java版本提供的API。
介绍执行器服务
要迁移到最新的API,第一步是停止直接使用线程,而使用executor服务-从Java 5开始可用。executor服务允许返回Future
,从而可以跟踪任务的进度。
此外,Java 5还提供了Executors
类,这是不同执行程序服务类型的工厂。 例如, Executors.newFixedThreadPool()
创建了一个执行程序服务,该服务显然使固定数量的线程可用:
valdataStorageManager=DataStorageManager() valstopWordManager=StopWordManager() valwordFrequencyManager=WordFrequencyManager() valwordFrequencyController=WordFrequencyController() valexecutorService=Executors.newFixedThreadPool(4) listOf(dataStorageManager,stopWordManager, wordFrequencyManager,wordFrequencyController) .map{
executorService.submit(it)}[3] .get() returnwordFrequencyController.getResult()
介绍Callable
下一步是认识到要从当前状态的代码中获取结果,需要读取一个属性:
classWordFrequencyController:Actor(){
privatelateinitvarresult:Map<String,Int> // Somehow fill in the result property fungetResult():Map<String,Int>{
returnresult } }
但是,这种设计不是最佳的,因为JDK中有一个接口可以支持此合同: java.util.concurrent.Callable
在任务完成时返回结果。 因此,我们可以重新设计当前的类层次结构,以从Callable
受益:
等效代码如下:
classWordFrequencyController:Actor(),Callable<Map<String,Int>>{
privatelateinitvarresult:Map<String,Int> overridefuncall():Map<String,Int>{
loop() (1) returnresult } }
- 依次调用
dispatch()
,它填充result
介绍可赎回期货
最后一步需要使用CompletableFuture
,这是专门的Future
“可以显式完成”。 它提供了可以接受其他Executor
参数的静态函数。 从Java 1.8开始可用。
要利用CompletableFuture
,需要从Supplier
迁移到Callable
。 然后,可以如下更新代码:
with(Executors.newFixedThreadPool(4)){
CompletableFuture.runAsync(dataStorageManager,this) CompletableFuture.runAsync(stopWordManager,this) CompletableFuture.runAsync(wordFrequencyManager,this) returnCompletableFuture.supplyAsync(wordFrequencyController,this).get() }
结论
参与者模型是基于消息并提供弹性的重要设计。 在某些技术堆栈中,它作为库提供,例如用于Scala的Akka 。 在Elixir中,它似乎是语言API本身的核心部分 -我当然需要更深入地研究它。
在其他情况下,参与者需要手动实现。 在这种情况下,需要了解其一些重要功能。 例如,参与者模型的主要好处之一是让单个参与者崩溃并自动产生新的参与者,从而通过自我修复提供弹性。 上面的代码中认为没有必要这样做。
当该语言不能为演员提供现成的方法时,有几种方法可以实现它们。 根据技术堆栈的不同,有些方法比其他方法更容易:检查每种语言版本带来的新的并发API很有好处。
更进一步:
- 原始Python代码
- 并发和并行之间有什么区别?
翻译自: https://blog.frankel.ch/exercises-programming-style/16/
并发编程 代码练习
版权声明:
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若内容造成侵权、违法违规、事实不符,请将相关资料发送至xkadmin@xkablog.com进行投诉反馈,一经查实,立即处理!
转载请注明出处,原文链接:https://www.xkablog.com/elixirbfbc/2405.html