首页 > 其他分享 >Scala并发

Scala并发

时间:2023-08-04 17:37:53浏览次数:34  
标签:String val Scala 并发 线程 new def name


 

Runnable/Callable

Runnable只有一个没有返回值的方法


trait Runnable {
   def run(): Unit
}

Callable的方法和run类似,只不过它有一个返回值



trait Callable[V] {
   def call(): V
}

线程

Scala的并发是建立在Java的并发模型上的。


在Sun的JVM上,对于一个IO密集型的任务,我们可以在单机上运行成千上万的线程。


Thread是通过Runnable构造的。要运行一个Runnable的run方法,你需要调用对应线程的start方法。


scala> val hello = new Thread( new Runnable {
   def run() {
     println( "hello world" )
   }
})
hello: java.lang.Thread = Thread[Thread- 3 , 5 ,main]
 
scala> hello.start
hello world

当你看见一个实现Runnable的类,你应该明白它会被放到一个线程里去执行的。



一段单线程的代码

下面是一段代码片段,它可以运行,但是会有问题。



import java.net.{Socket, ServerSocket}
import java.util.concurrent.{Executors, ExecutorService}
import java.util.Date
 
class NetworkService(port: Int, poolSize: Int) extends Runnable {
   val serverSocket = new ServerSocket(port)
 
   def run() {
     while ( true ) {
       // 这里会阻塞直到有连接进来
       val socket = serverSocket.accept()
       ( new Handler(socket)).run()
     }
   }
}
 
class Handler(socket: Socket) extends Runnable {
   def message = (Thread.currentThread.getName() + "\n" ).getBytes
 
   def run() {
     socket.getOutputStream.write(message)
     socket.getOutputStream.close()
   }
}
 
( new NetworkService( 2020 , 2 )).run

每个请求都会把当前线程的名称main作为响应。


这段代码最大的问题在于一次只能够响应一个请求!


你可以对每个请求都单独用一个线程来响应。只需要把



( new Handler(socket)).run()

改成



( new Thread( new Handler(socket))).start()

但是如果你想要复用线程或者对于线程的行为要做一些其他的控制呢?



Executors

随着Java 5的发布,对于线程的管理需要一个更加抽象的接口。


你可以通过Executors对象的静态方法来取得一个ExecutorService对象。这些方法可以让你使用各种不同的策略来配置一个ExecutorService,例如线程池。


下面是我们之前的阻塞式网络服务器,现在改写成可以支持并发请求。



import java.net.{Socket, ServerSocket}
import java.util.concurrent.{Executors, ExecutorService}
import java.util.Date
 
class NetworkService(port: Int, poolSize: Int) extends Runnable {
   val serverSocket = new ServerSocket(port)
   val pool: ExecutorService = Executors.newFixedThreadPool(poolSize)
 
   def run() {
     try {
       while ( true ) {
         // This will block until a connection comes in.
         val socket = serverSocket.accept()
         pool.execute( new Handler(socket))
       }
     } finally {
       pool.shutdown()
     }
   }
}
 
class Handler(socket: Socket) extends Runnable {
   def message = (Thread.currentThread.getName() + "\n" ).getBytes
 
   def run() {
     socket.getOutputStream.write(message)
     socket.getOutputStream.close()
   }
}
 
( new NetworkService( 2020 , 2 )).run

从下面的示例中,我们可以大致了解内部的线程是怎么进行复用的。


$ nc localhost 2020
pool- 1 -thread- 1
 
$ nc localhost 2020
pool- 1 -thread- 2
 
$ nc localhost 2020
pool- 1 -thread- 1
 
$ nc localhost 2020
pool- 1 -thread- 2


Futures

一个Future代表一次异步计算的操作。你可以把你的操作包装在一个Future里,当你需要结果的时候,你只需要简单调用一个阻塞的get()方法就好了。一个Executor返回一个Future。如果你使用Finagle RPC的话,你可以使用Future的实例来保存还没有到达的结果。


FutureTask是一个可运行的任务,并且被设计成由Executor进行运行。


val future = new FutureTask[String]( new Callable[String]() {
   def call(): String = {
     searcher.search(target);
}})

executor.execute(future)

现在我需要结果,那就只能阻塞到直到结果返回。



val blockingResult = future.get()

参考 Scala School中关于Finagle的章节有大量使用Future的示例,也有一些组合使用的例子。Effective Scala中也有关于Futures的内容。



线程安全问题


class Person(var name: String) {
   def set(changedName: String) {
     name = changedName
   }
}

这个程序在多线程的环境下是不安全的。如果两个线程都有同一个Person示例的引用,并且都调用set方法,你没法预料在两个调用都结束的时候name会是什么。


在Java的内存模型里,每个处理器都允许在它的L1或者L2 cache里缓存变量,所以两个在不同处理器上运行的线程对于相同的数据有种不同的视图。


下面我们来讨论一下可以强制线程的数据视图保持一致的工具。



三个工具

同步

互斥量(Mutex)提供了锁定资源的语法。当你进入一个互斥量的时候,你会获得它。在JVM里使用互斥量最常用的方式就是在一个对象上进行同步访问。在这里,我们会在Person上进行同步访问。


在JVM里,你可以对任何非null的对象进行同步访问。


s Person(var name: String) {
   def set(changedName: String) {
     this . synchronized {
       name = changedName
     }
   }
}

volatile

随着Java 5对于内存模型的改变,volatile和synchronized的作用基本相同,除了一点,volatile也可以用在null上。


synchronized提供了更加细粒度的加锁控制。而volatile直接是对每次访问进行控制。


class Person( @volatile var name: String) {
   def set(changedName: String) {
     name = changedName
   }

}

AtomaticReference

同样的,在Java 5中新增了一系列底层的并发原语。AtomicReference类就是其中一个。


import java.util.concurrent.atomic.AtomicReference
 
class Person(val name: AtomicReference[String]) {
   def set(changedName: String) {
     name.set(changedName)
   }
}


它们都有额外的消耗吗?

AutomicReference是这两种方式中最耗性能的,因为如果你要取得对应的值,则需要经过方法分派(method dispatch)的过程。


volatile和synchronized都是通过Java内置的monitor来实现的。在没有竞争的情况下,monitor对性能的影响非常小。由于synchronized允许你对代码进行更加细粒度的加锁控制,这样就可以减小加锁区,进而减小竞争,因此synchronized应该是最佳的选择。


当你进入同步块,访问volatile引用,或者引用AtomicReference,Java会强制要求处理器刷新它们的缓存流水线,从而保证数据的一致性。


如果我这里说错了,请指正出来。这是一个很复杂的主题,对于这个主题肯定需要花费大量的时间来进行讨论。



其他来自Java 5的优秀工具

之前提到了AtomicReference,除了它之外,Java 5还提供了很多其他有用的工具。


CountDownLatch

CountDownLatch是供多个进程进行通信的一个简单机制。



val doneSignal = new CountDownLatch( 2 )
doAsyncWork( 1 )
doAsyncWork( 2 )
 
doneSignal.await()
println( "both workers finished!" )

除此之外,它对于单元测试也是很有用的。假设你在做一些异步的工作,并且你想要保证所有的功能都完成了。你只需要让你的函数都对latch进行countDown操作,然后在你的测试代码里进行await。


AtomicInteger/Long

由于对于Int和Long的自增操作比较常见,所以就增加了AtomicInteger和AtomicLong。


AtomicBoolean

我想我没有必要来解释这个的作用了。


读写锁(ReadWriteLock)

ReadWriteLock可以实现读写锁,读操作只会在写者加锁的时候进行阻塞。



我们来构建一个非线程安全的搜索引擎

这是一个简单的非线程安全的倒排索引。我们这个反向排索引把名字的一部分映射到指定的用户。


下面是原生的假设只有单线程访问的写法。


注意这里的使用mutable.HashMap的另一个构造函数this()。

import scala.collection.mutable
 
case class User(name: String, id: Int)
 
class InvertedIndex(val userMap: mutable.Map[String, User]) {
 
   def this () = this ( new mutable.HashMap[String, User])
 
   def tokenizeName(name: String): Seq[String] = {
     name.split( " " ).map(_.toLowerCase)
   }
 
   def add(term: String, user: User) {
     userMap += term -> user
   }
 
   def add(user: User) {
     tokenizeName(user.name).foreach { term =>
       add(term, user)
     }
   }
}

我把具体怎么根据索引获取用户的方法暂时省略掉了,我们后面会来进行补充。



我们来让它变得安全

在上面的倒排索引的示例里,userMap是没法保证线程安全的。多个客户端可以同时尝试去添加元素,这样会产生和之前Person示例里相似的问题。


因为userMap本身不是线程安全的,那么我们怎么能够保证每次只有一个线程对它进行修改呢?


你需要在添加元素的时候给userMap加锁。


def add(user: User) {
   userMap. synchronized {
     tokenizeName(user.name).foreach { term =>
       add(term, user)
     }
   }


不幸的是,上面的做法有点太粗糙了。能在互斥量(mutex)外面做的工作尽量都放在外面做。记住我之前说过,如果没有竞争的话,加锁的代价是非常小的。如果你在临界区尽量少做操作,那么竞争就会非常少。



def add(user: User) {
   // tokenizeName was measured to be the most expensive operation.
   // tokenizeName 这个操作是最耗时的。
   val tokens = tokenizeName(user.name)
 
   tokens.foreach { term =>
     userMap. synchronized {
       add(term, user)
     }
   }
}

SynchronizedMap

我们可以通过使用SynchronizedMap trait来使得一个可变的(mutable)HashMap具有同步机制。


我们可以扩展之前的InvertedIndex,给用户提供一种构建同步索引的简单方法。


import scala.collection.mutable.SynchronizedMap
 
class SynchronizedInvertedIndex(userMap: mutable.Map[String, User]) extends InvertedIndex(userMap) {
   def this () = this ( new mutable.HashMap[String, User] with SynchronizedMap[String, User])
}

如果你去看具体的实现的话,你会发现SynchronizedMap只是在每个方法上都加上了同步访问,因此它的安全是以牺牲性能为代价的。



Java ConcurrentHashMap

Java里有一个很不错的线程安全的ConcurrentHashMap。幸运的是,JavaConverter可以使得我们通过Scala的语法来使用它。


实际上,我们可以无缝地把我们新的,线程安全的InvertedIndex作为老的非线程安全的一个扩展。



import java.util.concurrent.ConcurrentHashMap
import scala.collection.JavaConverters._
 
class ConcurrentInvertedIndex(userMap: collection.mutable.ConcurrentMap[String, User])
     extends InvertedIndex(userMap) {
 
   def this () = this ( new ConcurrentHashMap[String, User] asScala)
}

现在来加载我们的InvertedIndex

最原始的方法

trait UserMaker {
   def makeUser(line: String) = line.split( "," ) match {
     case Array(name, userid) => User(name, userid.trim().toInt)
   }
}
 
class FileRecordProducer(path: String) extends UserMaker {
   def run() {
     Source.fromFile(path, "utf-8" ).getLines.foreach { line =>
       index.add(makeUser(line))
     }
   }
}

对于文件里的每一行字符串,我们通过调用makeUser来生成一个User,然后通过add添加到InvertedIndex里。如果我们并发访问一个InvertedIndex,我们可以并行调用add方法,因为makeUser方法没有副作用,它本身就是线程安全的。


我们不能并行读取一个文件,但是我们可以并行构造User,并且并行将它添加到索引里。



解决方案:生产者/消费者

实现非同步计算的,通常采用的方法就是将生产者同消费者分开,并让它们通过队列(queue)来进行通信。让我们用下面的例子来说明我们是怎么实现搜索引擎的索引的。


import java.util.concurrent.{BlockingQueue, LinkedBlockingQueue}
 
// Concrete producer
class Producer[T](path: String, queue: BlockingQueue[T]) extends Runnable {
   def run() {
     Source.fromFile(path, "utf-8" ).getLines.foreach { line =>
       queue.put(line)
     }
   }
}
 
// 抽象的消费者
abstract class Consumer[T](queue: BlockingQueue[T]) extends Runnable {
   def run() {
     while ( true ) {
       val item = queue.take()
       consume(item)
     }
   }
 
   def consume(x: T)
}
 
val queue = new LinkedBlockingQueue[String]()
 
//一个生产者线程
 
val producer = new Producer[String]( "users.txt" , q)
new Thread(producer).start()
 
trait UserMaker {
   def makeUser(line: String) = line.split( "," ) match {
     case Array(name, userid) => User(name, userid.trim().toInt)
   }
}
 
class IndexerConsumer(index: InvertedIndex, queue: BlockingQueue[String]) extends Consumer[String](queue) with UserMaker {
   def consume(t: String) = index.add(makeUser(t))
}
 
// 假设我们的机器有8个核
 
val cores = 8
val pool = Executors.newFixedThreadPool(cores)
 
// 每个核设置一个消费者
 
for (i <- i to cores) {
   pool.submit( new IndexerConsumer[String](index, q))
}

http://www.importnew.com/4750.html


相关资源:Scala基础(11)运行基本流程及RDD_怎么启动scala资源-CSDN文库

————————————————


标签:String,val,Scala,并发,线程,new,def,name
From: https://blog.51cto.com/u_2650279/6964979

相关文章

  • Scala——文件和正则表达式
    读取文件importscala.io.Sourcevalsource=Source.fromFile(fileName,"UTF-8")//第一个参数可以是文件名或java.io.File//如果没有第二个参数将会使用当前平台缺省的字符编码vallineIterator=source.getLines//结果是一个迭代器//迭代器可以......
  • 常见的并发陷阱
    常见的并发陷阱volatilevolatile只能强调数据的可见性,并不能保证原子操作和线程安全,因此volatile不是万能的。参考指令重排序volatile最常见于下面两种场景。a.循环检测机制volatilebooleandone=false;while(!done){dosomething();......
  • Scala的基本使用
    @目录Scala的基本使用一、基础语法1.1变量1.1.1var和val1.1.2自动类型推断1.2数据类型1.2.1基础数据类型1.2.2增强数据类型1.3操作符二、流程控制2.1if表达式2.2语句终结符2.3循环2.3.1for循环2.3.2这里面的to可以换成until2.3.3for循环针对字符串还可以用2.3.4注意......
  • 【现网事故】记一次多系统调用,并发冲突、请求放大导致的生产问题
    事故现象生产环境,转账相关请求失败量暴增。直接原因现网多个重试请求同时到达svr,导致内存数据库大量返回时间戳冲突。业务方收到时间戳冲突,自动进行业务重试,服务内部也存在重试,导致流量放大。转账首先我们一起了解一下转账。转账请求在支付场景中的应用频率非常高,它是现代金......
  • 【知识点】JAVA之并发集合
    当涉及到多线程编程时,使用并发集合是一种常见的方式来处理多个线程同时访问和操作共享数据的问题。并发集合是一组线程安全的数据结构,可以同时被多个线程访问和修改,而不会导致数据不一致或竞争条件。以下是一些常见的并发集合及其特点:ConcurrentHashMap(并发哈希表):它是一个线程......
  • Wi-Fi STA/STA 并发
    Android12引入了Wi-FiSTA/STA并发功能,使设备可同时连接到两个Wi-Fi网络。此可选功能支持以下功能。Make-before-break:设备会在断开现有连接之前连接到新的Wi-Fi网络。这使得Wi-Fi网络之间的切换更加顺畅并发仅本地和互联网连接:设备会连接到仅限本地的网络,而不中断设......
  • 高并发性能指标:QPS、TPS、RT、吞吐量
    QPS,每秒查询QPS:QueriesPerSecond意思是“每秒查询率”,是一台服务器每秒能够相应的查询次数,是对一个特定的查询服务器在规定时间内所处理流量多少的衡量标准。互联网中,作为域名系统服务器的机器的性能经常用每秒查询率来衡量。TPS,每秒事务TPS:是TransactionsPerSecond的缩写......
  • Linux Shell实现模拟多进程并发执行
        在bash中,使用后台任务来实现任务的“多进程化”。在不加控制的模式下,不管有多少任务,全部都后台执行。也就是说,在这种情况下,有多少任务就有多少“进程”在同时执行。我们就先实现第一种情况:实例一:正常情况脚本———————————————————————————–#......
  • golang并发编程
    23协程(Goroutine)定义:协程(goroutine)是Go语言中的一种轻量级线程,可以在单个线程中同时执行多个任务。使用方法:在调用函数时gofunction()在函数中使用协程时,需要注意以下几点:协程的执行是异步的,因此需要使用通道等方式进行同步。协程共享内存空间,因此需要使用互斥......
  • Java面试题 P35:数据库篇:MySql篇-事务-并发事务带来哪些问题?怎么解决这些问题呢?MySQL
         ......