首页 > 其他分享 >hadoop组件---spark实战-----airflow----调度工具airflow部署到k8s中使用

hadoop组件---spark实战-----airflow----调度工具airflow部署到k8s中使用

时间:2023-08-08 23:33:56浏览次数:39  
标签:web airflow name hadoop ---- usr && local


在之前的文章中 我们已经了解了airflow 和 它的工作原理。

hadoop组件—spark实战-----airflow----调度工具airflow的介绍和使用示例

Scheduler进程,WebServer进程和Worker进程需要单独启动。Scheduler和WebServer可以跑在一个操作系统内,也可以分开,而通常Worker需要很多,如果是部署特定的数量的Worker,那就需要特定数量的机器才行;

Airflow On Kubernetes的方案,就可以克服这个缺点,Scheduler和WebServer长期运行,Worker只在有需要任务调度的时候跑起来,如果没有任务要执行,那这个Airflow集群将只有Scheduler和WebServer进程;

方案和架构选择

任务交互方式–支持 非常驻 worker

airflow和k8s结合交互,有两种方式。

一种是 KubernetesExecutor

KubernetesExecutor是比较新的一种用法和属性,在airflow 1.10中才引入。

使用KubernetesExecutor是 一种跟 HiveOperator 或者 PythonOperator一个层级的 Operators。
会在worker层面有k8s的一些依赖和上下文context,同时需要在worker节点中设置相应的配置参数。
它是一种 airflow跟k8s更紧密的结合 和应用。

Kubernetes Executor的使用流程是配置文件中定义好任务,并指明任务运行使用KuberneteExecutor,在配置KubernetesExecutor的时候指定镜像、tag、将要跟k8s集群申请的资源等,接下来,在指定的容器里面直接使用kubernetes_executor 执行器运行 任务。

Kubernetes Executor的粒度层级是 跟task 紧密结合的,一般一个task就会创建一个pod。

另一种比较常用的是 KubernetesPodOperator

KubernetesPodOperator的粒度层级是pod,它只管到pod的创建,不会管理到task的层面,也就是说 airflow利用k8s的pod创建能力,创建出一个pod作为我们的worker,里面的镜像是我们指定的,然后 上面 可以运行多个task,也可以运行任意其他的类型的executor,比如python,hive。

我们可以根据 我们的业务场景需求来 使用。

如果我们是 一个全新的业务,那么可以尝试使用Kubernetes Executor来配置运行。

如果我们是一个已有的业务,以前都是使用PythonOperator等调度器在运行,那么 使用 KubernetesPodOperator会更好,这样 我们的业务代码就不需要修改,只需要修改外部调度的地方。

master的部署方式

我们的交互方式 使用 KubernetesPodOperator 或者 KubernetesExecutor 只能保证woker的创建和运行。

我们还是需要 master 来 常驻运行 Scheduler和WebServer。

master可以部署在一个常驻服务器中,当然也可以使用pod的方式运行。

这里为了 统一管理,我们选择 使用pod来进行 部署master。

常驻master和worker部署在k8s

如果 我们的数据量不大,任务不多,可以使用常驻的pod服务来安装master和 worker。

本篇文章实现这个思路的airflow。

需要借助项目 kube-airflow

kube-airflow github

下载项目

git clone git://www.github.com/mumoshu/kube-airflow.git

项目中文件如下:

zhangxiaofans-MacBook-Pro:kube-airflow joe$ ls
Dockerfile.template	README.md		config			script
LICENSE			airflow			dags
Makefile		airflow.all.yaml	requirements
zhangxiaofans-MacBook-Pro:kube-airflow joe$

流Dockerfile.template是kube-airflow的镜像文件,这个镜像也发布到了Docker-hub中,详情可参考docker-airflow 该镜像基于Debian扩展官方映像 debian:stretch。

airflow.all.yaml是用于手动创建Kubernetes服务和部署airflow。

kube-airflow默认使用的是 CeleryExecutor + rabbitmq的模式。配置情况参考gitlab的airflow.cfg

使用命令部署如下:

kubectl create -f airflow.all.yaml

这个命令会创建以下 deployment:

postgres
 rabbitmq
 airflow-webserver
 airflow-scheduler
 airflow-flower
 airflow-worker和 以下 services :
postgres
 rabbitmq
 airflow-webserver
 airflow-flower

部署成功后可以使用命令查看创建好的相关pod

kubectl get pods

找到web的podname, 比如 web-796b7857b7-dt7nk

进入pod中

kubectl exec -ti  web-796b7857b7-dt7nk  -- bash

运行测试语句

# 测试 print_date
airflow test tutorial print_date 2020-02-27

运行成功输出如下:

airflow@web-796b7857b7-dt7nk:~$ airflow test tutorial print_date 2020-02-27
[2020-02-27 14:50:27,188] {__init__.py:57} INFO - Using executor CeleryExecutor
[2020-02-27 14:50:27,236] {driver.py:120} INFO - Generating grammar tables from /usr/lib/python2.7/lib2to3/Grammar.txt
[2020-02-27 14:50:27,251] {driver.py:120} INFO - Generating grammar tables from /usr/lib/python2.7/lib2to3/PatternGrammar.txt
[2020-02-27 14:50:27,379] {models.py:167} INFO - Filling up the DagBag from /usr/local/airflow/dags
[2020-02-27 14:50:27,555] {models.py:1126} INFO - Dependencies all met for <TaskInstance: tutorial.print_date 2020-02-27 00:00:00 [success]>
[2020-02-27 14:50:27,557] {models.py:1126} INFO - Dependencies all met for <TaskInstance: tutorial.print_date 2020-02-27 00:00:00 [success]>
[2020-02-27 14:50:27,557] {models.py:1318} INFO -
--------------------------------------------------------------------------------
Starting attempt 1 of 2
--------------------------------------------------------------------------------

[2020-02-27 14:50:27,558] {models.py:1342} INFO - Executing <Task(BashOperator): print_date> on 2020-02-27 00:00:00
[2020-02-27 14:50:27,568] {bash_operator.py:71} INFO - tmp dir root location:
/tmp
[2020-02-27 14:50:27,568] {bash_operator.py:80} INFO - Temporary script location :/tmp/airflowtmpiBOkUx//tmp/airflowtmpiBOkUx/print_dateXQWqUY
[2020-02-27 14:50:27,569] {bash_operator.py:81} INFO - Running command: date
[2020-02-27 14:50:27,572] {bash_operator.py:90} INFO - Output:
[2020-02-27 14:50:27,574] {bash_operator.py:94} INFO - Thu Feb 27 14:50:27 UTC 2020
[2020-02-27 14:50:27,574] {bash_operator.py:97} INFO - Command exited with return code 0
airflow@web-796b7857b7-dt7nk:~$

更多测试命令

# 1、列出现有所有的活动的DAGS
    airflow list_dags

# 2、列出 tutorial 的任务id
    airflow list_tasks tutorial

# 3、以树形图的形式列出 tutorial 的任务id
    airflow list_tasks tutorial --tree


# 4、backfill 可以执行一个时间段内应该执行的所有任务
airflow backfill  tutorial -s 2020-02-26  -e  2020-02-27

也可以使用mysql+LocalExecutor模式
新增yaml内容如下:

\apiVersion: v1
kind: Pod
metadata:
  name: airflow-mysql
  labels:
    name: airflow-mysql
spec:
  containers:
  - name: airflow-mysql
    image: mysql:8.0.12
    imagePullPolicy: IfNotPresent
    ports:
    - containerPort: 3306
    env:
    - name: MYSQL_SERVICE_HOST
      value: "mysql"
    - name: MYSQL_SERVICE_PORT
      value: "3306"
    - name: MYSQL_ROOT_PASSWORD
      value: '123456'
    - name: MYSQL_DATABASE
      value: 'airflow'
    - name: TZ
      value: 'Asia/Shanghai'
    - name: LANG
      value: 'C.UTF-8'

---
apiVersion: v1
kind: Service
metadata:
  name: airflow-mysql
spec:
  selector:
    name: airflow-mysql
  ports:
    - protocol: TCP
      port: 3306
      targetPort: 3306
---

以及work的yaml中增加

- name: AIRFLOW__CORE__SQL_ALCHEMY_CONN
          value: "mysql+mysqldb://root:123456@mysql:3306/airflow"

如下:

apiVersion: extensions/v1beta1
kind: Deployment
metadata:
  name: worker
spec:
  replicas: 1
  template:
    metadata:
      labels:
        app: airflow
        tier: worker
    spec:
      restartPolicy: Always
      containers:
      - name: worker
        image: mumoshu/kube-airflow:1.8.0.0-1.6.1
        # volumes:
        #     - /localpath/to/dags:/usr/local/airflow/dags
        env:
        - name: AIRFLOW_HOME
          value: "/usr/local/airflow"
        - name: AIRFLOW__CORE__SQL_ALCHEMY_CONN
          value: "mysql+mysqldb://root:123456@mysql:3306/airflow"
        args: ["worker"]

airflow.cfg中executor需要修改如下:

executor = LocalExecutor

mysql+localExecutor模式可参考

增加ingress配置,访问airflow的web

修改airflow.all.yaml文件

apiVersion: v1
kind: Service
metadata:
  name: web
spec:
  type: NodePort
  selector:
    app: airflow
    tier: web
  ports:
    - name: web
      protocol: TCP
      port: 8080
      targetPort: web
      nodePort: 32080

修改为:

apiVersion: v1
kind: Service
metadata:
  name: web
spec:
  selector:
    app: airflow
    tier: web
  ports:
    - name: web
      protocol: TCP
      port: 8080
      targetPort: 8080

新增内容:

---
apiVersion: extensions/v1beta1
kind: Ingress
metadata:
  name: web
  namespace: default
spec:
  rules:
  -
    host: api-beta.test.com
    http:
      paths:
      - path: /
        backend:
          serviceName: web
          servicePort: 8080

把之前的web的deployment和service删除后重新创建,使用命令

kubectl delete deployment web
kubectl delete service  web

kubectl create -f airflow.all.yaml

浏览器中访问api-beta.test.com

如图

hadoop组件---spark实战-----airflow----调度工具airflow部署到k8s中使用_spark

使用Dockerfile做镜像

有时候我们根据自己的业务需要进行自动的ci/cd发布。

比如把需要执行的etl的py文件打包到 dags中,或者需要安装我们的业务项目包。

这几种情况都需要对Dockerfile.template修改调整后 发布镜像。

步骤如下:

cp Dockerfile.template  Dockerfile

vi Dockerfile

注意 有两个变量

ENV     EMBEDDED_DAGS_LOCATION=%%EMBEDDED_DAGS_LOCATION%%
ENV     REQUIREMENTS_TXT_LOCATION=%%REQUIREMENTS_TXT_LOCATION%%

这里是 示例代码本地的路径,需要先设置环境变量,不然会报错找不到目录。

也可以直接注释掉,不把这部分加入镜像

# COPY    ${REQUIREMENTS_TXT_LOCATION} /requirements/dags.txt

# COPY        ${EMBEDDED_DAGS_LOCATION} ${AIRFLOW_HOME}/dags

删除

&&  pip3 install -r /requirements/dags.txt \

使用命令创建镜像并上传

docker build -t  spark-airlfow:1.8.0  .
docker tag  spark-airflow:1.8.0   <repo>/spark-airflow:1.8.0
docker push <repo>/spark-airflow:1.8.0

可能遇到的报错–ERROR: Package ‘apache-airflow’ requires a different Python: 3.5.3 not in ‘~=3.6’

这是因为 Dockerfile中默认安装python是使用的 python3-dev,这样有可能下载到的是python3.5,不符合airflow需要的版本,所以我们需要修改一下 Dockerfile

RUN         set -ex \
        &&  buildDeps=' \
                build-essential \
                libblas-dev \
                libffi-dev \
                libkrb5-dev \
                liblapack-dev \
                libpq-dev \
                libsasl2-dev \
                libssl-dev \
                libxml2-dev \
                libxslt1-dev \
                python3-dev \
                python3-pip \
                zlib1g-dev \

去掉python3的安装 修改成

RUN         set -ex \
        &&  buildDeps=' \
                build-essential \
                libblas-dev \
                libffi-dev \
                libkrb5-dev \
                liblapack-dev \
                libpq-dev \
                libsasl2-dev \
                libssl-dev \
                libxml2-dev \
                libxslt1-dev \
                zlib1g-dev \

新增命令 安装3.6版本

&&  apt-get install wget  -yqq \
        &&  wget https://www.python.org/ftp/python/3.6.9/Python-3.6.9.tgz \
        &&  tar -xzvf Python-3.6.9.tgz \
        &&  cd Python-3.6.9/   \
        &&  ./configure --prefix=/usr/local/python36   \
        &&  make && make install    \
        &&  chmod 777 -R /usr/local/python36  \
        &&  export PATH=/usr/local/python36/bin:$PATH  \

或者新增命令 安装3.7版本

&&  apt-get install wget  -yqq \
        &&  wget https://www.python.org/ftp/python/3.7.2/Python-3.7.2.tgz \
        &&  tar -zxvf Python-3.7.2.tgz \
        &&  cd Python-3.7.2/   \
        &&  ./configure --prefix=/usr/local/python37   \
        &&  make && make install    \
        &&  chmod 777 -R /usr/local/python37  \
        &&  export PATH=/usr/local/python37/bin:$PATH \

可能遇到的报错–airflow command not found,python3 command not found

识别不到安装好的python和airflow

需要在endpoint.sh中设置下环境变量

vi script/entrypoint.sh

新增内容

export PATH=/usr/local/python37/bin:$PATH

python3 -V

whereis  python

export PATH=$PATH:$AIRFLOW_HOME

echo $PATH

Dockerfile中也新增一个软连接

vi Dockerfile

新增内容

RUN    ln -sf  /usr/bin/python  /usr/bin/python3

可能遇到的问题–relation “log” does not exist at character,relation ‘table_name‘’ does not exist

需要在pod使用前进行 airflow initdb命令行操作

检查endpoint.sh中是否有 airflow initdb 语句执行

vi script/entrypoint.sh

如果已经执行了,需要排查下是否运行成功还是报错了。

可能遇到的问题–airflow command error: argument subcommand: invalid choice: ‘initdb’

原因是在requirements/airflow.txt中的airflow没有指定版本,默认会拿最新的版本,也就是airflow2.0.0 。

2.0.0的命令与1.x.x的已经不同了,需要修正下运行的语句,比如

airflow initdb

修改为

airflow db init

详情参考 github question

不过这种方案 需要修改的命令比较多,最好还是 指定airflow的版本与kube-airflow使用的对应。比如这里是 1.8.0版本。

使用命令

vi  requirements/airflow.txt

git+https://github.com/apache/incubator-airflow#egg=airflow

修改为

git+https://github.com/apache/[email protected]#egg=airflow

requirements.txt中使用git有如下用法:

下面的示例package-two使用GitHub存储库进行更新。@和之间的文字#表示包装的详细信息。



指定提交哈希(41b95ec为commit号,在github中切换到相应分支后右上角可以看到Latest commit 41b95ec on 20 Mar 2017):

package-one==1.9.4

git+git://github.com/path/to/package-two@41b95ec#egg=package-two

package-three==1.0.1

指定分支名称(master):

git+git://github.com/path/to/package-two@master#egg=package-two

指定标签(0.1):

git+git://github.com/path/to/[email protected]#egg=package-two

指定发布(3.7.1):

git+git://github.com/path/to/package-two@releases/tag/v3.7.1#egg=package-two

请注意,#egg=package-two此处不是注释,而是要明确说明软件包名称

可能遇到的问题–async = [^ SyntaxError: invalid syntax

原因是python3.7中async是关键字,而airflow1.8.0版本中把async当作一个变量来使用。

也就是 python3.7和airflow1.8.0存在兼容性问题。

airflow initdb时会引入tenacity的包(from tenacity.async import AsyncRetrying)async在python3.7版本被加入了关键字,导致airflow无法在python3.7运行,详见github question

解决方案,指定tenacity==4.10.0

使用命令

vi  requirements/airflow.txt

新增内容

tenacity==4.10.0

同时 airflow的setup.py等也需要修改,不能再使用async这个变量,可以修改成 _async 等任意变量名。

这个方案需要修改的地方比较多,也可以尝试使用airflow1.10.7版本以上,是跟python3.7兼容的。

详情参考 [AIRFLOW-2716] Replace new Python 3.7 keywords

https://github.com/apache/airflow/releases

使用命令

vi  requirements/airflow.txt

git+https://github.com/apache/incubator-airflow#egg=airflow

修改为

git+https://github.com/apache/[email protected]#egg=airflow

可能遇到的问题—ModuleNotFoundError: No module named ‘_bz2’

原因是安装的python缺少系统的相关解压包

不用重新编译python的解决方式

找到_bz2.cpython-36m-x86_64-linux-gnu.so文件,一般python3.6的目录中会有,如果没有的话,可以下载我存在百度网盘的这个

链接:https://pan.baidu.com/s/1AwAW028WOTlQTk8RuF9evg 密码:f2tu

如果你的python版本是3.6,那就是36m
使用命令

cp _bz2.cpython-36m-x86_64-linux-gnu.so /usr/local/python3/lib/python3.6/lib-dynload/

我的是python3.7,得把文件名改为37m,并拷贝到python3的安装目录
使用命令

mv _bz2.cpython-36m-x86_64-linux-gnu.so _bz2.cpython-37m-x86_64-linux-gnu.so
cp _bz2.cpython-37m-x86_64-linux-gnu.so /usr/local/python3/lib/python3.7/lib-dynload/

修改时区为中国时区

首先镜像的时区也需要设置成中国时区

在Dockfile中新增语句如下—根据系统不同选择不同的语句:

# CentOS

RUN echo "Asia/shanghai" > /etc/timezone;

# Ubuntu

RUN cp /usr/share/zoneinfo/Asia/Shanghai /etc/localtime


# debian

ENV TZ=Asia/Shanghai
RUN ln -snf /usr/share/zoneinfo/$TZ /etc/localtime && echo $TZ > /etc/timezone

启动pod后需要进入pod中进行校验,使用命令如下:

[dev@test-kops-jump kube-airflow]$ kubectl get pods |grep airflow
airflow-flower-5f49695d96-mrmld                1/1     Running     0          53s
airflow-ps-5d5bb5cb88-mggmp                    1/1     Running     0          53s
airflow-rabbitmq-7984cf97ff-c7pjt              1/1     Running     0          53s
airflow-scheduler-575ccf69d9-bchvq             1/1     Running     0          53s
airflow-web-68cffc7664-52xzm                   1/1     Running     0          53s
airflow-worker-9cf6cb6dd-ndlg8                 1/1     Running     0          53s
[dev@test-kops-jump kube-airflow]$
[dev@test-kops-jump kube-airflow]$ kubectl exec -ti airflow-web-68cffc7664-52xzm -- bash
airflow@airflow-web-68cffc7664-52xzm:~$ date -R
Sat, 07 Mar 2020 15:55:11 +0800

这里使用airflow版本是1.10.9,其它版本大同小异,参照修改即可

首先要找到airflow的安装文件所在目录,不同的安装方式使用的文件是不同的。 如果是手动下载github的源代码进行安装的话,安装目录就在我们自己指定的目录。

如果是使用python方式pip方式安装的,则一般在python的site-package目录中,比如我的目录如下:

airflow@airflow-web-77b59b7cc5-swhtq:~$ ls /usr/local/python37/lib/python3.7/site-packages/airflow/
alembic.ini         contrib/            executors/          logging_config.py   plugins_manager.py  serialization/      _vendor/            
api/                dag/                hooks/              macros/             __pycache__/        settings.py         version.py          
bin/                default_login.py    __init__.py         migrations/         security/           task/               www/                
config_templates/   example_dags/       jobs/               models/             sensors/            ti_deps/            www_rbac/           
configuration.py    exceptions.py       lineage/            operators/          sentry.py           utils/

airflow.cfg文件则在 指定的airflow的home目录下,比如我的在

airflow@airflow-web-77b59b7cc5-jllpj:~$ ls /usr/local/airflow/
airflow.cfg  airflow-webserver.pid  entrypoint.sh  git-sync  hail_oper_base.log  logs  unittests.cfg
airflow@airflow-web-77b59b7cc5-jllpj:~$

修改airflow.cfg文件增加时区

使用命令

cd /usr/local/airflow/
vi  airflow.cfg

设置

default_timezone = Asia/Shanghai

修改airflow/utils/timezone.py

使用命令

cd /usr/local/python37/lib/python3.7/site-packages/airflow/
 vi utils/timezone.py

在 utc = pendulum.timezone(‘UTC’) 这行(第27行)代码下添加

from airflow import configuration as conf
try:
	tz = conf.get("core", "default_timezone")
	if tz == "system":
		utc = pendulum.local_timezone()
	else:
		utc = pendulum.timezone(tz)
except Exception:
	pass

修改utcnow()函数 (在第69行)

原代码 d = dt.datetime.utcnow() 
修改为 d = dt.datetime.now()

修改airflow/utils/sqlalchemy.py

使用命令

cd /usr/local/python37/lib/python3.7/site-packages/airflow/
 vi utils/sqlalchemy.py

在utc = pendulum.timezone(‘UTC’) 这行(第37行)代码下添加

from airflow import configuration as conf
try:
	tz = conf.get("core", "default_timezone")
	if tz == "system":
		utc = pendulum.local_timezone()
	else:
		utc = pendulum.timezone(tz)
except Exception:
	pass

注释airflow/utils/sqlalchemy.py中的cursor.execute(“SET time_zone = ‘+00:00’”) (第66行)

修改airflow/www/templates/admin/master.html(第31行)

使用命令

cd /usr/local/python37/lib/python3.7/site-packages/airflow/
vi www/templates/admin/master.html

修改的内容

把代码 var UTCseconds = (x.getTime() + x.getTimezoneOffset()*60*1000); 
改为 var UTCseconds = x.getTime();

把代码 "timeFormat":"H:i:s %UTC%",
改为  "timeFormat":"H:i:s",

修改airflow/models/dag.py —注意,镜像时区不为中国时区时才需要修改

使用命令

cd /usr/local/python37/lib/python3.7/site-packages/airflow/
vi models/dag.py

修改的内容–在py文件中找到class DagModel(Base):在该类中新增方法utc2local如下:

class DagModel(Base):

    def utc2local(self,utc):
        import time
        epoch = time.mktime(utc.timetuple())
        print("时间戳"+str(epoch))
        result_time_8_hours_s = datetime.fromtimestamp(epoch) - datetime.utcfromtimestamp(epoch)
        time_result = utc + result_time_8_hours_s
        print("结果时间戳"+str(time_result))
        return time_result

修改airflow/www/templates/airflow/dags.html —注意,镜像时区不为中国时区时才需要修改

使用命令

cd /usr/local/python37/lib/python3.7/site-packages/airflow/
vi www/templates/airflow/dags.html

修改的内容

last_run.execution_date.strftime("%Y-%m-%d %H:%M")
和
last_run.start_date.strftime("%Y-%m-%d %H:%M")

分别修改为:
dag.utc2local(last_run.execution_date).strftime("%Y-%m-%d %H:%M")
和
dag.utc2local(last_run.start_date).strftime("%Y-%m-%d %H:%M")

重启webserver使修改生效

su airflow
 ps -ef|egrep 'airflow-webserver'|grep -v grep|awk '{print $2}'|xargs kill -9


rm -rf /home/airflow/airflow/airflow-scheduler.pid
 
 
airflow webserver -p 8080 -D

更多重启命令参考

重启webserver和scheduler

su airflow
 ps -ef|egrep 'scheduler|airflow-webserver'|grep -v grep|awk '{print $2}'|xargs kill -9
 rm -rf /home/airflow/airflow/airflow-scheduler.pid 
 airflow webserver -p 8080 -D
  airflow scheduler -D
tail -f /home/airflow/airflow/airflow-scheduler.err 

重启worker
su airflow
ps -ef|egrep 'serve_logs|celeryd'|grep -v grep
 rm -rf /home/airflow/airflow/airflow-worker.pid
 airflow worker -D
tail -f /home/airflow/airflow/airflow-worker.err   什么也不打印就是没有问题

隐藏官方示例

修改 airflow.cfg配置项(97行), 在airflow_home(airflow_home = /root/airflow)下

load_examples = False

删除airflow.db ,在airflow_home(airflow_home = /root/airflow)下,重新初始化

airflow initdb

进入访问pg数据库

使用命令进入pod

kubectl exec -ti  airflow-ps-5d5bb5cb88-s8z7f  -- bash

在pod中进入pg数据库

root@airflow-ps-5d5bb5cb88-s8z7f:/# which psql
/usr/bin/psql
root@airflow-ps-5d5bb5cb88-s8z7f:/# /usr/bin/psql -h localhost -U airflow  -d airflow
psql (12.2 (Debian 12.2-2.pgdg100+1))
Type "help" for help.

airflow=#

pg数据库常用命令

连接数据库, 默认的用户和数据库是postgres
psql -U user -d dbname

切换数据库,相当于mysql的use dbname
\c dbname
列举数据库,相当于mysql的show databases
\l
列举表,相当于mysql的show tables
\dt
查看表结构,相当于desc tblname,show columns from tbname
\d tblname

\di 查看索引 

创建数据库: 
create database [数据库名]; 
删除数据库: 
drop database [数据库名];  
*重命名一个表: 
alter table [表名A] rename to [表名B]; 
*删除一个表: 
drop table [表名]; 

*在已有的表里添加字段: 
alter table [表名] add column [字段名] [类型]; 
*删除表中的字段: 
alter table [表名] drop column [字段名]; 
*重命名一个字段:  
alter table [表名] rename column [字段名A] to [字段名B]; 
*给一个字段设置缺省值:  
alter table [表名] alter column [字段名] set default [新的默认值];
*去除缺省值:  
alter table [表名] alter column [字段名] drop default; 
在表中插入数据: 
insert into 表名 ([字段名m],[字段名n],......) values ([列m的值],[列n的值],......); 
修改表中的某行某列的数据: 
update [表名] set [目标字段名]=[目标值] where [该行特征]; 
删除表中某行数据: 
delete from [表名] where [该行特征]; 
delete from [表名];--删空整个表 
创建表: 
create table ([字段名1] [类型1] <references 关联表名(关联的字段名)>;,[字段名2] [类型2],......<,primary key (字段名m,字段名n,...)>;); 
\copyright     显示 PostgreSQL 的使用和发行条款
\encoding [字元编码名称]
                 显示或设定用户端字元编码
\h [名称]      SQL 命令语法上的说明,用 * 显示全部命令
\prompt [文本] 名称
                 提示用户设定内部变数
\password [USERNAME]
                 securely change the password for a user
\q             退出 psql

可能遇到的问题—Failed to fetch log file from worker.

在web ui界面中点击查看日志,发现获取不了, 连接不上worker,远程获取不了log的日志

解决方法,在创建pod的yaml中需要增加worker的service 如下:

apiVersion: v1
kind: Service
metadata:
  name: worker
spec:
  type: NodePort
  selector:
    app: airflow
    tier: worker
  ports:
    - name: worker
      protocol: TCP
      port: 8793
      targetPort: worker
      nodePort: 32082
---

worker的deployment中也需要增加

hostname: worker


        ports:
        - name: airflow-worker
          containerPort: 8793

详情参考:
https://github.com/mumoshu/kube-airflow/issues/23

格式参考 最终可用文件 yaml

可能遇到的问题—部分任务看不了日志–No host supplied

在web ui 中查看日志报错输出如下:

*** Log file does not exist: /usr/local/airflow/logs/user_quality/user_quality/2020-03-08T09:09:00+00:00/1.log
*** Fetching from: http://:8793/log/user_quality/user_quality/2020-03-08T09:09:00+00:00/1.log
*** Failed to fetch log file from worker. Invalid URL 'http://:8793/log/user_quality/user_quality/2020-03-08T09:09:00+00:00/1.log': No host supplied

解决方案

目前没有找到worker运行时 日志获取没加上 host的原因。

只能通过另外的方案解决,比如 把 日志存储在s3上。

可以通过config/airflow.cfg进行配置

# The folder where airflow should store its log files. This location
base_log_folder = /usr/local/airflow/logs

# Airflow can store logs remotely in AWS S3 or Google Cloud Storage. Users
# must supply a remote location URL (starting with either 's3://...' or
# 'gs://...') and an Airflow connection id that provides access to the storage
# location.
remote_logging = True
remote_base_log_folder = s3://beta-env/tmp/sparktest/
remote_log_conn_id = my_conn_S3
# Use server-side encryption for logs stored in S3
encrypt_s3_logs = False

my_conn_S3为任意起的名字,也可以空着,关键是需要在 环境变量中 注入 aws s3需要的三个变量

我在做镜像的时候 在Dockfile中使用命令:

ENV     AWS_ACCESS_KEY_ID   AKI123   
ENV     AWS_DEFAULT_REGION   cn-northwest-1   
ENV     AWS_SECRET_ACCESS_KEY   FmPTa12343

需要注意 s3这种方式 的日志 只有在任务运行结束后无论运行成功还是失败 才会上传到s3中,然后显示

最终可用文件参考

Dockerfile

# VERSION 1.8.0.0
# AUTHOR: Yusuke KUOKA
# DESCRIPTION: Docker image to run Airflow on Kubernetes which is capable of creating Kubernetes jobs
# BUILD: docker build --rm -t mumoshu/kube-airflow
# SOURCE: https://github.com/mumoshu/kube-airflow

FROM    debian:stretch
MAINTAINER Yusuke KUOKA <[email protected]>

# Never prompts the user for choices on installation/configuration of packages
ENV     DEBIAN_FRONTEND noninteractive
ENV     TERM linux

# Airflow
ARG     AIRFLOW_VERSION=1.8.0
ENV     POSTGRES_HOST airflow-ps
ENV     RABBITMQ_HOST airflow-rabbitmq
ENV     AIRFLOW_HOME /usr/local/airflow
ENV     EMBEDDED_DAGS_LOCATION=%%EMBEDDED_DAGS_LOCATION%%
ENV     REQUIREMENTS_TXT_LOCATION=%%REQUIREMENTS_TXT_LOCATION%%


# Define en_US.
ENV     LANGUAGE en_US.UTF-8
ENV     LANG en_US.UTF-8
ENV     LC_ALL en_US.UTF-8
ENV     LC_CTYPE en_US.UTF-8
ENV     LC_MESSAGES en_US.UTF-8
ENV     LC_ALL en_US.UTF-8

WORKDIR /requirements
# Only copy needed files
COPY    requirements/airflow.txt /requirements/airflow.txt
# COPY    ${REQUIREMENTS_TXT_LOCATION} /requirements/dags.txt


RUN         set -ex \
        &&  buildDeps=' \
                build-essential \
                libblas-dev \
                libffi-dev \
                libkrb5-dev \
                liblapack-dev \
                libpq-dev \
                libsasl2-dev \
                libssl-dev \
                libxml2-dev \
                libxslt1-dev \
                zlib1g-dev \
            ' \
        &&  apt-get update -yqq \
        &&  apt-get upgrade -yqq \
        &&  apt-get install -yqq --no-install-recommends \
                $buildDeps \
                apt-utils \
                curl \
                git \
                locales \
                netcat \
                bzip2 \
        &&      sed -i 's/^# en_US.UTF-8 UTF-8$/en_US.UTF-8 UTF-8/g' /etc/locale.gen \
        &&  apt-get install wget  -yqq \
        &&  wget https://www.python.org/ftp/python/3.7.2/Python-3.7.2.tgz \
        &&  tar -zxvf Python-3.7.2.tgz \
        &&  cd Python-3.7.2/   \
        &&  ./configure --prefix=/usr/local/python37   \
        &&  make && make install    \
        &&  chmod 777 -R /usr/local/python37  \
        &&  export PATH=/usr/local/python37/bin:$PATH \
        &&  locale-gen \
        &&  update-locale LANG=en_US.UTF-8 LC_ALL=en_US.UTF-8 \
        &&  useradd -ms /bin/bash -d ${AIRFLOW_HOME} airflow \
        &&  pip3 install --upgrade pip 'setuptools!=36.0.0' \
        &&  if [ ! -e /usr/bin/pip ]; then ln -s /usr/bin/pip3 /usr/bin/pip ; fi \
        &&  if [ ! -e /usr/bin/python ]; then ln -sf /usr/bin/python3 /usr/bin/python; fi \
        &&  pip3 install wheel   \
        &&  pip3 install hail \
        &&  pip3 install -r /requirements/airflow.txt \
#        &&  pip3 install -r /requirements/dags.txt \
        &&  pip3 install  pydigree==0.0.3  --no-cache-dir \
        &&  apt-get remove --purge -yqq $buildDeps libpq-dev \
        &&  apt-get clean \
        &&  rm -rf \
                /var/lib/apt/lists/* \
                /tmp/* \
                /var/tmp/* \
                /usr/share/man \
                /usr/share/doc \
                /usr/share/doc-base

ENV         KUBECTL_VERSION %%KUBECTL_VERSION%%

RUN         curl -L -o /usr/local/bin/kubectl \
                https://storage.googleapis.com/kubernetes-release/release/v${KUBECTL_VERSION}/bin/linux/amd64/kubectl \
        &&  chmod +x /usr/local/bin/kubectl

COPY        script/entrypoint.sh ${AIRFLOW_HOME}/entrypoint.sh
COPY        config/airflow.cfg ${AIRFLOW_HOME}/airflow.cfg
COPY        script/git-sync ${AIRFLOW_HOME}/git-sync
COPY        script/git-sync ${AIRFLOW_HOME}/git-sync
COPY        _bz2.cpython-37m-x86_64-linux-gnu.so  /usr/local/python37/lib/python3.7/lib-dynload/

RUN         chown -R airflow: ${AIRFLOW_HOME} \
        &&  chmod +x ${AIRFLOW_HOME}/entrypoint.sh \
        &&  chmod +x ${AIRFLOW_HOME}/git-sync



RUN       mv /etc/apt/sources.list /etc/apt/sources.list.bak \
   &&  echo "deb http://mirrors.aliyun.com/debian/ stretch main non-free contrib" >> /etc/apt/sources.list  \
   && echo "deb-src http://mirrors.aliyun.com/debian/ stretch main non-free contrib" >>/etc/apt/sources.list  \
   && echo "deb http://mirrors.aliyun.com/debian-security stretch/updates main" >>/etc/apt/sources.list   \
   && echo "deb-src http://mirrors.aliyun.com/debian-security stretch/updates main" >>/etc/apt/sources.list  \
   &&  echo "deb http://mirrors.aliyun.com/debian/ stretch-updates main non-free contrib" >> /etc/apt/sources.list  \
   && echo "deb-src http://mirrors.aliyun.com/debian/ stretch-updates main non-free contrib" >>/etc/apt/sources.list  \
   && echo "deb http://mirrors.aliyun.com/debian/ stretch-backports main non-free contrib" >>/etc/apt/sources.list   \
   && echo "deb-src http://mirrors.aliyun.com/debian/ stretch-backports main non-free contrib" >>/etc/apt/sources.list  \
   &&  apt-get update  -y  \
   &&  apt-get install vim  -y



RUN     export PATH=/usr/local/python37/bin:$PATH \
        && pip install apache-airflow[s3]  \
        && pip install apache-airflow[log] \
        && pip install awscli --upgrade --user  -i https://mirrors.aliyun.com/pypi/simple/ \
        && pip install py4j  -i https://mirrors.aliyun.com/pypi/simple/ \
        && pip install s3fs==0.4.0  -i https://mirrors.aliyun.com/pypi/simple/




COPY   airflow.cfg ${AIRFLOW_HOME}/airflow.cfg
COPY   master.html /usr/local/python37/lib/python3.7/site-packages/airflow/www/templates/admin/master.html
COPY   sqlalchemy.py /usr/local/python37/lib/python3.7/site-packages/airflow/utils/sqlalchemy.py
COPY   timezone.py  /usr/local/python37/lib/python3.7/site-packages/airflow/utils/timezone.py
COPY   entrypoint.sh ${AIRFLOW_HOME}/entrypoint.sh



RUN  chmod 777 -R ${AIRFLOW_HOME}/entrypoint.sh

RUN    ln -sf  /usr/bin/python  /usr/bin/python3

EXPOSE  8080 5555 8793

ENV TZ=Asia/Shanghai
RUN ln -snf /usr/share/zoneinfo/$TZ /etc/localtime && echo $TZ > /etc/timezone


USER        airflow
WORKDIR     ${AIRFLOW_HOME}
ENTRYPOINT  ["./entrypoint.sh"]

script/entrypoint.sh

#!/usr/bin/env bash


ls  /usr/local/python37/lib/python3.7/site-packages/etl/dags

export PATH=/usr/local/python37/bin:$PATH

python3 -V

whereis  python

export PATH=$PATH:$AIRFLOW_HOME

echo $PATH

echo 'export PYSPARK_SUBMIT_ARGS="--master k8s://https://${KUBERNETES_SERVICE_HOST}:${KUBERNETES_SERVICE_PORT}   --deploy-mode client   pyspark-shell" '  >> ~/.bashrc

source ~/.bashrc

echo "hail env:"
echo $PYSPARK_SUBMIT_ARGS

CMD="airflow"
TRY_LOOP="${TRY_LOOP:-10}"
POSTGRES_HOST="${POSTGRES_HOST:-postgres}"
POSTGRES_PORT=5432
POSTGRES_CREDS="${POSTGRES_CREDS:-airflow:airflow}"
RABBITMQ_HOST="${RABBITMQ_HOST:-rabbitmq}"
RABBITMQ_CREDS="${RABBITMQ_CREDS:-airflow:airflow}"
RABBITMQ_MANAGEMENT_PORT=15672
FLOWER_URL_PREFIX="${FLOWER_URL_PREFIX:-}"
AIRFLOW_URL_PREFIX="${AIRFLOW_URL_PREFIX:-}"
LOAD_DAGS_EXAMPLES="${LOAD_DAGS_EXAMPLES:-true}"
GIT_SYNC_REPO="${GIT_SYNC_REPO:-}"

if [ -z $FERNET_KEY ]; then
    FERNET_KEY=$(python3 -c "from cryptography.fernet import Fernet; FERNET_KEY = Fernet.generate_key().decode(); print(FERNET_KEY)")
fi

echo "Postgres host: $POSTGRES_HOST"
echo "RabbitMQ host: $RABBITMQ_HOST"
echo "Load DAG examples: $LOAD_DAGS_EXAMPLES"
echo "Git sync repository: $GIT_SYNC_REPO"
echo

# Generate Fernet key
sed -i "s/{{ FERNET_KEY }}/${FERNET_KEY}/" $AIRFLOW_HOME/airflow.cfg
sed -i "s/{{ POSTGRES_HOST }}/${POSTGRES_HOST}/" $AIRFLOW_HOME/airflow.cfg
sed -i "s/{{ POSTGRES_CREDS }}/${POSTGRES_CREDS}/" $AIRFLOW_HOME/airflow.cfg
sed -i "s/{{ RABBITMQ_HOST }}/${RABBITMQ_HOST}/" $AIRFLOW_HOME/airflow.cfg
sed -i "s/{{ RABBITMQ_CREDS }}/${RABBITMQ_CREDS}/" $AIRFLOW_HOME/airflow.cfg
sed -i "s/{{ LOAD_DAGS_EXAMPLES }}/${LOAD_DAGS_EXAMPLES}/" $AIRFLOW_HOME/airflow.cfg
sed -i "s#{{ FLOWER_URL_PREFIX }}#${FLOWER_URL_PREFIX}#" $AIRFLOW_HOME/airflow.cfg
sed -i "s#{{ AIRFLOW_URL_PREFIX }}#${AIRFLOW_URL_PREFIX}#" $AIRFLOW_HOME/airflow.cfg

# wait for rabbitmq
if [ "$1" = "webserver" ] || [ "$1" = "worker" ] || [ "$1" = "scheduler" ] || [ "$1" = "flower" ] ; then
  j=0
  while ! curl -sI -u $RABBITMQ_CREDS http://$RABBITMQ_HOST:$RABBITMQ_MANAGEMENT_PORT/api/whoami |grep '200 OK'; do
    j=`expr $j + 1`
    if [ $j -ge $TRY_LOOP ]; then
      echo "$(date) - $RABBITMQ_HOST still not reachable, giving up"
      exit 1
    fi
    echo "$(date) - waiting for RabbitMQ... $j/$TRY_LOOP"
    sleep 5
  done
fi

# wait for postgres
if [ "$1" = "webserver" ] || [ "$1" = "worker" ] || [ "$1" = "scheduler" ] ; then
  i=0
  while ! nc $POSTGRES_HOST $POSTGRES_PORT >/dev/null 2>&1 < /dev/null; do
    i=`expr $i + 1`
    if [ $i -ge $TRY_LOOP ]; then
      echo "$(date) - ${POSTGRES_HOST}:${POSTGRES_PORT} still not reachable, giving up"
      exit 1
    fi
    echo "$(date) - waiting for ${POSTGRES_HOST}:${POSTGRES_PORT}... $i/$TRY_LOOP"
    sleep 5
  done
  # TODO: move to a Helm hook
  #   https://github.com/kubernetes/helm/blob/master/docs/charts_hooks.md
  if [ "$1" = "webserver" ]; then
    echo "Initialize database..."
    $CMD initdb
  fi
fi

if [ ! -z $GIT_SYNC_REPO ]; then
    mkdir -p $AIRFLOW_HOME/dags
    # remove possible embedded dags to avoid conflicts
    rm -rf $AIRFLOW_HOME/dags/*
    echo "Executing background task git-sync on repo $GIT_SYNC_REPO"
    $AIRFLOW_HOME/git-sync --dest $AIRFLOW_HOME/dags --force &
fi

$CMD "$@"

config/airflow.cfg

[core]
# The home folder for airflow, default is ~/airflow
airflow_home = /usr/local/airflow

# The folder where your airflow pipelines live, most likely a
# subfolder in a code repository
# dags_folder = /usr/local/airflow/dags
dags_folder =  /usr/local/python37/lib/python3.7/site-packages/etl/dags


# The folder where airflow should store its log files. This location
base_log_folder = /usr/local/airflow/logs

# Airflow can store logs remotely in AWS S3 or Google Cloud Storage. Users
# must supply a remote location URL (starting with either 's3://...' or
# 'gs://...') and an Airflow connection id that provides access to the storage
# location.
remote_logging = True
remote_base_log_folder = s3://etl/tmp/spark-log/
remote_log_conn_id = 
# Use server-side encryption for logs stored in S3
encrypt_s3_logs = False
# deprecated option for remote log storage, use remote_base_log_folder instead!
# s3_log_folder =

# The executor class that airflow should use. Choices include
# SequentialExecutor, LocalExecutor, CeleryExecutor
executor = CeleryExecutor

# The SqlAlchemy connection string to the metadata database.
# SqlAlchemy supports many different database engine, more information
# their website
sql_alchemy_conn = postgresql+psycopg2://{{ POSTGRES_CREDS }}@{{ POSTGRES_HOST }}/airflow

# The SqlAlchemy pool size is the maximum number of database connections
# in the pool.
sql_alchemy_pool_size = 5

# The SqlAlchemy pool recycle is the number of seconds a connection
# can be idle in the pool before it is invalidated. This config does
# not apply to sqlite.
sql_alchemy_pool_recycle = 3600

# The amount of parallelism as a setting to the executor. This defines
# the max number of task instances that should run simultaneously
# on this airflow installation
parallelism = 32

# The number of task instances allowed to run concurrently by the scheduler
dag_concurrency = 16

# Are DAGs paused by default at creation
dags_are_paused_at_creation = False

# The maximum number of active DAG runs per DAG
max_active_runs_per_dag = 16

# Whether to load the examples that ship with Airflow. It's good to
# get started, but you probably want to set this to False in a production
# environment
load_examples = false

# Where your Airflow plugins are stored
plugins_folder = /usr/local/airflow/plugins

# Secret key to save connection passwords in the db
fernet_key = {{ FERNET_KEY }}

# Whether to disable pickling dags
donot_pickle = False

# How long before timing out a python file import while filling the DagBag
dagbag_import_timeout = 30

[webserver]
# The base url of your website as airflow cannot guess what domain or
# cname you are using. This is use in automated emails that
# airflow sends to point links to the right web server
base_url = http://localhost:8080

# The ip specified when starting the web server
web_server_host = 0.0.0.0

# Root URL to use for the web server
web_server_url_prefix = {{ AIRFLOW_URL_PREFIX }}

# The port on which to run the web server
web_server_port = 8080

# The time the gunicorn webserver waits before timing out on a worker
web_server_worker_timeout = 120

# Secret key used to run your flask app
secret_key = temporary_key

# Number of workers to run the Gunicorn web server
workers = 1

# The worker class gunicorn should use. Choices include
# sync (default), eventlet, gevent
worker_class = sync

# Expose the configuration file in the web server
expose_config = true

# Set to true to turn on authentication : http://pythonhosted.org/airflow/installation.html#web-authentication
authenticate = False

# Filter the list of dags by owner name (requires authentication to be enabled)
filter_by_owner = False

[email]
email_backend = airflow.utils.send_email_smtp

[smtp]
# If you want airflow to send emails on retries, failure, and you want to
# the airflow.utils.send_email function, you have to configure an smtp
# server here
smtp_host = localhost
smtp_starttls = True
smtp_ssl = False
smtp_user = airflow
smtp_port = 25
smtp_password = airflow
smtp_mail_from = [email protected]

[celery]
# This section only applies if you are using the CeleryExecutor in
# [core] section above

# The app name that will be used by celery
celery_app_name = airflow.executors.celery_executor

# The concurrency that will be used when starting workers with the
# "airflow worker" command. This defines the number of task instances that
# a worker will take, so size up your workers based on the resources on
# your worker box and the nature of your tasks
celeryd_concurrency = 16

# When you start an airflow worker, airflow starts a tiny web server
# subprocess to serve the workers local log files to the airflow main
# web server, who then builds pages and sends them to users. This defines
# the port on which the logs are served. It needs to be unused, and open
# visible from the main web server to connect into the workers.
# worker_log_server_port = 8793

# The Celery broker URL. Celery supports RabbitMQ, Redis and experimentally
# a sqlalchemy database. Refer to the Celery documentation for more
# information.
broker_url = amqp://{{ RABBITMQ_CREDS }}@{{ RABBITMQ_HOST }}:5672/airflow

# Another key Celery setting
celery_result_backend = amqp://{{ RABBITMQ_CREDS }}@{{ RABBITMQ_HOST }}:5672/airflow

# Celery Flower is a sweet UI for Celery. Airflow has a shortcut to start
# it `airflow flower`. This defines the port that Celery Flower runs on
flower_port = 5555

# The root URL for Flower
flower_url_prefix = {{ FLOWER_URL_PREFIX }}

# Default queue that tasks get assigned to and that worker listen on.
default_queue = default

[scheduler]
# Task instances listen for external kill signal (when you clear tasks
# from the CLI or the UI), this defines the frequency at which they should
# listen (in seconds).
job_heartbeat_sec = 5

# The scheduler constantly tries to trigger new tasks (look at the
# scheduler section in the docs for more information). This defines
# how often the scheduler should run (in seconds).
scheduler_heartbeat_sec = 5

# Statsd (https://github.com/etsy/statsd) integration settings
# statsd_on =  False
# statsd_host =  localhost
# statsd_port =  8125
# statsd_prefix = airflow

# The scheduler can run multiple threads in parallel to schedule dags.
# This defines how many threads will run. However airflow will never
# use more threads than the amount of cpu cores available.
max_threads = 2

[mesos]
# Mesos master address which MesosExecutor will connect to.
master = localhost:5050

# The framework name which Airflow scheduler will register itself as on mesos
framework_name = Airflow

# Number of cpu cores required for running one task instance using
# 'airflow run <dag_id> <task_id> <execution_date> --local -p <pickle_id>'
# command on a mesos slave
task_cpu = 1

# Memory in MB required for running one task instance using
# 'airflow run <dag_id> <task_id> <execution_date> --local -p <pickle_id>'
# command on a mesos slave
task_memory = 256

# Enable framework checkpointing for mesos
# See http://mesos.apache.org/documentation/latest/slave-recovery/
checkpoint = False

# Failover timeout in milliseconds.
# When checkpointing is enabled and this option is set, Mesos waits
# until the configured timeout for
# the MesosExecutor framework to re-register after a failover. Mesos
# shuts down running tasks if the
# MesosExecutor framework fails to re-register within this timeframe.
# failover_timeout = 604800

# Enable framework authentication for mesos
# See http://mesos.apache.org/documentation/latest/configuration/
authenticate = False

# Mesos credentials, if authentication is enabled
# default_principal = admin
# default_secret = admin


default_timezone = Asia/Shanghai

yaml

apiVersion: v1
kind: Service
metadata:
  name: airflow-worker
  namespace: airflow
spec:
  clusterIP: None
  selector:
    app: airflow-worker
  ports:
    - name: airflow-worker
      protocol: TCP
      port: 8888
      targetPort: 8888
---
apiVersion: v1
kind: Service
metadata:
  name: airflow-ps
  namespace: airflow
spec:
  type: ClusterIP
  selector:
    app: airflow
    tier: db
  ports:
    - name: airflow-ps
      protocol: TCP
      port: 5432
      targetPort: airflow-ps
---
apiVersion: v1
kind: Service
metadata:
  name: airflow-rabbitmq
  namespace: airflow
spec:
  type: ClusterIP
  selector:
    app: airflow
    tier: airflow-rabbitmq
  ports:
    - name: node
      protocol: TCP
      port: 5672
      targetPort: node
    - name: management
      protocol: TCP
      port: 15672
      targetPort: management
---
apiVersion: v1
kind: Service
metadata:
  name: airflow-web
  namespace: airflow
spec:
  selector:
    app: airflow
    tier: airflow-web
  ports:
    - name: airflow-web
      protocol: TCP
      port: 8080
      targetPort: 8080
---
apiVersion: v1
kind: Service
metadata:
  name: airflow-flower
  namespace: airflow
spec:
  type: NodePort
  selector:
    app: airflow-airflow
    tier: airflow-flower
  ports:
    - name: airflow-flower
      protocol: TCP
      port: 5555
      targetPort: airflow-flower
      nodePort: 32081
---
apiVersion: extensions/v1beta1
kind: Deployment
metadata:
  name: airflow-ps
  namespace: airflow
spec:
  replicas: 1
  template:
    metadata:
      labels:
        app: airflow
        tier: db
    spec:
      containers:
      - name: airflow-ps
        image: postgres
        ports:
        - name: airflow-ps
          containerPort: 5432
        env:
         - name: POSTGRES_USER
           value: "airflow"
         - name: POSTGRES_PASSWORD
           value: "airflow"
         - name: POSTGRES_DB
           value: "airflow"
---
apiVersion: extensions/v1beta1
kind: Deployment
metadata:
  name: airflow-rabbitmq
  namespace: airflow
spec:
  replicas: 1
  template:
    metadata:
      labels:
        app: airflow
        tier: airflow-rabbitmq
    spec:
      restartPolicy: Always
      containers:
      - name: airflow-rabbitmq
        image: rabbitmq:3-management
        ports:
        - name: management
          containerPort: 15672
        - name: node
          containerPort: 5672
        env:
        - name: RABBITMQ_DEFAULT_USER
          value: airflow
        - name: RABBITMQ_DEFAULT_PASS
          value: airflow
        - name: RABBITMQ_DEFAULT_VHOST
          value: airflow
---
apiVersion: extensions/v1beta1
kind: Deployment
metadata:
  name: airflow-web
  namespace: airflow
spec:
  replicas: 1
  template:
    metadata:
      labels:
        app: airflow
        tier: airflow-web
    spec:
      restartPolicy: Always
      containers:
      - name: airflow-web
        image: 123.dkr.ecr.cn-northwest-1.amazonaws.com.cn/spark/etl-airflow:1.10.9-prod-_TAG_
        imagePullPolicy: Always
        env:
        - name: AIRFLOW_HOME
          value: "/usr/local/airflow"
        ports:
        - name: airflow-web
          containerPort: 8080
        args: ["webserver"]
        livenessProbe:
          httpGet:
            path: /
            port: 8080
          initialDelaySeconds: 240
          periodSeconds: 60
---
apiVersion: extensions/v1beta1
kind: Deployment
metadata:
  name: airflow-flower
  namespace: airflow
spec:
  replicas: 1
  template:
    metadata:
      labels:
        app: airflow
        tier: airflow-flower
    spec:
      restartPolicy: Always
      containers:
      - name: airflow-flower
        image: 123.dkr.ecr.cn-northwest-1.amazonaws.com.cn/spark/etl-airflow:1.10.9-prod-_TAG_
        imagePullPolicy: Always
        env:
        - name: AIRFLOW_HOME
          value: "/usr/local/airflow"
        - name: FLOWER_PORT
          value: "5555"
        ports:
        - name: airflow-flower
          containerPort: 5555
        args: ["flower"]
---
apiVersion: extensions/v1beta1
kind: Deployment
metadata:
  name: airflow-scheduler
  namespace: airflow
spec:
  replicas: 1
  template:
    metadata:
      labels:
        app: airflow
        tier: airflow-scheduler
    spec:
      restartPolicy: Always
      containers:
      - name: airflow-scheduler
        image: 123.dkr.ecr.cn-northwest-1.amazonaws.com.cn/spark/etl-airflow:1.10.9-prod-_TAG_
        imagePullPolicy: Always
        env:
        - name: AIRFLOW_HOME
          value: "/usr/local/airflow"
        args: ["scheduler", "-n", "5"]
---
apiVersion: extensions/v1beta1
kind: Deployment
metadata:
  name: airflow-worker
  namespace: airflow
spec:
  replicas: 1
  template:
    metadata:
      labels:
        app: airflow
        tier: airflow-worker
    spec:
      hostname: airflow-worker
      restartPolicy: Always
      containers:
      - name: airflow-worker
        image: 123.dkr.ecr.cn-northwest-1.amazonaws.com.cn/spark/etl-airflow:1.10.9-prod-_TAG_
        imagePullPolicy: Always
        lifecycle:
          postStart:
            exec:
              command:
              - sh
              - -c
              - |
                echo "spark.master k8s://https://${KUBERNETES_SERVICE_HOST}:${KUBERNETES_SERVICE_PORT}" >> $SPARK_HOME/conf/spark-defaults.conf ; echo "spark.driver.pod.name ${HOSTNAME}" >> $SPARK_HOME/conf/spark-defaults.conf ; echo "spark.driver.host `hostname -i`" >> $SPARK_HOME/conf/spark-defaults.conf ;
        env:
        - name: AIRFLOW_HOME
          value: "/usr/local/airflow"
        args: ["worker"]

---
apiVersion: extensions/v1beta1
kind: Ingress
metadata:
  name: airflow-web
  namespace: airflow
spec:
  rules:
  -
    host: airflow.prod.com
    http:
      paths:
      - path: /
        backend:
          serviceName: airflow-web
          servicePort: 8080

参考资料

kube-airflow github

Airflow on Kubernetes (Part 1): A Different Kind of Operator

Airflow在Kubernetes上的操作器

airflow官网kubernetes


标签:web,airflow,name,hadoop,----,usr,&&,local
From: https://blog.51cto.com/u_16218512/7013770

相关文章

  • k8s---使用ingress配置域名转发时的traefik路径规则详解
    ingress中traefik的使用方式如下:apiVersion:extensions/v1beta1kind:Ingressmetadata:name:spark-client-testnamespace:defaultannotations:kubernetes.io/ingress.class:traefiktraefik.frontend.rule.type:PathPrefixspec:rules:-host:......
  • hadoop组件---spark实战-----airflow----调度工具airflow的介绍和使用示例
    Airflow是什么Airflow是一个可编程,调度和监控的工作流平台,基于有向无环图(DAG),airflow可以定义一组有依赖的任务,按照依赖依次执行。airflow提供了丰富的命令行工具用于系统管控,而其web管理界面同样也可以方便的管控调度任务,并且对任务运行状态进行实时监控,方便了系统的运维和管理,......
  • 数据挖掘(七) -----在python程序中使用hail
    我们在之前的文章中已经尝试安装了hail和简单的使用数据挖掘(五)-----基于Spark的可伸缩基因数据分析平台开源存储运算架构hail全面了解和安装但是我们发现这种hail的运行方式是需要进入到conda的hail的虚拟环境中才能运行的。我们业务一般来说都是在外层执行,还有其他的业务逻......
  • 数据挖掘(五) -----基于Spark的可伸缩基因数据分析平台开源存储运算架构hail全面了解
    hail简介hail是一个开源的、通用的、面向python数据类型的处理基因数据专用的分析库和方法解决方案。hail的存在是为了支持多维度的复杂的数据结构,比如全基因组关联数据研究(GWAS).GWASTutorialhail的底层是通过python,scala,java和apachespark来实现的。hail官网gitlab官方文......
  • 云监控---grafana使用mysql数据源创建dashboard--全面解析
    grafana的dashboard简介经常被用作基础设施的时间序列数据和应用程序分析的可视化。Grafana主要特性:灵活丰富的图形化选项;可以混合多种风格;支持多个数据源;拥有丰富的插件扩展;支持用户权限管理。Grafana有着非常漂亮的图表和布局展示,功能齐全的度量仪表盘dashboard和图形编辑......
  • 遇到问题--Kubernetes--argo--output does not exist
    情况在使用argo进行流程串联时使用了output进行文件输出。在生产环境的argo中运行,即时需要output的文件在pod中不存在,也能正常运行进入后续步骤。但是内测环境的argo同样的情况下会报错。报错如下:path/mendel/need_update_barcode.txtdoesnotexist(or/mendel/need_update_......
  • Android平台GB28181设备接入端如何实现多视频通道接入?
    技术背景我们在设计Android平台GB28181设备接入模块的时候,有这样的场景诉求,一个设备可能需要多个通道,常见的场景,比如车载终端,一台设备,可能需要接入多个摄像头,那么这台车载终端设备可以作为主设备,然后,主设备下,配置多个通道,听起来是不是有点儿类似于DVR或NVR?技术实现这里,我们说下,我们......
  • 操作系统概述
    2.1.1操作系统的概念操作系统 是一组控制盒管理计算机系统的硬件和软件资源、控制程序执行、改善人机界面、合理地组织计算机工作流程并未用户使用计算机提供良好运行环境的一种系统软件。 目的:提高计算机系统的效率,增强系统的处理能力,提高系统资源的利用率,方便用户使用计算机。2......
  • shell命令概述 Shell作用:命令解释器 介于操作系统内核与用户之间,负责解释命令行 获得
    shell命令概述Shell作用:命令解释器介于操作系统内核与用户之间,负责解释命令行获得命令帮助内部命令help命令的“--help”选项使用man命令阅读手册页命令行编辑的几个辅助操作Tab键:自动补齐反斜杠“\”:强制换行快捷键Ctrl+U:清空至行首快捷键Ctrl+K:清空至行尾快捷键Ctr......
  • 三层交换机实验
    一、三层交换机同时具有交换机与路由器功能的强大网络设备三层交换机=路由器(三层)+交换机(二层)按图搭建拓扑实验,最上面是s5700三层交换机点击应用,不然通信失败配置命令<Huawei>system-view  //进入系统视图<Huawei>sysnamesw1//修改名字[SW1]vlanbatch23  //创建vlan......