[ERROR: stuck at "INFO:tensorflow:ParameterServerStrategyV2 is now connecting to cluster with cluster_spec: ClusterSpec({'ps': ['dist-strat-example-ps-0:5000'], 'worker': ['dist-strat-example-worker-0:5000', 'dist-strat-example-worker-1:5000']})"]
# service dist-strat-example-ps-0 definition yaml file
---
kind: Service
apiVersion: v1
metadata:
name: dist-strat-example-ps-0
spec:
type: LoadBalancer
selector:
app: dist-strat-example-ps-0
ports:
- port: 5000
---
apiVersion: apps/v1
kind: Deployment
metadata:
labels:
app: dist-strat-example-ps-0
name: dist-strat-example-ps-0
spec:
replicas: 1
selector:
matchLabels:
app: dist-strat-example-ps-0
template:
metadata:
labels:
app: dist-strat-example-ps-0
spec:
affinity:
nodeAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
nodeSelectorTerms:
- matchExpressions:
- key: kubernetes.io/hostname
operator: In
values:
- maye-inspiron-5547
containers:
- name: tensorflow
image: tf_std_server:v1
resources:
limits:
#nvidia.com/gpu: 2
env:
- name: TF_CONFIG
value: "{
\"cluster\": {
\"worker\": [\"dist-strat-example-worker-0:5000\",\"dist-strat-example-worker-1:5000\"],
\"ps\": [\"dist-strat-example-ps-0:5000\"]},
\"task\": {
\"type\": \"ps\",
\"index\": \"0\"
}
}"
#- name: GOOGLE_APPLICATION_CREDENTIALS
# value: "/var/secrets/google/key.json"
ports:
- containerPort: 5000
command:
- "/usr/bin/python"
- "/tf_std_server.py"
- ""
#volumeMounts:
#- name: credential
# mountPath: /var/secrets/google
#volumes:
#- name: credential
# secret:
# secretName: credential
---
# run_fn in module file of tfx component trainer
def run_fn(fn_args: tfx.components.FnArgs):
cluster_dict = {}
### ClusterIp should be used, not service name, or
### this error will be raised.
cluster_dict["worker"] = ["dist-strat-example-worker-0:5000", "dist-strat-example-worker-1:5000"]
cluster_dict["ps"] = ["dist-strat-example-ps-0:5000"]
#cluster_dict["worker"] = ["10.102.74.8:5000", "10.100.198.218:5000"]
#cluster_dict["ps"] = ["10.96.200.160:5000"]
cluster_spec = tf.train.ClusterSpec(cluster_dict)
cluster_resolver = tf.distribute.cluster_resolver.SimpleClusterResolver(
cluster_spec, rpc_layer="grpc")
strategy = tf.distribute.ParameterServerStrategy(
cluster_resolver,)
tf_transform_output = tft.TFTransformOutput(fn_args.transform_output)
train_dataset = _input_fn(
fn_args.train_files,
fn_args.data_accessor,
tf_transform_output,
batch_size=_TRAIN_BATCH_SIZE,
)
resampled_train_dataset = _resample_train_dataset(train_dataset,
batch_size=_TRAIN_BATCH_SIZE)
#tf.print(f"resampled_train_dataset {resampled_train_dataset.cardinality()}")
val_dataset = _input_fn(
fn_args.eval_files,
fn_args.data_accessor,
tf_transform_output,
batch_size=_EVAL_BATCH_SIZE,
)
with strategy.scope():
model = _build_keras_model()
trainer_train_history = model.fit(
resampled_train_dataset,
epochs=fn_args.custom_config['epochs'],
steps_per_epoch=fn_args.train_steps,
validation_data=val_dataset,
#callbacks=[tensorboard_callback],
)
with open('trainer_train_history.json', 'w') as f:
json.dump(trainer_train_history.history, f)
signatures = {
'serving_default': _get_serve_tf_examples_fn(model, tf_transform_output),
}
model.save(fn_args.serving_model_dir, save_format='tf', signatures=signatures)
$ kubectl logs pod-tfx-trainer-component -n kubeflow
...
INFO:absl:Successfully installed '/tfx/pipelines/tfx_user_code_Trainer-0.0+a0a99f38e703a50fc266bc1da356164d31c1f23c893900324e04c03582c72555-py3-none-any.whl'.
INFO:absl:Training model.
INFO:tensorflow:`tf.distribute.experimental.ParameterServerStrategy` is initialized with cluster_spec: ClusterSpec({'ps': ['dist-strat-example-ps-0:5000'], 'worker': ['dist-strat-example-worker-0:5000', 'dist-strat-example-worker-1:5000']})
INFO:tensorflow:`tf.distribute.experimental.ParameterServerStrategy` is initialized with cluster_spec: ClusterSpec({'ps': ['dist-strat-example-ps-0:5000'], 'worker': ['dist-strat-example-worker-0:5000', 'dist-strat-example-worker-1:5000']})
INFO:tensorflow:ParameterServerStrategyV2 is now connecting to cluster with cluster_spec: ClusterSpec({'ps': ['dist-strat-example-ps-0:5000'], 'worker': ['dist-strat-example-worker-0:5000', 'dist-strat-example-worker-1:5000']})
INFO:tensorflow:ParameterServerStrategyV2 is now connecting to cluster with cluster_spec: ClusterSpec({'ps': ['dist-strat-example-ps-0:5000'], 'worker': ['dist-strat-example-worker-0:5000', 'dist-strat-example-worker-1:5000']})
(base) maye@maye-Inspiron-5547:~/github_repository/tensorflow_ecosystem/distribution_strategy$ kubectl logs dist-strat-example-ps-0-85fdfdddcb-9x6mt
2024-02-14 05:51:36.101034: I tensorflow/core/platform/cpu_feature_guard.cc:210] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.
2024-02-14 05:51:38.566981: E external/local_xla/xla/stream_executor/cuda/cuda_driver.cc:282] failed call to cuInit: UNKNOWN ERROR (34)
2024-02-14 05:51:38.570913: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:457] Started server with target: grpc://dist-strat-example-ps-0:5000
(base) maye@maye-Inspiron-5547:~/github_repository/tensorflow_ecosystem/distribution_strategy$
[SOLUTION]
This error is due to that service name, e.g. "dist-strat-example-worker-0", is used when passing to tf.distribute.ParameterServerStrategy(), and "dist-strat-example-worker-0" is service name of worker-0, not its host name, so tf.distribute.ParameterServerStrategy() thinks that worker-0 it needs is not ready and keeps waiting.
clusterIp of service "dist-strat-example-worker-0" should be used here, so that tf.distribute.ParameterServerStrategy() can connect to it.
def run_fn(fn_args: tfx.components.FnArgs):
cluster_dict = {}
#cluster_dict["worker"] = ["dist-strat-example-worker-0:5000", "dist-strat-example-worker-1:5000"]
#cluster_dict["ps"] = ["dist-strat-example-ps-0:5000"]
cluster_dict["worker"] = ["10.102.74.8:5000", "10.100.198.218:5000"]
cluster_dict["ps"] = ["10.96.200.160:5000"]
cluster_spec = tf.train.ClusterSpec(cluster_dict)
cluster_resolver = tf.distribute.cluster_resolver.SimpleClusterResolver(
cluster_spec, rpc_layer="grpc")
strategy = tf.distribute.ParameterServerStrategy(
cluster_resolver,)
标签:INFO,5000,dist,cluster,strat,worker,stuck,now,example
From: https://www.cnblogs.com/zhenxia-jiuyou/p/18015258