Clojure学习笔记(三):并发与引用

很多人是为了更好地进行并发编程而选择了Clojure,但Clojure所有的数据都是只读的,除非你使用引用类型(Vars、Ref、Atom、Agent)来标明它们是可以修改的。Clojure处理并发的思路与众不同,采用的是Software Transactional Memory (STM)来实现的,即软事务内存。你可以将STM想象成数据库,只不过是内存型的,它只支持事务的ACI,也就是原子性、一致性、隔离性,但是不包括持久性,因为状态的保存都在内存里。引用类型是一种可变引用指向不可变数据的一种机制。Clojure的并发API分为四种模型:

  1. 管理Thread local变量的Var。
  2. 管理协作式、同步修改可变状态的Ref
  3. 管理非协作式、同步修改可变状态的Atom
  4. 管理异步修改可变状态的Agent

Vars

Vars 是一种引用类型,它可以有一个被所有线程共享的root binding,并且每个线程还能拥有自己(thread-local)的值。

  • def
    def 定义, 将会影响全局定义。

    (def v 2) ; -> 2
  • let
    let 定义, 将会影响自身生命周期内,以及自己作用域内,如果超出自己的作用域,则无效。

    (def name "jark")
    (let [name "wuchong"]
    (println name)) ; -> 输出 wuchong
    (println name) ; -> 输出 jark
  • binding
    binding, 将会影响自身生命周期以及自己作用域内,即使超出自身作用域,也都有效。

    这个例子演示了binding和set!一起使用,用set!来修改一个由binding bind的Var的线程本地的值。

    (def ^:dynamic v 1) ; 需要声明成"dynamic",v才能用binding来改变值。

    (defn change-it []
    (println "2) v =" v) ; -> 1

    (def v 2) ; changes root value
    (println "3) v =" v) ; -> 2

    (binding [v 3] ; binds a thread-local value
    (println "4) v =" v) ; -> 3

    (set! v 4) ; changes thread-local value
    (println "5) v =" v)) ; -> 4

    (println "6) v =" v)) ; thread-local value is gone now -> 2

    (println "1) v =" v) ; -> 1

    (let [thread (Thread. #(change-it))] ; 启动一个本地线程
    (.start thread)
    (.join thread)) ; 等待线程结束

    (println "7) v =" v) ; -> 2

好吧,说了这么多,其实Clojure不鼓励我们使用Vars,因为Vars在线程间不能很好地协作。

Refs

Refs是用来协调对于一个或者多个binding的并发修改的。

  • ref

    ; 用ref函数创建一个可变的引用(reference),一个空的歌曲集合
    (def song (ref #{}))
    (println @song) ; -> #{} 用@来读取ref值
  • validator
    类似数据库,可以为ref添加“约束”,在数据更新的时候需要通过validator函数的验证,如果验证不通过,整个事务将回滚。

    (def validate-song
    (partial every? #(not (nil? %)))) ; 定义了歌名不能为空的validator
    (def song (ref #{} :validator validate-song))
  • dosync & ref-set

    ; 改变引用指向的内容,使用ref-set函数
    (ref-set song #{"Dangerous"})
    ; -> IllegalStateException: No transaction running
    ; 会报错,因为引用是可变的,对状态的更新需要用事务进行保护,使用dosync
    (dosync (ref-set song #{"Dangerous"}))

    ; 因为我们加了不能为空的validator,加入空会报错
    (dosync (ref-set song #{})) ; -> IllegalStateException Invalid reference state

    ; 可以对多个ref的更新放在一个事务里
    (dosync (ref-set song #{"Dangerous"})
    (ref-set singer #{"MJ"}) )
  • alter & commute
    更改引用有点暴力也比较少见,更常见的更新是根据当前状态更新,比如加一首歌进去。

    ; 先查询集合内容,然后往集合里添加歌曲,然后更新整个集合
    (dosync (ref-set song (conj @song "heal the world")))

    ; 查询并更新的操作可以合成一步,这是通过alter函数实现
    (dosync (alter song conj "heal the world"))

    (println @song) ; -> #{heal the world}

注意alter后跟的函数会把ref值当做第一个参数,所以这里使用cons就不行了,因为cons要求第一个参数是加入的元素。

commute函数是是对alter的优化,commute可以同时进行修改(并不影响ref最终的值)。通常情况下,一般优先使用alter,除非在遇到明显的性能瓶颈并且对顺序不是那么关心的时候,可以考虑用commute替换。

Atoms

Atoms 提供了比使用Refs&STM更简单的更新当个值的方法。它不受事务的影响。有点像Java的原子类(Atomic)。

有三个函数可以修改一个Atom的值:reset!,compare-and-set!swap!

(def counter (atom 1))  ; 指定 counter 为Atom类型
(reset! counter 2) ; 更新原子的值
(println @counter) ; -> 2 deref,用`@`读取atom的值
(compare-and-set! counter 2 3) ; -> true 执行成功
(swap! counter inc) ; 第二个参数是计算Atom新值的函数,可带参数。会一直执行直到成功为止。

Agents

Agents 是用来把一些事情放到另外一个线程来做(一般不需要事务控制的),用来控制状态的异步更新。

(def counter (agent 0)) ; 使用agent函数定义一个初始值为0的agent
(println @counter) ; -> 0 同样的使用@读取值

; 更新agent,通过send函数给agent发送任务去更新agent
(send counter inc) ; -> #<Agent@9444d1: 0> 此处一般是0,因为更新是异步的
(println @counter) ; -> 1 这里获取的肯定是1了,已经更新了

; 还有个方法,send-off,它的作用于send类似:
(send-off counter inc)
(println @counter) ; -> 2

send和send-off的区别在于,send是将任务交给一个固定大小的线程池执行(默认大小是CPU核数+2)。因此send执行的任务最好不要有阻塞的操作。而send-off则使用没有大小限制(取决于内存)的线程池。因此,send-off比较适合任务有阻塞的操作,如IO读写之类。注意,所有的agent是共用这些线程池。

扩展阅读

这篇笔记原先是想放在一篇文章里的,谁知太长了,只好分成了三篇。以下是我觉得学习Clojure不错的网上资源,需者自取。