開頭:忘了tensorflow 1吧,都過去了
pip install tensorflow==2.0.0-alpha
生態(tài)系統(tǒng)
TensorFlow 2.0
@tf.function轉(zhuǎn)換成計算圖
tf.function解讀TensorFlow Lite
TensorFlow.JS
TensorFlow Extended
TensorFlow Prob
TPU Cloud
1. 數(shù)據(jù)類型
數(shù)據(jù)載體
list支持不同的數(shù)據(jù)類型,效率低
np.array相同類型的載體,效率高,但是不支持GPU,不支持自動求導(dǎo)
tf.Tensortensorflow中存儲大量連續(xù)數(shù)據(jù)的載體
基本數(shù)據(jù)類型
tf.int32: tf.constant(1)
tf.float32: tf.constant(1.)
tf.float64: tf.constant(1., dtype=tf.double)
tf.bool: tf.constant([True, False])
tf.string: tf.constant('hello')
數(shù)據(jù)基本屬性
with tf.device("cpu"):
a=tf.range(4)
a.device # '/job:localhost/replica:0/task:0/device:CPU:0'
aa=a.gpu()
a.numpy() # array([0, 1, 2, 3], dtype=int32)
a.ndim # 1 (0的話就是標量)
a.shape # TensorShape([4])
a.name # AttributeError: Tensor.name is meaningless when eager execution is enabled.
tf.rank(tf.ones([3,4,2])) # <tf.Tensor: id=466672, shape=(), dtype=int32, numpy=3>
tf.is_tensor(a) # True
a.dtype # tf.int32
- rank和ndim的區(qū)別在于返回的類型不同
- name屬性在tensorflow2沒有意義,因為變量名本身就是name
數(shù)據(jù)類型轉(zhuǎn)換
a=np.arange(5)
a.dtype # dtype('int64')
aa=tf.convert_to_tensor(a) # <tf.Tensor: id=466678, shape=(5,), dtype=int64, numpy=array([0, 1, 2, 3, 4])>
aa=tf.convert_to_tensor(a, dtype=tf.int32) # <tf.Tensor: id=466683, shape=(5,), dtype=int32, numpy=array([0, 1, 2, 3, 4], dtype=int32)>
tf.cast(aa, tf.float32)
b=tf.constant([0,1])
tf.cast(b, tf.bool) # <tf.Tensor: id=466697, shape=(2,), dtype=bool, numpy=array([False, True])>
a.tf.ones([])
a.numpy()
int(a) #標量可以直接這樣類型轉(zhuǎn)換
float(a)
可訓(xùn)練數(shù)據(jù)類型
a=tf.range(5)
b=tf.Variable(a)
b.dtype # tf.int32
b.name # 'Variable:0' 其實沒啥用
b.trainable #True
2. 創(chuàng)建Tensor
tf.convert_to_tensor(data)
tf.zeros(shape)
tf.ones(1)生成一個一維tensor,包含一個1
tf.ones([])生成一個標量1
tf.ones([2])生成一個一維tensor,包含兩個1
tf.ones_like(a)相當于tf.ones(a.shape)
tf.fill([3,4], 9) 全部填充9
tf.random.normal([3,4], mean=1, stddev=1)
tf.random.truncated_normal([3,4], mean=0, stddev=1) 帶截斷的正態(tài)分布,(大于某個值重新采樣),比如在經(jīng)過sigmoid激活后,如果用不帶截斷的,容易出現(xiàn)梯度消失問題。
tf.random.uniform([3,4], minval=0, maxval=100, dtype=tf.int32) 平均分布
idx=tf.range(5)
idx=tf.random.shuffle(idx)
a=tf.random.normal([10,784])
b=tf.random.uniform([10])
a=tf.gather(a, idx) # a中隨機取5行
b=tf.gather(b, idx) # b中隨機取5個
-
三維tensor舉例
自然語言處理,b個句子,每個句子有5個單詞,每個單詞由5維向量表示 - 四維tensor:圖像
[b, h, w, c] - 五維tensor:meta-learning
[task_b, b, h, w, c](多任務(wù))
以下是自由活動時間
out=tf.random.uniform([4,10]) # 模擬4張圖片的輸出,每個輸出對應(yīng)10個分類
y=tf.range(4)
y=tf.one_hot(y, depth=10) # 模擬4張圖片的真實分類
loss=tf.keras.losses.mse(y, out)
loss=tf.reduce_mean(loss) # 計算loss
一個簡單的x@w+b
from tensorflow.keras import layers
net=layers.Dense(10)
net.build((4,8)) # 4 是batch_size, 前一層有8個units
net.kernel #w shape=(8, 10)
net.bias #b shape=(10, )
記?。篧的維度是[input_dim, output_dim], b的維度是[output_dim, ]
自由活動結(jié)束
3. Tensor操作
3.1 索引
基本:a[idx][idx][idx]
numpy風(fēng)格:a[idx,idx,idx]可讀性更強
3.2 切片
與numpy基本一致
a[start:end:positive_step]
a[end:start:negative_step]
a[0, 1, ..., 0] 代表任意多個: 只要能推斷出有多少個:就是合法的
selective indexing
tf.gather
場景:對[4, 28, 28, 3]Tensor的第[3, 27, 9 ,13]行(也就是第一個28)順序采樣
使用:tf.gather(a, axis=1, indices=[3,27,9,13])-
tf.gather_ndW3Cschool解釋
場景:對[4, 28, 28, 3]Tensor第二維的[3, 27]和第三維的[20,8]進行采樣
使用:tf.gather_nd(a, indices=[[:,3,20,:],[:,3,8,:],[:,27,20,:],[:,27,8,:]])
更多實例

-
tf.boolean_mask
tf.boolean_mask(a, mask=[True, True, False], axis=3)相當于只取RG兩個通道的數(shù)據(jù),a的shape是[4, 28, 28, 3]。mask可以是一個list,作用有點像tf.gather_nd
3.3 維度變換
-
a.shape,a.ndim -
tf.transpose比如交換圖像的行列,也就是旋轉(zhuǎn)90°
a=tf.random.normal([4, 3, 2, 1])
tf.transpose(a, perm=[0, 1, 3, 2])相當于交換最后兩維
tf.reshape
a=tf.random.normal([4, 28, 28, 3])
tf.reshape(a, [4, 784, 3])
tf.reshape(a, [4, -1, 3]) #效果和上面一樣
tf.reshape(a, [4, -1])
-
tf.expand_dims增加維度(dim和axis含義類似)
a=tf.random.normal([4, 35, 8])
tf.expand_dims(a, axis=3) # 增加的維度是第4(3+1)維 shape是[4, 35, 8, 1]
-
tf.squeeze維度壓縮,默認去掉所有長度是1的維度,也可以通過axis指定某一個維度
3.4 Broadcasting
-
Tensor運算的時候首先右對齊,插入維度,并將長度是1的維度擴張成相應(yīng)的長度
圖示 - 場景:一般情況下,高維度比低維度的概念更高層,如
[班級,學(xué)生,成績],利用broadcasting把小維度推廣到大維度。 - 作用:簡潔、節(jié)省內(nèi)存
tf.broadcast_to(a, [2,3,4])
3.5 合并與分割
-
tf.concat([a, b], axis=0)在原來的維度上累加,要求其他維度的長度都相等。比如[4,35,8] concat [2,35,8] => [6,35,8] -
tf.stack([a, b], axis=0)在0維度處創(chuàng)建一個維度,長度為2 (因為這里只有a,b兩個),要求所有維度的長度都相等 -
res=tf.unstack(c, axis=3)c的第3維上打散成多個張量,數(shù)量是這個維度的長度 -
tf.split(c, axis=3, num_or_size_splits=[2,3,2]比unstack更靈活
3.6 數(shù)據(jù)統(tǒng)計
tf.norm(a) 求a的范數(shù),默認是二范數(shù)
tf.norm(a, ord=1, axis=1) 第一維看成一個整體,求一范數(shù)
tf.reduce_min reduce是為了提醒我們這些操作會降維
tf.reduce_max
tf.reduce_mean
tf.argmax(a) 默認返回axis=0上最大值的下標
tf.argmin(a)
tf.equak(a,b) 逐元素比較
tf.reduce_sum(tf.cast(tf.equal(a,b), dtype=tf.int32) 相當于統(tǒng)計相同元素的個數(shù)
tf.unique(a)返回一個數(shù)組和一個idx數(shù)組(用于反向生成a)
3.7 排序
tf.sort(a, direction='DESCENDING' 對最后一個維度進行排序
tf.argsort(a) 得到升序排列后元素在原數(shù)組中的下標
tf.gather(a, tf.argsort(a))
res=tf.math.top_k(a,2) res.indices res.value 用于topK accuracy
3.8 填充與復(fù)制
-
tf.pad(a, [[1,1],[1,1], ...])每一維上前后填充的數(shù)量
a=tf.random.normal([4,28,28,3])
b=tf.pad(a, [[0, 0], [2, 2], [2, 2], [0, 0]]) # 圖片四周各填充兩個像素

-
tf.tile(a, [ ])后面的參數(shù)指定每個維度復(fù)制的次數(shù),1表示保持不變,2表示復(fù)制一次
3.9 張量限幅
tf.maximum(a, 2)每個元素都會大于2, 簡單的relu實現(xiàn)就用這個tf.minimum(a, 8)tf.clip_by_value(a, 2, 8)new_grads, total_norm = tf.clip_by_globel_norm(grads, 15)等比例放縮,不改變數(shù)據(jù)的分布,不影響梯度方向,可用于梯度消失,梯度爆炸
3.10 其他高級操作
indices=tf.where(a>0)返回所有為True的坐標,配合tf.gather_nd(a, indices)使用tf.where(cond, A, B)根據(jù)cond,從A,B中挑選元素-
tf.scatter_nd(indices, updates, shape)
根據(jù)indics,把updates中的元素填充到shape大小的全零tensor中 points_x, points_y = tf.meshgrid(x, y)
points=tf.stack([points_x, points_y], axis=2
低層級方法實戰(zhàn)MNIST
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import datasets, layers, optimizers
import os
os.environ['TF_CPP_MIN_LOG_LEVEL']='2'
def preprocess(x, y):
# [b, 28, 28], [b]
x = tf.cast(x, dtype=tf.float32) / 255.
x = tf.reshape(x, [-1, 28*28])
y = tf.cast(y, dtype=tf.int32)
y = tf.one_hot(y, depth=10)
return x,y
(x, y), (x_test, y_test) = datasets.mnist.load_data()
print('x:', x.shape, 'y:', y.shape, 'x test:', x_test.shape, 'y test:', y_test)
train_db = tf.data.Dataset.from_tensor_slices((x, y))
train_db = train_db.shuffle(60000).batch(128).map(preprocess).repeat(30)
test_db = tf.data.Dataset.from_tensor_slices((x_test, y_test))
test_db = test_db.shuffle(10000).batch(128).map(preprocess)
x,y = next(iter(train_db))
print('train sample:', x.shape, y.shape)
# print(x[0], y[0])
def main():
# learning rate
lr = 1e-3
# 784 => 512
w1, b1 = tf.Variable(tf.random.truncated_normal([784, 512], stddev=0.1)), tf.Variable(tf.zeros([512])) # 梯度只會跟蹤tf.Variable類型的變量
'''
如果不用tf.Variable, 在with tf.GradientTape() as tape: 中需要調(diào)用tape.watch(w),否則不會計算梯度
'''
# 512 => 256
w2, b2 = tf.Variable(tf.random.truncated_normal([512, 256], stddev=0.1)), tf.Variable(tf.zeros([256]))
# 256 => 10
w3, b3 = tf.Variable(tf.random.truncated_normal([256, 10], stddev=0.1)), tf.Variable(tf.zeros([10])) # stddev在這里解決了梯度爆炸的問題
for step, (x,y) in enumerate(train_db):
# [b, 28, 28] => [b, 784]
x = tf.reshape(x, (-1, 784))
with tf.GradientTape() as tape:
# layer1.
h1 = x @ w1 + b1
h1 = tf.nn.relu(h1)
# layer2
h2 = h1 @ w2 + b2
h2 = tf.nn.relu(h2)
# output
out = h2 @ w3 + b3
# out = tf.nn.relu(out)
# compute loss
# [b, 10] - [b, 10]
loss = tf.square(y-out)
# [b, 10] => [b]
loss = tf.reduce_mean(loss, axis=1)
# [b] => scalar
loss = tf.reduce_mean(loss)
# compute gradient
grads = tape.gradient(loss, [w1, b1, w2, b2, w3, b3])
# for g in grads:
# print(tf.norm(g))
# update w' = w - lr*grad
for p, g in zip([w1, b1, w2, b2, w3, b3], grads):
p.assign_sub(lr * g) # assign_sub 原地更新, 不會改變變量類型
if step % 100 == 0:
print(step, 'loss:', float(loss))
# evaluate
if step % 500 == 0:
total, total_correct = 0., 0
for step, (x, y) in enumerate(test_db):
# layer1.
h1 = x @ w1 + b1
h1 = tf.nn.relu(h1)
# layer2
h2 = h1 @ w2 + b2
h2 = tf.nn.relu(h2)
# output
out = h2 @ w3 + b3
# [b, 10] => [b]
pred = tf.argmax(out, axis=1)
# convert one_hot y to number y
y = tf.argmax(y, axis=1)
# bool type
correct = tf.equal(pred, y)
# bool tensor => int tensor => numpy
total_correct += tf.reduce_sum(tf.cast(correct, dtype=tf.int32)).numpy()
total += x.shape[0]
print(step, 'Evaluate Acc:', total_correct/total)
4. 神經(jīng)網(wǎng)絡(luò)與全連接
4.1 數(shù)據(jù)加載
keras.datasets
(x, y), (x_test, y_test) = keras.datasets.mnist.load_data() => numpy數(shù)組
y_onehot = tf.one_hot(y, depth=10)
(x, y), (x_test, y_test) = keras.datasets.cifar10.load_data()
tf.data.Dataset.from_tensor_slices
db=tf.data.Dataset.from_tensor_slices((x, y)).batch(16).repeat(2) # 相當于數(shù)據(jù)翻了倍
itr = iter(db)
for i in range(10):
print(next(itr)[0][15][16,16,0]) # batch中最后一張圖中的一個像素
db=db.shuffle(10000)
4.2 全連接層
net=tf.keras.layers.Dense(units)
net.build(input_shape=(None, 784))根據(jù)輸入shape創(chuàng)建net的所有變量w,b
net(x)#x是真正的輸入model = keras.Sequetial([keras,layers.Dense(2, activation='relu'), [keras,layers.Dense(4, activation='relu') ])model.summary()打印網(wǎng)絡(luò)信息
4.3 輸出方式
-
tf.sigmoid保證輸出在[0,1]中 -
prob=tf.nn.softmax(logits)保證所有輸出之和=1,logits一般指沒有激活函數(shù)的最后一層的輸出 -
tf.tanh輸出在[-1, 1]之間
4.4 誤差計算
- MSE
tf.reduce_mean(tf.losses.MSE(y, out)) - 交叉熵
-log(q_i)
tf.losses.categorical_crossentropy(y, logits, from_logits=True)大多數(shù)情況下,使用from_logits參數(shù),從而不用手動添加softmax
tf.losses.binary_crossentropy(x, y)
5. 梯度下降、損失函數(shù)
導(dǎo)數(shù) => 偏微分 某個坐標方向的導(dǎo)數(shù)=> 梯度所有坐標方向?qū)?shù)的集合
5.1 自動求梯度
with tf.GradientTape() as tape:
loss= ...
[w_grad] = tape.gradiet(loss, [w]) # w是指定要求梯度的參數(shù)
with tf.GradientTape(persistent=True) as tape: 使得tape.gradient可被多次調(diào)用
求二階導(dǎo)
with tf.GradientTape() as t1:
with tf.GradientTape() as t2:
y = x * w + b
dy_dw, dy_db = t2.gradient(y, [w, b])
d2y_dw2 = t1.gradient(dy_dw, w)
5.2 反向傳播
單輸出感知機

x=tf.random.normal([1,3])
w=tf.ones([3,1])
b=tf.ones([1])
y = tf.constant([1])
with tf.GradientTape() as tape:
tape.watch([w, b])
logits = tf.sigmoid(x@w+b)
loss = tf.reduce_mean(tf.losses.MSE(y, logits))
grads = tape.gradient(loss, [w, b])
print('w grad:', grads[0])
print('b grad:', grads[1])
多輸出感知機

x=tf.random.normal([1,3])
w=tf.ones([3,2])
b=tf.ones([2])
y = tf.constant([0, 1])
with tf.GradientTape() as tape:
tape.watch([w, b])
logits = tf.sigmoid(x@w+b)
loss = tf.reduce_mean(tf.losses.MSE(y, logits))
grads = tape.gradient(loss, [w, b])
print('w grad:', grads[0])
print('b grad:', grads[1])
5.3 鏈式法則


多層感知機

損失函數(shù)優(yōu)化實戰(zhàn)
假設(shè)損失函數(shù):
import numpy as np
from matplotlib import pyplot as plt
from mpl_toolkits.mplot3d import Axes3D
def loss(x):
return (x[0] ** 2 + x[1] - 11) ** 2 + (x[0] + x[1] ** 2 - 7) ** 2
x = np.arange(-6, 6, 0.1)
y = np.arange(-6, 6, 0.1)
X, Y = np.meshgrid(x, y)
Z = loss([X, Y])
fig = plt.figure('loss')
ax = fig.gca(projection='3d')
ax.plot_surface(X, Y, Z)
ax.view_init(45, -60)
ax.set_xlabel('x')
ax.set_ylabel('y')
plt.show()

import tensorflow as tf
x = tf.constant([0., 0.])
for step in range(200):
with tf.GradientTape() as tape:
tape.watch([x])
y = loss(x)
grads = tape.gradient(y, [x])[0] # y 對 x求導(dǎo)
x -= 0.01*grads
if step % 20 == 0:
print ('step {}: x = {}, f(x) = {}'
.format(step, x.numpy(), y.numpy()))
Fashion MNIST Dense 實戰(zhàn)
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import datasets, layers, optimizers, Sequential, metrics
import os
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'
def preprocess(x, y):
x = tf.cast(x, dtype=tf.float32) / 255.
y = tf.cast(y, dtype=tf.int32)
return x,y
(x, y), (x_test, y_test) = datasets.fashion_mnist.load_data()
batchsz = 128
db = tf.data.Dataset.from_tensor_slices((x,y))
db = db.map(preprocess).shuffle(10000).batch(batchsz)
db_test = tf.data.Dataset.from_tensor_slices((x_test,y_test))
db_test = db_test.map(preprocess).batch(batchsz)
model = Sequential([
layers.Dense(256, activation=tf.nn.relu), # [b, 784] => [b, 256]
layers.Dense(128, activation=tf.nn.relu), # [b, 256] => [b, 128]
layers.Dense(64, activation=tf.nn.relu), # [b, 128] => [b, 64]
layers.Dense(32, activation=tf.nn.relu), # [b, 64] => [b, 32]
layers.Dense(10) # [b, 32] => [b, 10], 330 = 32*10 + 10
])
model.build(input_shape=[None, 28*28])
model.summary()
# w = w - lr*grad
optimizer = optimizers.Adam(lr=1e-3)
def main():
for epoch in range(30):
for step, (x,y) in enumerate(db):
# x: [b, 28, 28] => [b, 784]
# y: [b]
x = tf.reshape(x, [-1, 28*28])
with tf.GradientTape() as tape:
# [b, 784] => [b, 10]
logits = model(x)
y_onehot = tf.one_hot(y, depth=10)
# [b]
loss_mse = tf.reduce_mean(tf.losses.MSE(y_onehot, logits))
loss_ce = tf.losses.categorical_crossentropy(y_onehot, logits, from_logits=True)
loss_ce = tf.reduce_mean(loss_ce)
grads = tape.gradient(loss_ce, model.trainable_variables)
optimizer.apply_gradients(zip(grads, model.trainable_variables))
if step % 100 == 0:
print(epoch, step, 'loss:', float(loss_ce), float(loss_mse))
# test
total_correct = 0
total_num = 0
for x,y in db_test:
# x: [b, 28, 28] => [b, 784]
# y: [b]
x = tf.reshape(x, [-1, 28*28])
# [b, 10]
logits = model(x)
# logits => prob, [b, 10]
prob = tf.nn.softmax(logits, axis=1)
# [b, 10] => [b], int64
pred = tf.argmax(prob, axis=1)
pred = tf.cast(pred, dtype=tf.int32)
# pred:[b]
# y: [b]
# correct: [b], True: equal, False: not equal
correct = tf.equal(pred, y)
correct = tf.reduce_sum(tf.cast(correct, dtype=tf.int32))
total_correct += int(correct)
total_num += x.shape[0]
acc = total_correct / total_num
print(epoch, 'test acc:', acc)
6. Tensorboard 可視化
tensorboad --logdir logs
current_time = datetime.datetime.now().strftime("%Y%m%d-%H%M%S")
log_dir = 'logs/' + current_time
summary_writer = tf.summary.create_file_writer(log_dir)
if step % 100 == 0:
print(step, 'loss:', float(loss))
with summary_writer.as_default():
tf.summary.scalar('train-loss', float(loss), step=step)
...
with summary_writer.as_default():
tf.summary.scalar('test-acc', float(total_correct/total), step=step)
tf.summary.image("val-onebyone-images:", val_images, max_outputs=25, step=step)
val_images = tf.reshape(val_images, [-1, 28, 28])
figure = image_grid(val_images)
tf.summary.image('val-images:', plot_to_image(figure), step=step)

7. Keras 高層接口
datasets
layers
losses
metrics
optimizers
7.1 Metrics
acc_metric =metrics.Accuracy()
acc_metric.update_state(y, pred)
acc_metric.result().numpy() # result() 返回的是tensor
acc_metric.reset_states()
7.2 常規(guī)工作流
compile fit evaluate predict
network.compile(optimizer=..., loss=..., metrics=[...])
network.fit(data, epochs=..., validation_data=...)
network.evaluate(x, y)
network.predict(x)
7.3 自定義網(wǎng)絡(luò)
keras.Sequential是keras.Model的子類
net=Sequential([Layer])
net.build(input_shape=(...))=net(x)
model.trainable_variables
model.call()自定義model的時候需要實現(xiàn)這個方法來實現(xiàn)正向傳播的邏輯,從而支持model(x)的寫法,背后邏輯是調(diào)用了model.__call__(x),然后再調(diào)用model.call()keras.layers.Layerkeras.Model
繼承之后實現(xiàn):__init__、call


7.4 模型保存與加載
-
save/load weights輕量級的
model.save_weights('path')(.ckpt)
model=create_model()=>model.load_weights('path') -
save/load entire model暴力保存
model.save('xx.h5')
model=tf.keras.models.load_model('xx.h5') -
saved_model生產(chǎn)環(huán)境通用格式
tf.saved_model.save(model, 'path')
imported=tf.saved_model.load('path')
CIFAR10 自定義網(wǎng)絡(luò)實戰(zhàn)
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import datasets, layers, optimizers, Sequential, metrics
import os
os.environ['TF_CPP_MIN_LOG_LEVEL']='2'
def preprocess(x, y):
x=tf.cast(x, dtype=tf.float32)/255.
y=tf.squeeze(y) # y的原始shape是(5000,1) 第二維是多余的
y=tf.one_hot(y, depth=10)
y=tf.cast(y, dtype=tf.int32)
return x, y
batch_size=128
(x, y), (x_val, y_val)=datasets.cifar10.load_data()
train_db=tf.data.Dataset.from_tensor_slices((x,y))
train_db=train_db.map(preprocess).shuffle(10000).batch(batch_size)
val_db=tf.data.Dataset.from_tensor_slices((x_val,y_val))
val_db=val_db.map(preprocess).shuffle(10000).batch(batch_size)
class MyDense(layers.Layer):
def __init__(self, in_dim, out_dim):
super(MyDense, self).__init__()
self.kernel=self.add_variable('w', [in_dim, out_dim])
def call(self, inputs, training=None):
x=inputs@self.kernel
return x
class MyNet(keras.Model):
def __init__(self):
super(MyNet, self).__init__()
self.fc1=MyDense(32*32*3, 256)
self.fc2=MyDense(256,128)
self.fc3=MyDense(128,64)
self.fc4=MyDense(64,32)
self.fc5=MyDense(32,10)
def call(self, inputs, training=None):
x=tf.reshape(inputs, [-1, 32*32*3])
x=self.fc1(x)
x=tf.nn.relu(x)
x=self.fc2(x)
x=tf.nn.relu(x)
x=self.fc3(x)
x=tf.nn.relu(x)
x=self.fc4(x)
x=tf.nn.relu(x)
x=self.fc5(x)
return x
model = MyNet()
model.compile(
optimizer=optimizers.Adam(),
loss=tf.losses.CategoricalCrossentropy(from_logits=True),
metrics=['accuracy']
)
model.fit(train_db, epochs=10, validation_data=val_db)
8. 卷積神經(jīng)網(wǎng)絡(luò)
概念就不覆蓋了,只關(guān)注API
keras.layers.Conv2D() 類的實現(xiàn)
padding='same' #輸入輸出的h w相同
tf.nn.conv2d 功能的實現(xiàn)

keras.layers.MaxPool2Dkeras.layers.UpSampling2Dlayers.ReLU
Cifar100 VGG13實戰(zhàn)

import tensorflow as tf
from tensorflow.keras import layers, optimizers, datasets, Sequential
import os
os.environ['TF_CPP_MIN_LOG_LEVEL']='2'
tf.random.set_seed(2019)
conv_layer=[
layers.Conv2D(64, kernel_size=(3,3), padding='same', activation=tf.nn.relu),
layers.Conv2D(64, kernel_size=(3,3), padding='same', activation=tf.nn.relu),
layers.MaxPool2D(pool_size=(2,2), strides=2, padding='same'),
layers.Conv2D(128, kernel_size=(3,3), padding='same', activation=tf.nn.relu),
layers.Conv2D(128, kernel_size=(3,3), padding='same', activation=tf.nn.relu),
layers.MaxPool2D(pool_size=(2,2), strides=2, padding='same'),
layers.Conv2D(256, kernel_size=(3,3), padding='same', activation=tf.nn.relu),
layers.Conv2D(256, kernel_size=(3,3), padding='same', activation=tf.nn.relu),
layers.MaxPool2D(pool_size=(2,2), strides=2, padding='same'),
layers.Conv2D(512, kernel_size=(3,3), padding='same', activation=tf.nn.relu),
layers.Conv2D(512, kernel_size=(3,3), padding='same', activation=tf.nn.relu),
layers.MaxPool2D(pool_size=(2,2), strides=2, padding='same'),
layers.Conv2D(512, kernel_size=(3,3), padding='same', activation=tf.nn.relu),
layers.Conv2D(512, kernel_size=(3,3), padding='same', activation=tf.nn.relu),
layers.MaxPool2D(pool_size=(2,2), strides=2, padding='same'),
layers.Flatten(),
layers.Dense(4096, activation=tf.nn.relu),
layers.Dense(4096, activation=tf.nn.relu),
layers.Dense(100, activation=None)
]
model = Sequential(layers=conv_layer)
model.build(input_shape=[None, 32,32,3])
'''
跑模型的時候先給個隨便什么輸入,看看輸出是不是期望的
'''
x=tf.random.normal([4,32,32,3])
out=model(x)
print(out.shape)
def preprocess(x, y):
x = tf.cast(x, dtype=tf.float32) / 255.
y = tf.cast(y, dtype=tf.int32)
y = tf.squeeze(y, axis=0)
y = tf.cast(tf.one_hot(y, depth=100),dtype=tf.int32)
return x, y
(x, y), (x_test, y_test) = datasets.cifar100.load_data()
print(y.shape)
train_db=tf.data.Dataset.from_tensor_slices((x,y)).shuffle(1000).map(preprocess).batch(128)
test_db=tf.data.Dataset.from_tensor_slices((x_test,y_test)).map(preprocess).batch(128)
optimizer = optimizers.Adam(lr=1e-4)
variables = model.trainable_variables
for epoch in range(50):
for step, (x,y) in enumerate(train_db):
with tf.GradientTape() as tape:
# [b, 32, 32, 3] => [b, 1, 1, 512]
logits = model(x)
# compute loss
loss = tf.losses.categorical_crossentropy(y, logits, from_logits=True)
loss = tf.reduce_mean(loss)
grads = tape.gradient(loss, variables)
optimizer.apply_gradients(zip(grads, variables))
if step %100 == 0:
print(epoch, step, 'loss:', float(loss))
total_num = 0
total_correct = 0
for x,y in test_db:
logits = model(x)
prob = tf.nn.softmax(logits, axis=1)
pred = tf.argmax(prob, axis=1)
pred = tf.cast(pred, dtype=tf.int32)
correct = tf.cast(tf.equal(pred, tf.cast(tf.argmax(y, axis=1), dtype=tf.int32)), dtype=tf.int32)
correct = tf.reduce_sum(correct)
total_num += x.shape[0]
total_correct += int(correct)
acc = total_correct / total_num
print(epoch, 'acc:', acc)

Cifar100 ResNet實戰(zhàn)
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers, Sequential
class BasicBlock(layers.Layer):
def __init__(self, filter_num, stride=1):
super(BasicBlock, self).__init__()
self.conv1 = layers.Conv2D(filter_num, (3, 3), strides=stride, padding='same')
self.bn1 = layers.BatchNormalization()
self.relu = layers.Activation('relu')
self.conv2 = layers.Conv2D(filter_num, (3, 3), strides=1, padding='same')
self.bn2 = layers.BatchNormalization()
if stride != 1:
self.downsample = Sequential()
self.downsample.add(layers.Conv2D(filter_num, (1, 1), strides=stride))
else:
self.downsample = lambda x:x
def call(self, inputs, training=None):
# [b, h, w, c]
out = self.conv1(inputs)
out = self.bn1(out)
out = self.relu(out)
out = self.conv2(out)
out = self.bn2(out)
identity = self.downsample(inputs)
output = layers.add([out, identity])
output = tf.nn.relu(output)
return output
class ResNet(keras.Model):
def __init__(self, layer_dims, num_classes=100): # [2, 2, 2, 2]
super(ResNet, self).__init__()
self.stem = Sequential([layers.Conv2D(64, (3, 3), strides=(1, 1)),
layers.BatchNormalization(),
layers.Activation('relu'),
layers.MaxPool2D(pool_size=(2, 2), strides=(1, 1), padding='same')
])
self.layer1 = self.build_resblock(64, layer_dims[0])
self.layer2 = self.build_resblock(128, layer_dims[1], stride=2)
self.layer3 = self.build_resblock(256, layer_dims[2], stride=2)
self.layer4 = self.build_resblock(512, layer_dims[3], stride=2)
# output: [b, 512, h, w],
self.avgpool = layers.GlobalAveragePooling2D()
self.fc = layers.Dense(num_classes)
def call(self, inputs, training=None):
x = self.stem(inputs)
x = self.layer1(x)
x = self.layer2(x)
x = self.layer3(x)
x = self.layer4(x)
# [b, c]
x = self.avgpool(x)
# [b, 100]
x = self.fc(x)
return x
def build_resblock(self, filter_num, blocks, stride=1):
res_blocks = Sequential()
# may down sample
res_blocks.add(BasicBlock(filter_num, stride))
for _ in range(1, blocks):
res_blocks.add(BasicBlock(filter_num, stride=1))
return res_blocks
def resnet18():
return ResNet([2, 2, 2, 2])
def resnet34():
return ResNet([3, 4, 6, 3])
from tensorflow.keras import optimizers, datasets
import os
os.environ['TF_CPP_MIN_LOG_LEVEL']='2'
tf.random.set_seed(2345)
def preprocess(x, y):
# [-1~1]
x = tf.cast(x, dtype=tf.float32) / 255. - 0.5
y = tf.cast(y, dtype=tf.int32)
return x,y
(x,y), (x_test, y_test) = datasets.cifar100.load_data()
y = tf.squeeze(y, axis=1)
y_test = tf.squeeze(y_test, axis=1)
print(x.shape, y.shape, x_test.shape, y_test.shape)
train_db = tf.data.Dataset.from_tensor_slices((x,y))
train_db = train_db.shuffle(1000).map(preprocess).batch(128)
test_db = tf.data.Dataset.from_tensor_slices((x_test,y_test))
test_db = test_db.map(preprocess).batch(128)
sample = next(iter(train_db))
print('sample:', sample[0].shape, sample[1].shape,
tf.reduce_min(sample[0]), tf.reduce_max(sample[0]))
# [b, 32, 32, 3] => [b, 1, 1, 512]
model = resnet18()
model.build(input_shape=(None, 32, 32, 3))
model.summary()
optimizer = optimizers.Adam(lr=1e-3)
for epoch in range(500):
for step, (x,y) in enumerate(train_db):
with tf.GradientTape() as tape:
# [b, 32, 32, 3] => [b, 100]
logits = model(x)
# [b] => [b, 100]
y_onehot = tf.one_hot(y, depth=100)
# compute loss
loss = tf.losses.categorical_crossentropy(y_onehot, logits, from_logits=True)
loss = tf.reduce_mean(loss)
grads = tape.gradient(loss, model.trainable_variables)
optimizer.apply_gradients(zip(grads, model.trainable_variables))
if step %50 == 0:
print(epoch, step, 'loss:', float(loss))
total_num = 0
total_correct = 0
for x,y in test_db:
logits = model(x)
prob = tf.nn.softmax(logits, axis=1)
pred = tf.argmax(prob, axis=1)
pred = tf.cast(pred, dtype=tf.int32)
correct = tf.cast(tf.equal(pred, y), dtype=tf.int32)
correct = tf.reduce_sum(correct)
total_num += x.shape[0]
total_correct += int(correct)
acc = total_correct / total_num
print(epoch, 'acc:', acc)
9. 循環(huán)神經(jīng)網(wǎng)絡(luò)
- Sequence Embedding:
word=>vector
語義相似的詞匯,vector的距離也應(yīng)該小
layers.Embedding(input_dim, output_dim)input_dim:Size of the vocabulary, (一共能處理的詞匯量)i.e. maximum integer index + 1.output_dim:Dimension of the dense embedding.
Embedding也是可以訓(xùn)練的,語義表達會越來越好 -
layers.SimpleRNNCell(units)units是輸出的維度,需要手動管理h
layers.SimpleRNN()不需要關(guān)心h
h_t = x@w1+h_t-1@w2+b ___ (4,3)是x的w1 (3,3)是h的w2
中間狀態(tài)h在tensorflow中都是一個List
RNN實戰(zhàn) - 情感分類

低層級實現(xiàn)
import os
import tensorflow as tf
import numpy as np
from tensorflow import keras
from tensorflow.keras import layers
tf.random.set_seed(22)
np.random.seed(22)
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'
assert tf.__version__.startswith('2.')
batchsz = 128
# the most frequest words
total_words = 10000
max_review_len = 80
embedding_len = 100
(x_train, y_train), (x_test, y_test) = keras.datasets.imdb.load_data(num_words=total_words)
# x_train:[b, 80]
# x_test: [b, 80]
x_train = keras.preprocessing.sequence.pad_sequences(x_train, maxlen=max_review_len)
x_test = keras.preprocessing.sequence.pad_sequences(x_test, maxlen=max_review_len)
db_train = tf.data.Dataset.from_tensor_slices((x_train, y_train))
db_train = db_train.shuffle(1000).batch(batchsz, drop_remainder=True)
db_test = tf.data.Dataset.from_tensor_slices((x_test, y_test))
db_test = db_test.batch(batchsz, drop_remainder=True)
print('x_train shape:', x_train.shape, tf.reduce_max(y_train), tf.reduce_min(y_train))
print('x_test shape:', x_test.shape)
class MyRNN(keras.Model):
def __init__(self, units):
super(MyRNN, self).__init__()
# [b, 64]
self.state0 = [tf.zeros([batchsz, units])]
self.state1 = [tf.zeros([batchsz, units])]
# transform text to embedding representation
# [b, 80] => [b, 80, 100]
self.embedding = layers.Embedding(total_words, embedding_len,
input_length=max_review_len)
# [b, 80, 100] , h_dim: 64
# RNN: cell1 ,cell2, cell3
# SimpleRNN
self.rnn_cell0 = layers.SimpleRNNCell(units, dropout=0.5)
self.rnn_cell1 = layers.SimpleRNNCell(units, dropout=0.5)
# fc, [b, 80, 100] => [b, 64] => [b, 1]
self.outlayer = layers.Dense(1)
def call(self, inputs, training=None):
"""
net(x) net(x, training=True) :train mode
net(x, training=False): test
:param inputs: [b, 80]
:param training:
:return:
"""
# [b, 80]
x = inputs
# embedding: [b, 80] => [b, 80, 100]
x = self.embedding(x)
# rnn cell compute
# [b, 80, 100] => [b, 64]
state0 = self.state0
state1 = self.state1
for word in tf.unstack(x, axis=1): # word: [b, 100]
# h1 = x*wxh+h0*whh
# out0: [b, 64]
out0, state0 = self.rnn_cell0(word, state0, training)
# out1: [b, 64]
out1, state1 = self.rnn_cell1(out0, state1, training)
# out: [b, 64] => [b, 1]
x = self.outlayer(out1)
# p(y is pos|x)
prob = tf.sigmoid(x)
return prob
def main():
units = 64
epochs = 4
model = MyRNN(units)
model.compile(optimizer = keras.optimizers.Adam(0.001),
loss = tf.losses.BinaryCrossentropy(),
metrics=['accuracy'])
model.fit(db_train, epochs=epochs, validation_data=db_test)
model.evaluate(db_test)
if __name__ == '__main__':
main()
Layer實現(xiàn)
import os
import tensorflow as tf
import numpy as np
from tensorflow import keras
from tensorflow.keras import layers
tf.random.set_seed(22)
np.random.seed(22)
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'
assert tf.__version__.startswith('2.')
batchsz = 128
# the most frequest words
total_words = 10000
max_review_len = 80
embedding_len = 100
(x_train, y_train), (x_test, y_test) = keras.datasets.imdb.load_data(num_words=total_words)
# x_train:[b, 80]
# x_test: [b, 80]
x_train = keras.preprocessing.sequence.pad_sequences(x_train, maxlen=max_review_len)
x_test = keras.preprocessing.sequence.pad_sequences(x_test, maxlen=max_review_len)
db_train = tf.data.Dataset.from_tensor_slices((x_train, y_train))
db_train = db_train.shuffle(1000).batch(batchsz, drop_remainder=True)
db_test = tf.data.Dataset.from_tensor_slices((x_test, y_test))
db_test = db_test.batch(batchsz, drop_remainder=True)
print('x_train shape:', x_train.shape, tf.reduce_max(y_train), tf.reduce_min(y_train))
print('x_test shape:', x_test.shape)
class MyRNN(keras.Model):
def __init__(self, units):
super(MyRNN, self).__init__()
# transform text to embedding representation
# [b, 80] => [b, 80, 100]
self.embedding = layers.Embedding(total_words, embedding_len,
input_length=max_review_len)
# [b, 80, 100] , h_dim: 64
self.rnn = keras.Sequential([
layers.SimpleRNN(units, dropout=0.5, return_sequences=True, unroll=True),
layers.SimpleRNN(units, dropout=0.5, unroll=True)
])
# fc, [b, 80, 100] => [b, 64] => [b, 1]
self.outlayer = layers.Dense(1)
def call(self, inputs, training=None):
"""
net(x) net(x, training=True) :train mode
net(x, training=False): test
:param inputs: [b, 80]
:param training:
:return:
"""
# [b, 80]
x = inputs
# embedding: [b, 80] => [b, 80, 100]
x = self.embedding(x)
# rnn cell compute
# x: [b, 80, 100] => [b, 64]
x = self.rnn(x)
# out: [b, 64] => [b, 1]
x = self.outlayer(x)
# p(y is pos|x)
prob = tf.sigmoid(x)
return prob
def main():
units = 64
epochs = 4
model = MyRNN(units)
model.compile(optimizer = keras.optimizers.Adam(0.001),
loss = tf.losses.BinaryCrossentropy(),
metrics=['accuracy'])
model.fit(db_train, epochs=epochs, validation_data=db_test)
model.evaluate(db_test)
if __name__ == '__main__':
main()
梯度裁剪
grads=[tf.clip_by_norm(g, 15) for g in grads] #15是經(jīng)驗值,一般梯度小于10是比較好的
LSTM
參數(shù)unroll:性能優(yōu)化
GRU
10. AutoEncoder
非監(jiān)督學(xué)習(xí)
- 降維、壓縮、預(yù)處理、可視化
-
Denoising AutoEncoder
找到真實的語義 -
Dropout AutoEncoder
0.2比較好 -
Adversarial AutoEncoder
使得Z除了能完成重建,而且能盡可能滿足預(yù)設(shè)的分布,比如正態(tài)分布 -
Variational AutoEncoder
第一項就是MSE,第二項KL散度,也就是z的分布與z|x的分布越接近越好


- 效果沒法和GAN比,但是訓(xùn)練要比GAN穩(wěn)定很多
VAE實戰(zhàn) (MNIST)
import os
import tensorflow as tf
import numpy as np
from tensorflow import keras
from tensorflow.keras import Sequential, layers
from PIL import Image
from matplotlib import pyplot as plt
tf.random.set_seed(22)
np.random.seed(22)
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'
assert tf.__version__.startswith('2.')
def save_images(imgs, name):
new_im = Image.new('L', (280, 280))
index = 0
for i in range(0, 280, 28):
for j in range(0, 280, 28):
im = imgs[index]
im = Image.fromarray(im, mode='L')
new_im.paste(im, (i, j))
index += 1
new_im.save(name)
h_dim = 20
batchsz = 512
lr = 1e-3
(x_train, y_train), (x_test, y_test) = keras.datasets.fashion_mnist.load_data()
x_train, x_test = x_train.astype(np.float32) / 255., x_test.astype(np.float32) / 255.
# we do not need label
train_db = tf.data.Dataset.from_tensor_slices(x_train)
train_db = train_db.shuffle(batchsz * 5).batch(batchsz)
test_db = tf.data.Dataset.from_tensor_slices(x_test)
test_db = test_db.batch(batchsz)
print(x_train.shape, y_train.shape)
print(x_test.shape, y_test.shape)
z_dim = 10
class VAE(keras.Model):
def __init__(self):
super(VAE, self).__init__()
# Encoder
self.fc1 = layers.Dense(128)
self.fc2 = layers.Dense(z_dim) # get mean prediction
self.fc3 = layers.Dense(z_dim)
# Decoder
self.fc4 = layers.Dense(128)
self.fc5 = layers.Dense(784)
def encoder(self, x):
h = tf.nn.relu(self.fc1(x))
# get mean
mu = self.fc2(h)
# get variance
log_var = self.fc3(h)
return mu, log_var
def decoder(self, z):
out = tf.nn.relu(self.fc4(z))
out = self.fc5(out)
return out
def reparameterize(self, mu, log_var):
eps = tf.random.normal(log_var.shape)
std = tf.exp(log_var*0.5)
z = mu + std * eps
return z
def call(self, inputs, training=None):
# [b, 784] => [b, z_dim], [b, z_dim]
mu, log_var = self.encoder(inputs)
# reparameterization trick
z = self.reparameterize(mu, log_var)
x_hat = self.decoder(z)
return x_hat, mu, log_var
model = VAE()
model.build(input_shape=(4, 784))
optimizer = tf.optimizers.Adam(lr)
for epoch in range(1000):
for step, x in enumerate(train_db):
x = tf.reshape(x, [-1, 784])
with tf.GradientTape() as tape:
x_rec_logits, mu, log_var = model(x)
rec_loss = tf.nn.sigmoid_cross_entropy_with_logits(labels=x, logits=x_rec_logits)
rec_loss = tf.reduce_sum(rec_loss) / x.shape[0]
# compute kl divergence (mu, var) ~ N (0, 1)
# https://stats.stackexchange.com/questions/7440/kl-divergence-between-two-univariate-gaussians
kl_div = -0.5 * (log_var + 1 - mu**2 - tf.exp(log_var))
kl_div = tf.reduce_sum(kl_div) / x.shape[0]
loss = rec_loss + 1. * kl_div
grads = tape.gradient(loss, model.trainable_variables)
optimizer.apply_gradients(zip(grads, model.trainable_variables))
if step % 100 == 0:
print(epoch, step, 'kl div:', float(kl_div), 'rec loss:', float(rec_loss))
# evaluation
z = tf.random.normal((batchsz, z_dim))
logits = model.decoder(z)
x_hat = tf.sigmoid(logits)
x_hat = tf.reshape(x_hat, [-1, 28, 28]).numpy() *255.
x_hat = x_hat.astype(np.uint8)
save_images(x_hat, 'vae_images/sampled_epoch%d.png'%epoch)
x = next(iter(test_db))
x = tf.reshape(x, [-1, 784])
x_hat_logits, _, _ = model(x)
x_hat = tf.sigmoid(x_hat_logits)
x_hat = tf.reshape(x_hat, [-1, 28, 28]).numpy() *255.
x_hat = x_hat.astype(np.uint8)
save_images(x_hat, 'vae_images/rec_epoch%d.png'%epoch)
11. GAN
本質(zhì)
- 逼近兩個分布,不管用KL散度還是JS散度
- 容易出現(xiàn)訓(xùn)練不起來的原因是:兩個分布如果相差太遠,散度的梯度始終是0,梯度不起作用
- WGAN-GP
WGAN實戰(zhàn)
class Generator(keras.Model):
def __init__(self):
super(Generator, self).__init__()
# z:[b, 100] => [b, 3*3*512] => [b, 3,3,512] =>[b,64,64,3]
self.fc= layers.Dense(3*3*512)
self.conv1=layers.Conv2DTranspose(256, 3, 3)
self.bn1 = BatchNormalization()
self.conv2=layers.Conv2DTranspose(128, 5, 2)
self.bn2 = BatchNormalization()
self.conv3=layers.Conv2DTranspose(3, 4, 3)
def call(self, inputs, training=None):
x = self.fc(inputs)
x = tf.reshape(x, [-1, 3, 3, 512])
x = tf.nn.leaky_relu(x)
x = tf.nn.leaky_relu(self.bn1(self.conv1(x), training=training))
x = tf.nn.leaky_relu(self.bn2(self.conv2(x), training=training))
x = self.conv3(x)
x=tf.tanh(x) # [-1, 1] Discriminator的輸入是[-1,1]
return x
class Discriminator(keras.Model):
def __init__(self):
super(Discriminator, self).__init__()
# [b, 64, 64, 3] => [b,1]
self.conv1 = layers.Conv2D(filters=64, kernel_size=5, strides=3, padding='valid')
self.conv2 = layers.Conv2D(128, 5, 3)
self.bn2 = BatchNormalization()
self.conv3 = layers.Conv2D(256, 5, 3)
self.bn3 = BatchNormalization()
# [b, h, w, 3] => [b, -1]
self.flatten = layers.Flatten()
self.fc = layers.Dense(1)
def call(self, inputs, training=None):
x = tf.nn.leaky_relu(self.conv1(inputs))
x = tf.nn.leaky_relu(self.bn2(self.conv2(x), training=training))
x = tf.nn.leaky_relu(self.bn3(self.conv3(x), training=training))
x= self.flatten(x)
logits = self.fc(x)
return logits
def celoss_ones(logits):
# [b, 1]
# [b] = [1, 1, 1, 1,]
loss = tf.nn.sigmoid_cross_entropy_with_logits(logits=logits,
labels=tf.ones_like(logits))
return tf.reduce_mean(loss)
def celoss_zeros(logits):
# [b, 1]
# [b] = [1, 1, 1, 1,]
loss = tf.nn.sigmoid_cross_entropy_with_logits(logits=logits,
labels=tf.zeros_like(logits))
return tf.reduce_mean(loss)
def gradient_penalty(discriminator, batch_x, fake_image):
batchsz = batch_x.shape[0]
#[b, h, w, c]
t=tf.random.uniform([batchsz, 1, 1, 1])
t = tf.broadcast_to(t, batch_x.shape)
interplate = t*batch_x+(1-t)*fake_image
with tf.GradientTape() as tape:
tape.watch([interplate])
d_interplote_logits = discriminator(interplate)
grads = tape.gradient(d_interplote_logits, interplate)
# grads:[b, h, w, c] => [b, -1]
grads = tf.reshape(grads, [grads.shape[0], -1])
gp = tf.norm(grads, axis=1) #[b]
gp = tf.reduce_mean( (gp-1)**2 )
return gp
def d_loss_fn(generator, discriminator, batch_z, batch_x, is_training):
# 1. treat real image as real
# 2. treat generated image as fake
fake_image = generator(batch_z, is_training)
d_fake_logits = discriminator(fake_image, is_training)
d_real_logits = discriminator(batch_x, is_training)
d_loss_real = celoss_ones(d_real_logits)
d_loss_fake = celoss_zeros(d_fake_logits)
gp = gradient_penalty(discriminator, batch_x, fake_image)
loss = d_loss_fake + d_loss_real + 5. * gp
return loss, gp
def g_loss_fn(generator, discriminator, batch_z, is_training):
fake_image = generator(batch_z, is_training)
d_fake_logits = discriminator(fake_image, is_training)
loss = celoss_ones(d_fake_logits)
return loss
z_dim = 100
epochs = 3000000
batch_size = 512
learning_rate = 0.002
is_training = True
generator = Generator()
generator.build(input_shape = (None, z_dim))
discriminator = Discriminator()
discriminator.build(input_shape=(None, 64, 64, 3))
g_optimizer = tf.optimizers.Adam(learning_rate=learning_rate, beta_1=0.5)
d_optimizer = tf.optimizers.Adam(learning_rate=learning_rate, beta_1=0.5)
for epoch in range(epochs):
batch_z = tf.random.uniform([batch_size, z_dim], minval=-1., maxval=1.)
batch_x = next(db_iter)
# train D
with tf.GradientTape() as tape:
d_loss, gp = d_loss_fn(generator, discriminator, batch_z, batch_x, is_training)
grads = tape.gradient(d_loss, discriminator.trainable_variables)
d_optimizer.apply_gradients(zip(grads, discriminator.trainable_variables))
with tf.GradientTape() as tape:
g_loss = g_loss_fn(generator, discriminator, batch_z, is_training)
grads = tape.gradient(g_loss, generator.trainable_variables)
g_optimizer.apply_gradients(zip(grads, generator.trainable_variables))
if epoch % 100 == 0:
print(epoch, 'd-loss:',float(d_loss), 'g-loss:', float(g_loss),
'gp:', float(gp))
z = tf.random.uniform([100, z_dim])
fake_image = generator(z, training=False)
img_path = os.path.join('images', 'wgan-%d.png'%epoch)
save_result(fake_image.numpy(), 10, img_path, color_mode='P')










