Keras 快速入门
构建好模型后,通过调用 compile 方法配置该模型的学习流程
1
2
3
4
5
6
7model = tf.keras.Sequential()
model.add(layers.Dense(32, activation='relu'))
model.add(layers.Dense(32, activation='relu'))
model.add(layers.Dense(10, activation='softmax'))
model.compile(optimizer=tf.keras.optimizers.Adam(0.001),
loss=tf.keras.losses.categorical_crossentropy,
metrics=[tf.keras.metrics.categorical_accuracy])函数式api
tf.keras.Sequential 模型是层的简单堆叠,无法表示任意模型。使用 Keras 函数式 API 可以构建复杂的模型拓扑,例如:
多输入模型,
多输出模型,
具有共享层的模型(同一层被调用多次),
具有非序列数据流的模型(例如,残差连接)。
使用函数式 API 构建的模型具有以下特征:
层实例可调用并返回张量。 输入张量和输出张量用于定义 tf.keras.Model 实例。 此模型的训练方式和 Sequential 模型一样。
1
2
3
4
5
6
7
8
9
10
11
12
13input_x = tf.keras.Input(shape=(72,))
# 层实例可调用并返回张量
hidden1 = layers.Dense(32, activation='relu')(input_x)
hidden2 = layers.Dense(16, activation='relu')(hidden1)
pred = layers.Dense(10, activation='softmax')(hidden2)
# 输入张量和输出张量用于定义 tf.keras.Model 实例
model = tf.keras.Model(inputs=input_x, outputs=pred)
model.compile(optimizer=tf.keras.optimizers.Adam(0.001),
loss=tf.keras.losses.categorical_crossentropy,
metrics=['accuracy'])
model.fit(train_x, train_y, batch_size=32, epochs=5)模型子类化(类似Pytorch)
- Init 创建层并将它们设置为类实例的属性
- call 中定义前向传播
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22class MyModel(tf.keras.Model):
def __init__(self, num_classes=10):
super(MyModel, self).__init__(name='my_model')
self.num_classes = num_classes
self.layer1 = layers.Dense(32, activation='relu')
self.layer2 = layers.Dense(num_classes, activation='softmax')
def call(self, inputs):
h1 = self.layer1(inputs)
out = self.layer2(h1)
return out
def compute_output_shape(self, input_shape):
shape = tf.TensorShape(input_shape).as_list()
shape[-1] = self.num_classes
return tf.TensorShape(shape)
model = MyModel(num_classes=10)
model.compile(optimizer=tf.keras.optimizers.RMSprop(0.001),
loss=tf.keras.losses.categorical_crossentropy,
metrics=['accuracy'])
model.fit(train_x, train_y, batch_size=16, epochs=5)自定义层
通过对 tf.keras.layers.Layer 进行子类化并实现以下方法来创建自定义层:
- build:创建层的权重。使用 add_weight 方法添加权重。
- call:定义前向传播。
- compute_output_shape:指定在给定输入形状的情况下如何计算层的输出形状。 或者,可以通过实现 get_config 方法和 from_config 类方法序列化层。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40class MyLayer(layers.Layer):
def __init__(self, output_dim, **kwargs):
self.output_dim = output_dim
super(MyLayer, self).__init__(**kwargs)
def build(self, input_shape):
shape = tf.TensorShape((input_shape[1], self.output_dim))
self.kernel = self.add_weight(name='kernel1', shape=shape,
initializer='uniform', trainable=True)
super(MyLayer, self).build(input_shape)
def call(self, inputs):
return tf.matmul(inputs, self.kernel)
def compute_output_shape(self, input_shape):
shape = tf.TensorShape(input_shape).as_list()
shape[-1] = self.output_dim
return tf.TensorShape(shape)
def get_config(self):
base_config = super(MyLayer, self).get_config()
base_config['output_dim'] = self.output_dim
return base_config
def from_config(cls, config):
return cls(**config)
model = tf.keras.Sequential(
[
MyLayer(10),
layers.Activation('softmax')
])
model.compile(optimizer=tf.keras.optimizers.RMSprop(0.001),
loss=tf.keras.losses.categorical_crossentropy,
metrics=['accuracy'])
model.fit(train_x, train_y, batch_size=16, epochs=5)Estimator
Estimator API 用于针对分布式环境训练模型。它适用于一些行业使用场景,例如用大型数据集进行分布式训练并导出模型以用于生产
1
2
3
4
5
6
7
8model = tf.keras.Sequential([layers.Dense(10,activation='softmax'),
layers.Dense(10,activation='softmax')])
model.compile(optimizer=tf.keras.optimizers.RMSprop(0.001),
loss='categorical_crossentropy',
metrics=['accuracy'])
estimator = tf.keras.estimator.model_to_estimator(model)
keras 函数api
小型残差网络
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35inputs = keras.Input(shape=(32,32,3), name='img')
h1 = layers.Conv2D(32, 3, activation='relu')(inputs)
h1 = layers.Conv2D(64, 3, activation='relu')(h1)
block1_out = layers.MaxPooling2D(3)(h1)
h2 = layers.Conv2D(64, 3, activation='relu', padding='same')(block1_out)
h2 = layers.Conv2D(64, 3, activation='relu', padding='same')(h2)
block2_out = layers.add([h2, block1_out])
h3 = layers.Conv2D(64, 3, activation='relu', padding='same')(block2_out)
h3 = layers.Conv2D(64, 3, activation='relu', padding='same')(h3)
block3_out = layers.add([h3, block2_out])
h4 = layers.Conv2D(64, 3, activation='relu')(block3_out)
h4 = layers.GlobalMaxPool2D()(h4)
h4 = layers.Dense(256, activation='relu')(h4)
h4 = layers.Dropout(0.5)(h4)
outputs = layers.Dense(10, activation='softmax')(h4)
model = keras.Model(inputs, outputs, name='small resnet')
model.summary()
keras.utils.plot_model(model, 'small_resnet_model.png', show_shapes=True)
(x_train, y_train), (x_test, y_test) = keras.datasets.cifar10.load_data()
x_train = x_train.astype('float32') / 255
x_test = y_train.astype('float32') / 255
y_train = keras.utils.to_categorical(y_train, 10)
y_test = keras.utils.to_categorical(y_test, 10)
model.compile(optimizer=keras.optimizers.RMSprop(1e-3),
loss='categorical_crossentropy',
metrics=['acc'])
model.fit(x_train, y_train,
batch_size=64,
epochs=1,
validation_split=0.2)自定义网络层
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66# import tensorflow as tf
# import tensorflow.keras as keras
class MyDense(layers.Layer):
def __init__(self, units=32):
super(MyDense, self).__init__()
self.units = units
def build(self, input_shape):
self.w = self.add_weight(shape=(input_shape[-1], self.units),
initializer='random_normal',
trainable=True)
self.b = self.add_weight(shape=(self.units,),
initializer='random_normal',
trainable=True)
def call(self, inputs):
return tf.matmul(inputs, self.w) + self.b
def get_config(self):
return {'units': self.units}
inputs = keras.Input((4,))
outputs = MyDense(10)(inputs)
model = keras.Model(inputs, outputs)
config = model.get_config()
new_model = keras.Model.from_config(
config, custom_objects={'MyDense':MyDense}
)
# 在自定义网络层调用其他网络层
# 超参
time_step = 10
batch_size = 32
hidden_dim = 32
inputs_dim = 5
# 网络
class MyRnn(layers.Layer):
def __init__(self):
super(MyRnn, self).__init__()
self.hidden_dim = hidden_dim
self.projection1 = layers.Dense(units=hidden_dim, activation='relu')
self.projection2 = layers.Dense(units=hidden_dim, activation='relu')
self.classifier = layers.Dense(1, activation='sigmoid')
def call(self, inputs):
outs = []
states = tf.zeros(shape=[inputs.shape[0], self.hidden_dim])
for t in range(inputs.shape[1]):
x = inputs[:,t,:]
h = self.projection1(x)
y = h + self.projection2(states)
states = y
outs.append(y)
# print(outs)
features = tf.stack(outs, axis=1)
print(features.shape)
return self.classifier(features)
# 构建网络
inputs = keras.Input(batch_shape=(batch_size, time_step, inputs_dim))
x = layers.Conv1D(32, 3)(inputs)
print(x.shape)
outputs = MyRnn()(x)
model = keras.Model(inputs, outputs)
rnn_model = MyRnn()
_ = rnn_model(tf.zeros((1, 10, 5)))
使用keras训练模型
模型构造、训练、测试
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35# 模型构造
inputs = keras.Input(shape=(784,), name='mnist_input')
h1 = layers.Dense(64, activation='relu')(inputs)
h1 = layers.Dense(64, activation='relu')(h1)
outputs = layers.Dense(10, activation='softmax')(h1)
model = keras.Model(inputs, outputs)
# keras.utils.plot_model(model, 'net001.png', show_shapes=True)
model.compile(optimizer=keras.optimizers.RMSprop(),
loss=keras.losses.SparseCategoricalCrossentropy(),
metrics=[keras.metrics.SparseCategoricalAccuracy()])
# 载入数据
(x_train, y_train), (x_test, y_test) = keras.datasets.mnist.load_data()
x_train = x_train.reshape(60000, 784).astype('float32') /255
x_test = x_test.reshape(10000, 784).astype('float32') /255
x_val = x_train[-10000:]
y_val = y_train[-10000:]
x_train = x_train[:-10000]
y_train = y_train[:-10000]
# 训练模型
history = model.fit(x_train, y_train, batch_size=64, epochs=3,
validation_data=(x_val, y_val))
print('history:')
print(history.history)
result = model.evaluate(x_test, y_test, batch_size=128)
print('evaluate:')
print(result)
pred = model.predict(x_test[:2])
print('predict:')
print(pred)多输入多输出模型
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43image_input = keras.Input(shape=(32, 32, 3), name='img_input')
timeseries_input = keras.Input(shape=(None, 10), name='ts_input')
x1 = layers.Conv2D(3, 3)(image_input)
x1 = layers.GlobalMaxPooling2D()(x1)
x2 = layers.Conv1D(3, 3)(timeseries_input)
x2 = layers.GlobalMaxPooling1D()(x2)
x = layers.concatenate([x1, x2])
score_output = layers.Dense(1, name='score_output')(x)
class_output = layers.Dense(5, activation='softmax', name='class_output')(x)
model = keras.Model(inputs=[image_input, timeseries_input],
outputs=[score_output, class_output])
keras.utils.plot_model(model, 'multi_input_output_model.png'
, show_shapes=True)
# 可以为模型指定不同的loss和metrics
model.compile(
optimizer=keras.optimizers.RMSprop(1e-3),
loss=[keras.losses.MeanSquaredError(),
keras.losses.CategoricalCrossentropy()])
# 还可以指定loss的权重
model.compile(
optimizer=keras.optimizers.RMSprop(1e-3),
loss={'score_output': keras.losses.MeanSquaredError(),
'class_output': keras.losses.CategoricalCrossentropy()},
metrics={'score_output': [keras.metrics.MeanAbsolutePercentageError(),
keras.metrics.MeanAbsoluteError()],
'class_output': [keras.metrics.CategoricalAccuracy()]},
loss_weight={'score_output': 2., 'class_output': 1.})
# 可以把不需要传播的loss置0
model.compile(
optimizer=keras.optimizers.RMSprop(1e-3),
loss=[None, keras.losses.CategoricalCrossentropy()])
# Or dict loss version
model.compile(
optimizer=keras.optimizers.RMSprop(1e-3),
loss={'class_output': keras.losses.CategoricalCrossentropy()})
动态调整学习率
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16# 动态调整学习率
initial_learning_rate = 0.1
lr_schedule = keras.optimizers.schedules.ExponentialDecay(
initial_learning_rate,
decay_steps=10000,
decay_rate=0.96,
staircase=True
)
optimizer = keras.optimizers.RMSprop(learning_rate=lr_schedule)
# 使用tensorboard
tensorboard_cbk = keras.callbacks.TensorBoard(log_dir='./full_path_to_your_logs')
model.fit(x_train, y_train,
epochs=5,
batch_size=64,
callbacks=[tensorboard_cbk],
validation_split=0.2)
自己构造训练和验证循环 (类Pytorch)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32# 构建一个全连接网络.
inputs = keras.Input(shape=(784,), name='digits')
x = layers.Dense(64, activation='relu', name='dense_1')(inputs)
x = layers.Dense(64, activation='relu', name='dense_2')(x)
outputs = layers.Dense(10, activation='softmax', name='predictions')(x)
model = keras.Model(inputs=inputs, outputs=outputs)
# 优化器.
optimizer = keras.optimizers.SGD(learning_rate=1e-3)
# 损失函数.
loss_fn = keras.losses.SparseCategoricalCrossentropy()
# 准备数据.
batch_size = 64
train_dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train))
train_dataset = train_dataset.shuffle(buffer_size=1024).batch(batch_size)
# 自己构造循环
for epoch in range(3):
print('epoch: ', epoch)
for step, (x_batch_train, y_batch_train) in enumerate(train_dataset):
# 开一个gradient tape, 计算梯度
with tf.GradientTape() as tape:
logits = model(x_batch_train)
loss_value = loss_fn(y_batch_train, logits)
grads = tape.gradient(loss_value, model.trainable_variables)
optimizer.apply_gradients(zip(grads, model.trainable_variables))
if step % 200 == 0:
print('Training loss (for one batch) at step %s: %s' % (step, float(loss_value)))
print('Seen so far: %s samples' % ((step + 1) * 64))训练并验证
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61# 训练并验证
# 获取模型
inputs = keras.Input(shape=(784,), name='digits')
x = layers.Dense(64, activation='relu', name='dense_1')(inputs)
x = layers.Dense(64, activation='relu', name='dense_2')(x)
outputs = layers.Dense(10, activation='softmax', name='predictions')(x)
model = keras.Model(inputs=inputs, outputs=outputs)
# sgd优化器
optimizer = keras.optimizers.SGD(learning_rate=1e-3)
# 分类损失函数
loss_fn = keras.losses.SparseCategoricalCrossentropy()
# 设定统计参数
train_acc_metric = keras.metrics.SparseCategoricalAccuracy()
val_acc_metric = keras.metrics.SparseCategoricalAccuracy()
# 准备训练数据
batch_size = 64
train_dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train))
train_dataset = train_dataset.shuffle(buffer_size=1024).batch(batch_size)
# 准备验证数据
val_dataset = tf.data.Dataset.from_tensor_slices((x_val, y_val))
val_dataset = val_dataset.batch(64)
# 迭代训练
for epoch in range(3):
print('Start of epoch %d' % (epoch,))
for step, (x_batch_train, y_batch_train) in enumerate(train_dataset):
with tf.GradientTape() as tape:
logits = model(x_batch_train)
loss_value = loss_fn(y_batch_train, logits)
grads = tape.gradient(loss_value, model.trainable_variables)
optimizer.apply_gradients(zip(grads, model.trainable_variables))
# 更新统计传输
train_acc_metric(y_batch_train, logits)
# 输出
if step % 200 == 0:
print('Training loss (for one batch) at step %s: %s' % (step, float(loss_value)))
print('Seen so far: %s samples' % ((step + 1) * 64))
# 输出统计参数的值
train_acc = train_acc_metric.result()
print('Training acc over epoch: %s' % (float(train_acc),))
# 重置统计参数
train_acc_metric.reset_states()
# 用模型进行验证
for x_batch_val, y_batch_val in val_dataset:
val_logits = model(x_batch_val)
# 根据验证的统计参数
val_acc_metric(y_batch_val, val_logits)
val_acc = val_acc_metric.result()
val_acc_metric.reset_states()
print('Validation acc: %s' % (float(val_acc),))添加自己构造的loss
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42## 添加自己构造的loss, 每次只能看到最新一次训练增加的loss
class ActivityRegularizationLayer(layers.Layer):
def call(self, inputs):
self.add_loss(1e-2 * tf.reduce_sum(inputs))
return inputs
inputs = keras.Input(shape=(784,), name='digits')
x = layers.Dense(64, activation='relu', name='dense_1')(inputs)
# Insert activity regularization as a layer
x = ActivityRegularizationLayer()(x)
x = layers.Dense(64, activation='relu', name='dense_2')(x)
outputs = layers.Dense(10, activation='softmax', name='predictions')(x)
model = keras.Model(inputs=inputs, outputs=outputs)
logits = model(x_train[:64])
print(model.losses)
logits = model(x_train[:64])
logits = model(x_train[64: 128])
logits = model(x_train[128: 192])
print(model.losses)
# 将loss添加进求导中
optimizer = keras.optimizers.SGD(learning_rate=1e-3)
for epoch in range(3):
print('Start of epoch %d' % (epoch,))
for step, (x_batch_train, y_batch_train) in enumerate(train_dataset):
with tf.GradientTape() as tape:
logits = model(x_batch_train)
loss_value = loss_fn(y_batch_train, logits)
# 添加额外的loss
loss_value += sum(model.losses)
grads = tape.gradient(loss_value, model.trainable_variables)
optimizer.apply_gradients(zip(grads, model.trainable_variables))
# 每200个batch输出一次学习.
if step % 200 == 0:
print('Training loss (for one batch) at step %s: %s' % (step, float(loss_value)))
print('Seen so far: %s samples' % ((step + 1) * 64))
用keras构建自己的网络层
构建一个简单的网络层: 设置网络权重和输出到输入的计算过程
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31from __future__ import absolute_import, division, print_function
import tensorflow as tf
tf.keras.backend.clear_session()
import tensorflow.keras as keras
import tensorflow.keras.layers as layers
# 定义网络层就是:设置网络权重和输出到输入的计算过程
class MyLayer(layers.Layer):
def __init__(self, input_dim=32, unit=32):
super(MyLayer, self).__init__()
w_init = tf.random_normal_initializer()
self.weight = tf.Variable(initial_value=w_init(
shape=(input_dim, unit), dtype=tf.float32), trainable=True)
b_init = tf.zeros_initializer()
self.bias = tf.Variable(initial_value=b_init(
shape=(unit,), dtype=tf.float32), trainable=True)
def call(self, inputs):
return tf.matmul(inputs, self.weight) + self.bias
x = tf.ones((3,5))
my_layer = MyLayer(5, 4)
out = my_layer(x)
print(out)
tf.Tensor(
[[0.06709253 0.06818779 0.09926171 0.0179923 ]
[0.06709253 0.06818779 0.09926171 0.0179923 ]
[0.06709253 0.06818779 0.09926171 0.0179923 ]], shape=(3, 4), dtype=float32)按上面构建网络层,图层会自动跟踪权重w和b,当然我们也可以直接用add_weight的方法构建权重
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22class MyLayer(layers.Layer):
def __init__(self, input_dim=32, unit=32):
super(MyLayer, self).__init__()
self.weight = self.add_weight(shape=(input_dim, unit),
initializer=keras.initializers.RandomNormal(),
trainable=True)
self.bias = self.add_weight(shape=(unit,),
initializer=keras.initializers.Zeros(),
trainable=True)
def call(self, inputs):
return tf.matmul(inputs, self.weight) + self.bias
x = tf.ones((3,5))
my_layer = MyLayer(5, 4)
out = my_layer(x)
print(out)
tf.Tensor(
[[-0.10401802 -0.05459599 -0.08195674 0.13151655]
[-0.10401802 -0.05459599 -0.08195674 0.13151655]
[-0.10401802 -0.05459599 -0.08195674 0.13151655]], shape=(3, 4), dtype=float32)
keras模型保存和序列化
-