实验内容与完成情况:
1.数据导入
从文件中导入数据,并转化为 DataFrame。
代码:
from pyspark.ml.feature import PCA
from pyspark.shell import spark
from pyspark.sql import Row
from pyspark.ml.linalg import Vector, Vectors
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml import Pipeline, PipelineModel
from pyspark.ml.feature import IndexToString, StringIndexer, VectorIndexer, HashingTF, Tokenizer
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.classification import LogisticRegressionModel
from pyspark.ml.classification import BinaryLogisticRegressionSummary, LogisticRegression
from pyspark.sql import functions
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
def f(x):
rel = {'features': Vectors.dense(float(x[0]), float(x[2]), float(x[4]), float(x[10]), float(x[11]), float(x[12])),
'label': str(x[14])}
return rel
df = spark.sparkContext.textFile("./data/adult.data")\
.map(lambda line: line.split(',')).map(lambda p: Row(**f(p))).toDF()
test = spark.sparkContext.textFile("./data/adult.test")\
.map(lambda line: line.split(',')).map(lambda p: Row(**f(p))).toDF()
df.show(5) # 显示df前5条记录
test.show(5) # 显示test前5条记录
运行结果:
2.进行主成分分析(PCA)
代码:
pca = PCA(k=3, inputCol="features", outputCol="pcaFeatures").fit(df)
result = pca.transform(df)
testdata = pca.transform(test)
result.show()
运行结果:
3.训练分类模型并预测居民收入
在主成分分析的基础上,采用逻辑斯蒂回归,或者决策树模型预测居民收入是否超过
50K;对 Test 数据集进行验证。
逻辑回归代码:
# 将标签列进行索引编码
labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(result)
# 打印编码后的标签列表
for label in labelIndexer.labels:
print(label)
# 自动检测数值型特征,并对特征向量进行索引编码
featureIndexer = VectorIndexer(inputCol="pcaFeatures", outputCol="indexedFeatures").fit(result)
print("索引后的特征数量为: ", featureIndexer.numFeatures)
# 创建IndexToString转换器以将预测结果从索引还原为原始标签
labelConverter = IndexToString(inputCol="prediction", outputCol="predictedLabel", labels=labelIndexer.labels)
# 重新定义labelIndexer变量
labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(result)
# 再次打印编码后的标签列表
for label in labelIndexer.labels:
print(label)
# 使用之前定义的featureIndexer对象获取特征数量
print("索引后特征的数量为: ", featureIndexer.numFeatures)
# 实例化逻辑回归模型并设置参数(逻辑回归模型)
lr = LogisticRegression().setLabelCol("indexedLabel").setFeaturesCol("indexedFeatures").setMaxIter(100)
# 构建机器学习管道,包括标签和特征索引、逻辑回归模型以及标签还原步骤
lrPipeline = Pipeline().setStages([labelIndexer, featureIndexer, lr, labelConverter])
# 训练整个管道模型
lrPipelineModel = lrPipeline.fit(result)
lrModel = lrPipelineModel.stages[2] # 获取管道中逻辑回归模型的具体实例
# 输出逻辑回归模型的系数矩阵、截距向量和特征数量
print(f"系数: {lrModel.coefficientMatrix}\n截距: {lrModel.interceptVector}\n特征数量: {lrModel.numFeatures}")
# 使用训练好的管道模型对测试集进行预测
lrPredictions = lrPipelineModel.transform(testdata)
# 初始化多分类评估器
evaluator = MulticlassClassificationEvaluator().setLabelCol("indexedLabel").setPredictionCol("prediction")
# 计算测试集上的准确率
lrAccuracy = evaluator.evaluate(lrPredictions)
print("测试错误率 = %g " % (1.0 - lrAccuracy))
逻辑回归运行结果:
决策树参数代码
# 将标签列进行索引编码
labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(result)
# 自动检测数值型特征,并对特征向量进行索引编码
featureIndexer = VectorIndexer(inputCol="pcaFeatures", outputCol="indexedFeatures").fit(result)
# 创建IndexToString转换器以将预测结果从索引还原为原始标签
labelConverter = IndexToString(inputCol="prediction", outputCol="predictedLabel", labels=labelIndexer.labels)
# 实例化决策树分类器模型并设置参数
dt = DecisionTreeClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures")
# 构建机器学习管道,包括标签和特征索引、决策树模型以及标签还原步骤
dtPipeline = Pipeline().setStages([labelIndexer, featureIndexer, dt, labelConverter])
# 训练整个管道模型
dtPipelineModel = dtPipeline.fit(result)
dtModel = dtPipelineModel.stages[2] # 获取管道中决策树模型的具体实例
# 使用训练好的管道模型对测试集进行预测
dtPredictions = dtPipelineModel.transform(testdata)
# 初始化多分类评估器
evaluator = MulticlassClassificationEvaluator().setLabelCol("indexedLabel").setPredictionCol("prediction")
# 计算测试集上的准确率
dtAccuracy = evaluator.evaluate(dtPredictions)
print("决策树模型在测试集上的错误率为: %g " % (1.0 - dtAccuracy))
运行结果
4.超参数调优
利用 CrossValidator 确定最优的参数,包括最优主成分 PCA 的维数、分类器自身的参数等。
运行结果:
代码:
# 导入所需的PySpark库和模块
from pyspark.ml import Pipeline, Transformer
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import IndexToString, StringIndexer, VectorIndexer
from pyspark.ml.feature import PCA
from pyspark.ml.linalg import Vectors
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.shell import spark
from pyspark.sql import Row
# 定义一个函数f,用于提取特征和标签
def f(x):
# 创建一个字典,其中'features'键存储选取的数值型特征构成的DenseVector,'label'键存储字符串类型的标签
rel = {'features': Vectors.dense(float(x[0]), float(x[2]), float(x[4]), float(x[10]), float(x[11]), float(x[12])),
'label': str(x[14])}
return rel
# 读取CSV数据并转换为DataFrame
df = spark.sparkContext.textFile("./data/adult.data")\
.map(lambda line: line.split(','))\
.map(lambda p: Row(**f(p)))\
.toDF()
test = spark.sparkContext.textFile("./data/adult.test")\
.map(lambda line: line.split(','))\
.map(lambda p: Row(**f(p)))\
.toDF()
# 显示部分原始数据
# df.show(5)
# test.show(5)
# 应用PCA降维
pca = PCA(k=3, inputCol="features", outputCol="pcaFeatures").fit(df)
result = pca.transform(df)
testdata = pca.transform(test)
# 显示PCA变换后的训练集数据
# result.show()
# 将标签列进行索引编码
labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(result)
# 打印编码后的标签列表
for label in labelIndexer.labels:
print(label)
# 自动检测数值型特征,并对特征向量进行索引编码
featureIndexer = VectorIndexer(inputCol="pcaFeatures", outputCol="indexedFeatures").fit(result)
print("索引后的特征数量为: ", featureIndexer.numFeatures)
# 创建IndexToString转换器以将预测结果从索引还原为原始标签
labelConverter = IndexToString(inputCol="prediction", outputCol="predictedLabel", labels=labelIndexer.labels)
# 重新定义labelIndexer变量
labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(result)
# 再次打印编码后的标签列表
for label in labelIndexer.labels:
print(label)
# 使用之前定义的featureIndexer对象获取特征数量
print("索引后特征的数量为: ", featureIndexer.numFeatures)
# 实例化逻辑回归模型并设置参数(逻辑回归模型)
lr = LogisticRegression().setLabelCol("indexedLabel").setFeaturesCol("indexedFeatures").setMaxIter(100)
# 构建机器学习管道,包括标签和特征索引、逻辑回归模型以及标签还原步骤
lrPipeline = Pipeline().setStages([labelIndexer, featureIndexer, lr, labelConverter])
# 训练整个管道模型
lrPipelineModel = lrPipeline.fit(result)
lrModel = lrPipelineModel.stages[2] # 获取管道中逻辑回归模型的具体实例
# 输出逻辑回归模型的系数矩阵、截距向量和特征数量
print(f"系数: {lrModel.coefficientMatrix}\n截距: {lrModel.interceptVector}\n特征数量: {lrModel.numFeatures}")
# 使用训练好的管道模型对测试集进行预测
lrPredictions = lrPipelineModel.transform(testdata)
# 初始化多分类评估器
evaluator = MulticlassClassificationEvaluator().setLabelCol("indexedLabel").setPredictionCol("prediction")
# 计算测试集上的准确率
lrAccuracy = evaluator.evaluate(lrPredictions)
print("测试错误率 = %g " % (1.0 - lrAccuracy))
# PCA
pca = PCA().setInputCol("features").setOutputCol("pcaFeatures")
# String Indexer for Label
labelIndexer = StringIndexer().setInputCol("label").setOutputCol("indexedLabel").fit(df)
# Vector Indexer for PCA Features
featureIndexer = VectorIndexer().setInputCol("pcaFeatures").setOutputCol("indexedFeatures")
# Index to String Converter for Label
labelConverter = IndexToString().setInputCol("prediction").setOutputCol("predictedLabel").setLabels(labelIndexer.labels)
# Logistic Regression
lr = LogisticRegression().setLabelCol("indexedLabel").setFeaturesCol("indexedFeatures").setMaxIter(100)
# Create a Pipeline
lrPipeline = Pipeline().setStages([pca, labelIndexer, featureIndexer, lr, labelConverter])
# Parameter grid for CrossValidator
paramGrid = ParamGridBuilder() \
.addGrid(pca.k, [1, 2, 3, 4, 5, 6]) \
.addGrid(lr.elasticNetParam, [0.2, 0.8]) \
.addGrid(lr.regParam, [0.01, 0.1, 0.5]) \
.build()
# CrossValidator
cv = CrossValidator().setEstimator(lrPipeline) \
.setEvaluator(MulticlassClassificationEvaluator().setLabelCol("indexedLabel").setPredictionCol("prediction")) \
.setEstimatorParamMaps(paramGrid) \
.setNumFolds(3)
# Fit the model
cvModel = cv.fit(df)
# Make predictions on the test set
lrPredictions = cvModel.transform(test)
# Evaluate the model
evaluator = MulticlassClassificationEvaluator().setLabelCol("indexedLabel").setPredictionCol("prediction")
lrAccuracy = evaluator.evaluate(lrPredictions)
print("Model Accuracy: " + str(lrAccuracy))
# Get the best model
bestModel = cvModel.bestModel
lrModel = bestModel.stages[3]
print("Coefficients: \n" + str(lrModel.coefficientMatrix) + "\nIntercept: " + str(lrModel.interceptVector)
+ "\nNum Classes: " + str(lrModel.numClasses) + "\nNum Features: " + str(lrModel.numFeatures))
# Get PCA Model from the best model
pcaModel = bestModel.stages[0]
|