首页 > 编程语言 >python spark kmeans demo

python spark kmeans demo

时间:2023-06-01 14:32:23浏览次数:47  
标签:map val python demo predict clusters spark data

官方的demo

from numpy import array
from math import sqrt

from pyspark import SparkContext

from pyspark.mllib.clustering import KMeans, KMeansModel

sc = SparkContext(appName="clusteringExample")
# Load and parse the data
data = sc.textFile("/root/spark-2.1.1-bin-hadoop2.6/data/mllib/kmeans_data.txt")
parsedData = data.map(lambda line: array([float(x) for x in line.split(' ')]))

# Build the model (cluster the data)
clusters = KMeans.train(parsedData, 2, maxIterations=10, initializationMode="random")

# Evaluate clustering by computing Within Set Sum of Squared Errors
def error(point):
    center = clusters.centers[clusters.predict(point)]
    return sqrt(sum([x**2 for x in (point - center)]))

WSSSE = parsedData.map(lambda point: error(point)).reduce(lambda x, y: x + y)
print("Within Set Sum of Squared Error = " + str(WSSSE))

# Save and load model
#clusters.save(sc, "target/org/apache/spark/PythonKMeansExample/KMeansModel")
#sameModel = KMeansModel.load(sc, "target/org/apache/spark/PythonKMeansExample/KMeansModel")

 带归一化的例子:

import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.clustering.KMeans
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.sql.functions.{col, udf}

case class DataRow(label: Double, x1: Double, x2: Double)
val data = sqlContext.createDataFrame(sc.parallelize(Seq(
    DataRow(3, 1, 2),
    DataRow(5, 3, 4),
    DataRow(7, 5, 6),
    DataRow(6, 0, 0)
)))

val parsedData = data.rdd.map(s => Vectors.dense(s.getDouble(1),s.getDouble(2))).cache()
val clusters = KMeans.train(parsedData, 3, 20)
val t = udf { (x1: Double, x2: Double) => clusters.predict(Vectors.dense(x1, x2)) }
val result = data.select(col("label"), t(col("x1"), col("x2")))

The important part are the last two lines.

    Creates a UDF (user-defined function) which can be directly applied to Dataframe columns (in this case, the two columns x1 and x2).

    Selects the label column along with the UDF applied to the x1 and x2 columns. Since the UDF will predict closestCluster, after this result will be a Dataframe consisting of (label, closestCluster)

参考:https://stackoverflow.com/questions/31447141/spark-mllib-kmeans-from-dataframe-and-back-again

import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.clustering._

val rows = data.rdd.map(r => (r.getDouble(1),r.getDouble(2))).cache()
val vectors = rows.map(r => Vectors.dense(r._1, r._2))
val kMeansModel = KMeans.train(vectors, 3, 20)
val predictions = rows.map{r => (r._1, kMeansModel.predict(Vectors.dense(r._1, r._2)))}
val df = predictions.toDF("id", "cluster")
df.show

Create column from RDD

It's very easy to obtain pairs of ids and clusters in form of RDD:

val idPointRDD = data.rdd.map(s => (s.getInt(0), Vectors.dense(s.getDouble(1),s.getDouble(2)))).cache()
val clusters = KMeans.train(idPointRDD.map(_._2), 3, 20)
val clustersRDD = clusters.predict(idPointRDD.map(_._2))
val idClusterRDD = idPointRDD.map(_._1).zip(clustersRDD)

Then you create DataFrame from that

val idCluster = idClusterRDD.toDF("id", "cluster")

It works because map doesn't change order of the data in RDD, which is why you can just zip ids with results of prediction.

Use UDF (User Defined Function)

Second method involves using clusters.predict method as UDF:

val bcClusters = sc.broadcast(clusters)
def predict(x: Double, y: Double): Int = {
    bcClusters.value.predict(Vectors.dense(x, y))
}
sqlContext.udf.register("predict", predict _)

Now we can use it to add predictions to data:

val idCluster = data.selectExpr("id", "predict(x, y) as cluster")

Keep in mind that Spark API doesn't allow UDF deregistration. This means that closure data will be kept in the memory.

标签:map,val,python,demo,predict,clusters,spark,data
From: https://blog.51cto.com/u_11908275/6393871

相关文章

  • spark Bisecting k-means(二分K均值算法)
    Bisectingk-means(二分K均值算法)    二分k均值(bisectingk-means)是一种层次聚类方法,算法的主要思想是:首先将所有点作为一个簇,然后将该簇一分为二。之后选择能最大程度降低聚类代价函数(也就是误差平方和)的簇划分为两个簇。以此进行下去,直到簇的数目等于用户给定的数目K为止。......
  • python dig 模拟—— DGA域名判定用
     #!/usr/bin/envpythonimportdns.resolver,sysdefget_domain_ip(domain):"""GettheDNSrecord,ifany,forthegivendomain."""dns_records=list()try:#getthednsresolutionsforthisdomain......
  • spark 常用参数和默认配置
    常用的Spark任务参数及其作用:spark.driver.memory:设置driver进程使用的内存大小,默认为1g。spark.executor.memory:设置每个executor进程使用的内存大小,默认为1g。spark.executor.cores:设置每个executor进程使用的CPU核数,默认为1。spark.default.parallelism:设置RDD的默......
  • 代码重复检查工具——python的使用CPD比较好用,clone digger针对py2
    代码重复检测:cpd--minimum-tokens100--filesg:\source\python\--languagepython>log.txt输出类似:=====================================================================Founda381line(1849tokens)duplicationinthefollowingfiles:Startingatline24of......
  • python通过文件操作字典
    python通过文件操作字典python把字典保存到文件中python从文件中加载字典importjsonmy_dict={'Apple':4,'Banana':2,'Orange':6,'Grapes':11,'area1':[[23,56],[66,12],[68,89],[90,890]]}#保存文件tf=open("myDictionary.js......
  • 6道Python简单的测试题,你知道答案吗?
    学Python光掌握基础理论知识是不够的,我们需要将理论知识转化为实战技能,本篇文章小编为大家整理了6道Python简单的测试题,快来检测一下你的Python基础怎么样!1、以下代码的输出结果为:print(round(-3.6))A.-4B.-4.0C.-3D.-3.02、以下代码的输出结果为......
  • python mock使用
    Overviewmock 是一个用于单元测试的Python库,它使用mock模拟系统中如class,method等部分,并且断言它们是如何被调用的。在编写单元测试时,mock非常适合模拟数据库,web服务器等依赖外部的场景。本文是mock的入门篇,主要介绍mock的基本用法。除了mock外,还有许多其它的moc......
  • python 合并k个有序链表
     fromheapqimportheappush,heappopclassSolution:defmergeKLists(self,lists):q=[]fori,headinenumerate(lists):ifhead:heappush(q,(head.val,i,head))node=dummy=ListNode(0)......
  • sparkSQL原理和使用——一般在生产中,基本都是使用hive做数据仓库存储数据,然后用spark
    一、sparkSQL概述1.1什么是sparkSQLSparkSQL是Spark用来处理结构化数据的一个模块,它提供了一个编程抽象叫做DataFrame并且作为分布式SQL查询引擎的作用。类似于hive的作用。1.2sparkSQL的特点1、容易集成:安装Spark的时候,已经集成好了。不需要单独安装。2、统一的数据访问方......
  • python代码规范 自动优化工具Black
    自动优化工具Black在众多代码格式化工具中,Black算是比较新的一个,它***的特点是可配置项比较少,个人认为这对于新手来说是件好事,因为我们不必过多考虑如何设置Black,让Black自己做决定就好。1).安装与使用与pylint类似,直接pipinstallblack即可完成该模块的安装,不过black依赖于Pyth......