机器学习
构建机器学习的第一步:数据特征工程,将数据转化成机器学习的模型
//构建向量
基础部分
Spark中一共有两类向量:稠密向量,稀疏向量
1 稠密向量 2 val denseVec: linalg.Vector = Vectors.dense(Array(1..0,2.0,3.0,4.0,5.0)) 3 println(denseVec)
转化成稀疏向量
println(denseVec.toSparse)
1 稀疏向量 2 val sparseVec: linalg.Vector =Vectors.sparse(10,Array(0,4,7),Arrary(10.0,2.1,2.0)) 3 println(sparseVec) 4 5 转化成稠密向量 6 println(sparseVec。toDense)
////根据有无label划分成有监督和无监督 ,完整的一条数据一般分为两个部分:特征,标签
构建带标签的数据:有监督数据模型所需要的数据格式 val labelPoint: LabeledPoint =LabeledPoint(1.0,Vectors.dense(1.0,2.0,3.0,4.0,5.0))
构建SVM类型的数据
1 使用SparkMLLib自带的工具类 2 3 val spark: SparkSession = SparkSession 4 .builder() 5 .appName("Demo1Vector") 6 .master("local[*]") 7 .getOrCreate() 8 9 val personRDD: RDD[regression.LabeledPoint] = MLUtils.loadLibSVMFile(spark.sparkContext,"SparkProject/data/mllib/data/人体指标.txt",10) //可以通过最后的numFeatures参数来设置最大特征值
通过DF来构建SVM格式的数据 spark .read .option("numFeatures",10) .format("libsvm") .load("SparkProject/data/mllib/data/人体指标.txt")
机器学习算法--->LogisticRegression(逻辑回归)
1 第一步:加载数据,进行数据特征工程,将数据转化成机器学习能够识别的数据 2 构架Spark环境 3 val spark: SparkSession = SparkSession 4 .builder() 5 .master("local[*]") 6 .appName("***") 7 getOrCreate() 8 //加载已经完成数据特征工程的人体指标的数据 9 10 val personDF: DataFrame=spark 11 .read 12 .format("libsvm") 13 .load("SparkProject/data/mllib/data/人体指标.txt") 14 15 //将数据切分成训练集和测试集 比例为8:2 16 17 val splitDF: Array[Dataset[Row]] = personDF.RandomSplit(Array(0.8,0.2)) 18 19 val trainDF: Dataset[Row] = splitDF(0)//训练集 20 val testDF: Dataset[Row] = splitDF(1)//测试集 21 22 23 //选择合适的模型 将训练集带入模型进行训练 24 /** 25 * 数据有无label 26 * 有-->有监督学习 27 * 无-->无监督学习 28 * 29 * 查看label是离散的还是连续的 30 * 离散-->分类的算法 31 * 连续-->回归算法 32 */ 33 val logisticRegression: LogisticRegression = new LogisticRegression() 34 .setMaxIter(10) 35 .setRegParam(0.3) 36 .setElasticNetParam(0.8) 37 38 //带入训练集 39 val logisticRegressionModel: LogisticRegressionModel = logisticRegression.fit(trainDF) 40 //使用测试集评估模型 41 val transDF: DataFrame = logisticRegressionModel.transform(testDF) 42 43 //计算准确率 44 transDF .withColumn("flag",when($"label"===$"prediction",1).otherwise(0)) 45 .groupBy() 46 .agg(sum($"flag")/count("*") as "准确率") 47 .show() 48 49 //如果模型通过评估,可以将模型保存起来 50 logisticRegressionModel.write.save("SparkProject/data/mllib/person")
用保存好的模型训练数据
1 //使用保存好的模型进行预测 2 val spark: SparkSession = SparkSession 3 .builder() 4 .master("local[*]") 5 .appName("Demo03PersonPredict") 6 .getOrCreate() 7 //加载模型 8 val logisticRegressionModel: LogisticRegressionModel = LogisticRegressionModel.load("SparkProject/data/mllib/person") 9 //训练单条数据 10 val res: Double = logisticRegressionModel.predict(Vectors.dense(Array(5.5, 4.3, 3.1, 129, 78.9, 69.4, 62)))
1 //对一批数据进行预测transform 2 //前提条件 需要将每一条数据转换成模型能够识别的数据格式 向量 3 4 val newPersonDF: DataFrame = spark 5 .read 6 .format("csv") 7 .option("sep", "|") 8 .schema("person String") 9 .load("SparkProject/data/mllib/data/person.txt") 10 11 // newPersonDF.show() 12 13 //切分 14 val df: DataFrame = newPersonDF 15 .map(row => { 16 val line: String = row.getAs[String]("person") 17 Tuple2(Vectors.dense(line.split(" ").map(_.split(":")(1).toDouble)), 1.0) 18 }).toDF("features", "useless") 19 20 logisticRegressionModel.transform(df).show()
手写数字特征工程
图片的存储格式
1 root 2 |-- image: struct (nullable = true) 3 | |-- origin: string (nullable = true) 4 | |-- height: integer (nullable = true) 5 | |-- width: integer (nullable = true) 6 | |-- nChannels: integer (nullable = true) 7 | |-- mode: integer (nullable = true) 8 | |-- data: binary (nullable = true)
1 val spark: SparkSession = SparkSession 2 .builder() 3 .appName("Demo04Image") 4 .master("local[*]") 5 .config("spark.sql.shuffle.partition", "16") 6 .getOrCreate() 7 8 import spark.implicits._ 9 import org.apache.spark.sql.functions._ 10 11 // 1、加载手写数字图片数据,并进行特征工程处理 12 val imageDF: DataFrame = spark 13 .read 14 .format("image") // 指定格式为image即可加载图片数据 15 .load("F:\\data\\train") 16 17 imageDF.printSchema() 18 // imageDF.show(truncate = false) 19 20 // 从image中提取文件名以及数据 21 val originDataDF: DataFrame = imageDF.select($"image.origin", $"image.data") 22 23 // 加载结果数据 24 val labelDF: DataFrame = spark 25 .read 26 .format("csv") 27 .option("sep", " ") 28 .schema("fileName String,label Int") 29 .load("SparkProject/data/mllib/data/image_res.txt") 30 31 val preImageDF: DataFrame = originDataDF 32 .as[(String, Array[Byte])] 33 .map { 34 case (origin: String, data: Array[Byte]) => 35 // 提取文件名 用于关联获取图片对应的结果 36 val fileName: String = origin.split("/").last 37 val newArr: Array[Double] = data.map(_.toInt).map(i => { 38 // 去除噪音,将边缘上对结果不需要造成影响的值置为0,对结果评判起决定性作用的值置为1 39 if (i >= 0) { 40 0.0 41 } else { 42 1.0 43 } 44 }) 45 // 将去噪后的Arr转为向量,并以稀疏向量保存 46 (fileName, Vectors.dense(newArr).toSparse) 47 }.toDF("fileName", "features") 48 49 val df: DataFrame = preImageDF.join(labelDF, "fileName") 50 .select($"fileName", $"label", $"features") 51 52 // 2、将数据切分成训练集、测试集,一般比例为8:2 53 val splitDF: Array[Dataset[Row]] = df.randomSplit(Array(0.8, 0.2)) 54 val trainDF: Dataset[Row] = splitDF(0) // 训练集,用于训练模型 55 val testDF: Dataset[Row] = splitDF(1) // 测试集,用于评估模型 56 57 // 3、选择合适的模型,并用训练集训练模型 58 val logisticRegression: LogisticRegression = new LogisticRegression() 59 .setMaxIter(10) // 设置最大的迭代次数 60 .setFitIntercept(true) // 设置是否有截距 61 62 val logisticRegressionModel: LogisticRegressionModel = logisticRegression.fit(trainDF) 63 64 // 4、使用测试集评估模型 65 val transDF: DataFrame = logisticRegressionModel.transform(testDF) 66 67 transDF.cache() 68 69 // transDF.show() 70 71 // 计算准确率 72 transDF 73 .withColumn("flag", when($"label" === $"prediction", 1).otherwise(0)) 74 .groupBy() 75 .agg(sum($"flag") / count("*") as "准确率") 76 // .show() 77 78 79 //通过评估 保存模型 80 logisticRegressionModel.write.overwrite().save("SparkProject/data/mllib/image")
测试模型
1 /** 2 * 构建spark环境 3 */ 4 5 val spark: SparkSession = SparkSession 6 .builder() 7 .master("local[*]") 8 .appName("Demo06ImagePredict") 9 .config("spark.sql.shuffle.partition", 16) 10 .getOrCreate() 11 12 import spark.implicits._ 13 import org.apache.spark.sql.functions._ 14 15 16 //加载测试数据 17 val imageDF: DataFrame = spark 18 .read 19 .format("image") 20 .load("SparkProject/data/mllib/data/image/*") 21 22 //将数据转换成模型能够识别的数据 23 val featuresDF: DataFrame = imageDF 24 .select($"image.origin", $"image.data") 25 .as[(String, Array[Byte])] 26 .map { 27 case (origin: String, data: Array[Byte]) => 28 val fileName: String = origin.split("/").last 29 val newArr: Array[Double] = data.map(_.toInt).map(i => { 30 if (i >= 0) { 31 0.0 32 } else { 33 1.0 34 } 35 }) 36 (fileName, Vectors.dense(newArr).toSparse) 37 }.toDF("filename", "features") 38 39 val logisticRegressionModel: LogisticRegressionModel = LogisticRegressionModel.load("SparkProject/data/mllib/image") 40 41 42 logisticRegressionModel.transform(featuresDF).show()
标签:入门,val,image,MLLib,DataFrame,Array,spark,data,Spark From: https://www.cnblogs.com/lkd0910/p/16919660.html