22.01.23 LSTM, GRU

2022. 1. 23. 15:34카테고리 없음

LSTM, GRU 모두 RNN 모델

LSTM은 Vanilla RNN의 기울기 소실 문젤즐 해결

(Long Short Term Memory 장단기 메모리)

LSTM 특징 Cell State 두번쨰 hidden state느낌

연관이 있는 단어 등

Gate 3종류의 게이트를 4개의 FC layer로 구성

Forget Gate 이전 hidden state와 현재 입력을 가지고 어떤 정보를 잊을지 결정

 

Input Gate 현재 입력받은 정보에서 어떤 것을 cell state에 저장할지 결정

망각 + input = 새로운 cell state

Output  Gate 다음 hidden state와 출력값 계싼

GRU Gated Recureent Unit

LSTM이 가지는 게이트를 간소화하고, Cell State를 아예 없애서 속도가 빠르다 성능은 비슷함

Reset Gate 기존 hidden state의 정보를 얼마나 초기화할지 결정하는 게이트

Update Gate 기존 hidden state의 정보를 얼마나 사용할지 결정

새로운 Hidden State 계산하기

RNN 모델 활용

회귀 분석 : 각 시점의 출력값이 어느 정도일지 예측  ex 주가,기온예측

분류 작업 : 각 시점의 데이터가 어느 클래스일지 예측(love 동사,명사,형용사..) ex 문장에서 다음단어예측, 품사예측

 

이 작업들을 하기 위해 모델 학습을 위한 손실 함수를 계산해야 한다

실습1 장기 의존성 문제 확인

import os
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'

import tensorflow as tf
from tensorflow.keras import layers, Sequential
from tensorflow.keras.optimizers import Adam

import pandas as pd
import numpy as np

def load_data(window_size):
    raw_data = pd.read_csv("./daily-min-temperatures.csv")
    raw_temps = raw_data["Temp"]

    mean_temp = raw_temps.mean()
    stdv_temp = raw_temps.std(ddof=0)
    raw_temps = (raw_temps - mean_temp) / stdv_temp

    X, y = [], []
    for i in range(len(raw_temps) - window_size):
        cur_temps = raw_temps[i:i + window_size]
        target = raw_temps[i + window_size]

        X.append(list(cur_temps))
        y.append(target)

    X = np.array(X)
    y = np.array(y)
    X = X[:, :, np.newaxis]

    total_len = len(X)
    train_len = int(total_len * 0.8)

    X_train, y_train = X[:train_len], y[:train_len]
    X_test, y_test = X[train_len:], y[train_len:]

    return X_train, X_test, y_train, y_test

def build_rnn_model(window_size):
    model = Sequential()

    # TODO: [지시사항 1번] Simple RNN과 Fully-connected Layer로 구성된 모델을 완성하세요.
    model.add(layers.SimpleRNN(128, input_shape=(window_size, 1)))
    model.add(layers.Dense(32, activation='relu'))
    model.add(layers.Dense(1))

    return model

def build_lstm_model(window_size):
    model = Sequential()

    # TODO: [지시사항 2번] LSTM과 Fully-connected Layer로 구성된 모델을 완성하세요.
    model.add(layers.LSTM(128, input_shape=(window_size,1))) #hidden state크기128, input_shape
    model.add(layers.Dense(32, activation='relu'))
    model.add(layers.Dense(1))

    return model

def build_gru_model(window_size):
    model = Sequential()

    # TODO: [지시사항 3번] GRU와 Fully-connected Layer로 구성된 모델을 완성하세요.
    model.add(layers.GRU(128, input_shape=(window_size, 1)))
    model.add(layers.Dense(32, activation='relu'))
    model.add(layers.Dense(1))

    return model

def run_model(model, X_train, X_test, y_train, y_test, epochs=10, model_name=None):
    # TODO: [지시사항 4번] 모델 학습을 위한 optimizer와 loss 함수를 설정하세요.
    optimizer = Adam(learning_rate=0.001)
    model.compile(optimizer=optimizer, loss='mse')
    
    # TODO: [지시사항 5번] 모델 학습을 위한 hyperparameter를 설정하세요.
    hist = model.fit(X_train, y_train, epochs=epochs, batch_size=64, shuffle=True, verbose=2)
    
    # 테스트 데이터셋으로 모델을 테스트합니다.
    test_loss = model.evaluate(X_test, y_test, verbose=0)
    
    return test_loss, optimizer, hist

def main(window_size):
    tf.random.set_seed(2022)
    X_train, X_test, y_train, y_test = load_data(window_size)

    rnn_model = build_rnn_model(window_size)
    lstm_model = build_lstm_model(window_size)
    gru_model = build_gru_model(window_size)

    rnn_test_loss, _, _ = run_model(rnn_model, X_train, X_test, y_train, y_test, model_name="RNN")
    lstm_test_loss, _, _ = run_model(lstm_model, X_train, X_test, y_train, y_test, model_name="LSTM")
    gru_test_loss, _, _ = run_model(gru_model, X_train, X_test, y_train, y_test, model_name="GRU")
    
    return rnn_test_loss, lstm_test_loss, gru_test_loss

if __name__ == "__main__":
    # 10일치 데이터를 보고 다음날의 기온을 예측합니다.
    rnn_10_test_loss, lstm_10_test_loss, gru_10_test_loss = main(10)
    
    # 300일치 데이터를 보고 다음날의 기온을 예측합니다.
    rnn_300_test_loss, lstm_300_test_loss, gru_300_test_loss = main(300)
    
    print("=" * 20, "시계열 길이가 10 인 경우", "=" * 20)
    print("[RNN ] 테스트 MSE = {:.5f}".format(rnn_10_test_loss))
    print("[LSTM] 테스트 MSE = {:.5f}".format(lstm_10_test_loss))
    print("[GRU ] 테스트 MSE = {:.5f}".format(gru_10_test_loss))
    print()
    
    print("=" * 20, "시계열 길이가 300 인 경우", "=" * 20)
    print("[RNN ] 테스트 MSE = {:.5f}".format(rnn_300_test_loss))
    print("[LSTM] 테스트 MSE = {:.5f}".format(lstm_300_test_loss))
    print("[GRU ] 테스트 MSE = {:.5f}".format(gru_300_test_loss))
    print()

실습2 LSTM으로 IMDB 데이터 학습

from elice_utils import EliceUtils

elice_utils = EliceUtils()

import os
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'

import tensorflow as tf
from tensorflow.keras import layers, Sequential
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.datasets import imdb
from tensorflow.keras.preprocessing.sequence import pad_sequences

def load_data(num_words, max_len):
    # TODO: [지시사항 1번] IMDB 데이터셋을 불러오세요.
    (X_train, y_train), (X_test, y_test) = imdb.load_data(num_words=num_words)
    X_train = pad_sequences(X_train, maxlen=max_len)
    X_test = pad_sequences(X_test, maxlen=max_len)

    return X_train, X_test, y_train, y_test

def build_lstm_model(num_words, embedding_len):
    model = Sequential()

    # TODO: [지시사항 2번] LSTM 기반 모델을 구성하세요.
    model.add(layers.Embedding(num_words, embedding_len))
    model.add(layers.LSTM(16))
    model.add(layers.Dense(1,activation='sigmoid'))
    return model
    
def run_model(model, X_train, X_test, y_train, y_test, epochs=5):
    # TODO: [지시사항 3번] 모델 학습을 위한 optimizer, loss 함수, 평가 지표를 설정하세요.
    optimizer = Adam(learning_rate=0.001)
    model.compile(optimizer=optimizer, loss='binary_crossentropy', metrics=['accuracy'])
    
    # TODO: [지시사항 4번] 모델 학습을 위한 hyperparameter를 설정하세요.
    hist = model.fit(X_train, y_train, epochs=epochs, batch_size=128, shuffle=True, verbose=2)
    
    # 모델을 테스트 데이터셋으로 테스트합니다.
    test_loss, test_acc = model.evaluate(X_test, y_test, verbose=0)
    print()
    print("테스트 loss: {:.5f}, 테스트 정확도: {:.3f}%".format(test_loss, test_acc * 100))
    
    return optimizer, hist

def main():
    tf.random.set_seed(2022)

    num_words = 6000
    max_len =  130
    embedding_len = 100

    X_train, X_test, y_train, y_test = load_data(num_words, max_len)

    model = build_lstm_model(num_words, embedding_len)
    run_model(model, X_train, X_test, y_train, y_test)

if __name__ == "__main__":
    main()

실습3 GRU를 통한 항공승객수 분석

from elice_utils import EliceUtils

elice_utils = EliceUtils()

import os
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'

import tensorflow as tf
from tensorflow.keras import layers, Sequential
from tensorflow.keras.optimizers import Adam

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

def load_data(window_size):
    raw_data = pd.read_csv("./airline-passengers.csv")
    raw_passengers = raw_data["Passengers"].to_numpy()

    # 데이터의 평균과 표준편차 값으로 정규화(표준화) 합니다.
    mean_passenger = raw_passengers.mean()
    stdv_passenger = raw_passengers.std(ddof=0)
    raw_passengers = (raw_passengers - mean_passenger) / stdv_passenger
    plot_data = {"month": raw_data["Month"], "mean": mean_passenger, "stdv": stdv_passenger}

    # window_size 개의 데이터를 불러와 입력 데이터(X)로 설정하고
    # window_size보다 한 시점 뒤의 데이터를 예측할 대상(y)으로 설정하여
    # 데이터셋을 구성합니다.
    X, y = [], []
    for i in range(len(raw_passengers) - window_size):
        cur_passenger = raw_passengers[i:i + window_size]
        target = raw_passengers[i + window_size]

        X.append(list(cur_passenger))
        y.append(target)

    # X와 y를 numpy array로 변환합니다.
    X = np.array(X)
    y = np.array(y)

    # 각 입력 데이터는 sequence 길이가 window_size이고, featuer 개수는 1개가 되도록
    # 마지막에 새로운 차원을 추가합니다.
    # 즉, (전체 데이터 개수, window_size) -> (전체 데이터 개수, window_size, 1)이 되도록 변환합니다.
    X = X[:, :, np.newaxis]

    # 학습 데이터는 전체 데이터의 80%, 테스트 데이터는 20%로 설정합니다.
    total_len = len(X)
    train_len = int(total_len * 0.8)

    X_train, y_train = X[:train_len], y[:train_len]
    X_test, y_test = X[train_len:], y[train_len:]

    return X_train, X_test, y_train, y_test, plot_data

def build_gru_model(window_size):
    model = Sequential()

    # TODO: [지시사항 1번] GRU 기반 모델을 구성하세요.
    model.add(layers.GRU(4, input_shape=(window_size, 1)))
    model.add(layers.Dense(1))

    return model

def build_rnn_model(window_size):
    model = Sequential()

    # TODO: [지시사항 2번] SimpleRNN 기반 모델을 구성하세요.
    model.add(layers.SimpleRNN(4, input_shape=(window_size,1)))
    model.add(layers.Dense(1))
    return model

def run_model(model, X_train, X_test, y_train, y_test, epochs=100, name=None):
    # TODO: [지시사항 3번] 모델 학습을 위한 optimizer와 loss 함수를 설정하세요.
    optimizer = Adam(learning_rate=0.001)
    model.compile(optimizer=optimizer, loss='mse')

    # TODO: [지시사항 4번] 모델 학습을 위한 hyperparameter를 설정하세요.
    hist = model.fit(X_train, y_train, batch_size=8, epochs=epochs, shuffle=True, verbose=2)

    # 테스트 데이터셋으로 모델을 테스트합니다.
    test_loss = model.evaluate(X_test, y_test, verbose=0)
    print()
    print("테스트 MSE: {:.5f}".format(test_loss))
    print()

    return optimizer, hist
    
def plot_result(model, X_true, y_true, plot_data, name):
    y_pred = model.predict(X_true)

    # 표준화된 결과를 다시 원래 값으로 변환합니다.
    y_true_orig = (y_true * plot_data["stdv"]) + plot_data["mean"]
    y_pred_orig = (y_pred * plot_data["stdv"]) + plot_data["mean"]

    # 테스트 데이터에서 사용한 날짜들만 가져옵니다.
    test_month = plot_data["month"][-len(y_true):]

    # 모델의 예측값을 실제값과 함께 그래프로 그립니다.
    fig = plt.figure(figsize=(8, 6))
    ax = plt.gca()
    ax.plot(y_true_orig, color="b", label="True")
    ax.plot(y_pred_orig, color="r", label="Prediction")
    ax.set_xticks(list(range(len(test_month))))
    ax.set_xticklabels(test_month, rotation=45)
    ax.set_title("{} Result".format(name))
    ax.legend(loc="upper left")
    plt.savefig("airline_{}.png".format(name.lower()))

def main():
    tf.random.set_seed(2022)

    window_size = 4
    X_train, X_test, y_train, y_test, plot_data = load_data(window_size)

    gru_model = build_gru_model(window_size)
    run_model(gru_model, X_train, X_test, y_train, y_test, name="GRU")
    plot_result(gru_model, X_test, y_test, plot_data, name="GRU")

    rnn_model = build_rnn_model(window_size)
    run_model(rnn_model, X_train, X_test, y_train, y_test, name="RNN")
    plot_result(rnn_model, X_test, y_test, plot_data, name="RNN")
    
    elice_utils.send_image("airline_{}.png".format("gru"))
    elice_utils.send_image("airline_{}.png".format("rnn"))


if __name__ == "__main__":
    main()

실습4 RNN 기반 모델 분류 작업 아마존 식품 리뷰 데이터셋

import os
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'

import tensorflow as tf
from tensorflow.keras import layers, Sequential
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.optimizers import Adam
from sklearn.model_selection import train_test_split

import pandas as pd

def load_data(max_len):
    data = pd.read_csv("./review_score.csv")
    # 리뷰 문장을 입력 데이터로, 해당 리뷰의 평점을 라벨 데이터로 설정합니다.
    X = data['Review']
    y = data['Score']
    y = y - 1 # 값을 1~5에서 0~4로 변경

    # 문장 내 각 단어를 숫자로 변환하는 Tokenizer를 적용합니다.
    tokenizer = Tokenizer()
    tokenizer.fit_on_texts(X)
    X = tokenizer.texts_to_sequences(X) #각 단어를 숫자로 tokenizer 활용

    # 전체 단어 중에서 가장 큰 숫자로 mapping된 단어의 숫자를 가져옵니다.
    # 즉, max_features는 전체 데이터셋에 등장하는 겹치지 않는 단어의 개수 + 1과 동일합니다.
    max_features = max([max(_in) for _in in X]) + 1

    # 불러온 데이터셋을 학습 데이터 80%, 테스트 데이터 20%로 분리합니다.
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)

    # 모든 문장들을 가장 긴 문장의 단어 개수가 되게 padding을 추가합니다.
    X_train = pad_sequences(X_train, maxlen=max_len)
    X_test = pad_sequences(X_test, maxlen=max_len)

    return X_train, X_test, y_train, y_test, max_features

def build_rnn_model(max_features, embedding_size):
    model = Sequential()

    # TODO: [지시사항 1번] Simple RNN 기반의 모델을 완성하세요.
    model.add(layers.Embedding(max_features, embedding_size))
    model.add(layers.SimpleRNN(20))
    model.add(layers.Dense(5, activation='softmax')) # 5개의 클래스 리뷰 5점까지
    

    return model

def build_lstm_model(max_features, embedding_size):
    model = Sequential()

    # TODO: [지시사항 2번] LSTM 기반의 모델을 완성하세요.
    model.add(layers.Embedding(max_features, embedding_size))
    model.add(layers.LSTM(20))
    model.add(layers.Dense(5, activation='softmax'))

    return model

def build_gru_model(max_features, embedding_size):
    model = Sequential()

    # TODO: [지시사항 3번] GRU 기반의 모델을 완성하세요.
    model.add(layers.Embedding(max_features, embedding_size))
    model.add(layers.GRU(20))
    model.add(layers.Dense(5, activation='softmax'))

    return model

def run_model(model, X_train, X_test, y_train, y_test, epochs=10):
    # TODO: [지시사항 4번] 모델 학습을 위한 optimizer, loss 함수, 평가 지표를 설정하세요.
    optimizer = Adam(learning_rate= 0.001)
    model.compile(optimizer=optimizer, loss='sparse_categorical_crossentropy', metrics=['accuracy'])

    # TODO: [지시사항 5번] 모델 학습을 위한 hyperparameter를 설정하세요.
    hist = model.fit(X_train, y_train, epochs=epochs, batch_size=256, shuffle=True, verbose=2)
    test_loss, test_acc = model.evaluate(X_test, y_test, verbose=0)

    return test_loss, test_acc, optimizer, hist

def main():
    tf.random.set_seed(2022)
    max_len = 150
    embedding_size = 128

    X_train, X_test, y_train, y_test, max_features = load_data(max_len)
    rnn_model = build_rnn_model(max_features, embedding_size)
    lstm_model = build_lstm_model(max_features, embedding_size)
    gru_model = build_gru_model(max_features, embedding_size)

    rnn_test_loss, rnn_test_acc, _, _ = run_model(rnn_model, X_train, X_test, y_train, y_test)
    lstm_test_loss, lstm_test_acc, _, _ = run_model(lstm_model, X_train, X_test, y_train, y_test)
    gru_test_loss, gru_test_acc, _, _ = run_model(gru_model, X_train, X_test, y_train, y_test)

    print()
    print("=" * 20, "모델 별 Test Loss와 정확도", "=" * 20)
    print("[RNN ] 테스트 Loss: {:.5f}, 테스트 Accuracy: {:.3f}%".format(rnn_test_loss, rnn_test_acc * 100))
    print("[LSTM] 테스트 Loss: {:.5f}, 테스트 Accuracy: {:.3f}%".format(lstm_test_loss, lstm_test_acc * 100))
    print("[GRU ] 테스트 Loss: {:.5f}, 테스트 Accuracy: {:.3f}%".format(gru_test_loss, gru_test_acc * 100))

if __name__ == "__main__":
    main()

실습5 RNN 기반 모델을 통한 회귀분석 나스닥 상장 기업 주가 데이터셋

from elice_utils import EliceUtils

elice_utils = EliceUtils()

import os
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'

import tensorflow as tf
from tensorflow.keras import layers, Sequential
from tensorflow.keras.optimizers import Adam

from sklearn.preprocessing import StandardScaler
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.ticker as ticker

def load_data(window_size):
    raw_data_df = pd.read_csv("./AAPL.csv", index_col="Date")
    
    # 데이터 전체를 표준화합니다.
    scaler = StandardScaler()
    raw_data = scaler.fit_transform(raw_data_df)
    plot_data = {"mean": scaler.mean_[3], "var": scaler.var_[3], "date": raw_data_df.index}

    # 입력 데이터(X)는 시작가, 일 최고가, 일 최저가, 종가 데이터를 사용하고
    # 라벨 데이터(y)는 4번째 컬럼에 해당하는 종가 데이터만 사용합니다.
    raw_X = raw_data[:, :4]
    raw_y = raw_data[:, 3]

    # window_size 개의 데이터를 불러와 입력 데이터(X)로 설정하고
    # window_size보다 한 시점 뒤의 데이터를 예측할 대상(y)으로 설정하여
    # 데이터셋을 구성합니다.
    X, y = [], []
    for i in range(len(raw_X) - window_size):
        cur_prices = raw_X[i:i + window_size, :]
        target = raw_y[i + window_size]

        X.append(list(cur_prices))
        y.append(target)

    # X와 y를 numpy array로 변환합니다.
    X = np.array(X)
    y = np.array(y)

    # 학습 데이터는 전체 데이터의 80%, 테스트 데이터는 20%로 설정합니다.
    total_len = len(X)
    train_len = int(total_len * 0.8)

    X_train, y_train = X[:train_len], y[:train_len]
    X_test, y_test = X[train_len:], y[train_len:]

    return X_train, X_test, y_train, y_test, plot_data

def build_rnn_model(window_size, num_features):
    model = Sequential()

    # TODO: [지시사항 1번] SimpleRNN 기반 모델을 구성하세요.
    model.add(layers.SimpleRNN(256, input_shape=(window_size, num_features)))
    model.add(layers.Dense(64, activation='relu'))
    model.add(layers.Dense(16, activation='relu'))
    model.add(layers.Dense(1))


    return model

def build_lstm_model(window_size, num_features):
    model = Sequential()

    # TODO: [지시사항 2번] LSTM 기반 모델을 구성하세요.
    model.add(layers.LSTM(256, input_shape=(window_size, num_features)))
    model.add(layers.Dense(64, activation='relu'))
    model.add(layers.Dense(16, activation='relu'))
    model.add(layers.Dense(1))
    return model

def build_gru_model(window_size, num_features):
    model = Sequential()

    # TODO: [지시사항 3번] GRU 기반 모델을 구성하세요.
    model.add(layers.GRU(256, input_shape=(window_size, num_features)))
    model.add(layers.Dense(64, activation='relu'))
    model.add(layers.Dense(16, activation='relu'))
    model.add(layers.Dense(1))

    return model

def run_model(model, X_train, X_test, y_train, y_test, epochs=10, name=None):
    # TODO: [지시사항 4번] 모델 학습을 위한 optimizer와 loss 함수를 설정하세요.
    optimizer = Adam(learning_rate=0.001)
    model.compile(optimizer=optimizer, loss='mse')

    # TODO: [지시사항 5번] 모델 학습을 위한 hyperparameter를 설정하세요.
    hist = model.fit(X_train, y_train, batch_size=128, epochs=epochs, shuffle=True, verbose=2)
    
    # 테스트 데이터셋으로 모델을 테스트합니다.
    test_loss = model.evaluate(X_test, y_test, verbose=0)
    print("[{}] 테스트 loss: {:.5f}".format(name, test_loss))
    print()

    return optimizer, hist

def plot_result(model, X_true, y_true, plot_data, name):
    y_pred = model.predict(X_true)

    # 표준화된 결과를 다시 원래 값으로 변환합니다.
    y_true_orig = (y_true * np.sqrt(plot_data["var"])) + plot_data["mean"]
    y_pred_orig = (y_pred * np.sqrt(plot_data["var"])) + plot_data["mean"]

    # 테스트 데이터에서 사용한 날짜들만 가져옵니다.
    test_date = plot_data["date"][-len(y_true):]

    # 모델의 예측값을 실제값과 함께 그래프로 그립니다.
    fig = plt.figure(figsize=(12, 8))
    ax = plt.gca()
    ax.plot(y_true_orig, color="b", label="True")
    ax.plot(y_pred_orig, color="r", label="Prediction")
    ax.set_xticks(list(range(len(test_date))))
    ax.set_xticklabels(test_date, rotation=45)
    ax.xaxis.set_major_locator(ticker.MultipleLocator(100))
    ax.yaxis.set_major_locator(ticker.MultipleLocator(100))
    ax.set_title("{} Result".format(name))
    ax.legend(loc="upper left")
    plt.tight_layout()
    plt.savefig("apple_stock_{}".format(name.lower()))
    
    elice_utils.send_image("apple_stock_{}.png".format(name.lower()))

def main():
    tf.random.set_seed(2022)

    window_size = 30
    X_train, X_test, y_train, y_test, plot_data = load_data(window_size)
    num_features = X_train[0].shape[1]

    rnn_model = build_rnn_model(window_size, num_features)
    lstm_model = build_lstm_model(window_size, num_features)
    gru_model = build_gru_model(window_size, num_features)

    run_model(rnn_model, X_train, X_test, y_train, y_test, name="RNN")
    run_model(lstm_model, X_train, X_test, y_train, y_test, name="LSTM")
    run_model(gru_model, X_train, X_test, y_train, y_test, name="GRU")

    plot_result(rnn_model, X_test, y_test, plot_data, name="RNN")
    plot_result(lstm_model, X_test, y_test, plot_data, name="LSTM")
    plot_result(gru_model, X_test, y_test, plot_data, name="GRU")

if __name__ == "__main__":
    main()