A Day in the Life

Keras を使わずに TensorFlow 2 を使い素朴な全層結合ニューラルネットワークを作る

TensorFlow では、高レベルAPIであるKerasを使うことで、簡単にニューラルネットワークのモデル作成~訓練、その他NNで行いたい様々なことを実現できる。しかしながら、自分のようなNN初心者にとっては何をやってるか解らないで使ってしまっていたため、簡単な順伝播型のNNを、Keras を使わず TensorFlow の API のみを用いて実装する。

なおこの記事は、ゼロから作るDeep Learningを参考に実装している。また、自分で理解するための忘備録的に残しており、きちんと理解したい人書籍ゼロから作るDeep Learningと、TensorFlow ガイドを読んだほうが良いであろう。また、この記事の元の jupyter notebookはこちら


一通り手を動かして自分で作ってみることで、どの関数がどう影響するのか、訓練を手動でやるとどんな感じなのか、自動微分とその使い方、keras の嬉しさ、などなどのTFやNNの基本が理解が進み、一昔前はよく分からなかったガイドページの内容も、だいたい読めるようになった。

いままで見てきたすごい人は、論理を知るだけで苦なくその論理をプログラムに実装できたりする。それでなくとも、大抵のできる人はジャンルとしたことは知らない分野の書籍を読むだけで理解し実装できたりする。自分の場合はそれでも理解できないことも多く、実際に手を動かしてみて挙動を把握して初めて理解できることが多い、ということをあらためて思ったのであった。


今回実装する簡単なNNはこんな感じ。

  • (入力数, ユニット数) のウェイトと、(ユニット数,) の重みバイアスの状態、この2つの重みパラメータ(weights)を持ったレイヤを作る
    • 順方向伝搬時には、入力とウェイトの積に重みバイアスを足したものに、活性化(activation)関数を適用する
  • レイヤーを取りまとめるネットワークを作る
    • 推論時は、レイヤーを順番に適用(順方向伝搬)した結果を出力する
    • 推論がどれだけ正しいかの指標として、損失関数(loss関数)を適用する
    • 学習(= 訓練データから、最適な重みパラメータを取得し、学習率で反映する)時に、損失関数が小さくなるように勾配を求め、逆誤差伝搬法でレイヤーを逆順にパラメータ少しづつ更新する。逆誤差伝搬法は TF の autodiffを使う。
  • このネットワークに学習データを与え、トレーニングする

まず最初の、単純なレイヤの実装を行う。

import numpy as np
import matplotlib.pyplot as plt
import tensorflow as tf

# GPU 使わない設定
import os
os.environ['CUDA_VISIBLE_DEVICES'] = '-1'

# seed の固定
np.random.seed(42)
tf.random.set_seed(42)
class SimpleLayer():
    def __init__(self, input_dim, units, activation):
        # ウェイトを正規分布で初期化。Variable で更新可能な値として作る。
        self.w = tf.Variable(tf.random.normal([input_dim, units]) * 0.01, name='weight')
        # バイアスを 0 で初期化
        self.b = tf.Variable(tf.zeros([units]), name="bais")
        # 活性化関数
        self.activation = activation
    
    @property
    def weights(self):
        return [self.w, self.b]

    def forward(self, x):
        y = x @ self.w + self.b # y = tf.matmul(x, self.w) + self.b # と同等
        return self.activation(y)

    def __call__(self, x):
        return self.forward(x)
identify_function = lambda x: x
zero_function = lambda x: x * 0

l1 = SimpleLayer(2, 2, identify_function)
l2 = SimpleLayer(2, 1, zero_function)

print(f'l1 weights: {l1.weights}')
a1 = l1([[10, 20]]) # SimpleLayer.__call__ を呼び出す
print(f'a1: {a1}')
a2 = l2(a1)
print(f'a2: {a2}')

print(l2(l1([[10, 20]])))

l1 weights: [<tf.Variable 'weight:0' shape=(2, 2) dtype=float32, numpy=
array([[ 0.00327469, -0.00842626],
       [ 0.00319434, -0.01407552]], dtype=float32)>, <tf.Variable 'bais:0' shape=(2,) dtype=float32, numpy=array([0., 0.], dtype=float32)>]
a1: [[ 0.09663358 -0.36577296]]
a2: [[0.]]
tf.Tensor([[0.]], shape=(1, 1), dtype=float32)

続いて、シンプルな活性化関数をいくつか実装する。

def step_function(x:tf.Tensor):
    return tf.cast(x > 0, tf.uint8)

step_function(tf.constant([1, 0, 3, -3]))
<tf.Tensor: shape=(4,), dtype=uint8, numpy=array([1, 0, 1, 0], dtype=uint8)>
def sigmoid(x:tf.Tensor):
    return 1 / (1 + tf.exp(-x))

sigmoid(tf.constant([0, 1.0, -2.0]))
<tf.Tensor: shape=(3,), dtype=float32, numpy=array([0.5       , 0.7310586 , 0.11920292], dtype=float32)>
def relu(x: tf.Tensor):
    return tf.maximum(0.0, x)

relu(tf.constant([-2.0, -1.0, 1.0, 2.0]))
<tf.Tensor: shape=(4,), dtype=float32, numpy=array([0., 0., 1., 2.], dtype=float32)>
def tanh(x: tf.Tensor):
    return (tf.exp(x) - tf.exp(-x)) / (tf.exp(x) + tf.exp(-x))

tanh(tf.constant([-3.0,-1.0, 0.0, 1.0,3.0]))
<tf.Tensor: shape=(5,), dtype=float32, numpy=
array([-0.9950547, -0.7615942,  0.       ,  0.7615942,  0.9950547],
      dtype=float32)>

出力層の活性化関数として、恒等関数(なにもしない)と、分類問題で使うソフトマックス関数を実装する。

def identity(x: tf.Tensor):
    return x

identity(tf.constant([1.0, 0.0, -1.0, -3.0]))
<tf.Tensor: shape=(4,), dtype=float32, numpy=array([ 1.,  0., -1., -3.], dtype=float32)>
def softmax(x:tf.Tensor):
    e = tf.exp(x -tf.reduce_max(x))
    s = tf.reduce_sum(e)
    return e / s

print(softmax(tf.constant([0.3,2.9,4.0])))
print(softmax(tf.constant([1010.0, 1000, 990])))
tf.Tensor([0.01821127 0.24519183 0.73659694], shape=(3,), dtype=float32)
tf.Tensor([9.999546e-01 4.539787e-05 2.061060e-09], shape=(3,), dtype=float32)

続いて、損失関数の実装。分類モデルのための二乗和誤差と交差エントロピー誤差関数、回帰モデルのための二乗平均平方誤差関数作る。

def sum_squared_error(x:tf.Tensor, y: tf.Tensor):
    return tf.reduce_mean(0.5 * tf.reduce_sum((x-y) ** 2, axis=tf.rank(x)-1))

y1 = [0.0, 0, 1, 0, 0, 0, 0, 0, 0, 0]
y2 = [0.0, 0, 0, 0, 0, 0, 0, 1, 0, 0]
x1 = [0.1, 0.05, 0.6, 0.0, 0.05, 0.1, 0.0, 0.1, 0.0, 0.0]

print(sum_squared_error(tf.constant(x1), tf.constant(y1)))
print(sum_squared_error(tf.constant(x1), tf.constant(y2)))
print(sum_squared_error(tf.constant([x1, x1]), tf.constant([y1, y2])))
tf.Tensor(0.0975, shape=(), dtype=float32)
tf.Tensor(0.59749997, shape=(), dtype=float32)
tf.Tensor(0.34749997, shape=(), dtype=float32)
def cross_entropy_error(x:tf.Tensor, y: tf.Tensor):
    delta = tf.constant(1e-7)
    if tf.rank(x) == 1:
        x = tf.reshape(x, (1, tf.size(x)))
        y = tf.reshape(y, (1, tf.size(y)))
    batch_size = x.shape[0]
    return -tf.reduce_sum(y * tf.math.log(x + 1e-7)) / batch_size

print(cross_entropy_error(tf.constant(x1), tf.constant(y1)))
print(cross_entropy_error(tf.constant(x1), tf.constant(y2)))
print(cross_entropy_error(tf.constant([x1, x1]), tf.constant([y1, y2])))
tf.Tensor(0.5108254, shape=(), dtype=float32)
tf.Tensor(2.3025842, shape=(), dtype=float32)
tf.Tensor(1.4067048, shape=(), dtype=float32)
def root_mean_squared_error(x:tf.Tensor, y: tf.Tensor):
    diff = y - x
    return tf.sqrt(tf.reduce_mean((diff)**2))

y = [[100.0], [160], [60]]
x = [[80.0], [100], [100]]

print(root_mean_squared_error(tf.constant(x), tf.constant(y)))
tf.Tensor(43.204937, shape=(), dtype=float32)

続いて、自動微分を使った勾配計算について、TF の挙動確認をする。

以下の関数f1の微分した導関数でx=3を求めると40である。

f1 = lambda x: x**3 + 2*x**2 + x

x = tf.Variable(3.0)
with tf.GradientTape() as tape:
    z = f1(x)
tape.gradient(z, [x])
[<tf.Tensor: shape=(), dtype=float32, numpy=40.0>]

続いて偏微分。以下の関数f2の、w1 と w2 が 5, 3 の時の w1, w2 に対する偏微分を求めると 36, 10。なおこのコードは Chapter 12 – Custom Models and Training with TensorFlow より。

def f2(w1, w2):
    return 3 * w1**2 + 2*w1 * w2

w1, w2 = tf.Variable(5.0), tf.Variable(3.0)

with tf.GradientTape() as tape:
    z = f2(w1, w2)
print(tape.gradient(z, [w1, w2]))

try:
    print(tape.gradient(z, [w1, w2]))
except RuntimeError:
    print('二回目の呼び出し時には、リソースが削除されていてエラーになる')

with tf.GradientTape(persistent=True) as tape:
    z = f2(w1, w2)

print(tape.gradient(z, [w1]))
print(tape.gradient(z, [w2]))
del tape # 開放する
[<tf.Tensor: shape=(), dtype=float32, numpy=36.0>, <tf.Tensor: shape=(), dtype=float32, numpy=10.0>]
二回目の呼び出し時には、リソースが削除されていてエラーになる
[<tf.Tensor: shape=(), dtype=float32, numpy=36.0>]
[<tf.Tensor: shape=(), dtype=float32, numpy=10.0>]

今ままで作った関数やレイヤーが意図した通りに動くか、試しに簡単な線形関数の予測ができるか確認する。2x + 10 に小さなランダムノイズを乗せたデータを作る。

x = np.arange(-50, 50, 2)
line_2x_1 = 2 * x  + 10
noise = -10 * np.random.rand(len(x)) + 5
dots_2x_1 = line_2x_1 + noise
plt.plot(x, line_2x_1)
plt.plot(x, dots_2x_1, 'o')
plt.show()

y = tf.expand_dims(tf.constant(dots_2x_1, dtype=tf.float32), axis=1)
X = tf.expand_dims(tf.constant(x, dtype=tf.float32), axis=1)

img

まずはNNを使わず、sklearn でうまく動くか確認する。

from sklearn.linear_model import LinearRegression
reg = LinearRegression().fit(X, y)
print(reg.score(X, y))
reg.predict([[-50], [0], [50], [100]])
0.9975753493086111





array([[-89.75652361],
       [ 10.54682827],
       [110.85018015],
       [211.15353203]])

続いて、2層レイヤーのNNの回帰モデルとして学習させる。sklearn と同じように回帰予想できてればOK。

layer1 = SimpleLayer(1, 32, relu)
layer2 = SimpleLayer(32, 1, identify_function)

loss_function = root_mean_squared_error
lr = 0.003

predict = lambda x: layer2(layer1(x))
for i in range(10000):
    # 勾配を求める
    with tf.GradientTape() as tape:
        y_pred = predict(X)
        z = loss_function(y_pred, y)
    (l1_w_grads, l1_b_grads), (l2_w_grads, l2_b_grads) = tape.gradient(z, [layer1.weights, layer2.weights])
    # SDG で勾配を学習する
    layer1.w.assign_sub(lr * l1_w_grads)
    layer1.b.assign_sub(lr * l1_b_grads)
    layer2.w.assign_sub(lr * l2_w_grads)
    layer2.b.assign_sub(lr * l2_b_grads)
    if (i % 1000 == 0):
        print('iter {} / train loss: {:.3}'.format(i, z.numpy()))

print('train loss: {:.3}'.format(loss_function(predict(X), y)))
print(predict(tf.constant([[-50], [0], [50], [100]], dtype=tf.float32)))
iter 0 / train loss: 58.6
iter 1000 / train loss: 5.08
iter 2000 / train loss: 3.71
iter 3000 / train loss: 3.79
iter 4000 / train loss: 3.5
iter 5000 / train loss: 3.3
iter 6000 / train loss: 3.18
iter 7000 / train loss: 3.09
iter 8000 / train loss: 3.03
iter 9000 / train loss: 2.98
train loss: 2.94
tf.Tensor(
[[-92.20102  ]
 [ 10.1016445]
 [109.65451  ]
 [209.20737  ]], shape=(4, 1), dtype=float32)

うまく動いているようだ。次に、先程の学習をよしなにあつかえるネットワークの実装する。

class SimpleSequenceNetwork:
    def __init__(self, layers, loss_function, lr=0.01):
        self.layers = layers
        self.loss_function = loss_function
        self.lr = lr
    
    def predict(self, x):
        for layer in self.layers:
            x = layer(x)
        return x

    def loss(self, x, target):
        y = self.predict(x)
        return self.loss_function(y, target)

    def accuracy(self, x, target):
        y = self.predict(x)
        y = tf.argmax(y, axis=1)
        target = tf.argmax(target, axis=1)

        accuracy = tf.math.count_nonzero(y == target) / x.shape[0]
        return accuracy
    
    @property
    def all_weights(self):
        return tf.nest.flatten([layer.weights for layer in self.layers])

    # 損失関数の、重みパラメータに対する勾配を求める
    def gradient(self, x, target):
        with tf.GradientTape() as tape:
            tape.watch(x)
            z = self.loss(x, target)
        return tape.gradient(z, self.all_weights)

    # 単純な勾配下降法(SDG)で、レイヤーの重みパラメータを更新する
    def update_variables_by_sdg(self, grads):
        for (grad, val) in zip(grads, self.all_weights):
            val.assign_sub(self.lr * grad)

    # 勾配を求め、パラメータを更新する
    def training(self, x, target):
        grads = self.gradient(x, target)
        self.update_variables_by_sdg(grads)

訓練するためのデータセットをロードする。おなじみ mnist の0-9の数字データを使う。

import tensorflow_datasets as tfds

ds = tfds.load("mnist", as_supervised=True)
test_ds = ds['test']
train_ds = ds['train']

for (i, (image, label)) in enumerate(train_ds.take(12)):
    plt.subplot(3, 4, i+1)
    plt.imshow(image, cmap='gray')
    plt.subplots_adjust(wspace=0, hspace=1)
    plt.title(label.numpy())
    plt.axis('off')
plt.show()

def preprocess(image, label):
    # 画像は (28,28,1) を (784,) にして、0.~1. の範囲へ
    image = tf.cast(tf.reshape(image, (-1,)), tf.float32) / 255.0
    # ラベルはワンホットベクトルに
    label = tf.one_hot(label, 10, dtype=tf.float32)
    return image, label

train_ds = train_ds.map(preprocess, num_parallel_calls=tf.data.AUTOTUNE).cache()
test_ds = test_ds.map(preprocess, num_parallel_calls=tf.data.AUTOTUNE).cache()


print((len(train_ds), len(test_ds)))

img

(60000, 10000)

データセットをもとに、ネットワークを訓練する関数を作る。


def fit(network, train_ds: tf.data.Dataset, test_ds: tf.data.Dataset, epochs=20, batch_size=32):
    history_train_loss, history_train_accuracy, history_test_accuracy = [], [], []
    for epoch in range(1, epochs + 1):
        train_loss, train_accuracy, test_accuracy = [], [], []
        for (X_batch, y_batch) in train_ds.shuffle(1000).batch(batch_size).prefetch(1):
            network.training(X_batch, y_batch)
            train_loss.append(network.loss(X_batch, y_batch))
            train_accuracy.append(network.accuracy(X_batch, y_batch))
        for (X_batch, y_batch) in test_ds.shuffle(1000).batch(batch_size).prefetch(1):
            test_accuracy.append(network.accuracy(X_batch, y_batch))
        
        print("train acc, test acc, train loss | {:.4}, {:.4}, {:.4}".format(
            tf.reduce_mean(train_accuracy).numpy(),
            tf.reduce_mean(test_accuracy).numpy(),
            tf.reduce_mean(train_loss).numpy()
        ))
        history_train_loss.extend(train_loss)
        history_train_accuracy.extend(train_accuracy)
        history_test_accuracy.extend(test_accuracy)
    return {
        'train_loss': np.array(history_train_loss),
        'train_accuracy': np.array(history_train_accuracy),
        'test_accuracy': np.array(history_test_accuracy)
    }

実際にレイヤーとNNを作って学習させる。

%%time
input_layer = SimpleLayer(784, 100, relu)
hidden_layer = SimpleLayer(100, 50, relu)
output_layer = SimpleLayer(50, 10, softmax) 
network = SimpleSequenceNetwork([input_layer, hidden_layer, output_layer], cross_entropy_error, lr=0.1)

history = fit(network, train_ds, test_ds, epochs=5, batch_size=32)
plt.plot(history['train_loss'])
plt.show()
train acc, test acc, train loss | 0.7391, 0.9364, 4.336
train acc, test acc, train loss | 0.9798, 0.9572, 3.604
train acc, test acc, train loss | 0.9897, 0.9674, 3.547
train acc, test acc, train loss | 0.9936, 0.9695, 3.524
train acc, test acc, train loss | 0.9954, 0.9706, 3.511

img

Wall time: 49.9 s

別の活性化関数に差し替えてみる。

%%time
input_layer = SimpleLayer(784, 100, tanh)
hidden_layer = SimpleLayer(100, 50, tanh)
output_layer = SimpleLayer(50, 10, softmax) 
network = SimpleSequenceNetwork([input_layer, hidden_layer, output_layer], cross_entropy_error, lr=0.1)

history = fit(network, train_ds, test_ds, epochs=5, batch_size=32)
plt.plot(history['train_loss'])
plt.show()
train acc, test acc, train loss | 0.774, 0.9139, 4.245
train acc, test acc, train loss | 0.953, 0.9455, 3.687
train acc, test acc, train loss | 0.973, 0.9567, 3.607
train acc, test acc, train loss | 0.9815, 0.9625, 3.57
train acc, test acc, train loss | 0.9869, 0.9663, 3.548

img

Wall time: 1min

素朴なNNで、mnist数値のラベル分類もよしなに動かすことができた。いちばん大変な逆誤差伝搬法による重みの更新も、TF の autodiff を使えば自分で処理を書かなくて良いため楽に行える。

記事の一覧 >

関連するかもエントリー

ChatGPT の Noteable プラグインで Iris データセットを分析する
先程書いたOpenCALMのデータはいくらなんでもデータが少なすぎだったので、みんな大好き世界で1億回は分析されてきたであろうIris(あやめ)のデータセットを Noteable で分析させてみる。と思ったった通りの結果だったのだけど、グラフ描画やいろんなアルゴリズムを試してもら...
先程書いたOpenCALMのデータはいくらなんでもデータが少なすぎだったので、みんな大好き世界で1億回は分析されてきたであろうIris(あや...
Ruby で 10進数 <=> 62進数 などのコンバータ生成クラス
Ruby で 10進数 62進数 などのコンバータ生成クラスなんか無性に Ruby が書きたくなったので、良くある短縮 URL ぽい 62 進数 (0-9, a-z, A-Z) とかに変換できるコンバータ生成クラスを作ってみた。# 0-9, a-z, A-Z な char を渡...
Ruby で 10進数 62進数 などのコンバータ生成クラスなんか無性に Ruby が書きたくなったので、良くある短縮 URL ぽい 62...