一篇文章就夠了 TensorFlow 2.0 實戰(zhàn) (持續(xù)更新)

開頭:忘了tensorflow 1吧,都過去了
pip install tensorflow==2.0.0-alpha
生態(tài)系統(tǒng)
  • TensorFlow 2.0
    @tf.function轉(zhuǎn)換成計算圖
    tf.function解讀

  • TensorFlow Lite

  • TensorFlow.JS

  • TensorFlow Extended

  • TensorFlow Prob

  • TPU Cloud

1. 數(shù)據(jù)類型

數(shù)據(jù)載體

list支持不同的數(shù)據(jù)類型,效率低
np.array相同類型的載體,效率高,但是不支持GPU,不支持自動求導(dǎo)
tf.Tensortensorflow中存儲大量連續(xù)數(shù)據(jù)的載體

基本數(shù)據(jù)類型

tf.int32: tf.constant(1)
tf.float32: tf.constant(1.)
tf.float64: tf.constant(1., dtype=tf.double)
tf.bool: tf.constant([True, False])
tf.string: tf.constant('hello')

數(shù)據(jù)基本屬性
with tf.device("cpu"):
    a=tf.range(4)
a.device # '/job:localhost/replica:0/task:0/device:CPU:0'
aa=a.gpu() 
a.numpy() # array([0, 1, 2, 3], dtype=int32)
a.ndim # 1  (0的話就是標量)
a.shape # TensorShape([4])
a.name # AttributeError: Tensor.name is meaningless when eager execution is enabled. 
tf.rank(tf.ones([3,4,2])) # <tf.Tensor: id=466672, shape=(), dtype=int32, numpy=3>
tf.is_tensor(a) # True
a.dtype # tf.int32

  • rank和ndim的區(qū)別在于返回的類型不同
  • name屬性在tensorflow2沒有意義,因為變量名本身就是name
數(shù)據(jù)類型轉(zhuǎn)換
a=np.arange(5)
a.dtype # dtype('int64')
aa=tf.convert_to_tensor(a) # <tf.Tensor: id=466678, shape=(5,), dtype=int64, numpy=array([0, 1, 2, 3, 4])>
aa=tf.convert_to_tensor(a, dtype=tf.int32) # <tf.Tensor: id=466683, shape=(5,), dtype=int32, numpy=array([0, 1, 2, 3, 4], dtype=int32)>
tf.cast(aa, tf.float32)

b=tf.constant([0,1])
tf.cast(b, tf.bool) # <tf.Tensor: id=466697, shape=(2,), dtype=bool, numpy=array([False,  True])>

a.tf.ones([])
a.numpy()
int(a) #標量可以直接這樣類型轉(zhuǎn)換
float(a)

可訓(xùn)練數(shù)據(jù)類型
a=tf.range(5)
b=tf.Variable(a)
b.dtype # tf.int32
b.name # 'Variable:0' 其實沒啥用
b.trainable #True

2. 創(chuàng)建Tensor

tf.convert_to_tensor(data)
tf.zeros(shape)
tf.ones(1)生成一個一維tensor,包含一個1
tf.ones([])生成一個標量1
tf.ones([2])生成一個一維tensor,包含兩個1
tf.ones_like(a)相當于tf.ones(a.shape)
tf.fill([3,4], 9) 全部填充9
tf.random.normal([3,4], mean=1, stddev=1)
tf.random.truncated_normal([3,4], mean=0, stddev=1) 帶截斷的正態(tài)分布,(大于某個值重新采樣),比如在經(jīng)過sigmoid激活后,如果用不帶截斷的,容易出現(xiàn)梯度消失問題。
tf.random.uniform([3,4], minval=0, maxval=100, dtype=tf.int32) 平均分布

idx=tf.range(5)
idx=tf.random.shuffle(idx)
a=tf.random.normal([10,784])
b=tf.random.uniform([10])
a=tf.gather(a, idx) # a中隨機取5行
b=tf.gather(b, idx) # b中隨機取5個
  • 三維tensor舉例


    自然語言處理,b個句子,每個句子有5個單詞,每個單詞由5維向量表示
  • 四維tensor:圖像[b, h, w, c]
  • 五維tensor:meta-learning [task_b, b, h, w, c] (多任務(wù))

以下是自由活動時間

out=tf.random.uniform([4,10]) # 模擬4張圖片的輸出,每個輸出對應(yīng)10個分類
y=tf.range(4)
y=tf.one_hot(y, depth=10) # 模擬4張圖片的真實分類
loss=tf.keras.losses.mse(y, out) 
loss=tf.reduce_mean(loss) # 計算loss

一個簡單的x@w+b

from tensorflow.keras import layers
net=layers.Dense(10)
net.build((4,8))  # 4 是batch_size, 前一層有8個units
net.kernel #w  shape=(8, 10)
net.bias #b  shape=(10, )

記?。篧的維度是[input_dim, output_dim], b的維度是[output_dim, ]

自由活動結(jié)束

3. Tensor操作

3.1 索引

基本:a[idx][idx][idx]
numpy風(fēng)格:a[idx,idx,idx]可讀性更強

3.2 切片

與numpy基本一致
a[start:end:positive_step]
a[end:start:negative_step]
a[0, 1, ..., 0] 代表任意多個: 只要能推斷出有多少個:就是合法的

selective indexing
  • tf.gather
    場景:對[4, 28, 28, 3]Tensor的第[3, 27, 9 ,13]行(也就是第一個28)順序采樣
    使用tf.gather(a, axis=1, indices=[3,27,9,13])

  • tf.gather_nd W3Cschool解釋
    場景:對[4, 28, 28, 3]Tensor第二維的[3, 27]和第三維的[20,8]進行采樣
    使用tf.gather_nd(a, indices=[[:,3,20,:],[:,3,8,:],[:,27,20,:],[:,27,8,:]])

    更多實例

更多實例
  • tf.boolean_mask
    tf.boolean_mask(a, mask=[True, True, False], axis=3) 相當于只取RG兩個通道的數(shù)據(jù), a的shape是[4, 28, 28, 3]。mask可以是一個list,作用有點像tf.gather_nd

3.3 維度變換

  • a.shape, a.ndim
  • tf.transpose 比如交換圖像的行列,也就是旋轉(zhuǎn)90°
a=tf.random.normal([4, 3, 2, 1])
tf.transpose(a, perm=[0, 1, 3, 2])相當于交換最后兩維
  • tf.reshape
a=tf.random.normal([4, 28, 28, 3])
tf.reshape(a, [4, 784, 3])
tf.reshape(a, [4, -1, 3]) #效果和上面一樣
tf.reshape(a, [4, -1]) 
  • tf.expand_dims增加維度(dim和axis含義類似)
a=tf.random.normal([4, 35, 8])
tf.expand_dims(a, axis=3)  # 增加的維度是第4(3+1)維 shape是[4, 35, 8, 1]
  • tf.squeeze維度壓縮,默認去掉所有長度是1的維度,也可以通過axis指定某一個維度

3.4 Broadcasting

  • Tensor運算的時候首先右對齊,插入維度,并將長度是1的維度擴張成相應(yīng)的長度


    圖示
  • 場景:一般情況下,高維度比低維度的概念更高層,如[班級,學(xué)生,成績],利用broadcasting把小維度推廣到大維度。
  • 作用:簡潔、節(jié)省內(nèi)存

tf.broadcast_to(a, [2,3,4])

3.5 合并與分割

  • tf.concat([a, b], axis=0) 在原來的維度上累加,要求其他維度的長度都相等。比如[4,35,8] concat [2,35,8] => [6,35,8]
  • tf.stack([a, b], axis=0) 在0維度處創(chuàng)建一個維度,長度為2 (因為這里只有a,b兩個),要求所有維度的長度都相等
  • res=tf.unstack(c, axis=3) c的第3維上打散成多個張量,數(shù)量是這個維度的長度
  • tf.split(c, axis=3, num_or_size_splits=[2,3,2]unstack更靈活

3.6 數(shù)據(jù)統(tǒng)計

tf.norm(a)a的范數(shù),默認是二范數(shù)
tf.norm(a, ord=1, axis=1) 第一維看成一個整體,求一范數(shù)
tf.reduce_min reduce是為了提醒我們這些操作會降維
tf.reduce_max
tf.reduce_mean
tf.argmax(a) 默認返回axis=0上最大值的下標
tf.argmin(a)
tf.equak(a,b) 逐元素比較
tf.reduce_sum(tf.cast(tf.equal(a,b), dtype=tf.int32) 相當于統(tǒng)計相同元素的個數(shù)
tf.unique(a)返回一個數(shù)組和一個idx數(shù)組(用于反向生成a)

3.7 排序

tf.sort(a, direction='DESCENDING' 對最后一個維度進行排序
tf.argsort(a) 得到升序排列后元素在原數(shù)組中的下標
tf.gather(a, tf.argsort(a))
res=tf.math.top_k(a,2) res.indices res.value 用于topK accuracy

3.8 填充與復(fù)制

  • tf.pad(a, [[1,1],[1,1], ...]) 每一維上前后填充的數(shù)量
a=tf.random.normal([4,28,28,3])
b=tf.pad(a, [[0, 0], [2, 2], [2, 2], [0, 0]]) # 圖片四周各填充兩個像素
  • tf.tile(a, [ ])后面的參數(shù)指定每個維度復(fù)制的次數(shù),1表示保持不變,2表示復(fù)制一次

3.9 張量限幅

  • tf.maximum(a, 2) 每個元素都會大于2, 簡單的relu實現(xiàn)就用這個

  • tf.minimum(a, 8)

  • tf.clip_by_value(a, 2, 8)

  • new_grads, total_norm = tf.clip_by_globel_norm(grads, 15) 等比例放縮,不改變數(shù)據(jù)的分布,不影響梯度方向,可用于梯度消失,梯度爆炸

3.10 其他高級操作

  • indices=tf.where(a>0) 返回所有為True的坐標,配合tf.gather_nd(a, indices)使用

  • tf.where(cond, A, B) 根據(jù)cond,從A,B中挑選元素

  • tf.scatter_nd(indices, updates, shape)

    根據(jù)indics,把updates中的元素填充到shape大小的全零tensor中

  • points_x, points_y = tf.meshgrid(x, y)
    points=tf.stack([points_x, points_y], axis=2

低層級方法實戰(zhàn)MNIST
import  tensorflow as tf
from    tensorflow import keras
from    tensorflow.keras import datasets, layers, optimizers
import  os

os.environ['TF_CPP_MIN_LOG_LEVEL']='2'

def preprocess(x, y):
    # [b, 28, 28], [b]
    x = tf.cast(x, dtype=tf.float32) / 255.
    x = tf.reshape(x, [-1, 28*28])
    y = tf.cast(y, dtype=tf.int32)
    y = tf.one_hot(y, depth=10)
    return x,y


(x, y), (x_test, y_test) = datasets.mnist.load_data()
print('x:', x.shape, 'y:', y.shape, 'x test:', x_test.shape, 'y test:', y_test)
train_db = tf.data.Dataset.from_tensor_slices((x, y))
train_db = train_db.shuffle(60000).batch(128).map(preprocess).repeat(30)

test_db = tf.data.Dataset.from_tensor_slices((x_test, y_test))
test_db = test_db.shuffle(10000).batch(128).map(preprocess)
x,y = next(iter(train_db))
print('train sample:', x.shape, y.shape)
# print(x[0], y[0])

def main():

    # learning rate
    lr = 1e-3
    # 784 => 512
    w1, b1 = tf.Variable(tf.random.truncated_normal([784, 512], stddev=0.1)), tf.Variable(tf.zeros([512])) # 梯度只會跟蹤tf.Variable類型的變量
'''
如果不用tf.Variable, 在with tf.GradientTape() as tape: 中需要調(diào)用tape.watch(w),否則不會計算梯度
'''
    # 512 => 256
    w2, b2 = tf.Variable(tf.random.truncated_normal([512, 256], stddev=0.1)), tf.Variable(tf.zeros([256]))
    # 256 => 10
    w3, b3 = tf.Variable(tf.random.truncated_normal([256, 10], stddev=0.1)), tf.Variable(tf.zeros([10])) # stddev在這里解決了梯度爆炸的問題

    for step, (x,y) in enumerate(train_db):

        # [b, 28, 28] => [b, 784]
        x = tf.reshape(x, (-1, 784))

        with tf.GradientTape() as tape:

            # layer1.
            h1 = x @ w1 + b1
            h1 = tf.nn.relu(h1)
            # layer2
            h2 = h1 @ w2 + b2
            h2 = tf.nn.relu(h2)
            # output
            out = h2 @ w3 + b3
            # out = tf.nn.relu(out)

            # compute loss
            # [b, 10] - [b, 10]
            loss = tf.square(y-out)
            # [b, 10] => [b]
            loss = tf.reduce_mean(loss, axis=1)
            # [b] => scalar
            loss = tf.reduce_mean(loss)

        # compute gradient
        grads = tape.gradient(loss, [w1, b1, w2, b2, w3, b3])
        # for g in grads:
        #     print(tf.norm(g))
        # update w' = w - lr*grad
        for p, g in zip([w1, b1, w2, b2, w3, b3], grads):
            p.assign_sub(lr * g) # assign_sub 原地更新, 不會改變變量類型

        if step % 100 == 0:
            print(step, 'loss:', float(loss))

        # evaluate
        if step % 500 == 0:
            total, total_correct = 0., 0

            for step, (x, y) in enumerate(test_db):
                # layer1.
                h1 = x @ w1 + b1
                h1 = tf.nn.relu(h1)
                # layer2
                h2 = h1 @ w2 + b2
                h2 = tf.nn.relu(h2)
                # output
                out = h2 @ w3 + b3
                # [b, 10] => [b]
                pred = tf.argmax(out, axis=1)
                # convert one_hot y to number y
                y = tf.argmax(y, axis=1)
                # bool type
                correct = tf.equal(pred, y)
                # bool tensor => int tensor => numpy
                total_correct += tf.reduce_sum(tf.cast(correct, dtype=tf.int32)).numpy()
                total += x.shape[0]

            print(step, 'Evaluate Acc:', total_correct/total)

4. 神經(jīng)網(wǎng)絡(luò)與全連接

4.1 數(shù)據(jù)加載

  • keras.datasets

(x, y), (x_test, y_test) = keras.datasets.mnist.load_data() => numpy數(shù)組
y_onehot = tf.one_hot(y, depth=10)

(x, y), (x_test, y_test) = keras.datasets.cifar10.load_data()

  • tf.data.Dataset.from_tensor_slices
db=tf.data.Dataset.from_tensor_slices((x, y)).batch(16).repeat(2) # 相當于數(shù)據(jù)翻了倍
itr = iter(db)
for i in range(10):
    print(next(itr)[0][15][16,16,0])  # batch中最后一張圖中的一個像素
  • db=db.shuffle(10000)

4.2 全連接層

  • net=tf.keras.layers.Dense(units)
    net.build(input_shape=(None, 784)) 根據(jù)輸入shape創(chuàng)建net的所有變量 w ,b
    net(x) #x是真正的輸入

  • model = keras.Sequetial([keras,layers.Dense(2, activation='relu'), [keras,layers.Dense(4, activation='relu') ])

  • model.summary() 打印網(wǎng)絡(luò)信息

4.3 輸出方式

  • tf.sigmoid 保證輸出在[0,1]
  • prob=tf.nn.softmax(logits) 保證所有輸出之和=1, logits一般指沒有激活函數(shù)的最后一層的輸出
  • tf.tanh輸出在 [-1, 1]之間

4.4 誤差計算

  • MSE
    tf.reduce_mean(tf.losses.MSE(y, out))
  • 交叉熵 -log(q_i)
    tf.losses.categorical_crossentropy(y, logits, from_logits=True) 大多數(shù)情況下,使用from_logits參數(shù),從而不用手動添加softmax
    tf.losses.binary_crossentropy(x, y)

5. 梯度下降、損失函數(shù)

導(dǎo)數(shù) => 偏微分 某個坐標方向的導(dǎo)數(shù)=> 梯度所有坐標方向?qū)?shù)的集合

5.1 自動求梯度

with tf.GradientTape() as tape:
    loss= ...
    [w_grad] = tape.gradiet(loss, [w])  # w是指定要求梯度的參數(shù)

with tf.GradientTape(persistent=True) as tape: 使得tape.gradient可被多次調(diào)用

求二階導(dǎo)
with tf.GradientTape() as t1:
    with tf.GradientTape() as t2:
        y = x * w + b
    dy_dw, dy_db = t2.gradient(y, [w, b])

d2y_dw2 = t1.gradient(dy_dw, w)

5.2 反向傳播

單輸出感知機
感知機反向傳播求梯度結(jié)果,E是loss
x=tf.random.normal([1,3])
w=tf.ones([3,1])
b=tf.ones([1])
y = tf.constant([1])

with tf.GradientTape() as tape:

    tape.watch([w, b])
    logits = tf.sigmoid(x@w+b) 
    loss = tf.reduce_mean(tf.losses.MSE(y, logits))

grads = tape.gradient(loss, [w, b])
print('w grad:', grads[0])
print('b grad:', grads[1])
多輸出感知機
多輸出感知機梯度傳播
x=tf.random.normal([1,3])
w=tf.ones([3,2])
b=tf.ones([2])
y = tf.constant([0, 1])

with tf.GradientTape() as tape:

    tape.watch([w, b])
    logits = tf.sigmoid(x@w+b) 
    loss = tf.reduce_mean(tf.losses.MSE(y, logits))

grads = tape.gradient(loss, [w, b])
print('w grad:', grads[0])
print('b grad:', grads[1])

5.3 鏈式法則

鏈式法則
多層感知機
多層感知機梯度推導(dǎo)

損失函數(shù)優(yōu)化實戰(zhàn)

假設(shè)損失函數(shù):f(x,y)=(x^{2}+y-11)^{2}+(x+y^{2}-7)^{2}

import  numpy as np
from    matplotlib import pyplot as plt
from    mpl_toolkits.mplot3d import Axes3D

def loss(x):
    return (x[0] ** 2 + x[1] - 11) ** 2 + (x[0] + x[1] ** 2 - 7) ** 2

x = np.arange(-6, 6, 0.1)
y = np.arange(-6, 6, 0.1)
X, Y = np.meshgrid(x, y)
Z = loss([X, Y])

fig = plt.figure('loss')
ax = fig.gca(projection='3d')
ax.plot_surface(X, Y, Z)
ax.view_init(45, -60)
ax.set_xlabel('x')
ax.set_ylabel('y')
plt.show()
loss
import  tensorflow as tf
x = tf.constant([0., 0.])

for step in range(200):

    with tf.GradientTape() as tape:
        tape.watch([x])
        y = loss(x)

    grads = tape.gradient(y, [x])[0]  # y 對 x求導(dǎo)
    x -= 0.01*grads

    if step % 20 == 0:
        print ('step {}: x = {}, f(x) = {}'
               .format(step, x.numpy(), y.numpy()))

Fashion MNIST Dense 實戰(zhàn)

import tensorflow as tf
from    tensorflow import keras
from    tensorflow.keras import datasets, layers, optimizers, Sequential, metrics

import  os

os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'

def preprocess(x, y):

    x = tf.cast(x, dtype=tf.float32) / 255.
    y = tf.cast(y, dtype=tf.int32)
    return x,y


(x, y), (x_test, y_test) = datasets.fashion_mnist.load_data()

batchsz = 128

db = tf.data.Dataset.from_tensor_slices((x,y))
db = db.map(preprocess).shuffle(10000).batch(batchsz)

db_test = tf.data.Dataset.from_tensor_slices((x_test,y_test))
db_test = db_test.map(preprocess).batch(batchsz)

model = Sequential([
    layers.Dense(256, activation=tf.nn.relu), # [b, 784] => [b, 256]
    layers.Dense(128, activation=tf.nn.relu), # [b, 256] => [b, 128]
    layers.Dense(64, activation=tf.nn.relu), # [b, 128] => [b, 64]
    layers.Dense(32, activation=tf.nn.relu), # [b, 64] => [b, 32]
    layers.Dense(10) # [b, 32] => [b, 10], 330 = 32*10 + 10
])
model.build(input_shape=[None, 28*28])
model.summary()
# w = w - lr*grad
optimizer = optimizers.Adam(lr=1e-3)

def main():
    for epoch in range(30):
        for step, (x,y) in enumerate(db):

            # x: [b, 28, 28] => [b, 784]
            # y: [b]
            x = tf.reshape(x, [-1, 28*28])

            with tf.GradientTape() as tape:
                # [b, 784] => [b, 10]
                logits = model(x)
                y_onehot = tf.one_hot(y, depth=10)
                # [b]
                loss_mse = tf.reduce_mean(tf.losses.MSE(y_onehot, logits))
                loss_ce = tf.losses.categorical_crossentropy(y_onehot, logits, from_logits=True)
                loss_ce = tf.reduce_mean(loss_ce)

            grads = tape.gradient(loss_ce, model.trainable_variables)
            optimizer.apply_gradients(zip(grads, model.trainable_variables))


            if step % 100 == 0:
                print(epoch, step, 'loss:', float(loss_ce), float(loss_mse))


        # test
        total_correct = 0
        total_num = 0
        for x,y in db_test:

            # x: [b, 28, 28] => [b, 784]
            # y: [b]
            x = tf.reshape(x, [-1, 28*28])
            # [b, 10]
            logits = model(x)
            # logits => prob, [b, 10]
            prob = tf.nn.softmax(logits, axis=1)
            # [b, 10] => [b], int64
            pred = tf.argmax(prob, axis=1)
            pred = tf.cast(pred, dtype=tf.int32)
            # pred:[b]
            # y: [b]
            # correct: [b], True: equal, False: not equal
            correct = tf.equal(pred, y)
            correct = tf.reduce_sum(tf.cast(correct, dtype=tf.int32))

            total_correct += int(correct)
            total_num += x.shape[0]

        acc = total_correct / total_num
        print(epoch, 'test acc:', acc)

6. Tensorboard 可視化

tensorboad --logdir logs

current_time = datetime.datetime.now().strftime("%Y%m%d-%H%M%S")
log_dir = 'logs/' + current_time
summary_writer = tf.summary.create_file_writer(log_dir) 

    if step % 100 == 0:

        print(step, 'loss:', float(loss))
        with summary_writer.as_default(): 
            tf.summary.scalar('train-loss', float(loss), step=step) 

...
        with summary_writer.as_default():
            tf.summary.scalar('test-acc', float(total_correct/total), step=step)
            tf.summary.image("val-onebyone-images:", val_images, max_outputs=25, step=step)
            
            val_images = tf.reshape(val_images, [-1, 28, 28])
            figure  = image_grid(val_images)
            tf.summary.image('val-images:', plot_to_image(figure), step=step)

tensorboard 示例

7. Keras 高層接口

datasets
layers
losses
metrics
optimizers

7.1 Metrics

acc_metric =metrics.Accuracy()
acc_metric.update_state(y, pred)
acc_metric.result().numpy()   # result() 返回的是tensor
acc_metric.reset_states()

7.2 常規(guī)工作流

compile fit evaluate predict

network.compile(optimizer=..., loss=..., metrics=[...])
network.fit(data, epochs=..., validation_data=...)

network.evaluate(x, y)
network.predict(x)

7.3 自定義網(wǎng)絡(luò)

  • keras.Sequentialkeras.Model的子類
    net=Sequential([Layer])
    net.build(input_shape=(...))=net(x)
    model.trainable_variables
    model.call()自定義model的時候需要實現(xiàn)這個方法來實現(xiàn)正向傳播的邏輯,從而支持model(x)的寫法,背后邏輯是調(diào)用了model.__call__(x),然后再調(diào)用model.call()

  • keras.layers.Layer keras.Model
    繼承之后實現(xiàn): __init__ 、call

自定義層

自定義model

7.4 模型保存與加載

  • save/load weights 輕量級的
    model.save_weights('path') (.ckpt)
    model=create_model()=>model.load_weights('path')
  • save/load entire model 暴力保存
    model.save('xx.h5')
    model=tf.keras.models.load_model('xx.h5')
  • saved_model 生產(chǎn)環(huán)境通用格式
    tf.saved_model.save(model, 'path')
    imported=tf.saved_model.load('path')
CIFAR10 自定義網(wǎng)絡(luò)實戰(zhàn)
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import datasets, layers, optimizers, Sequential, metrics
import os

os.environ['TF_CPP_MIN_LOG_LEVEL']='2'

def preprocess(x, y):
    x=tf.cast(x, dtype=tf.float32)/255.
    y=tf.squeeze(y)  # y的原始shape是(5000,1) 第二維是多余的
    y=tf.one_hot(y, depth=10)
    y=tf.cast(y, dtype=tf.int32)
    return x, y

batch_size=128
(x, y), (x_val, y_val)=datasets.cifar10.load_data()

train_db=tf.data.Dataset.from_tensor_slices((x,y))
train_db=train_db.map(preprocess).shuffle(10000).batch(batch_size)

val_db=tf.data.Dataset.from_tensor_slices((x_val,y_val))
val_db=val_db.map(preprocess).shuffle(10000).batch(batch_size)

class MyDense(layers.Layer):
    def __init__(self, in_dim, out_dim):
        super(MyDense, self).__init__()
        self.kernel=self.add_variable('w', [in_dim, out_dim])
        
    def call(self, inputs, training=None):
        x=inputs@self.kernel
        return x
 
class MyNet(keras.Model):
    def __init__(self):
        super(MyNet, self).__init__()
        self.fc1=MyDense(32*32*3, 256)
        self.fc2=MyDense(256,128)
        self.fc3=MyDense(128,64)
        self.fc4=MyDense(64,32)
        self.fc5=MyDense(32,10)
    
    def call(self, inputs, training=None):
        x=tf.reshape(inputs, [-1, 32*32*3])
        x=self.fc1(x)
        x=tf.nn.relu(x)
        x=self.fc2(x)
        x=tf.nn.relu(x)
        x=self.fc3(x)
        x=tf.nn.relu(x)
        x=self.fc4(x)
        x=tf.nn.relu(x)
        x=self.fc5(x)
        
        return x

model = MyNet()
model.compile(
    optimizer=optimizers.Adam(), 
    loss=tf.losses.CategoricalCrossentropy(from_logits=True),
    metrics=['accuracy']
)
model.fit(train_db, epochs=10, validation_data=val_db)

8. 卷積神經(jīng)網(wǎng)絡(luò)

概念就不覆蓋了,只關(guān)注API

keras.layers.Conv2D() 類的實現(xiàn)
padding='same' #輸入輸出的h w相同

tf.nn.conv2d 功能的實現(xiàn)

卷積網(wǎng)絡(luò)梯度

keras.layers.MaxPool2D
keras.layers.UpSampling2D
layers.ReLU

Cifar100 VGG13實戰(zhàn)
VGG網(wǎng)絡(luò)結(jié)構(gòu)
import tensorflow as tf
from tensorflow.keras import layers, optimizers, datasets, Sequential
import os
os.environ['TF_CPP_MIN_LOG_LEVEL']='2'

tf.random.set_seed(2019)

conv_layer=[
    layers.Conv2D(64, kernel_size=(3,3), padding='same', activation=tf.nn.relu),
    layers.Conv2D(64, kernel_size=(3,3), padding='same', activation=tf.nn.relu),
    layers.MaxPool2D(pool_size=(2,2), strides=2, padding='same'),
    
    layers.Conv2D(128, kernel_size=(3,3), padding='same', activation=tf.nn.relu),
    layers.Conv2D(128, kernel_size=(3,3), padding='same', activation=tf.nn.relu),
    layers.MaxPool2D(pool_size=(2,2), strides=2, padding='same'),
    
    layers.Conv2D(256, kernel_size=(3,3), padding='same', activation=tf.nn.relu),
    layers.Conv2D(256, kernel_size=(3,3), padding='same', activation=tf.nn.relu),
    layers.MaxPool2D(pool_size=(2,2), strides=2, padding='same'),
    
    layers.Conv2D(512, kernel_size=(3,3), padding='same', activation=tf.nn.relu),
    layers.Conv2D(512, kernel_size=(3,3), padding='same', activation=tf.nn.relu),
    layers.MaxPool2D(pool_size=(2,2), strides=2, padding='same'),
    
    layers.Conv2D(512, kernel_size=(3,3), padding='same', activation=tf.nn.relu),
    layers.Conv2D(512, kernel_size=(3,3), padding='same', activation=tf.nn.relu),
    layers.MaxPool2D(pool_size=(2,2), strides=2, padding='same'),
    
    layers.Flatten(),
    
    layers.Dense(4096, activation=tf.nn.relu),
    layers.Dense(4096, activation=tf.nn.relu),
    layers.Dense(100, activation=None)
]

model = Sequential(layers=conv_layer)
model.build(input_shape=[None, 32,32,3])
'''
跑模型的時候先給個隨便什么輸入,看看輸出是不是期望的
'''
x=tf.random.normal([4,32,32,3])
out=model(x)
print(out.shape)

def preprocess(x, y):
    x = tf.cast(x, dtype=tf.float32) / 255.
    y = tf.cast(y, dtype=tf.int32)
    y = tf.squeeze(y, axis=0)
    y = tf.cast(tf.one_hot(y, depth=100),dtype=tf.int32)
    return x, y

(x, y), (x_test, y_test) = datasets.cifar100.load_data()
print(y.shape)

train_db=tf.data.Dataset.from_tensor_slices((x,y)).shuffle(1000).map(preprocess).batch(128)
test_db=tf.data.Dataset.from_tensor_slices((x_test,y_test)).map(preprocess).batch(128)

optimizer = optimizers.Adam(lr=1e-4)
variables = model.trainable_variables
for epoch in range(50):

    for step, (x,y) in enumerate(train_db):

        with tf.GradientTape() as tape:
            # [b, 32, 32, 3] => [b, 1, 1, 512]
            logits = model(x)
            # compute loss
            loss = tf.losses.categorical_crossentropy(y, logits, from_logits=True)
            loss = tf.reduce_mean(loss)

        grads = tape.gradient(loss, variables)
        optimizer.apply_gradients(zip(grads, variables))

        if step %100 == 0:
            print(epoch, step, 'loss:', float(loss))
    total_num = 0
    total_correct = 0
    for x,y in test_db:
        logits = model(x)
        prob = tf.nn.softmax(logits, axis=1)
        pred = tf.argmax(prob, axis=1)
        pred = tf.cast(pred, dtype=tf.int32)

        correct = tf.cast(tf.equal(pred, tf.cast(tf.argmax(y, axis=1), dtype=tf.int32)), dtype=tf.int32)
        correct = tf.reduce_sum(correct)

        total_num += x.shape[0]
        total_correct += int(correct)

    acc = total_correct / total_num
    print(epoch, 'acc:', acc)

殘差網(wǎng)絡(luò)基本結(jié)構(gòu)的實現(xiàn)
Cifar100 ResNet實戰(zhàn)
import  tensorflow as tf
from    tensorflow import keras
from    tensorflow.keras import layers, Sequential

class BasicBlock(layers.Layer):

    def __init__(self, filter_num, stride=1):
        super(BasicBlock, self).__init__()

        self.conv1 = layers.Conv2D(filter_num, (3, 3), strides=stride, padding='same')
        self.bn1 = layers.BatchNormalization()
        self.relu = layers.Activation('relu')

        self.conv2 = layers.Conv2D(filter_num, (3, 3), strides=1, padding='same')
        self.bn2 = layers.BatchNormalization()

        if stride != 1:
            self.downsample = Sequential()
            self.downsample.add(layers.Conv2D(filter_num, (1, 1), strides=stride))
        else:
            self.downsample = lambda x:x

    def call(self, inputs, training=None):

        # [b, h, w, c]
        out = self.conv1(inputs)
        out = self.bn1(out)
        out = self.relu(out)

        out = self.conv2(out)
        out = self.bn2(out)

        identity = self.downsample(inputs)

        output = layers.add([out, identity])
        output = tf.nn.relu(output)

        return output

class ResNet(keras.Model):

    def __init__(self, layer_dims, num_classes=100): # [2, 2, 2, 2]
        super(ResNet, self).__init__()

        self.stem = Sequential([layers.Conv2D(64, (3, 3), strides=(1, 1)),
                                layers.BatchNormalization(),
                                layers.Activation('relu'),
                                layers.MaxPool2D(pool_size=(2, 2), strides=(1, 1), padding='same')
                                ])

        self.layer1 = self.build_resblock(64,  layer_dims[0])
        self.layer2 = self.build_resblock(128, layer_dims[1], stride=2)
        self.layer3 = self.build_resblock(256, layer_dims[2], stride=2)
        self.layer4 = self.build_resblock(512, layer_dims[3], stride=2)

        # output: [b, 512, h, w],
        self.avgpool = layers.GlobalAveragePooling2D()
        self.fc = layers.Dense(num_classes)

    def call(self, inputs, training=None):

        x = self.stem(inputs)

        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)

        # [b, c]
        x = self.avgpool(x)
        # [b, 100]
        x = self.fc(x)

        return x



    def build_resblock(self, filter_num, blocks, stride=1):

        res_blocks = Sequential()
        # may down sample
        res_blocks.add(BasicBlock(filter_num, stride))

        for _ in range(1, blocks):
            res_blocks.add(BasicBlock(filter_num, stride=1))

        return res_blocks


def resnet18():
    return ResNet([2, 2, 2, 2])


def resnet34():
    return ResNet([3, 4, 6, 3])


from    tensorflow.keras import  optimizers, datasets
import  os


os.environ['TF_CPP_MIN_LOG_LEVEL']='2'
tf.random.set_seed(2345)



def preprocess(x, y):
    # [-1~1]
    x = tf.cast(x, dtype=tf.float32) / 255. - 0.5
    y = tf.cast(y, dtype=tf.int32)
    return x,y


(x,y), (x_test, y_test) = datasets.cifar100.load_data()
y = tf.squeeze(y, axis=1)
y_test = tf.squeeze(y_test, axis=1)
print(x.shape, y.shape, x_test.shape, y_test.shape)


train_db = tf.data.Dataset.from_tensor_slices((x,y))
train_db = train_db.shuffle(1000).map(preprocess).batch(128)

test_db = tf.data.Dataset.from_tensor_slices((x_test,y_test))
test_db = test_db.map(preprocess).batch(128)

sample = next(iter(train_db))
print('sample:', sample[0].shape, sample[1].shape,
      tf.reduce_min(sample[0]), tf.reduce_max(sample[0]))



# [b, 32, 32, 3] => [b, 1, 1, 512]
model = resnet18()
model.build(input_shape=(None, 32, 32, 3))
model.summary()
optimizer = optimizers.Adam(lr=1e-3)

for epoch in range(500):

    for step, (x,y) in enumerate(train_db):

        with tf.GradientTape() as tape:
            # [b, 32, 32, 3] => [b, 100]
            logits = model(x)
            # [b] => [b, 100]
            y_onehot = tf.one_hot(y, depth=100)
            # compute loss
            loss = tf.losses.categorical_crossentropy(y_onehot, logits, from_logits=True)
            loss = tf.reduce_mean(loss)

        grads = tape.gradient(loss, model.trainable_variables)
        optimizer.apply_gradients(zip(grads, model.trainable_variables))

        if step %50 == 0:
            print(epoch, step, 'loss:', float(loss))

    total_num = 0
    total_correct = 0
    for x,y in test_db:

        logits = model(x)
        prob = tf.nn.softmax(logits, axis=1)
        pred = tf.argmax(prob, axis=1)
        pred = tf.cast(pred, dtype=tf.int32)

        correct = tf.cast(tf.equal(pred, y), dtype=tf.int32)
        correct = tf.reduce_sum(correct)

        total_num += x.shape[0]
        total_correct += int(correct)

    acc = total_correct / total_num
    print(epoch, 'acc:', acc)

9. 循環(huán)神經(jīng)網(wǎng)絡(luò)

  • Sequence Embedding: word=>vector
    語義相似的詞匯,vector的距離也應(yīng)該小
    layers.Embedding(input_dim, output_dim) input_dim:Size of the vocabulary, (一共能處理的詞匯量)i.e. maximum integer index + 1. output_dim:Dimension of the dense embedding.
    Embedding也是可以訓(xùn)練的,語義表達會越來越好
  • layers.SimpleRNNCell(units) units是輸出的維度,需要手動管理h
    layers.SimpleRNN() 不需要關(guān)心h

    h_t = x@w1+h_t-1@w2+b ___ (4,3)是x的w1 (3,3)是h的w2

    中間狀態(tài)h在tensorflow中都是一個List

RNN實戰(zhàn) - 情感分類

情景:給定一個評語,判斷是好評還是差評
低層級實現(xiàn)
import  os
import  tensorflow as tf
import  numpy as np
from    tensorflow import keras
from    tensorflow.keras import layers

tf.random.set_seed(22)
np.random.seed(22)
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'
assert tf.__version__.startswith('2.')

batchsz = 128

# the most frequest words
total_words = 10000
max_review_len = 80
embedding_len = 100
(x_train, y_train), (x_test, y_test) = keras.datasets.imdb.load_data(num_words=total_words)
# x_train:[b, 80]
# x_test: [b, 80]
x_train = keras.preprocessing.sequence.pad_sequences(x_train, maxlen=max_review_len)
x_test = keras.preprocessing.sequence.pad_sequences(x_test, maxlen=max_review_len)

db_train = tf.data.Dataset.from_tensor_slices((x_train, y_train))
db_train = db_train.shuffle(1000).batch(batchsz, drop_remainder=True)
db_test = tf.data.Dataset.from_tensor_slices((x_test, y_test))
db_test = db_test.batch(batchsz, drop_remainder=True)
print('x_train shape:', x_train.shape, tf.reduce_max(y_train), tf.reduce_min(y_train))
print('x_test shape:', x_test.shape)


class MyRNN(keras.Model):

    def __init__(self, units):
        super(MyRNN, self).__init__()

        # [b, 64]
        self.state0 = [tf.zeros([batchsz, units])]
        self.state1 = [tf.zeros([batchsz, units])]

        # transform text to embedding representation
        # [b, 80] => [b, 80, 100]
        self.embedding = layers.Embedding(total_words, embedding_len,
                                          input_length=max_review_len)

        # [b, 80, 100] , h_dim: 64
        # RNN: cell1 ,cell2, cell3
        # SimpleRNN
        self.rnn_cell0 = layers.SimpleRNNCell(units, dropout=0.5)
        self.rnn_cell1 = layers.SimpleRNNCell(units, dropout=0.5)

        # fc, [b, 80, 100] => [b, 64] => [b, 1]
        self.outlayer = layers.Dense(1)

    def call(self, inputs, training=None):
        """
        net(x) net(x, training=True) :train mode
        net(x, training=False): test
        :param inputs: [b, 80]
        :param training:
        :return:
        """
        # [b, 80]
        x = inputs
        # embedding: [b, 80] => [b, 80, 100]
        x = self.embedding(x)
        # rnn cell compute
        # [b, 80, 100] => [b, 64]
        state0 = self.state0
        state1 = self.state1
        for word in tf.unstack(x, axis=1): # word: [b, 100]
            # h1 = x*wxh+h0*whh
            # out0: [b, 64]
            out0, state0 = self.rnn_cell0(word, state0, training)
            # out1: [b, 64]
            out1, state1 = self.rnn_cell1(out0, state1, training)

        # out: [b, 64] => [b, 1]
        x = self.outlayer(out1)
        # p(y is pos|x)
        prob = tf.sigmoid(x)

        return prob

def main():
    units = 64
    epochs = 4

    model = MyRNN(units)
    model.compile(optimizer = keras.optimizers.Adam(0.001),
                  loss = tf.losses.BinaryCrossentropy(),
                  metrics=['accuracy'])
    model.fit(db_train, epochs=epochs, validation_data=db_test)
    model.evaluate(db_test)

if __name__ == '__main__':
    main()
Layer實現(xiàn)
import  os
import  tensorflow as tf
import  numpy as np
from    tensorflow import keras
from    tensorflow.keras import layers


tf.random.set_seed(22)
np.random.seed(22)
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'
assert tf.__version__.startswith('2.')

batchsz = 128

# the most frequest words
total_words = 10000
max_review_len = 80
embedding_len = 100
(x_train, y_train), (x_test, y_test) = keras.datasets.imdb.load_data(num_words=total_words)
# x_train:[b, 80]
# x_test: [b, 80]
x_train = keras.preprocessing.sequence.pad_sequences(x_train, maxlen=max_review_len)
x_test = keras.preprocessing.sequence.pad_sequences(x_test, maxlen=max_review_len)

db_train = tf.data.Dataset.from_tensor_slices((x_train, y_train))
db_train = db_train.shuffle(1000).batch(batchsz, drop_remainder=True)
db_test = tf.data.Dataset.from_tensor_slices((x_test, y_test))
db_test = db_test.batch(batchsz, drop_remainder=True)
print('x_train shape:', x_train.shape, tf.reduce_max(y_train), tf.reduce_min(y_train))
print('x_test shape:', x_test.shape)

class MyRNN(keras.Model):

    def __init__(self, units):
        super(MyRNN, self).__init__()


        # transform text to embedding representation
        # [b, 80] => [b, 80, 100]
        self.embedding = layers.Embedding(total_words, embedding_len,
                                          input_length=max_review_len)

        # [b, 80, 100] , h_dim: 64
        self.rnn = keras.Sequential([
            layers.SimpleRNN(units, dropout=0.5, return_sequences=True, unroll=True),
            layers.SimpleRNN(units, dropout=0.5, unroll=True)
        ])


        # fc, [b, 80, 100] => [b, 64] => [b, 1]
        self.outlayer = layers.Dense(1)

    def call(self, inputs, training=None):
        """
        net(x) net(x, training=True) :train mode
        net(x, training=False): test
        :param inputs: [b, 80]
        :param training:
        :return:
        """
        # [b, 80]
        x = inputs
        # embedding: [b, 80] => [b, 80, 100]
        x = self.embedding(x)
        # rnn cell compute
        # x: [b, 80, 100] => [b, 64]
        x = self.rnn(x)

        # out: [b, 64] => [b, 1]
        x = self.outlayer(x)
        # p(y is pos|x)
        prob = tf.sigmoid(x)

        return prob

def main():
    units = 64
    epochs = 4

    model = MyRNN(units)
    model.compile(optimizer = keras.optimizers.Adam(0.001),
                  loss = tf.losses.BinaryCrossentropy(),
                  metrics=['accuracy'])
    model.fit(db_train, epochs=epochs, validation_data=db_test)

    model.evaluate(db_test)

if __name__ == '__main__':
    main()
梯度裁剪
grads=[tf.clip_by_norm(g, 15) for g in grads] #15是經(jīng)驗值,一般梯度小于10是比較好的

LSTM

參數(shù)unroll:性能優(yōu)化

GRU

10. AutoEncoder

非監(jiān)督學(xué)習(xí)
  • 降維、壓縮、預(yù)處理、可視化
  • Denoising AutoEncoder


    找到真實的語義
  • Dropout AutoEncoder


    0.2比較好
  • Adversarial AutoEncoder


    使得Z除了能完成重建,而且能盡可能滿足預(yù)設(shè)的分布,比如正態(tài)分布
  • Variational AutoEncoder


    第一項就是MSE,第二項KL散度,也就是z的分布與z|x的分布越接近越好
KL散度示意圖
z滿足mu, sigma的正太分布,encoder部分得到,mu和sigma,通過sample得到一個z。但是這個過程是不可微的,用:z=mu+sigma*epsilon來計算z,backprop的時候不關(guān)心epsilon
  • 效果沒法和GAN比,但是訓(xùn)練要比GAN穩(wěn)定很多
VAE實戰(zhàn) (MNIST)
import  os
import  tensorflow as tf
import  numpy as np
from    tensorflow import keras
from    tensorflow.keras import Sequential, layers
from    PIL import Image
from    matplotlib import pyplot as plt

tf.random.set_seed(22)
np.random.seed(22)
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'
assert tf.__version__.startswith('2.')


def save_images(imgs, name):
    new_im = Image.new('L', (280, 280))

    index = 0
    for i in range(0, 280, 28):
        for j in range(0, 280, 28):
            im = imgs[index]
            im = Image.fromarray(im, mode='L')
            new_im.paste(im, (i, j))
            index += 1

    new_im.save(name)


h_dim = 20
batchsz = 512
lr = 1e-3


(x_train, y_train), (x_test, y_test) = keras.datasets.fashion_mnist.load_data()
x_train, x_test = x_train.astype(np.float32) / 255., x_test.astype(np.float32) / 255.
# we do not need label
train_db = tf.data.Dataset.from_tensor_slices(x_train)
train_db = train_db.shuffle(batchsz * 5).batch(batchsz)
test_db = tf.data.Dataset.from_tensor_slices(x_test)
test_db = test_db.batch(batchsz)

print(x_train.shape, y_train.shape)
print(x_test.shape, y_test.shape)

z_dim = 10

class VAE(keras.Model):

    def __init__(self):
        super(VAE, self).__init__()

        # Encoder
        self.fc1 = layers.Dense(128)
        self.fc2 = layers.Dense(z_dim) # get mean prediction
        self.fc3 = layers.Dense(z_dim)

        # Decoder
        self.fc4 = layers.Dense(128)
        self.fc5 = layers.Dense(784)

    def encoder(self, x):

        h = tf.nn.relu(self.fc1(x))
        # get mean
        mu = self.fc2(h)
        # get variance
        log_var = self.fc3(h)

        return mu, log_var

    def decoder(self, z):

        out = tf.nn.relu(self.fc4(z))
        out = self.fc5(out)

        return out

    def reparameterize(self, mu, log_var):

        eps = tf.random.normal(log_var.shape)

        std = tf.exp(log_var*0.5)

        z = mu + std * eps
        return z

    def call(self, inputs, training=None):

        # [b, 784] => [b, z_dim], [b, z_dim]
        mu, log_var = self.encoder(inputs)
        # reparameterization trick
        z = self.reparameterize(mu, log_var)

        x_hat = self.decoder(z)

        return x_hat, mu, log_var


model = VAE()
model.build(input_shape=(4, 784))
optimizer = tf.optimizers.Adam(lr)

for epoch in range(1000):

    for step, x in enumerate(train_db):

        x = tf.reshape(x, [-1, 784])

        with tf.GradientTape() as tape:
            x_rec_logits, mu, log_var = model(x)

            rec_loss = tf.nn.sigmoid_cross_entropy_with_logits(labels=x, logits=x_rec_logits)
            rec_loss = tf.reduce_sum(rec_loss) / x.shape[0]

            # compute kl divergence (mu, var) ~ N (0, 1)
            # https://stats.stackexchange.com/questions/7440/kl-divergence-between-two-univariate-gaussians
            kl_div = -0.5 * (log_var + 1 - mu**2 - tf.exp(log_var))
            kl_div = tf.reduce_sum(kl_div) / x.shape[0]

            loss = rec_loss + 1. * kl_div

        grads = tape.gradient(loss, model.trainable_variables)
        optimizer.apply_gradients(zip(grads, model.trainable_variables))


        if step % 100 == 0:
            print(epoch, step, 'kl div:', float(kl_div), 'rec loss:', float(rec_loss))


    # evaluation
    z = tf.random.normal((batchsz, z_dim))
    logits = model.decoder(z)
    x_hat = tf.sigmoid(logits)
    x_hat = tf.reshape(x_hat, [-1, 28, 28]).numpy() *255.
    x_hat = x_hat.astype(np.uint8)
    save_images(x_hat, 'vae_images/sampled_epoch%d.png'%epoch)

    x = next(iter(test_db))
    x = tf.reshape(x, [-1, 784])
    x_hat_logits, _, _ = model(x)
    x_hat = tf.sigmoid(x_hat_logits)
    x_hat = tf.reshape(x_hat, [-1, 28, 28]).numpy() *255.
    x_hat = x_hat.astype(np.uint8)
    save_images(x_hat, 'vae_images/rec_epoch%d.png'%epoch)

11. GAN

本質(zhì)

  • 逼近兩個分布,不管用KL散度還是JS散度
  • 容易出現(xiàn)訓(xùn)練不起來的原因是:兩個分布如果相差太遠,散度的梯度始終是0,梯度不起作用
  • WGAN-GP
WGAN實戰(zhàn)
class Generator(keras.Model):
    def __init__(self):
        super(Generator, self).__init__()
        # z:[b, 100] => [b, 3*3*512] => [b, 3,3,512] =>[b,64,64,3]
        self.fc= layers.Dense(3*3*512)
        self.conv1=layers.Conv2DTranspose(256, 3, 3)
        self.bn1 = BatchNormalization()
        
        self.conv2=layers.Conv2DTranspose(128, 5, 2)
        self.bn2 = BatchNormalization()
        
        self.conv3=layers.Conv2DTranspose(3, 4, 3)
        
        
        
    def call(self, inputs, training=None):
        x = self.fc(inputs)
        x = tf.reshape(x, [-1, 3, 3, 512])
        x = tf.nn.leaky_relu(x)
        x = tf.nn.leaky_relu(self.bn1(self.conv1(x), training=training))
        x = tf.nn.leaky_relu(self.bn2(self.conv2(x), training=training))
        x = self.conv3(x)
        x=tf.tanh(x) # [-1, 1]  Discriminator的輸入是[-1,1]
        return x

class Discriminator(keras.Model):
    def __init__(self):
        super(Discriminator, self).__init__()
        # [b, 64, 64, 3] => [b,1]
        self.conv1 = layers.Conv2D(filters=64, kernel_size=5, strides=3, padding='valid')
        self.conv2 = layers.Conv2D(128, 5, 3)
        self.bn2 = BatchNormalization()
        self.conv3 = layers.Conv2D(256, 5, 3)
        self.bn3 = BatchNormalization()
        # [b, h, w, 3] => [b, -1]
        self.flatten = layers.Flatten()
        self.fc = layers.Dense(1)
        
    def call(self, inputs, training=None):
        x = tf.nn.leaky_relu(self.conv1(inputs))
        x = tf.nn.leaky_relu(self.bn2(self.conv2(x), training=training))
        x = tf.nn.leaky_relu(self.bn3(self.conv3(x), training=training))
        x= self.flatten(x)
        logits = self.fc(x)
        return logits
    
def celoss_ones(logits):
    # [b, 1]
    # [b] = [1, 1, 1, 1,]
    loss = tf.nn.sigmoid_cross_entropy_with_logits(logits=logits,
                                                   labels=tf.ones_like(logits))
    return tf.reduce_mean(loss)


def celoss_zeros(logits):
    # [b, 1]
    # [b] = [1, 1, 1, 1,]
    loss = tf.nn.sigmoid_cross_entropy_with_logits(logits=logits,
                                                   labels=tf.zeros_like(logits))
    return tf.reduce_mean(loss)

def gradient_penalty(discriminator, batch_x, fake_image):
    batchsz = batch_x.shape[0]
    #[b, h, w, c]
    t=tf.random.uniform([batchsz, 1, 1, 1])
    t = tf.broadcast_to(t, batch_x.shape)
    interplate = t*batch_x+(1-t)*fake_image
    with tf.GradientTape() as tape:
        tape.watch([interplate])
        d_interplote_logits = discriminator(interplate)
    grads = tape.gradient(d_interplote_logits, interplate)

    # grads:[b, h, w, c] => [b, -1]
    grads = tf.reshape(grads, [grads.shape[0], -1])
    gp = tf.norm(grads, axis=1) #[b]
    gp = tf.reduce_mean( (gp-1)**2 )

    return gp

def d_loss_fn(generator, discriminator, batch_z, batch_x, is_training):
    # 1. treat real image as real
    # 2. treat generated image as fake
    fake_image = generator(batch_z, is_training)
    d_fake_logits = discriminator(fake_image, is_training)
    d_real_logits = discriminator(batch_x, is_training)

    d_loss_real = celoss_ones(d_real_logits)
    d_loss_fake = celoss_zeros(d_fake_logits)
    gp = gradient_penalty(discriminator, batch_x, fake_image)
    loss = d_loss_fake + d_loss_real + 5. * gp

    return loss, gp


def g_loss_fn(generator, discriminator, batch_z, is_training):

    fake_image = generator(batch_z, is_training)
    d_fake_logits = discriminator(fake_image, is_training)
    loss = celoss_ones(d_fake_logits)

    return loss

z_dim = 100
epochs = 3000000
batch_size = 512
learning_rate = 0.002
is_training = True

generator = Generator()
generator.build(input_shape = (None, z_dim))
discriminator = Discriminator()
discriminator.build(input_shape=(None, 64, 64, 3))
g_optimizer = tf.optimizers.Adam(learning_rate=learning_rate, beta_1=0.5)
d_optimizer = tf.optimizers.Adam(learning_rate=learning_rate, beta_1=0.5)

for epoch in range(epochs):

    batch_z = tf.random.uniform([batch_size, z_dim], minval=-1., maxval=1.)
    batch_x = next(db_iter)

    # train D
    with tf.GradientTape() as tape:
        d_loss, gp = d_loss_fn(generator, discriminator, batch_z, batch_x, is_training)
    grads = tape.gradient(d_loss, discriminator.trainable_variables)
    d_optimizer.apply_gradients(zip(grads, discriminator.trainable_variables))


    with tf.GradientTape() as tape:
        g_loss = g_loss_fn(generator, discriminator, batch_z, is_training)
    grads = tape.gradient(g_loss, generator.trainable_variables)
    g_optimizer.apply_gradients(zip(grads, generator.trainable_variables))

    if epoch % 100 == 0:
        print(epoch, 'd-loss:',float(d_loss), 'g-loss:', float(g_loss),
                  'gp:', float(gp))

        z = tf.random.uniform([100, z_dim])
        fake_image = generator(z, training=False)
        img_path = os.path.join('images', 'wgan-%d.png'%epoch)
        save_result(fake_image.numpy(), 10, img_path, color_mode='P')    
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容