首页 > 其他分享 >pyspark建模(类似于dwd层),flask直接对接前端请求进行召回(类似于ads层,但是不保存)

pyspark建模(类似于dwd层),flask直接对接前端请求进行召回(类似于ads层,但是不保存)

时间:2024-11-05 11:57:59浏览次数:1  
标签:pyspark flask jar dwd position HOME spark SPARK jars

2. Spark MLib

2.1 Spark MLib 开发环境准备

2.1.1 配置python和spark环境

安装Python环境

安装Anaconda3-5.2.0-Windows-x86_64.exe

配置环境变量

Anaconda_HOME  
E:\20241014_Soft\Anaconda3

PATH
%Anaconda_HOME%Scripts;%Anaconda_HOME%Library\mingw-w64\bin;%Anaconda_HOME%Library\usr\bin;%Anaconda_HOME%Library\bin
打开AnacondaPromt

conda --version

安装spark环境

Windows下配置Spark运行环境及环境变量
spark-2.4.5-bin-hadoop2.7.tgz

解压spark的安装包到磁盘目录
D:\Code\Soft\spark\spark-2.4.5-bin-hadoop2.7

在环境变量中配置SPARK_HOME指定解压的路径

SPARK_HOME  

D:\Code\Soft\spark\spark-2.4.5-bin-hadoop2.7
将解压的spark安装包中的
D:\Code\Soft\spark\spark-2.4.5-bin-hadoop2.7\python\lib
复制到anaconda对应的目录下

E:\20241014_Soft\Anaconda3\Lib\site-packages

image-20241019231052064

Step3:验证py4j是否安装成功,进入python环境,输入import py4j

image-20241019231750287

Step5:使用import导入pyspark模块,如果没错即安装成功。import pyspark

image-20241019231829943

2.1.2 idea安装python插件

image-20241019232018768

image-20241019232045876

image-20241019232123784

新建一个python项目,pyspark_test

统计每个职位投递总次数 & 投递总人数


统计指定地区的投递的总人数 & 总次数

统计每个地区投递次数最多职位topN
见pyspark_test代码

新建一个python项目,sparkmlib

线性回归模型(连续变量)
逻辑回归模型(分类变量)
决策树模型(分类变量)
随机森林模型(分类变量)
见sparkmlib代码

2.1.3 安装spark 集群

下载并安装
spark-2.4.5-bin-hadoop2.7.tgz

cd /opt/lagou/software/

tar zxvf spark-2.4.5-bin-hadoop2.7.tgz
sudo chown -R root:root spark-2.4.5-bin-hadoop2.7
sudo chmod -R 755 spark-2.4.5-bin-hadoop2.7

mv spark-2.4.5-bin-hadoop2.7 ../servers/spark-2.4.5

配置
vi /etc/profile
export SPARK_HOME=/opt/lagou/servers/spark-2.4.5
export PATH=$PATH:$SPARK_HOME/bin:$SPARK_HOME/sbin
source /etc/profile


文件位置:
cd $SPARK_HOME/conf
修改文件:slaves、spark-defaults.conf、spark-env.sh、log4j.properties

cp log4j.properties.template log4j.properties
cp slaves.template slaves
cp spark-defaults.conf.template spark-defaults.conf
cp spark-env.sh.template spark-env.sh



vi slaves

linux121
linux122
linux123


vi spark-defaults.conf

spark.master spark://linux121:7077
spark.eventLog.enabled true
spark.eventLog.dir hdfs://linux121:9000/spark-eventlog
spark.serializer org.apache.spark.serializer.KryoSerializer
spark.driver.memory 512m
spark.yarn.jars hdfs:///spark-yarn/jars/*.jar

修改spark-env.sh

vi spark-env.sh

export JAVA_HOME=/opt/lagou/servers/jdk1.8.0_421
export HADOOP_HOME=/opt/lagou/servers/hadoop-2.7.3
export HADOOP_CONF_DIR=/opt/lagou/servers/hadoop-2.7.3/etc/hadoop
export SPARK_DIST_CLASSPATH=$(/opt/lagou/servers/hadoop-2.7.3/bin/hadoop classpath)
export SPARK_MASTER_HOST=linux121
export SPARK_MASTER_PORT=7077





vi $HADOOP_HOME/etc/hadoop/yarn-site.xml

新增

<property>
    <name>yarn.nodemanager.pmem-check-enabled</name>
    <value>false</value>
</property>
<property>
<name>yarn.nodemanager.vmem-check-enabled</name>
<value>false</value>
</property>



# 配置spark-sql读取hive的元数据

##将hive-site.xml 软连接到spark的conf配置目录中:
cd $SPARK_HOME/conf
ln -s $HIVE_HOME/conf/hive-site.xml hive-site.xml

vi hive-site.xml

修改

    <property>
        <name>hive.metastore.uris</name>
        <value>thrift://linux122:9083</value>
        <description>Thrift URI for the remote metastore. Used by metastore client to connect to remote metastore.
        </description>
    </property>


scp -r hive-site.xml linux121:/opt/lagou/servers/spark-2.4.5/conf

scp -r hive-site.xml linux123:/opt/lagou/servers/spark-2.4.5/conf


##将连接 mysql-connector-java-5.1.35-bin.jar拷贝到spark的jars目录下
cp $HIVE_HOME/lib/mysql-connector-java-5.1.46.jar  $SPARK_HOME/jars






将Spark软件分发到集群;修改其他节点上的环境变量

cd /opt/lagou/servers/
scp -r spark-2.4.5/ linux122:$PWD
scp -r spark-2.4.5/ linux121:$PWD
scp -r spark-2.4.5/ linux123:$PWD

source /etc/profile







注意:使用pyspark读取Hive 外部表(Hive 映射Hbase),需要额外准备Hbase,Hive相关Jar包到Spark。
cp $HBASE_HOME/lib/hbase-*.jar $SPARK_HOME/jars/
cp $HIVE_HOME/lib/hive-*.jar $SPARK_HOME/jars/


# 将 $SPARK_HOME/jars 下的jar包上传到hdfs


创建 HDFS 目录:
hdfs dfs -rm -r /spark-eventlog
hdfs dfs -rm -r /spark-yarn
hdfs dfs -mkdir /spark-eventlog
hdfs dfs -mkdir -p /spark-yarn/jars/



cd $SPARK_HOME/jars
hdfs dfs -put * /spark-yarn/jars/



额外补充:(linux122),重要(ps:因为一些版本匹配问题,所以有可能跑不通,需要先尝试跑通hbase,再尝试跑通spark_sql,唯一的办法是提前确定版本,找jar包解决不了问题)

rm -rf $SPARK_HOME/jars/metrics-core-4.1.1.jar
scp $HBASE_HOME/lib/metrics-core-4.1.1.jar linux122:$SPARK_HOME/jars
scp $HBASE_HOME/lib/metrics-core-2.1.3.jar linux122:$HBASE_HOME/lib/
hdfs dfs -rm -r /spark-yarn/jars/metrics-core-2.1.3.jar
rm -rf $SPARK_HOME/jars/lz4-java-1.4.0.jar
scp lz4-java-1.8.1.jar linux122:$SPARK_HOME/jars/
rm -rf $SPARK_HOME/jars/metrics-core-2.2.0.jar
hdfs dfs -rm -r /spark-yarn/jars/lz4-java-1.8.1.jar

cd $SPARK_HOME/jars/

rm -rf parquet-hadoop-bundle-1.6.0.jar



hdfs dfs -rm -r /spark-yarn/jars/lz4-java-1.4.0.jar
hdfs dfs -put lz4-java-1.4.1.jar /spark-yarn/jars/

hdfs dfs -put $HBASE_HOME/lib/metrics-core-4.1.1.jar  /spark-yarn/jars/


scp $SPARK_HOME/jars/*.jar linux121:$SPARK_HOME/jars/
scp $SPARK_HOME/jars/*.jar linux123:$SPARK_HOME/jars/

rm -rf $SPARK_HOME/jars/lz4-java-1.4.1.jar

hdfs dfs -rm -r /spark-yarn/jars/lz4-java-1.4.1.jar

hdfs dfs -get /spark-yarn/jars/* 

启动(前提:Hadoop的 HDFS、Yarn、HistoryServer 正常;Spark historyserver服务正常;)

scp lz4-java-1.8.0.jar linux122:$SPARK_HOME/jars/
hdfs dfs -put lz4-java-1.8.0.jar /spark-yarn/jars/

cd $SPARK_HOME/sbin

./stop-all.sh
./start-all.sh

#linux122
ps aux | grep metastore

hive --service metastore &
#linux121
spark-sql --master yarn
spark-shell

测试


http://192.168.49.121:8080/
http://192.168.49.121:18080/

2.1.4 激活python3环境,并且启动jupyter notebook


# 创建一个名为 'spark-env' 的新环境,使用 Python 3.7
conda create -n spark-env python=3.7

# 激活新环境
conda activate spark-env


安装
conda install py4j
conda install jieba
conda install pyspark==2.4.5
conda install pyhive
conda install happybase==1.2.0
conda uninstall jupyter
# 或者安装 Jupyter Notebook 6.0.3
conda install notebook=6.0.3

cd /opt/soft
mkdir -p /opt/soft/conda
chmod 777 /opt/soft/conda
scp -r conda/ linux121:/opt/soft
scp -r conda/ linux123:/opt/soft

/opt/soft/conda
修改其它节点上的环境变量
新增
vim /etc/profile
export CONDA_HOME=/opt/soft/conda
export PATH=$PATH:$CONDA_HOME/bin

source /etc/profile

新建项目目录

mkdir -p /root/data/code/job_recommended/
chmod 777 /root/data/code/job_recommended/
cd /root/data/code/job_recommended/

启动

在项目目录开启
# export TZ=Asia/Shanghai
# export LANG=en_US.UTF-8

jupyter notebook --port=8889 --ip=0.0.0.0 --no-browser --allow-root




use ods;
show tables;
desc ods_position;

导入sql文件

hive -f /root/data/user_action.sql

进入jupyter notebook,新建一个concat_fields.ipynb

代码见/root/data/code/job_recommended/concat_fields.ipynb

python3位置:/opt/soft/conda/envs/superset/bin

在hive中新建相应的表,将结果插入hive中的表

drop table `ods.ods_position_content`;
CREATE TABLE `ods.ods_position_content`(
`id` string,
`region` string,
`position_category` string,
`content` string)
row format delimited fields terminated by ',';

2.1.4 TFIDF

新建目录,将文件放入

sudo mkdir -p /data/words
ITKeywords.txt   stopwords.txt

新建文件夹

hdfs dfs -rm -r /lgns/lg_models/
hdfs dfs -mkdir -p /lgns/lg_models/


在hive中新建相应的表

drop table `idf_keywords_values`;
CREATE TABLE idf_keywords_values(
keyword STRING comment "keyword",
idf DOUBLE comment "idf",
index INT comment "index");
-- 职位tfidf保存
CREATE TABLE tfidf_keywords_values(
position_id INT comment "position_id",
region string comment "region",
keyword STRING comment "keyword",
tfidf DOUBLE comment "tfidf");

新建文件compute_tfidf.ipynb

见compute_tfidf.ipynb

2.1.5 TextRank

创建textrank_keywords_values表

drop table if exists textrank_keywords_values ;
CREATE TABLE textrank_keywords_values(
position_id INT comment "position_id",
region String comment "region",
industry String comment "industry",
keyword STRING comment "keyword",
textrank DOUBLE comment "textrank");

新建文件compute_textrank.ipynb

见compute_textrank.ipynb代码

新建表

create table position_profile(
position_id String,
region String,
keywords MAP<String,String>,
topics ARRAY<String>
);

drop table if exists position_vector;
CREATE TABLE position_vector(
position_id String comment "position_id",
region String comment "region",
position_vector ARRAY<double> comment "keyword")
row format delimited fields terminated by "/t" collection items terminated by
',';

新建文件word2vec.ipynb文件,计算职位画像结果和职位相似度

见word2vec.ipynb代码

hbase新建表

disable 'position_similar'
drop 'position_similar'
create 'position_similar', 'similar'
# 存储格式如下:key:为position_id, 'similar:position_id', 结果为相似度
put 'position_similar', '1', 'similar:2', 0.34
put 'position_similar', '1', 'similar:3', 0.267
put 'position_similar', '1', 'similar:4', 0.56
put 'position_similar', '1', 'similar:5', 0.7
put 'position_similar', '1', 'similar:6', 0.819
put 'position_similar', '1', 'similar:8', 0.28

hbase thrift start -p 9090

,之后才能连接hbase

2.1.8 用户画像构建

hbase新建表

create 'user_profile', 'basic','user_reference','env'

新建user_profile

见user_profile代码

代码可能会报hive连不到hbase的错误,导包到

cd $HIVE_HOME/lib/

上传 hive-hbase-handler-2.3.7.jar

新建hive表

drop table if exists user_profile_hbase;
create external table user_profile_hbase(
user_id STRING comment "userID",
basic map<string, String> comment "user basic information",
user_reference map<string, String> comment "user_reference",
env map<string, String> comment "user env")
COMMENT "user profile table"
STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
WITH SERDEPROPERTIES ("hbase.columns.mapping" =
":key,basic:,user_reference:,env:")
TBLPROPERTIES ("hbase.table.name" = "user_profile");

这里因为版本不匹配,所以无法推进,为了节省时间,直接先略过

停止全部spark的命令

yarn application -list | grep -i spark | awk '{print $1}' | xargs -I {} yarn application -kill {}

2.1.9 召回与排序

新建hbase表

create 'lg_recall', {NAME=>'als', TTL=>1296000, VERSIONS=>999999}
alter 'lg_recall', {NAME=>'content', TTL=>1296000, VERSIONS=>999999}
alter 'lg_recall', {NAME=>'online', TTL=>1296000, VERSIONS=>999999}
# 例子:
put 'lg_recall', 'recall:user:5', 'als:1',[45,3,5,10]
put 'lg_recall', 'recall:user:5', 'als:1',[289,11,65,52,109,8]
put 'lg_recall', 'recall:user:5', 'als:2',[1,2,3,4,5,6,7,8,9,10]
put 'lg_recall', 'recall:user:2', 'content:1',[45,3,5,10,289,11,65,52,109,8]
put 'lg_recall', 'recall:user:2', 'content:2',[1,2,3,4,5,6,7,8,9,10]
create 'history_recall', {NAME=>'recall', TTL=>3888000, VERSIONS=>999999}
put 'history_recall', 'userid1', 'recall:history',[1,2,3]
put 'history_recall', 'userid1', 'recall:history',[4,5,6,7]
put 'history_recall', 'userid1', 'recall:history',[8,9,10]

新建AlsRecall文件,按用户召回

见AlsRecall代码

新建LRRank文件,按内容召回

见LRRank代码

2.1.10 推荐流程

windows环境

# 创建一个名为 'spark-env' 的新环境,使用 Python 3.7
conda create -n spark-env python=3.7

# 激活新环境
conda activate spark-env

conda install grpcio-tools
conda install grpcio
conda install pyspark==2.4.5
conda install happybase==1.2.0
conda install redis

# 首先切换到 E: 驱动器
E:

# 然后进入目标目录
cd mysource\pyspark_test\com\abtest

编译生成代码
python -m grpc_tools.protoc -I. --python_out=.. --grpc_python_out=reco.proto

新建hbase表

create 'ctr_user_feature', 'user_weigths'
create 'ctr_position_feature', 'position_weigths'

新建feature_process

见feature_process代码

标签:pyspark,flask,jar,dwd,position,HOME,spark,SPARK,jars
From: https://www.cnblogs.com/zwnfdswww/p/18527567

相关文章

  • 毕业设计:python高校舆情分析系统+可视化+情感分析 舆情分析+Flask框架(源码)✅
    毕业设计:python高校舆情分析系统+可视化+情感分析舆情分析+Flask框架(源码)✅1、项目介绍技术栈:Python语言、Flask框架、requests爬虫、snownlp情感分析、Echarts可视化、HTML2、项目界面(1)系统首页数据概况(2)敏感词统计分析(3)词云图分析(4)话题趋势分析(5)新闻词云图......
  • python+flask框架的智慧停车平台 小程序28(开题+程序+论文) 计算机毕业设计
    本系统(程序+源码+数据库+调试部署+开发环境)带论文文档1万字以上,文末可获取,系统界面在最后面。系统程序文件列表开题报告内容选题背景随着城市化进程的加速,车辆数量急剧增加,停车难问题已成为各大城市面临的普遍难题。智慧停车平台作为解决停车难问题的有效手段,近年来在国内......
  • python+flask框架的智慧停车平台 小程序18(开题+程序+论文) 计算机毕业设计
    本系统(程序+源码+数据库+调试部署+开发环境)带论文文档1万字以上,文末可获取,系统界面在最后面。系统程序文件列表开题报告内容选题背景随着城市化进程的加速,车辆数量急剧增加,停车难问题已成为各大城市面临的普遍难题。智慧停车平台作为解决停车难问题的有效手段,近年来在国内......
  • python+flask框架的智慧工会微信小程序 小程序端28(开题+程序+论文) 计算机毕业设计
    本系统(程序+源码+数据库+调试部署+开发环境)带论文文档1万字以上,文末可获取,系统界面在最后面。系统程序文件列表开题报告内容选题背景随着信息技术的飞速发展和移动互联网的普及,微信小程序作为一种轻量级的应用形态,已成为各类服务接入互联网的重要平台。智慧工会作为新时代......
  • 【Python期末/课程设计】高校成绩管理系统(PyCharm项目/flask框架/MySQL数据库)
    代写C语言、C++、Java、Python、HTML、JavaScript、vue、MySQL相关编程作业,长期接单,信誉有保证,如有需要请加推广QQ。本文资源:【Python期末/课程设计】高校成绩管理系统(PyCharm项目/flask框架/MySQL数据库)1.题目要求题目描述:无编程软件:2.视频演示【Python期......
  • python+flask计算机毕业设计个性化推荐图书借阅系统开发(程序+开题+论文)
    本系统(程序+源码+数据库+调试部署+开发环境)带论文文档1万字以上,文末可获取,系统界面在最后面。系统程序文件列表开题报告内容一、选题背景关于图书借阅系统的研究,现有研究主要以传统借阅管理为主,如[1]中的图书借阅管理系统侧重于管理员操作和基本借阅功能的实现。专门针对......
  • python+flask计算机毕业设计光爱之家孤儿院管理系统设计与实现(程序+开题+论文)
    本系统(程序+源码+数据库+调试部署+开发环境)带论文文档1万字以上,文末可获取,系统界面在最后面。系统程序文件列表开题报告内容一、选题背景关于孤儿院管理的研究,现有研究主要以孤儿院的运营模式、儿童心理关怀等为主。专门针对孤儿院管理系统,尤其是结合光爱之家这种特定模式......
  • python+flask计算机毕业设计高校学生饮食推荐系统(程序+开题+论文)
    本系统(程序+源码+数据库+调试部署+开发环境)带论文文档1万字以上,文末可获取,系统界面在最后面。系统程序文件列表开题报告内容一、选题背景关于饮食推荐系统的研究,现有研究多以大众群体为主,专门针对高校学生这一特定群体的饮食推荐系统研究较少。在国内外,饮食推荐相关研究主......
  • python+flask计算机毕业设计国风彩妆网站(程序+开题+论文)
    本系统(程序+源码+数据库+调试部署+开发环境)带论文文档1万字以上,文末可获取,系统界面在最后面。系统程序文件列表开题报告内容一、选题背景关于国风彩妆网站的研究,现有研究主要以彩妆产品本身或一般的商业网站为主,专门针对国风彩妆网站特色功能、用户体验以及文化融合等方面......
  • 使用Flask做langchain的API
    文章目录概述安装依赖定义翻译方法定义请求的json数据格式定义接口和路由启动API验证API下载源代码之前使用langserve可以特别轻松的封装langchain服务为API,这些API开放了链的各种能力。有时候我们实际上只是需要更加简单的接口,并且希望能够更加灵活的对接口进行......