Runnable/Callable
Runnable只有一个没有返回值的方法
trait Runnable {
def run(): Unit
}
Callable的方法和run类似,只不过它有一个返回值
trait Callable[V] {
def call(): V
}
线程
Scala的并发是建立在Java的并发模型上的。
在Sun的JVM上,对于一个IO密集型的任务,我们可以在单机上运行成千上万的线程。
Thread是通过Runnable构造的。要运行一个Runnable的run方法,你需要调用对应线程的start方法。
scala> val hello = new Thread( new Runnable {
def run() {
println( "hello world" )
}
})
hello: java.lang.Thread = Thread[Thread- 3 , 5 ,main]
scala> hello.start
hello world
当你看见一个实现Runnable的类,你应该明白它会被放到一个线程里去执行的。
一段单线程的代码
下面是一段代码片段,它可以运行,但是会有问题。
import java.net.{Socket, ServerSocket}
import java.util.concurrent.{Executors, ExecutorService}
import java.util.Date
class NetworkService(port: Int, poolSize: Int) extends Runnable {
val serverSocket = new ServerSocket(port)
def run() {
while ( true ) {
// 这里会阻塞直到有连接进来
val socket = serverSocket.accept()
( new Handler(socket)).run()
}
}
}
class Handler(socket: Socket) extends Runnable {
def message = (Thread.currentThread.getName() + "\n" ).getBytes
def run() {
socket.getOutputStream.write(message)
socket.getOutputStream.close()
}
}
( new NetworkService( 2020 , 2 )).run
每个请求都会把当前线程的名称main作为响应。
这段代码最大的问题在于一次只能够响应一个请求!
你可以对每个请求都单独用一个线程来响应。只需要把
( new Handler(socket)).run()
改成
( new Thread( new Handler(socket))).start()
但是如果你想要复用线程或者对于线程的行为要做一些其他的控制呢?
Executors
随着Java 5的发布,对于线程的管理需要一个更加抽象的接口。
你可以通过Executors对象的静态方法来取得一个ExecutorService对象。这些方法可以让你使用各种不同的策略来配置一个ExecutorService,例如线程池。
下面是我们之前的阻塞式网络服务器,现在改写成可以支持并发请求。
import java.net.{Socket, ServerSocket}
import java.util.concurrent.{Executors, ExecutorService}
import java.util.Date
class NetworkService(port: Int, poolSize: Int) extends Runnable {
val serverSocket = new ServerSocket(port)
val pool: ExecutorService = Executors.newFixedThreadPool(poolSize)
def run() {
try {
while ( true ) {
// This will block until a connection comes in.
val socket = serverSocket.accept()
pool.execute( new Handler(socket))
}
} finally {
pool.shutdown()
}
}
}
class Handler(socket: Socket) extends Runnable {
def message = (Thread.currentThread.getName() + "\n" ).getBytes
def run() {
socket.getOutputStream.write(message)
socket.getOutputStream.close()
}
}
( new NetworkService( 2020 , 2 )).run
从下面的示例中,我们可以大致了解内部的线程是怎么进行复用的。
$ nc localhost 2020
pool- 1 -thread- 1
$ nc localhost 2020
pool- 1 -thread- 2
$ nc localhost 2020
pool- 1 -thread- 1
$ nc localhost 2020
pool- 1 -thread- 2
Futures
一个Future代表一次异步计算的操作。你可以把你的操作包装在一个Future里,当你需要结果的时候,你只需要简单调用一个阻塞的get()方法就好了。一个Executor返回一个Future。如果你使用Finagle RPC的话,你可以使用Future的实例来保存还没有到达的结果。
FutureTask是一个可运行的任务,并且被设计成由Executor进行运行。
val future = new FutureTask[String]( new Callable[String]() {
def call(): String = {
searcher.search(target);
}})
executor.execute(future)
现在我需要结果,那就只能阻塞到直到结果返回。
val blockingResult = future.get()
参考 Scala School中关于Finagle的章节有大量使用Future的示例,也有一些组合使用的例子。Effective Scala中也有关于Futures的内容。
线程安全问题
class Person(var name: String) {
def set(changedName: String) {
name = changedName
}
}
这个程序在多线程的环境下是不安全的。如果两个线程都有同一个Person示例的引用,并且都调用set方法,你没法预料在两个调用都结束的时候name会是什么。
在Java的内存模型里,每个处理器都允许在它的L1或者L2 cache里缓存变量,所以两个在不同处理器上运行的线程对于相同的数据有种不同的视图。
下面我们来讨论一下可以强制线程的数据视图保持一致的工具。
三个工具
同步
互斥量(Mutex)提供了锁定资源的语法。当你进入一个互斥量的时候,你会获得它。在JVM里使用互斥量最常用的方式就是在一个对象上进行同步访问。在这里,我们会在Person上进行同步访问。
在JVM里,你可以对任何非null的对象进行同步访问。
s Person(var name: String) {
def set(changedName: String) {
this . synchronized {
name = changedName
}
}
}
volatile
随着Java 5对于内存模型的改变,volatile和synchronized的作用基本相同,除了一点,volatile也可以用在null上。
synchronized提供了更加细粒度的加锁控制。而volatile直接是对每次访问进行控制。
class Person( @volatile var name: String) {
def set(changedName: String) {
name = changedName
}
}
AtomaticReference
同样的,在Java 5中新增了一系列底层的并发原语。AtomicReference类就是其中一个。
import java.util.concurrent.atomic.AtomicReference
class Person(val name: AtomicReference[String]) {
def set(changedName: String) {
name.set(changedName)
}
}
它们都有额外的消耗吗?
AutomicReference是这两种方式中最耗性能的,因为如果你要取得对应的值,则需要经过方法分派(method dispatch)的过程。
volatile和synchronized都是通过Java内置的monitor来实现的。在没有竞争的情况下,monitor对性能的影响非常小。由于synchronized允许你对代码进行更加细粒度的加锁控制,这样就可以减小加锁区,进而减小竞争,因此synchronized应该是最佳的选择。
当你进入同步块,访问volatile引用,或者引用AtomicReference,Java会强制要求处理器刷新它们的缓存流水线,从而保证数据的一致性。
如果我这里说错了,请指正出来。这是一个很复杂的主题,对于这个主题肯定需要花费大量的时间来进行讨论。
其他来自Java 5的优秀工具
之前提到了AtomicReference,除了它之外,Java 5还提供了很多其他有用的工具。
CountDownLatch
CountDownLatch是供多个进程进行通信的一个简单机制。
val doneSignal = new CountDownLatch( 2 )
doAsyncWork( 1 )
doAsyncWork( 2 )
doneSignal.await()
println( "both workers finished!" )
除此之外,它对于单元测试也是很有用的。假设你在做一些异步的工作,并且你想要保证所有的功能都完成了。你只需要让你的函数都对latch进行countDown操作,然后在你的测试代码里进行await。
AtomicInteger/Long
由于对于Int和Long的自增操作比较常见,所以就增加了AtomicInteger和AtomicLong。
AtomicBoolean
我想我没有必要来解释这个的作用了。
读写锁(ReadWriteLock)
ReadWriteLock可以实现读写锁,读操作只会在写者加锁的时候进行阻塞。
我们来构建一个非线程安全的搜索引擎
这是一个简单的非线程安全的倒排索引。我们这个反向排索引把名字的一部分映射到指定的用户。
下面是原生的假设只有单线程访问的写法。
注意这里的使用mutable.HashMap的另一个构造函数this()。
import scala.collection.mutable
case class User(name: String, id: Int)
class InvertedIndex(val userMap: mutable.Map[String, User]) {
def this () = this ( new mutable.HashMap[String, User])
def tokenizeName(name: String): Seq[String] = {
name.split( " " ).map(_.toLowerCase)
}
def add(term: String, user: User) {
userMap += term -> user
}
def add(user: User) {
tokenizeName(user.name).foreach { term =>
add(term, user)
}
}
}
我把具体怎么根据索引获取用户的方法暂时省略掉了,我们后面会来进行补充。
我们来让它变得安全
在上面的倒排索引的示例里,userMap是没法保证线程安全的。多个客户端可以同时尝试去添加元素,这样会产生和之前Person示例里相似的问题。
因为userMap本身不是线程安全的,那么我们怎么能够保证每次只有一个线程对它进行修改呢?
你需要在添加元素的时候给userMap加锁。
def add(user: User) {
userMap. synchronized {
tokenizeName(user.name).foreach { term =>
add(term, user)
}
}
不幸的是,上面的做法有点太粗糙了。能在互斥量(mutex)外面做的工作尽量都放在外面做。记住我之前说过,如果没有竞争的话,加锁的代价是非常小的。如果你在临界区尽量少做操作,那么竞争就会非常少。
def add(user: User) {
// tokenizeName was measured to be the most expensive operation.
// tokenizeName 这个操作是最耗时的。
val tokens = tokenizeName(user.name)
tokens.foreach { term =>
userMap. synchronized {
add(term, user)
}
}
}
SynchronizedMap
我们可以通过使用SynchronizedMap trait来使得一个可变的(mutable)HashMap具有同步机制。
我们可以扩展之前的InvertedIndex,给用户提供一种构建同步索引的简单方法。
import scala.collection.mutable.SynchronizedMap
class SynchronizedInvertedIndex(userMap: mutable.Map[String, User]) extends InvertedIndex(userMap) {
def this () = this ( new mutable.HashMap[String, User] with SynchronizedMap[String, User])
}
如果你去看具体的实现的话,你会发现SynchronizedMap只是在每个方法上都加上了同步访问,因此它的安全是以牺牲性能为代价的。
Java ConcurrentHashMap
Java里有一个很不错的线程安全的ConcurrentHashMap。幸运的是,JavaConverter可以使得我们通过Scala的语法来使用它。
实际上,我们可以无缝地把我们新的,线程安全的InvertedIndex作为老的非线程安全的一个扩展。
import java.util.concurrent.ConcurrentHashMap
import scala.collection.JavaConverters._
class ConcurrentInvertedIndex(userMap: collection.mutable.ConcurrentMap[String, User])
extends InvertedIndex(userMap) {
def this () = this ( new ConcurrentHashMap[String, User] asScala)
}
现在来加载我们的InvertedIndex
最原始的方法
trait UserMaker {
def makeUser(line: String) = line.split( "," ) match {
case Array(name, userid) => User(name, userid.trim().toInt)
}
}
class FileRecordProducer(path: String) extends UserMaker {
def run() {
Source.fromFile(path, "utf-8" ).getLines.foreach { line =>
index.add(makeUser(line))
}
}
}
对于文件里的每一行字符串,我们通过调用makeUser来生成一个User,然后通过add添加到InvertedIndex里。如果我们并发访问一个InvertedIndex,我们可以并行调用add方法,因为makeUser方法没有副作用,它本身就是线程安全的。
我们不能并行读取一个文件,但是我们可以并行构造User,并且并行将它添加到索引里。
解决方案:生产者/消费者
实现非同步计算的,通常采用的方法就是将生产者同消费者分开,并让它们通过队列(queue)来进行通信。让我们用下面的例子来说明我们是怎么实现搜索引擎的索引的。
import java.util.concurrent.{BlockingQueue, LinkedBlockingQueue}
// Concrete producer
class Producer[T](path: String, queue: BlockingQueue[T]) extends Runnable {
def run() {
Source.fromFile(path, "utf-8" ).getLines.foreach { line =>
queue.put(line)
}
}
}
// 抽象的消费者
abstract class Consumer[T](queue: BlockingQueue[T]) extends Runnable {
def run() {
while ( true ) {
val item = queue.take()
consume(item)
}
}
def consume(x: T)
}
val queue = new LinkedBlockingQueue[String]()
//一个生产者线程
val producer = new Producer[String]( "users.txt" , q)
new Thread(producer).start()
trait UserMaker {
def makeUser(line: String) = line.split( "," ) match {
case Array(name, userid) => User(name, userid.trim().toInt)
}
}
class IndexerConsumer(index: InvertedIndex, queue: BlockingQueue[String]) extends Consumer[String](queue) with UserMaker {
def consume(t: String) = index.add(makeUser(t))
}
// 假设我们的机器有8个核
val cores = 8
val pool = Executors.newFixedThreadPool(cores)
// 每个核设置一个消费者
for (i <- i to cores) {
pool.submit( new IndexerConsumer[String](index, q))
}
http://www.importnew.com/4750.html
相关资源:Scala基础(11)运行基本流程及RDD_怎么启动scala资源-CSDN文库
————————————————