今天继续完成spark实验,已经是最后一个了 关于这次的数据集,用到的是adult.data和adult.test 两个 两个数据集都要进行一下预处理 首先就是删除最后的空行,然后test文件第一行数据格式有问题,删掉 另外就是test文件每行最后有个.,可以采用把所有的K.替换成为K的方式 处理完之后上传到hdfs进行操作
import org.apache.spark.ml.feature.PCA import org.apache.spark.sql.Row import org.apache.spark.ml.linalg.{Vector,Vectors} import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator import org.apache.spark.ml.{Pipeline,PipelineModel} import org.apache.spark.ml.feature.{IndexToString, StringIndexer, VectorIndexer,HashingTF, Tokenizer} import org.apache.spark.ml.classification.LogisticRegression import org.apache.spark.ml.classification.LogisticRegressionModel import org.apache.spark.ml.classification.{BinaryLogisticRegressionSummary, LogisticRegression} import org.apache.spark.sql.functions; scala> import spark.implicits._ import spark.implicits._ scala> case class Adult(features: org.apache.spark.ml.linalg.Vector, label: String) <br>defined class Adult scala> val df=sc.textFile("adult.data.txt").map(_.split(",")).map(p=>Adult(Vectors.dense(p(0).toDouble,p(2).toDouble,p(4).toDouble,p(10).toDouble,p(11).toDouble,p(12).toDouble),p(14).toString())).toDF()<br>df:org.apache.spark.sql.DataFrame = [features: vector, label: string] scala> val test = sc.textFile("adult.test.txt").map(_.split(",")).map(p=>Adult(Vectors.dense(p(0).toDouble,p(2).toDouble,p(4).toDouble, p(10).toDouble, p(11).toDouble, p(12).toDouble), p(14).toString())).toDF() <br>test:org.apache.spark.sql.DataFrame = [features: vector, label: string]
scala> val pca = new PCA().setInputCol("features").setOutputCol("pcaFeatures").setK(3).fit(df) scala> val result = pca.transform(df) scala> val testdata = pca.transform(test) scala> result.show(false) scala> testdata.show(false)
scala> val labelIndexer = new StringIndexer().setInputCol("label").setOutputCol("indexedLabel").fit(result) scala> labelIndexer.labels.foreach(println) scala> val featureIndexer = new VectorIndexer().setInputCol("pcaFeatures").setOutputCol("indexedFeatures").fit(result) scala> println(featureIndexer.numFeatures) scala> val labelConverter = new IndexToString().setInputCol("prediction").setOutputCol("predictedLabel").setLabels(labelIndexer. labels) scala> val lr = new LogisticRegression().setLabelCol("indexedLabel").setFeaturesCol("indexedFeatures").setMaxIter( 100) scala> val lrPipeline = new Pipeline().setStages(Array(labelIndexer, featureIndexer, lr, labelConverter)) scala> val lrPipelineModel = lrPipeline.fit(result) scala> val lrModel = lrPipelineModel.stages(2).asInstanceOf[LogisticRegressionModel] scala> println("Coefficients: " + lrModel.coefficientMatrix+"Intercept: "+lrModel.interceptVector+"numClasses: "+lrModel.numClasses+"numFeatures: "+lrModel.numFeatures) scala> val lrPredictions = lrPipelineModel.transform(testdata) scala> val evaluator = new MulticlassClassificationEvaluator().setLabelCol("indexedLabel").setPredictionCol("prediction") scala> val lrAccuracy = evaluator.evaluate(lrPredictions) scala> println("Test Error = " + (1.0 - lrAccuracy))
scala> val pca = new PCA().setInputCol("features").setOutputCol("pcaFeatures") scala> val labelIndexer = new StringIndexer().setInputCol("label").setOutputCol("indexedLabel").fit(df) scala> val featureIndexer = new VectorIndexer().setInputCol("pcaFeatures").setOutputCol("indexedFeatures") scala> val labelConverter = new IndexToString().setInputCol("prediction").setOutputCol("predictedLabel").setLabels(labelIndexer.l abels) scala> val lr = new LogisticRegression().setLabelCol("indexedLabel").setFeaturesCol("indexedFeatures").setMaxIter(1 00) scala> val lrPipeline = new Pipeline().setStages(Array(pca, labelIndexer, featureIndexer, lr, labelConverter)) scala> val paramGrid = new ParamGridBuilder().addGrid(pca.k, Array(1,2,3,4,5,6)).addGrid(lr.elasticNetParam, Array(0.2,0.8)).addGrid(lr.regParam, Array(0.01, 0.1, 0.5)).build() scala> val cv = new CrossValidator().setEstimator(lrPipeline).setEvaluator(new MulticlassClassificationEvaluator().setLabelCol("indexedLabel").setPredictionCol("prediction")).se tEstimatorParamMaps(paramGrid).setNumFolds(3) scala> val cvModel = cv.fit(df) scala> val lrPredictions=cvModel.transform(test) scala> val evaluator = new MulticlassClassificationEvaluator().setLabelCol("indexedLabel").setPredictionCol("prediction") scala> val lrAccuracy = evaluator.evaluate(lrPredictions) scala> println("准确率为"+lrAccuracy) scala> val bestModel= cvModel.bestModel.asInstanceOf[PipelineModel] scala> val lrModel = bestModel.stages(3).asInstanceOf[LogisticRegressionModel] scala> println("Coefficients: " + lrModel.coefficientMatrix + "Intercept: "+lrModel.interceptVector+ "numClasses: "+lrModel.numClasses+"numFeatures: "+lrModel.numFeatures) scala> val pcaModel = bestModel.stages(0).asInstanceOf[PCAModel] scala> println("Primary Component: " + pcaModel.pc)
标签:2024.1,val,scala,日报,23,org,apache,new,spark From: https://www.cnblogs.com/Arkiya/p/17982019