首页 > 其他分享 >tensorflow distributed training in tfx pipeline run by kubeflow

tensorflow distributed training in tfx pipeline run by kubeflow

时间:2024-02-15 16:45:03浏览次数:32  
标签:training run name worker distributed server tf tensorflow CPU

1. deploy worker, parameter server on kubernetes cluster

1.1 build container image of worker, parameter server

$ git clone https://github.com/tensorflow/ecosystem.git

$ cd ecosystem/distribution_strategy

$ sudo nerdctl build --no-cache -t tf_std_server:v1 -f Dockerfile.tf_std_server . -namespace k8s.io

# check the built container image
$ sudo nerdctl image list --namespace k8s.io | grep tf_std_server

# export the built container image
$ sudo nerdctl save -o tf_std_server.tar.gz tf_std_server --namespace k8s.io

# scp the built container image to other nodes of the kubernetes cluster, where worker or parameter pod may be assigned.
$ scp tf_std_server.tar.gz maye@destinaton-node-ip:~

# import the built image on other nodes,
# in this example, the other node hasn't 
# installed nerdctl, so use the built-in
# cli of containerd -- ctr.
"""
the default tag is  docker.io/library/tf_std_server:v1
"""
$ sudo ctr -n k8s.io image import tf_std_server.tar.gz 

Attention:
The . in sudo nerdctl build --no-cache -t tf_std_server:v1 -f Dockerfile.tf_std_server . -namespace k8s.io means specifying the current directory as the context, namely nerdctl build will find files it needs in this directory, if no directory specified, raise error: "FATA[0004] context needs to be specified " .
If not specifying namespace, the built image will be in namespace "default", and crictl (container runtime interface cli of kubernetes) can only see images in namespace "k8s.io" .

Note:
This directory contains the following files:
template.yaml.jinja: a jinja template to be rendered into a Kubernetes yaml file
Dockerfile.keras_model_to_estimator: a docker file to build the model image
Dockerfile.tf_std_server: a docker file to build the standard TensorFlow server image
keras_model_to_estimator.py: model code to run multi-worker training
tf_std_server.py: a standard TensorFlow binary
keras_model_to_estimator_client.py: model code to run in standalone client mode [1]

1.2 generate services worker, parameter server definition yaml file

1.2.1 Modify the header of jinja template: set image to tf_std_server:v1, script to /tf_std_server.py and cmdline_args to empty to run this standard TensorFlow server on each Kubernetes pod.

# file template.yaml.jinja
{%- set name = "dist-strat-example" -%}
{%- set image = "tf_std_server:v1" -%}
{%- set worker_replicas = 2 -%}
{%- set ps_replicas = 1 -%}
{%- set num_gpus_per_worker = 2 -%}
{%- set has_eval = False -%}
{%- set has_tensorboard = False -%}
{%- set train_dir = "gs://<your_gcs_bucket>" -%}
{%- set script = "/tf_std_server.py" -%}
{%- set cmdline_args = "" -%}
{%- set credential_secret_json = "key.json" -%}
{%- set credential_secret_key = "credential" -%}
{%- set port = 5000 -%}


{%- set replicas = {"worker": worker_replicas,
                    "ps": ps_replicas,
                    "evaluator": has_eval|int,
                    "tensorboard": has_tensorboard|int} -%}
{% set cmdline_arg_list = cmdline_args.split(" ") %}

{%- macro worker_hosts() -%}
  {%- for i in range(worker_replicas) -%}
    {%- if not loop.first -%},{%- endif -%}
    \"{{ name }}-worker-{{ i }}:{{ port }}\"
  {%- endfor -%}
{%- endmacro -%}

{%- macro ps_hosts() -%}
  {%- for i in range(ps_replicas) -%}
    {%- if not loop.first -%},{%- endif -%}
    \"{{ name }}-ps-{{ i }}:{{ port }}\"
  {%- endfor -%}
{%- endmacro -%}

{%- macro tf_config(task_type, task_id) -%}
{
  \"cluster\": {
    \"worker\": [{{ worker_hosts() }}]
    {%- if ps_replicas > 0 -%}, \"ps\": [{{ ps_hosts() }}]{%- endif -%}
    {%- if has_eval -%},
    \"evaluator\": [\"{{ name }}-evaluator-0:{{ port }}\"]{%- endif -%}
  },
  \"task\": {
    \"type\": \"{{ task_type }}\",
    \"index\": \"{{ task_id }}\"
  }
}
{%- endmacro -%}

{% for job in ["worker", "ps", "evaluator", "tensorboard"] -%}
{%- for i in range(replicas[job]) -%}
kind: Service
apiVersion: v1
metadata:
  name: {{ name }}-{{ job }}-{{ i }}
spec:
  type: LoadBalancer
  selector:
    name: {{ name }}
    job: {{ job }}
    task: "{{ i }}"
  ports:
  - port: {{ port }}
---
kind: ReplicationController
apiVersion: v1
metadata:
  name: {{ name }}-{{ job }}-{{ i }}
spec:
  replicas: 1
  template:
    metadata:
      labels:
        name: {{ name }}
        job: {{ job }}
        task: "{{ i }}"
    spec:
      containers:
{% if job == "tensorboard" %}
      - name: tensorflow
        image: tensorflow/tensorflow
{% else %}
      - name: tensorflow
        image: {{ image }}
        resources:
          limits:
            nvidia.com/gpu: {{ num_gpus_per_worker }}
{% endif %}
        env:
{% if job != "tensorboard" %}
        - name: TF_CONFIG
          value: "{{ tf_config(job, i) }}"
{% endif %}
        - name: GOOGLE_APPLICATION_CREDENTIALS
          value: "/var/secrets/google/{{ credential_secret_json }}"
        ports:
        - containerPort: {{ port }}
{% if job == "tensorboard" %}
        command:
        - "tensorboard"
        args:
        - "--logdir={{ train_dir }}"
        - "--port={{ port }}"
{% else %}
        command:
        - "/usr/bin/python"
        - "{{ script }}"
        {%- for cmdline_arg in cmdline_arg_list %}
        - "{{ cmdline_arg }}"
        {%- endfor -%}
{% endif %}
        volumeMounts:
        - name: credential
          mountPath: /var/secrets/google
      volumes:
      - name: credential
        secret:
          secretName: {{ credential_secret_key }}
---
{% endfor %}
{%- endfor -%}

1.2.2 modify tf_std_server.py, to start a tensorflow standard server (namely a worker or parameter server).

# file tf_std_server.py
# Copyright 2018 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Run a standard tensorflow server."""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import tensorflow as tf
#import sys
import os

# This is for tensorflow v1, this example use 
# tensorflow v2.
#def main(unused_argv):
  # Contrib ops are lazily loaded. So we touch one contrib module to load them
  # immediately.
  #to_import_contrib_ops = tf.contrib.resampler

  # Load you custom ops here before starting the standard TensorFlow server.

  # Start and join the standard TensorFlow server.
  #tf.contrib.distribute.run_standard_tensorflow_server().join()
  
# for tensorflow v2  
def main():
    cluster_resolver = tf.distribute.cluster_resolver.TFConfigClusterResolver()
    
    if cluster_resolver.task_type in ("worker", "ps"):
        # start s tensorflow server and wait.
        os.environ["GRPC_FAIL_FAST"] = "use_caller"
        
        server = tf.distribute.Server(
            cluster_resolver.cluster_spec(),
            job_name=cluster_resolver.task_type,
            task_index=cluster_resolver.task_id,
            protocol="grpc",
            start=True)
        
        server.join()
        
        
    elif cluster_resolver.task_type == "evaluator":
        # run sidecat evaluation
        pass
    else:
        # run the coordinator  
        pass
  


if __name__ == "__main__":
  #tf.app.run()
  #main(sys.argv)
  main()

1.2.3 generate services worker, parameter server definition yaml file

$ ../render_template.py template.yaml.jinja > worker_template.yaml

1.2.4 modify worker_template.yaml

# file worker_template.yaml
kind: Service
apiVersion: v1
metadata:
  name: dist-strat-example-worker-0
  
# worker needs to mount pipeline_root directory,
# to access artifacts of tfx pipeline, 
# pipeline_root directory is a persistentVolumeClaim
# in this example in namespace kubeflow, resource 
# which wants to use the persistentVolumeClaim needs 
# to be in the same namespace.
  namespace: kubeflow
  
spec:

  type: LoadBalancer

  selector:
    app: dist-strat-example-worker-0
       
  ports:
  - port: 5000
---
apiVersion: apps/v1
kind: Deployment
metadata:
# labels need to be at start of metadata,
# or will raise error.
  labels:
    app: dist-strat-example-worker-0

  name: dist-strat-example-worker-0
  
  namespace: kubeflow

spec:
  replicas: 1
  
  selector:
    matchLabels:
      app: dist-strat-example-worker-0

      
  template:
    metadata:
      labels:
        app: dist-strat-example-worker-0
    
    spec:
# in this example, cpu of another node does not support
# AVX, which is needed by tensorflow v2, so 
# use nodeAffinity to specify node this pod will be
# assigned to.
      affinity:
        nodeAffinity:
          requiredDuringSchedulingIgnoredDuringExecution:
            nodeSelectorTerms:
            - matchExpressions:
              - key: kubernetes.io/hostname
                operator: In
                values:
                - maye-inspiron-5547
      
      containers:

      - name: tensorflow
        image: tf_std_server:v1
        resources:
          limits:
# no gpu in this example, so comment out this line,
# set it according to the actual case.
            #nvidia.com/gpu: 2

        env:
# environmental varialbe TF_CONFIG, which contains
# information of the distributed training cluster.
# task is the role of this pod, tf_std_server.py
# will start tf.distribute.Server() 
# based on TF_CONFIG.
        - name: TF_CONFIG
          value: "{
  \"cluster\": {
    \"worker\": [\"dist-strat-example-worker-0:5000\",\"dist-strat-example-worker-1:5000\"], 
    \"ps\": [\"dist-strat-example-ps-0:5000\"]},
  \"task\": {
    \"type\": \"worker\",
    \"index\": \"0\"
  }
}"

        #- name: GOOGLE_APPLICATION_CREDENTIALS
        #  value: "/var/secrets/google/key.json"
        ports:
        - containerPort: 5000

        command:
        - "/usr/bin/python"
        - "/tf_std_server.py"
        - ""
        
        
        volumeMounts:
        - mountPath: /tfx/tfx_pv
          name: tfx-pv  
        
        #- name: credential
        #  mountPath: /var/secrets/google
        
        
      volumes:
      - name: tfx-pv
        persistentVolumeClaim:
          claimName: tfx-pv-claim


      #- name: credential
      #  secret:
      #    secretName: credential
---
kind: Service
apiVersion: v1
metadata:
  name: dist-strat-example-worker-1
  
  namespace: kubeflow
  
spec:
  type: LoadBalancer
  
  selector:
    app: dist-strat-example-worker-1
    
  ports:
  - port: 5000
---
apiVersion: apps/v1
kind: Deployment
metadata:
  labels:
    app: dist-strat-example-worker-1

  name: dist-strat-example-worker-1
  
  namespace: kubeflow

spec:

  replicas: 1
  
  selector:
    matchLabels:
      app: dist-strat-example-worker-1  
  
  
  template:
  
    metadata:
      labels:
        app: dist-strat-example-worker-1  
  
    spec:
         
      affinity:
        nodeAffinity:
          requiredDuringSchedulingIgnoredDuringExecution:
            nodeSelectorTerms:
            - matchExpressions:
              - key: kubernetes.io/hostname
                operator: In
                values:
                - maye-inspiron-5547   
                      
      containers:

      - name: tensorflow
        image: tf_std_server:v1
        resources:
          limits:
            #nvidia.com/gpu: 2

        env:

        - name: TF_CONFIG
          value: "{
  \"cluster\": {
    \"worker\": [\"dist-strat-example-worker-0:5000\",\"dist-strat-example-worker-1:5000\"], 
    \"ps\": [\"dist-strat-example-ps-0:5000\"]},
  \"task\": {
    \"type\": \"worker\",
    \"index\": \"1\"
  }
}"

        #- name: GOOGLE_APPLICATION_CREDENTIALS
        #  value: "/var/secrets/google/key.json"
        ports:
        - containerPort: 5000

        command:
        - "/usr/bin/python"
        - "/tf_std_server.py"
        - ""

        volumeMounts:
        - mountPath: /tfx/tfx_pv
          name: tfx-pv          
        
        #volumeMounts:
        #- name: credential
        #  mountPath: /var/secrets/google
 

      volumes:
      - name: tfx-pv
        persistentVolumeClaim:
          claimName: tfx-pv-claim        
        
      #volumes:
      #- name: credential
      #  secret:
      #    secretName: credential
---

kind: Service
apiVersion: v1
metadata:
  name: dist-strat-example-ps-0
  
  namespace: kubeflow
  
spec:
  type: LoadBalancer
  
  selector:
    app: dist-strat-example-ps-0  
  
  ports:
  - port: 5000
---

apiVersion: apps/v1
kind: Deployment
metadata:
  labels:
    app: dist-strat-example-ps-0

  name: dist-strat-example-ps-0
  
  namespace: kubeflow

spec:
        
  replicas: 1
  
  selector:
    matchLabels:
      app: dist-strat-example-ps-0  
  
  
  template:
    metadata:
      labels:
        app: dist-strat-example-ps-0 
  
  
    spec:

      affinity:
        nodeAffinity:
          requiredDuringSchedulingIgnoredDuringExecution:
            nodeSelectorTerms:
            - matchExpressions:
              - key: kubernetes.io/hostname
                operator: In
                values:
                - maye-inspiron-5547   

    
      containers:

      - name: tensorflow
        image: tf_std_server:v1
        resources:
          limits:
            #nvidia.com/gpu: 2

        env:

        - name: TF_CONFIG
          value: "{
  \"cluster\": {
    \"worker\": [\"dist-strat-example-worker-0:5000\",\"dist-strat-example-worker-1:5000\"],
    \"ps\": [\"dist-strat-example-ps-0:5000\"]},
  \"task\": {
    \"type\": \"ps\",
    \"index\": \"0\"
  }
}"

        #- name: GOOGLE_APPLICATION_CREDENTIALS
        #  value: "/var/secrets/google/key.json"
        ports:
        - containerPort: 5000

        command:
        - "/usr/bin/python"
        - "/tf_std_server.py"
        - ""
        
        volumeMounts:
        - mountPath: /tfx/tfx_pv
          name: tfx-pv         
        
        #volumeMounts:
        #- name: credential
        #  mountPath: /var/secrets/google
        
      volumes:
      - name: tfx-pv
        persistentVolumeClaim:
          claimName: tfx-pv-claim        
        
      #volumes:
      #- name: credential
      #  secret:
      #    secretName: credential
---

1.3 deploy services worker, parameter server

$ kubectl apply -f worker_template.yaml

2. use tf.distribute.ParameterServerStrategy() in run_fn() in module file of tfx component trainer.

def run_fn(fn_args: tfx.components.FnArgs):
    
    cluster_dict = {}
    #cluster_dict["worker"] = ["dist-strat-example-worker-0:5000", "dist-strat-example-worker-1:5000"]
    #cluster_dict["ps"] = ["dist-strat-example-ps-0:5000"]
### need to use ClusterIP of services worker, ps, 
### not service name, or can not connect to them.    
    cluster_dict["worker"] = ["10.105.206.29:5000", "10.102.137.138:5000"]
    cluster_dict["ps"] = ["10.105.27.97:5000"]
    
    cluster_spec = tf.train.ClusterSpec(cluster_dict)
    
    cluster_resolver = tf.distribute.cluster_resolver.SimpleClusterResolver(
      cluster_spec, rpc_layer="grpc")

### distribution strategy needs to be instantiated at start 
### of run_fn(), before any tensorflow operation.    
    strategy = tf.distribute.ParameterServerStrategy(
    cluster_resolver,)
        
    tf_transform_output = tft.TFTransformOutput(fn_args.transform_output)
    
    #tf.print(f"fn_args.train_files: {fn_args.train_files}")
    
    train_dataset = _input_fn(
        fn_args.train_files,
        fn_args.data_accessor,
        tf_transform_output,
        batch_size=_TRAIN_BATCH_SIZE,
    )
    
    
    resampled_train_dataset = _resample_train_dataset(train_dataset, 
                                                      batch_size=_TRAIN_BATCH_SIZE)
    
    #tf.print(f"resampled_train_dataset {resampled_train_dataset.cardinality()}")
    
    val_dataset = _input_fn(
        fn_args.eval_files,
        fn_args.data_accessor,
        tf_transform_output,
        batch_size=_EVAL_BATCH_SIZE,
    )

### val_dataset needs to be infinite, or will raise error:
### OutOfRange.    
    val_dataset = val_dataset.repeat()
    
    #tf.print(f"val_dataset cardinality: {val_dataset.cardinality()}")
    
### instantiate model, metric, optimizer,loss, compile model 
### in the scope of distribution strategy.
    with strategy.scope():
        model = _build_keras_model()      
        
    #tf.print(f"custom_config: {fn_args.custom_config}")
    
    #log_dir = "logs/fit/" + datetime.datetime.now().strftime("%Y%m%d-%H%M%S")
    #tensorboard_callback = tf.keras.callbacks.TensorBoard(log_dir=log_dir,)

### callbacks.BackupAndRestore() is for the case when 
### training cluster is not available temporally,  
### continue the training from the interrupted epoch.   
    backup_dir = os.path.join("/home/maye/maye_temp", "backup")
    
    callbacks = [
    tf.keras.callbacks.BackupAndRestore(backup_dir=backup_dir),
]
    
    
    trainer_train_history = model.fit(
        resampled_train_dataset,
        epochs=fn_args.custom_config['epochs'],
        steps_per_epoch=fn_args.train_steps,
        validation_data=val_dataset,

### since val_dataset is infinite, validation_steps needs
### to be specified.       
        validation_steps=3,
        
        callbacks=callbacks,
    )
    
    #tf.print(f"train_history: \n {train_history.history}")
    
    
    with open('trainer_train_history.json', 'w') as f:
        json.dump(trainer_train_history.history, f)
    
    #signatures = {
    #    'serving_default': _get_serve_tf_examples_fn(model, tf_transform_output),
    #}
    
    #model.save(fn_args.serving_model_dir, save_format='tf', signatures=signatures)

Check ClusterIp of service:

(base) maye@maye-Inspiron-5547:~/github_repository/tensorflow_ecosystem/distribution_strategy$ kubectl get service -n kubeflow
NAME                          TYPE           CLUSTER-IP       EXTERNAL-IP   PORT(S)          AGE
dist-strat-example-ps-0       LoadBalancer   10.96.200.160    <pending>     5000:32409/TCP   53m
dist-strat-example-worker-0   LoadBalancer   10.102.74.8      <pending>     5000:30550/TCP   53m
dist-strat-example-worker-1   LoadBalancer   10.100.198.218   <pending>     5000:31080/TCP   53m

3. run the tfx pipeline using kubeflow pipeline, refer to «Run a tfx pipeline using kubeflow pipeline» https://www.cnblogs.com/zhenxia-jiuyou/p/18003167

ok log of pod tfx-component-trainer:

Epoch 1/50
/usr/local/lib/python3.8/dist-packages/tensorflow/python/data/ops/dataset_ops.py:467: UserWarning: To make it possible to preserve tf.data options across serialization boundaries, their implementation has moved to be part of the TensorFlow graph. As a consequence, the options value is in general no longer known at graph construction time. Invoking this method in graph mode retains the legacy behavior of the original implementation, but note that the returned value might not reflect the actual value of the options.
  warnings.warn("To make it possible to preserve tf.data options across "
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Waiting for all global closures to be finished.
INFO:tensorflow:Waiting for all global closures to be finished.
INFO:tensorflow:Waiting for all global closures to be finished.
INFO:tensorflow:Waiting for all global closures to be finished.
21/21 - 31s - loss: 0.7040 - cross entropy: 0.7036 - tp: 882.0000 - fp: 820.0000 - tn: 547.0000 - fn: 439.0000 - precision: 0.5182 - recall: 0.6677 - auc: 0.5415 - prc: 0.5361 - val_loss: 0.6753 - val_cross entropy: 0.6749 - val_tp: 30.0000 - val_fp: 181.0000 - val_tn: 166.0000 - val_fn: 7.0000 - val_precision: 0.1422 - val_recall: 0.8108 - val_auc: 0.7620 - val_prc: 0.3268 - 31s/epoch - 1s/step
Epoch 2/50
INFO:tensorflow:Waiting for all global closures to be finished.
INFO:tensorflow:Waiting for all global closures to be finished.
INFO:tensorflow:Waiting for all global closures to be finished.
INFO:tensorflow:Waiting for all global closures to be finished.
21/21 - 14s - loss: 0.5821 - cross entropy: 0.5817 - tp: 1036.0000 - fp: 439.0000 - tn: 926.0000 - fn: 287.0000 - precision: 0.7024 - recall: 0.7831 - auc: 0.8096 - prc: 0.8139 - val_loss: 0.5677 - val_cross entropy: 0.5673 - val_tp: 25.0000 - val_fp: 82.0000 - val_tn: 271.0000 - val_fn: 6.0000 - val_precision: 0.2336 - val_recall: 0.8065 - val_auc: 0.8646 - val_prc: 0.3799 - 14s/epoch - 657ms/step
Epoch 3/50
INFO:tensorflow:Waiting for all global closures to be finished.
INFO:tensorflow:Waiting for all global closures to be finished.
INFO:tensorflow:Waiting for all global closures to be finished.
INFO:tensorflow:Waiting for all global closures to be finished.

Note:
1.
Keras Model.fit with parameter server training assumes that each worker receives the same dataset, except when it is shuffled differently. Therefore, by calling Dataset.shuffle, you ensure more even iterations over the data.
Because workers do not synchronize, they may finish processing their datasets at different times. Therefore, the easiest way to define epochs with parameter server training is to use Dataset.repeat—which repeats a dataset indefinitely when called without an argument—and specify the steps_per_epoch argument in the Model.fit call.
2. Parameter server training is a common data-parallel method to scale up model training on multiple machines. A parameter server training cluster consists of workers and parameter servers.
Variables are created on parameter servers and they are read and updated by workers in each step.
3. the worker and parameter server tasks run tf.distribute.Servers that listen for tasks from the coordinator. The coordinator creates resources, dispatches training tasks, writes checkpoints, and deals with task failures.
4. You will need to configure the 'TF_CONFIG' environment variable if you use TFConfigClusterResolver.
5. Even if you choose the Model.fit training path, you can optionally instantiate a tf.distribute.coordinator.ClusterCoordinator object to schedule other functions you would like to be executed on the workers.
6. In TensorFlow 2, parameter server training is powered by the tf.distribute.ParameterServerStrategy class, which distributes the training steps to a cluster that scales up to thousands of workers (accompanied by parameter servers).
7. When using parameter server training, it is recommended to have:
One coordinator job (which has the job name chief)
Multiple worker jobs (job name worker)
Multiple parameter server jobs (job name ps)

In this example, the pod trainer is the coordinator job, the process where tf.distribute.ParameterServerStrategy() is called is the coordinator job, no need to specify the coordinator job in the cluster_dict specially.

  1. The coordinator creates resources, dispatches training tasks, writes checkpoints, and deals with task failures. The workers and parameter servers run tf.distribute.Server instances that listen for requests from the coordinator.
  2. a parameter server training cluster requires a coordinator task that runs your training program, one or several workers and parameter server tasks that run TensorFlow servers—tf.distribute.Server.

The requirements to set them up are:
The coordinator task needs to know the addresses and ports of all other TensorFlow servers, except the evaluator.
The workers and parameter servers need to know which port they need to listen to. For the sake of simplicity, you can usually pass in the complete cluster information when creating TensorFlow servers on these tasks.

Workers and parameter servers should have task types as "worker" and "ps", respectively. The coordinator should use "chief" as the task type for legacy reasons.

You will start by creating several TensorFlow servers in advance and you will connect to them later.
[2]
When ClusterIPs or hostnames of workers, parameter servers are passed to tf.distribute.ParameterServerStrategy(cluster_resolver) of coordinator correctly, and workers, parameter servers are ready, the coordinator will connect to workers, parameter servers and start distributed training, the training log will be shown in log of the coordinator.

4. Error & Solution

[ERRRO: AttributeError: module 'tensorflow' has no attribute 'app']

(base) maye@maye-Inspiron-5547:~/github_repository/tensorflow_ecosystem/distribution_strategy$ kubectl logs dist-strat-example-worker-0-w6rsb
2024-02-03 07:38:33.104872: I tensorflow/core/platform/cpu_feature_guard.cc:210] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.
Traceback (most recent call last):
File "/tf_std_server.py", line 35, in
tf.app.run()
^^^^^^
AttributeError: module 'tensorflow' has no attribute 'app'

[SOLUTION]
This is due to that the tensorflow in use is v2, tf.app.run() is a sentence of tensorflow v1, module app has been removed in tensorflow v2,
tf.app.run() = main(sys.argv)
Use tf.distribute.Server() to start a tensorflow server for tensorflow v2.

Note:
"exit code: 1" : something wrong in executing code of the process.
"exit code: 137": the process has received SIGNAL KILL, in the case of kubernetes, if kubelet needs to stop a container process, it will call containerd, and containerd will send SIGNAL KILL to the container process. Linux will send SIGNAL KILL to a process if cpu, or memory is not enough.
"exit code: 139': SEGMENT FAULT, the process tries to access memory, or file, or table in a database which is not accessible, such as, memory out of boundary, not existed file or database table.

References:


  1. https://github.com/tensorflow/ecosystem/tree/master/distribution_strategy ↩︎

  2. https://tensorflow.google.cn/tutorials/distribute/parameter_server_training ↩︎

标签:training,run,name,worker,distributed,server,tf,tensorflow,CPU
From: https://www.cnblogs.com/zhenxia-jiuyou/p/18016342

相关文章

  • NuGetForUnity用不了时的一个折衷方案
    如果NuGetForUnity网络访问不了或者下载速度非常慢,导致无法正常使用,那可以试试下面的方法。 1)先用vs的NuGet包管理器下载,vs下貌似没遇到网络问题,下载很快 注意,vs不是下载在Unity的Assets文件夹下的,而是和他同层级,所以此时Unity并不会加载所下载的dll文件的 2)然后......
  • 【译】介绍 MSTest Runner – CLI、Visual Studio 等
    原文|AmauryLevé,MarcoRossignoli,JakubJareš翻译|郑子铭我们很高兴推出MSTestrunner,这是一个用于MSTest测试的新型轻量级运行程序。这个新的运行程序使测试更加便携和可靠,使测试运行得更快,并且可扩展,为您提供点菜测试体验,以添加成功所需的工具。它是什么?MSTes......
  • 执行truncate时报错:ORA-00054:资源正忙但指定以NOWAIT 方式获取资源或者超时失效,怎样
    在执行TRUNCATE语句时出现错误,可能是由于以下原因之一:表正在被其他会话使用:如果表正在被其他会话使用,您将无法执行TRUNCATE操作。请确保没有其他会话正在使用该表,并尝试再次执行TRUNCATE。权限不足:如果您没有足够的权限来执行TRUNCATE操作,则会收到错误消息。请确保您具有足......
  • UI自动化测试代码不想写脚本不想配?RunnerGo一键录制
    想快速配置可视化UI自动化测试脚本?RunnerGo近期上线脚本录制器,根据你的测试操作直接生成UI自动化测试脚本,下面是使用方法Step1:下载录制器点击RunnerGo上方插件按钮下载录制器Step2:录制器使用将插件文件拖入浏览器扩展程序点击打开录制器,在浏览器中进行操作时录制器会将操作录制为......
  • UI自动化测试代码不想写脚本不想配?RunnerGo一键录制
    想快速配置可视化UI自动化测试脚本?RunnerGo近期上线脚本录制器,根据你的测试操作直接生成UI自动化测试脚本,下面是使用方法Step1:下载录制器点击RunnerGo上方插件按钮下载录制器 Step2:录制器使用将插件文件拖入浏览器扩展程序 点击打开录制器,在浏览器中进行操作时录制器......
  • 云小课|Runc容器逃逸漏洞(CVE-2024-21626)安全风险通告
    阅识风云是华为云信息大咖,擅长将复杂信息多元化呈现,其出品的一张图(云图说)、深入浅出的博文(云小课)或短视频(云视厅)总有一款能让您快速上手华为云。更多精彩内容请单击此处。runc官方发布安全公告,披露runc1.1.11及更早版本中存在容器逃逸漏洞,攻击者会利用该漏洞导致容器逃逸......
  • Distribute tensorflow model training on a kubernetes cluster
    [ERRRO:AttributeError:module'tensorflow'hasnoattribute'app'](base)maye@maye-Inspiron-5547:~/github_repository/tensorflow_ecosystem/distribution_strategy$kubectldescribepoddist-strat-example-worker-0-w6rsbName:......
  • Run a tfx pipeline using kubeflow pipeline
    1.whatiskubeflowpipelinefortfxpipeline?kubeflowpipelineisanochetratoroftfxpipeline,whichrunsonakubernetescluster.LocalDagRunerisanorchetratoroftfxpipeline,whichrunslocal.#runatfxpipelineusgingLocalGagRunnertfx.orc......
  • Runtime Reflection
    参考:1. AFlexibleReflectionSysteminC++:Part1(preshing.com)2. C++Reflection|AustinBrunkhorst 2做的更好。反射的代码是自动生成的。目的为了学习这个理念,先是从0感受一个最简单的实现:假设:structNode{std::stringkey;intvalue;}可以......
  • ILRuntime是如何实现热更新的
    一、ILRuntime的基本原理ILRuntime的基本原理是将C#代码编译成IL代码,然后在运行时通过IL解释器将其转换成机器码执行。这种方式与传统的AOT(AheadofTime)编译方式不同,传统的AOT编译方式是在编译时将C#代码编译成机器码,然后在运行时直接执行机器码。由于ILRuntime是在运行时解释......