1. save trained model
# in module file of tfx component trainer
def _apply_preprocessing(raw_features, tft_layer):
transformed_features = tft_layer(raw_features)
if _LABEL_KEY in raw_features:
transformed_label = transformed_features.pop(_LABEL_KEY)
return transformed_features, transformed_label
else:
return transformed_features, None
### return a function which makes inference on raw features,
### it will be specified as signature of model.save().
def _get_serve_tf_examples_fn(model, tf_transform_output):
model.tft_layer = tf_transform_output.transform_features_layer()
### name should be "instances", which is inputs key of
### tensorflow/serving/predict.
@tf.function(input_signature=[
tf.TensorSpec(shape=[None], dtype=tf.string, name='instances')
])
def serve_tf_examples_fn(serialized_tf_examples):
feature_spec = tf_transform_output.raw_feature_spec()
required_feature_spec = {
k: v for k, v in feature_spec.items() if k in _FEATURE_KEYS
}
parsed_features = tf.io.parse_example(serialized_tf_examples,
required_feature_spec)
transformed_features, _ = _apply_preprocessing(parsed_features,
model.tft_layer)
return model(transformed_features)
return serve_tf_examples_fn
def _build_keras_model() -> tf.keras.Model:
_METRICS = [
keras.metrics.BinaryCrossentropy(name='cross entropy'),
keras.metrics.TruePositives(name='tp'),
keras.metrics.FalsePositives(name='fp'),
keras.metrics.TrueNegatives(name='tn'),
keras.metrics.FalseNegatives(name='fn'),
keras.metrics.Precision(name='precision'),
keras.metrics.Recall(name='recall'),
keras.metrics.AUC(name='auc'),
keras.metrics.AUC(name='prc', curve='PR'),
]
inputs = [
keras.layers.Input(shape=(1,), name=key)
for key in _FEATURE_KEYS
]
d = keras.layers.concatenate(inputs)
for _ in range(2):
d = keras.layers.Dense(16, activation='tanh',
kernel_regularizer=keras.regularizers.l2(1e-5))(d)
outputs = keras.layers.Dense(1, activation='sigmoid')(d)
model = keras.Model(inputs=inputs, outputs=outputs)
lr_schedule = tf.keras.optimizers.schedules.InverseTimeDecay(
1e-3,
decay_steps=_STEPS_PER_EPOCH*1000,
decay_rate=1,
staircase=False,
)
model.compile(
optimizer=keras.optimizers.Adam(learning_rate=lr_schedule),
loss=tf.keras.losses.BinaryCrossentropy(),
metrics=_METRICS
)
model.summary(print_fn=logging.info)
return model
def run_fn(fn_args: tfx.components.FnArgs):
"""
cluster_dict = {}
#cluster_dict["worker"] = ["dist-strat-example-worker-0:5000", "dist-strat-example-worker-1:5000"]
#cluster_dict["ps"] = ["dist-strat-example-ps-0:5000"]
cluster_dict["worker"] = ["10.105.206.29:5000", "10.102.137.138:5000"]
cluster_dict["ps"] = ["10.105.27.97:5000"]
cluster_spec = tf.train.ClusterSpec(cluster_dict)
cluster_resolver = tf.distribute.cluster_resolver.SimpleClusterResolver(
cluster_spec, rpc_layer="grpc")
strategy = tf.distribute.ParameterServerStrategy(
cluster_resolver,)
"""
tf_transform_output = tft.TFTransformOutput(fn_args.transform_output)
train_dataset = _input_fn(
fn_args.train_files,
fn_args.data_accessor,
tf_transform_output,
batch_size=_TRAIN_BATCH_SIZE,
)
resampled_train_dataset = _resample_train_dataset(train_dataset,
batch_size=_TRAIN_BATCH_SIZE)
#tf.print(f"resampled_train_dataset {resampled_train_dataset.cardinality()}")
val_dataset = _input_fn(
fn_args.eval_files,
fn_args.data_accessor,
tf_transform_output,
batch_size=_EVAL_BATCH_SIZE,
)
val_dataset = val_dataset.repeat()
#tf.print(f"val_dataset cardinality: {val_dataset.cardinality()}")
#with strategy.scope():
# model = _build_keras_model()
model = _build_keras_model()
#log_dir = "logs/fit/" + datetime.datetime.now().strftime("%Y%m%d-%H%M%S")
#tensorboard_callback = tf.keras.callbacks.TensorBoard(log_dir=log_dir,)
backup_dir = os.path.join("/home/maye/maye_temp", "backup")
callbacks = [
tf.keras.callbacks.BackupAndRestore(backup_dir=backup_dir),
]
trainer_train_history = model.fit(
resampled_train_dataset,
epochs=fn_args.custom_config['epochs'],
steps_per_epoch=fn_args.train_steps,
validation_data=val_dataset,
validation_steps=3,
callbacks=callbacks,
)
#tf.print(f"train_history: \n {train_history.history}")
with open('trainer_train_history.json', 'w') as f:
json.dump(trainer_train_history.history, f)
### argument signatures of model.save() specify the functions
### tensorflow/serving will use.
signatures = {
'serving_default': _get_serve_tf_examples_fn(model, tf_transform_output),
}
"""
fn.args.serving_model_dir of tfx component trainer is os.path.join(<uri-of-artifact-model-of-trainer>, "Format-Serving"), which can not be changed, or other tfx components can not find artifact model.
"""
model.save(fn_args.serving_model_dir, save_format='tf', signatures=signatures)
This creates
SignatureDef: {
value: {
inputs {
key: "instances"
value {
name: "serving_default_examples:0"
dtype: DT_STRING
tensor_shape {
dim {
size: -1
}
}
}
}
method_name: "tensorflow/serving/predict"
}
}
Note:
- Signature specifies what type of model is being exported, and the input/output tensors to bind to when running inference.
The special signature key serving_default specifies the default serving signature. The default serving signature def key, along with other constants related to signatures, are defined as part of SavedModel signature constants.
[1]
2.The saved model has attribute "signatures", which can be called:
wafer_serving_model = tf.keras.models.load_model("/home/maye/maye_temp/wafer/347")
csv_example_train_filepath = 'pipelines/detect_anomolies_on_wafer_tfdv_schema/train_eval_data/train_data'
raw_dataset = tf.data.TFRecordDataset(csv_example_train_filepath)
### example is serialized tf.Example, namely bytes,
### tf.Example is protobuf.
for example in raw_dataset.take(1):
example_infer = wafer_serving_model.signatures['serving_default'](instances=[example.numpy()]
for raw_record in raw_dataset.take(3):
example = tf.train.Example()
example.ParseFromString(raw_record.numpy())
print(example)
### This is tf.Example protobuf, one "features" = one example.
features {
feature {
key: "Class"
value {
int64_list {
value: 0
}
}
}
feature {
key: "feature_1"
value {
int64_list {
value: 40
}
}
}
feature {
key: "feature_10"
value {
int64_list {
value: 0
}
}
}
...
}
2.u
Note:
- When there is only one named input for signature function, specify the value of instances key to be the value of the input:
References:
for example in raw_dataset.take(1):
#print(example.numpy())
#example_base64 = base64.b64encode(example.numpy())
#print(example_base64)
#pay_load = {"instances": [{"b64": example_base64}]}
headers = {"Content-Type": "application/json"}
#print(f"pay_load: {pay_load}")
pay_load = {"instances": [{"b64": ""}]}
print(f"type(pay_load): {type(pay_load)}")
pay_load_jsons = json.dumps(pay_load)
#print(f"type(pay_load_jsons): {type(pay_load_jsons)}")
response = requests.post('http://10.4.0.9:8501/v1/models/wafer:predict', headers=headers, data=pay_load_jsons)
print(response.json())
#example_infer = wafer_serving_model.signatures['serving_default'](instances=[example])
#print(example_infer)
train_examples_file_path = os.path.join('pipelines/detect_anomolies_on_wafer_tfdv_schema/CsvExampleGen/examples/19', 'Split-train/data_tfrecord-00000-of-00001.gz')
raw_dataset = tf.data.TFRecordDataset(train_examples_file_path, compression_type='GZIP')
(base) maye@maye-Inspiron-5547:~/github_repository/tensorflow_serving$ sudo nerdctl run -t --rm -p 8500:8500 -p 8501:8501 -v "/home/maye/maye_temp/wafer:/models/wafer" -e MODEL_NAME=wafer tensorflow/serving
[sudo] password for maye:
2024-02-16 07:16:23.531943: I tensorflow_serving/model_servers/server.cc:74] Building single TensorFlow model file config: model_name: half_plus_two model_base_path: /models/half_plus_two
2024-02-16 07:16:23.617629: I tensorflow_serving/model_servers/server_core.cc:467] Adding/updating models.
2024-02-16 07:16:23.617750: I tensorflow_serving/model_servers/server_core.cc:596] (Re-)adding model: half_plus_two
2024-02-16 07:16:23.824056: I tensorflow_serving/core/basic_manager.cc:739] Successfully reserved resources to load servable {name: half_plus_two version: 123}
2024-02-16 07:16:23.824111: I tensorflow_serving/core/loader_harness.cc:66] Approving load for servable version {name: half_plus_two version: 123}
2024-02-16 07:16:23.824140: I tensorflow_serving/core/loader_harness.cc:74] Loading servable version {name: half_plus_two version: 123}
2024-02-16 07:16:23.841445: I external/org_tensorflow/tensorflow/cc/saved_model/reader.cc:83] Reading SavedModel from: /models/half_plus_two/00000123
2024-02-16 07:16:23.848473: I external/org_tensorflow/tensorflow/cc/saved_model/reader.cc:51] Reading meta graph with tags { serve }
2024-02-16 07:16:23.848516: I external/org_tensorflow/tensorflow/cc/saved_model/reader.cc:146] Reading SavedModel debug info (if present) from: /models/half_plus_two/00000123
2024-02-16 07:16:23.883025: I external/org_tensorflow/tensorflow/core/platform/cpu_feature_guard.cc:182] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.
2024-02-16 07:16:24.032082: I external/org_tensorflow/tensorflow/compiler/mlir/mlir_graph_optimization_pass.cc:382] MLIR V1 optimization pass is not enabled
2024-02-16 07:16:24.090038: I external/org_tensorflow/tensorflow/cc/saved_model/loader.cc:233] Restoring SavedModel bundle.
2024-02-16 07:16:25.306378: I external/org_tensorflow/tensorflow/cc/saved_model/loader.cc:217] Running initialization op on SavedModel bundle at path: /models/half_plus_two/00000123
2024-02-16 07:16:25.328697: I external/org_tensorflow/tensorflow/cc/saved_model/loader.cc:316] SavedModel load for tags { serve }; Status: success: OK. Took 1487618 microseconds.
2024-02-16 07:16:25.329046: I tensorflow_serving/servables/tensorflow/saved_model_warmup_util.cc:80] No warmup data file found at /models/half_plus_two/00000123/assets.extra/tf_serving_warmup_requests
2024-02-16 07:16:25.429497: I tensorflow_serving/core/loader_harness.cc:95] Successfully loaded servable version {name: half_plus_two version: 123}
2024-02-16 07:16:25.430402: I tensorflow_serving/model_servers/server_core.cc:488] Finished adding/updating models
2024-02-16 07:16:25.430447: I tensorflow_serving/model_servers/server.cc:118] Using InsecureServerCredentials
2024-02-16 07:16:25.430496: I tensorflow_serving/model_servers/server.cc:383] Profiler service is enabled
2024-02-16 07:16:25.491575: I tensorflow_serving/model_servers/server.cc:409] Running gRPC ModelServer at 0.0.0.0:8500 ...
2024-02-16 07:16:25.499873: I tensorflow_serving/model_servers/server.cc:430] Exporting HTTP/REST API at:localhost:8501 ...
[evhttp_server.cc : 245] NET_LOG: Entering the event loop ...