首页 > 其他分享 >Use trained sklearn model with pyspark

Use trained sklearn model with pyspark

时间:2023-06-02 23:04:58浏览次数:46  
标签:Use pyspark predict rdd regr RDD model your

Use trained sklearn model with pyspark

 

from pyspark import SparkContext
import numpy as np
from sklearn import ensemble


def batch(xs):
    yield list(xs)


N = 1000
train_x = np.random.randn(N, 10)
train_y = np.random.binomial(1, 0.5, N)

model = ensemble.RandomForestClassifier(10).fit(train_x, train_y)

test_x = np.random.randn(N * 100, 10)

sc = SparkContext()

n_partitions = 10
rdd = sc.parallelize(test_x, n_partitions).zipWithIndex()

b_model = sc.broadcast(model)

result = rdd.mapPartitions(batch) \
    .map(lambda xs: ([x[0] for x in xs], [x[1] for x in xs])) \
    .flatMap(lambda x: zip(x[1], b_model.value.predict(x[0])))

print(result.take(100))

output:

[(0, 0), (1, 1), (2, 1), (3, 1), (4, 1), (5, 0), (6, 1), (7, 0), (8, 1), (9, 1), (10, 0), (11, 1), (12, 0), (13, 0), (14, 1), (15, 0), (16, 0), (17, 1), (18, 0), (19, 0), (20, 1), (21, 0), (22, 1), (23, 1), (24, 1), (25, 1), (26, 0), (27, 0), (28, 1), (29, 0), (30, 0), (31, 0), (32, 0), (33, 1), (34, 1), (35, 1), (36, 1), (37, 1), (38, 1), (39, 0), (40, 1), (41, 1), (42, 1), (43, 0), (44, 0), (45, 0), (46, 1), (47, 1), (48, 0), (49, 0), (50, 0), (51, 0), (52, 0), (53, 0), (54, 1), (55, 0), (56, 0), (57, 0), (58, 1), (59, 0), (60, 0), (61, 0), (62, 0), (63, 0), (64, 0), (65, 1), (66, 1), (67, 1), (68, 0), (69, 0), (70, 1), (71, 1), (72, 1), (73, 0), (74, 0), (75, 1), (76, 1), (77, 0), (78, 1), (79, 0), (80, 0), (81, 0), (82, 0), (83, 0), (84, 0), (85, 1), (86, 1), (87, 0), (88, 0), (89, 0), (90, 1), (91, 0), (92, 0), (93, 0), (94, 0), (95, 0), (96, 1), (97, 1), (98, 0), (99, 1)]

>>> rdd.take(3)                                                                
18/05/15 09:37:18 WARN TaskSetManager: Stage 1 contains a task of very large size (723 KB). The maximum recommended task size is 100 KB.
[(array([-0.3142169 , -1.80738243, -1.29601447, -1.42500793, -0.49338668,
        0.32582428,  0.15244227, -2.41823997, -1.51832682, -0.32027413]), 0), (array([-0.00811787,  1.1534555 ,  0.92534192,  0.27246042,  1.06946727,
       -0.1420289 ,  0.3740049 , -1.84253399,  0.55459764, -0.96438845]), 1), (array([ 1.21547425,  0.87202465,  3.00628464, -1.0732967 , -1.79575235,
       -0.71943746,  0.83692206,  1.87272991,  0.31497977, -0.84061547]), 2)]

>>rdd.mapPartitions(batch).take(3)

[...,

# one element==>

[(array([ 0.95648585,  0.15749105, -1.2850535 ,  1.10495528, -1.98184263,
       -0.11160677, -0.11004717, -0.26977669,  0.93867963,  0.28810482]), 29691),

(array([ 2.67605744,  0.3678955 , -1.10677742,  1.3090983 ,  0.33327663,
       -0.29876755, -0.00869512, -0.53998984, -2.07484434, -0.83550041]), 29692),

(array([-0.23798771, -1.43967907,  0.05633439, -0.45039489, -1.47068918,
       -2.09854387, -0.70119312, -1.93214578,  0.44166082, -0.1442232 ]), 29693),

(array([-1.21476146, -0.7558832 , -0.53902146, -0.48273363, -0.24050023,
       -1.11263081, -0.02150105,  0.20790397,  0.78268026, -1.53404034]), 29694),

(array([ -9.63973837e-01,   3.51228982e-01,   3.51805780e-01,
        -5.06041907e-01,  -2.06905036e+00,  -8.66070627e-04,
        -1.11580654e+00,   4.94298203e-01,  -2.68946627e-01,
        -9.61166626e-01]), 29695)]

]

 

ref:

https://gist.github.com/lucidfrontier45/591be3eb78557d1844ca

 

https://stackoverflow.com/questions/42887621/how-to-do-prediction-with-sklearn-model-inside-spark/42887751

Well,

I will show an example of linear regression in Sklearn and show you how to use that to predict elements in Spark RDD.

First training the model with sklearn example:

# Create linear regression object
regr = linear_model.LinearRegression()

# Train the model using the training sets
regr.fit(diabetes_X_train, diabetes_y_train)

Here we just have the fit, and you need to predict each data from an RDD.

Your RDD in this case should be a RDD with X like this:

rdd = sc.parallelize([1, 2, 3, 4])

So you first need to broadcast your model of sklearn:

regr_bc = self.sc.broadcast(regr)

Then you can use it to predict your data like this:

rdd.map(lambda x: (x, regr_bc.value.predict(x))).collect()

So your element in the RDD is your X and the seccond element is going to be your predicted Y. The collect will return somthing like this:

[(1, 2), (2, 4), (3, 6), ...]

Well,

I will show an example of linear regression in Sklearn and show you how to use that to predict elements in Spark RDD.

First training the model with sklearn example:

# Create linear regression object
regr = linear_model.LinearRegression()

# Train the model using the training sets
regr.fit(diabetes_X_train, diabetes_y_train)

Here we just have the fit, and you need to predict each data from an RDD.

Your RDD in this case should be a RDD with X like this:

rdd = sc.parallelize([1, 2, 3, 4])

So you first need to broadcast your model of sklearn:

regr_bc = self.sc.broadcast(regr)

Then you can use it to predict your data like this:

rdd.map(lambda x: (x, regr_bc.value.predict(x))).collect()

So your element in the RDD is your X and the seccond element is going to be your predicted Y. The collect will return somthing like this:

[(1, 2), (2, 4), (3, 6), ...]

标签:Use,pyspark,predict,rdd,regr,RDD,model,your
From: https://blog.51cto.com/u_11908275/6405498

相关文章

  • FI的developuser就是人机用户
    DbVisualizer对接FusionInsight¶适用场景¶DbVisualizer10.0.21↔FusionInsightHD6.5(Hive/Phoenix/SparkSQL)简介¶SQL开发工具,如DbVisualizer、DBeaver、Squirrel是数据库开发的常用选择,虽然这些工具大多不提供原生Hive、SparkSQL、Phoenix的支持,但是通过它们支持的自......
  • use a circular buffer for video frames on iOS
    https://stackoverflow.com/questions/33581369/how-to-use-tpcircularbuffer-for-videohttps://github.com/jeremytregunna/Ringhttps://www.codesd.com/item/is-it-possible-to-use-a-circular-buffer-for-video-images-on-ios.htmlhttp://atastypixel.com/blog/a-simple-fa......
  • Flask Model 做分页
    #手动做分页persons=Person.query.offset((page-1))*per_page).limit(per_page)#1.手动翻页#offset().limit()#数据:1,2,3,4,5,6,7,8,9,10#页码:page=1#每页显示数量:per_page=5#page=1:1,2,3,4,5=>offset(0).limit(5)#page=2:6,7,8,9,10=>offset(5).limi......
  • FLink写入Clickhouse优化
    一、背景ck因为有合并文件操作,适合批量写入。如单条插入则速度太慢二、Flink写入ck优化改为分批插入,代码如下DataStream<Row>stream=...stream.addSink(JdbcSink.sink("INSERTINTOmytable(col1,col2)VALUES(?,?)",(ps,row)->{ps.setString(1,row.ge......
  • ubuntu报错:The following signatures couldn't be verified because the public key i
    当在ubuntu中加入了第三方源,没有设置公钥更新索引的时候就会提示Thefollowingsignaturescouldn'tbeverifiedbecausethepublickeyisnotavailable:NO_PUBKEY082AB56BA14FE591Readingpackagelists...DoneW:GPGerror:http://mirrors.tuna.tsinghua.edu.cn/za......
  • kubectl exec [POD] [COMMAND] is DEPRECATED and will be removed in a future versi
    问题现象为通过kubectl进入pods时提示在未来版本中将移除这种进入这种方式,需要使用新的命令格式进入[root@master~]#kubectlexecmyweb-c5xq6-it/bin/bashkubectlexec[POD][COMMAND]isDEPRECATEDandwillberemovedinafutureversion.Usekubectlexec[POD]--......
  • How to use variable in Python String All In One
    HowtousethevariableinPythonStringAllInOne如何在Python字符串中使用变量demos(......
  • [CEOI2017] Mousetrap
    100黑祭。首先以终点为根。先考虑简单一点的情况:如果起点终点相邻,那么方案一定是让老鼠先走到一个叶子节点,然后断掉该节点到根路径上其它的分支。于是我们令\(f_i\)表示从\(i\)开始走到\(i\)子树里的一个叶节点再返回所需的最小代价,每次dp从儿子里的次大值转移即可。考虑......
  • How to use Vim copy line and paste line All In One
    HowtouseVimcopylineandpastelineAllInOne如何使用Vim复制行和粘贴行在可视化模式下visual非编辑模式下i#复制当前行yy#粘贴行pdemos(......
  • Flink流式数据缓冲后批量写入Clickhouse
    一、背景对于clickhouse有过使用经验的开发者应该知道,ck的写入,最优应该是批量的写入。但是对于流式场景来说,每批写入的数据量都是不可控制的,如kafka,每批拉取的消息数量是不定的,flink对于每条数据流的输出,写入ck的效率会十分缓慢,所以写了一个demo,去批量入库。生产环境使用还需要优......