2022. 1. 23. 15:34ㆍ카테고리 없음
LSTM, GRU 모두 RNN 모델
LSTM은 Vanilla RNN의 기울기 소실 문젤즐 해결
(Long Short Term Memory 장단기 메모리)
LSTM 특징 Cell State 두번쨰 hidden state느낌
연관이 있는 단어 등
Gate 3종류의 게이트를 4개의 FC layer로 구성
Forget Gate 이전 hidden state와 현재 입력을 가지고 어떤 정보를 잊을지 결정
Input Gate 현재 입력받은 정보에서 어떤 것을 cell state에 저장할지 결정
망각 + input = 새로운 cell state
Output Gate 다음 hidden state와 출력값 계싼
GRU Gated Recureent Unit
LSTM이 가지는 게이트를 간소화하고, Cell State를 아예 없애서 속도가 빠르다 성능은 비슷함
Reset Gate 기존 hidden state의 정보를 얼마나 초기화할지 결정하는 게이트
Update Gate 기존 hidden state의 정보를 얼마나 사용할지 결정
새로운 Hidden State 계산하기
RNN 모델 활용
회귀 분석 : 각 시점의 출력값이 어느 정도일지 예측 ex 주가,기온예측
분류 작업 : 각 시점의 데이터가 어느 클래스일지 예측(love 동사,명사,형용사..) ex 문장에서 다음단어예측, 품사예측
이 작업들을 하기 위해 모델 학습을 위한 손실 함수를 계산해야 한다
실습1 장기 의존성 문제 확인
import os
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'
import tensorflow as tf
from tensorflow.keras import layers, Sequential
from tensorflow.keras.optimizers import Adam
import pandas as pd
import numpy as np
def load_data(window_size):
raw_data = pd.read_csv("./daily-min-temperatures.csv")
raw_temps = raw_data["Temp"]
mean_temp = raw_temps.mean()
stdv_temp = raw_temps.std(ddof=0)
raw_temps = (raw_temps - mean_temp) / stdv_temp
X, y = [], []
for i in range(len(raw_temps) - window_size):
cur_temps = raw_temps[i:i + window_size]
target = raw_temps[i + window_size]
X.append(list(cur_temps))
y.append(target)
X = np.array(X)
y = np.array(y)
X = X[:, :, np.newaxis]
total_len = len(X)
train_len = int(total_len * 0.8)
X_train, y_train = X[:train_len], y[:train_len]
X_test, y_test = X[train_len:], y[train_len:]
return X_train, X_test, y_train, y_test
def build_rnn_model(window_size):
model = Sequential()
# TODO: [지시사항 1번] Simple RNN과 Fully-connected Layer로 구성된 모델을 완성하세요.
model.add(layers.SimpleRNN(128, input_shape=(window_size, 1)))
model.add(layers.Dense(32, activation='relu'))
model.add(layers.Dense(1))
return model
def build_lstm_model(window_size):
model = Sequential()
# TODO: [지시사항 2번] LSTM과 Fully-connected Layer로 구성된 모델을 완성하세요.
model.add(layers.LSTM(128, input_shape=(window_size,1))) #hidden state크기128, input_shape
model.add(layers.Dense(32, activation='relu'))
model.add(layers.Dense(1))
return model
def build_gru_model(window_size):
model = Sequential()
# TODO: [지시사항 3번] GRU와 Fully-connected Layer로 구성된 모델을 완성하세요.
model.add(layers.GRU(128, input_shape=(window_size, 1)))
model.add(layers.Dense(32, activation='relu'))
model.add(layers.Dense(1))
return model
def run_model(model, X_train, X_test, y_train, y_test, epochs=10, model_name=None):
# TODO: [지시사항 4번] 모델 학습을 위한 optimizer와 loss 함수를 설정하세요.
optimizer = Adam(learning_rate=0.001)
model.compile(optimizer=optimizer, loss='mse')
# TODO: [지시사항 5번] 모델 학습을 위한 hyperparameter를 설정하세요.
hist = model.fit(X_train, y_train, epochs=epochs, batch_size=64, shuffle=True, verbose=2)
# 테스트 데이터셋으로 모델을 테스트합니다.
test_loss = model.evaluate(X_test, y_test, verbose=0)
return test_loss, optimizer, hist
def main(window_size):
tf.random.set_seed(2022)
X_train, X_test, y_train, y_test = load_data(window_size)
rnn_model = build_rnn_model(window_size)
lstm_model = build_lstm_model(window_size)
gru_model = build_gru_model(window_size)
rnn_test_loss, _, _ = run_model(rnn_model, X_train, X_test, y_train, y_test, model_name="RNN")
lstm_test_loss, _, _ = run_model(lstm_model, X_train, X_test, y_train, y_test, model_name="LSTM")
gru_test_loss, _, _ = run_model(gru_model, X_train, X_test, y_train, y_test, model_name="GRU")
return rnn_test_loss, lstm_test_loss, gru_test_loss
if __name__ == "__main__":
# 10일치 데이터를 보고 다음날의 기온을 예측합니다.
rnn_10_test_loss, lstm_10_test_loss, gru_10_test_loss = main(10)
# 300일치 데이터를 보고 다음날의 기온을 예측합니다.
rnn_300_test_loss, lstm_300_test_loss, gru_300_test_loss = main(300)
print("=" * 20, "시계열 길이가 10 인 경우", "=" * 20)
print("[RNN ] 테스트 MSE = {:.5f}".format(rnn_10_test_loss))
print("[LSTM] 테스트 MSE = {:.5f}".format(lstm_10_test_loss))
print("[GRU ] 테스트 MSE = {:.5f}".format(gru_10_test_loss))
print()
print("=" * 20, "시계열 길이가 300 인 경우", "=" * 20)
print("[RNN ] 테스트 MSE = {:.5f}".format(rnn_300_test_loss))
print("[LSTM] 테스트 MSE = {:.5f}".format(lstm_300_test_loss))
print("[GRU ] 테스트 MSE = {:.5f}".format(gru_300_test_loss))
print()
실습2 LSTM으로 IMDB 데이터 학습
from elice_utils import EliceUtils
elice_utils = EliceUtils()
import os
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'
import tensorflow as tf
from tensorflow.keras import layers, Sequential
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.datasets import imdb
from tensorflow.keras.preprocessing.sequence import pad_sequences
def load_data(num_words, max_len):
# TODO: [지시사항 1번] IMDB 데이터셋을 불러오세요.
(X_train, y_train), (X_test, y_test) = imdb.load_data(num_words=num_words)
X_train = pad_sequences(X_train, maxlen=max_len)
X_test = pad_sequences(X_test, maxlen=max_len)
return X_train, X_test, y_train, y_test
def build_lstm_model(num_words, embedding_len):
model = Sequential()
# TODO: [지시사항 2번] LSTM 기반 모델을 구성하세요.
model.add(layers.Embedding(num_words, embedding_len))
model.add(layers.LSTM(16))
model.add(layers.Dense(1,activation='sigmoid'))
return model
def run_model(model, X_train, X_test, y_train, y_test, epochs=5):
# TODO: [지시사항 3번] 모델 학습을 위한 optimizer, loss 함수, 평가 지표를 설정하세요.
optimizer = Adam(learning_rate=0.001)
model.compile(optimizer=optimizer, loss='binary_crossentropy', metrics=['accuracy'])
# TODO: [지시사항 4번] 모델 학습을 위한 hyperparameter를 설정하세요.
hist = model.fit(X_train, y_train, epochs=epochs, batch_size=128, shuffle=True, verbose=2)
# 모델을 테스트 데이터셋으로 테스트합니다.
test_loss, test_acc = model.evaluate(X_test, y_test, verbose=0)
print()
print("테스트 loss: {:.5f}, 테스트 정확도: {:.3f}%".format(test_loss, test_acc * 100))
return optimizer, hist
def main():
tf.random.set_seed(2022)
num_words = 6000
max_len = 130
embedding_len = 100
X_train, X_test, y_train, y_test = load_data(num_words, max_len)
model = build_lstm_model(num_words, embedding_len)
run_model(model, X_train, X_test, y_train, y_test)
if __name__ == "__main__":
main()
실습3 GRU를 통한 항공승객수 분석
from elice_utils import EliceUtils
elice_utils = EliceUtils()
import os
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'
import tensorflow as tf
from tensorflow.keras import layers, Sequential
from tensorflow.keras.optimizers import Adam
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
def load_data(window_size):
raw_data = pd.read_csv("./airline-passengers.csv")
raw_passengers = raw_data["Passengers"].to_numpy()
# 데이터의 평균과 표준편차 값으로 정규화(표준화) 합니다.
mean_passenger = raw_passengers.mean()
stdv_passenger = raw_passengers.std(ddof=0)
raw_passengers = (raw_passengers - mean_passenger) / stdv_passenger
plot_data = {"month": raw_data["Month"], "mean": mean_passenger, "stdv": stdv_passenger}
# window_size 개의 데이터를 불러와 입력 데이터(X)로 설정하고
# window_size보다 한 시점 뒤의 데이터를 예측할 대상(y)으로 설정하여
# 데이터셋을 구성합니다.
X, y = [], []
for i in range(len(raw_passengers) - window_size):
cur_passenger = raw_passengers[i:i + window_size]
target = raw_passengers[i + window_size]
X.append(list(cur_passenger))
y.append(target)
# X와 y를 numpy array로 변환합니다.
X = np.array(X)
y = np.array(y)
# 각 입력 데이터는 sequence 길이가 window_size이고, featuer 개수는 1개가 되도록
# 마지막에 새로운 차원을 추가합니다.
# 즉, (전체 데이터 개수, window_size) -> (전체 데이터 개수, window_size, 1)이 되도록 변환합니다.
X = X[:, :, np.newaxis]
# 학습 데이터는 전체 데이터의 80%, 테스트 데이터는 20%로 설정합니다.
total_len = len(X)
train_len = int(total_len * 0.8)
X_train, y_train = X[:train_len], y[:train_len]
X_test, y_test = X[train_len:], y[train_len:]
return X_train, X_test, y_train, y_test, plot_data
def build_gru_model(window_size):
model = Sequential()
# TODO: [지시사항 1번] GRU 기반 모델을 구성하세요.
model.add(layers.GRU(4, input_shape=(window_size, 1)))
model.add(layers.Dense(1))
return model
def build_rnn_model(window_size):
model = Sequential()
# TODO: [지시사항 2번] SimpleRNN 기반 모델을 구성하세요.
model.add(layers.SimpleRNN(4, input_shape=(window_size,1)))
model.add(layers.Dense(1))
return model
def run_model(model, X_train, X_test, y_train, y_test, epochs=100, name=None):
# TODO: [지시사항 3번] 모델 학습을 위한 optimizer와 loss 함수를 설정하세요.
optimizer = Adam(learning_rate=0.001)
model.compile(optimizer=optimizer, loss='mse')
# TODO: [지시사항 4번] 모델 학습을 위한 hyperparameter를 설정하세요.
hist = model.fit(X_train, y_train, batch_size=8, epochs=epochs, shuffle=True, verbose=2)
# 테스트 데이터셋으로 모델을 테스트합니다.
test_loss = model.evaluate(X_test, y_test, verbose=0)
print()
print("테스트 MSE: {:.5f}".format(test_loss))
print()
return optimizer, hist
def plot_result(model, X_true, y_true, plot_data, name):
y_pred = model.predict(X_true)
# 표준화된 결과를 다시 원래 값으로 변환합니다.
y_true_orig = (y_true * plot_data["stdv"]) + plot_data["mean"]
y_pred_orig = (y_pred * plot_data["stdv"]) + plot_data["mean"]
# 테스트 데이터에서 사용한 날짜들만 가져옵니다.
test_month = plot_data["month"][-len(y_true):]
# 모델의 예측값을 실제값과 함께 그래프로 그립니다.
fig = plt.figure(figsize=(8, 6))
ax = plt.gca()
ax.plot(y_true_orig, color="b", label="True")
ax.plot(y_pred_orig, color="r", label="Prediction")
ax.set_xticks(list(range(len(test_month))))
ax.set_xticklabels(test_month, rotation=45)
ax.set_title("{} Result".format(name))
ax.legend(loc="upper left")
plt.savefig("airline_{}.png".format(name.lower()))
def main():
tf.random.set_seed(2022)
window_size = 4
X_train, X_test, y_train, y_test, plot_data = load_data(window_size)
gru_model = build_gru_model(window_size)
run_model(gru_model, X_train, X_test, y_train, y_test, name="GRU")
plot_result(gru_model, X_test, y_test, plot_data, name="GRU")
rnn_model = build_rnn_model(window_size)
run_model(rnn_model, X_train, X_test, y_train, y_test, name="RNN")
plot_result(rnn_model, X_test, y_test, plot_data, name="RNN")
elice_utils.send_image("airline_{}.png".format("gru"))
elice_utils.send_image("airline_{}.png".format("rnn"))
if __name__ == "__main__":
main()
실습4 RNN 기반 모델 분류 작업 아마존 식품 리뷰 데이터셋
import os
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'
import tensorflow as tf
from tensorflow.keras import layers, Sequential
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.optimizers import Adam
from sklearn.model_selection import train_test_split
import pandas as pd
def load_data(max_len):
data = pd.read_csv("./review_score.csv")
# 리뷰 문장을 입력 데이터로, 해당 리뷰의 평점을 라벨 데이터로 설정합니다.
X = data['Review']
y = data['Score']
y = y - 1 # 값을 1~5에서 0~4로 변경
# 문장 내 각 단어를 숫자로 변환하는 Tokenizer를 적용합니다.
tokenizer = Tokenizer()
tokenizer.fit_on_texts(X)
X = tokenizer.texts_to_sequences(X) #각 단어를 숫자로 tokenizer 활용
# 전체 단어 중에서 가장 큰 숫자로 mapping된 단어의 숫자를 가져옵니다.
# 즉, max_features는 전체 데이터셋에 등장하는 겹치지 않는 단어의 개수 + 1과 동일합니다.
max_features = max([max(_in) for _in in X]) + 1
# 불러온 데이터셋을 학습 데이터 80%, 테스트 데이터 20%로 분리합니다.
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)
# 모든 문장들을 가장 긴 문장의 단어 개수가 되게 padding을 추가합니다.
X_train = pad_sequences(X_train, maxlen=max_len)
X_test = pad_sequences(X_test, maxlen=max_len)
return X_train, X_test, y_train, y_test, max_features
def build_rnn_model(max_features, embedding_size):
model = Sequential()
# TODO: [지시사항 1번] Simple RNN 기반의 모델을 완성하세요.
model.add(layers.Embedding(max_features, embedding_size))
model.add(layers.SimpleRNN(20))
model.add(layers.Dense(5, activation='softmax')) # 5개의 클래스 리뷰 5점까지
return model
def build_lstm_model(max_features, embedding_size):
model = Sequential()
# TODO: [지시사항 2번] LSTM 기반의 모델을 완성하세요.
model.add(layers.Embedding(max_features, embedding_size))
model.add(layers.LSTM(20))
model.add(layers.Dense(5, activation='softmax'))
return model
def build_gru_model(max_features, embedding_size):
model = Sequential()
# TODO: [지시사항 3번] GRU 기반의 모델을 완성하세요.
model.add(layers.Embedding(max_features, embedding_size))
model.add(layers.GRU(20))
model.add(layers.Dense(5, activation='softmax'))
return model
def run_model(model, X_train, X_test, y_train, y_test, epochs=10):
# TODO: [지시사항 4번] 모델 학습을 위한 optimizer, loss 함수, 평가 지표를 설정하세요.
optimizer = Adam(learning_rate= 0.001)
model.compile(optimizer=optimizer, loss='sparse_categorical_crossentropy', metrics=['accuracy'])
# TODO: [지시사항 5번] 모델 학습을 위한 hyperparameter를 설정하세요.
hist = model.fit(X_train, y_train, epochs=epochs, batch_size=256, shuffle=True, verbose=2)
test_loss, test_acc = model.evaluate(X_test, y_test, verbose=0)
return test_loss, test_acc, optimizer, hist
def main():
tf.random.set_seed(2022)
max_len = 150
embedding_size = 128
X_train, X_test, y_train, y_test, max_features = load_data(max_len)
rnn_model = build_rnn_model(max_features, embedding_size)
lstm_model = build_lstm_model(max_features, embedding_size)
gru_model = build_gru_model(max_features, embedding_size)
rnn_test_loss, rnn_test_acc, _, _ = run_model(rnn_model, X_train, X_test, y_train, y_test)
lstm_test_loss, lstm_test_acc, _, _ = run_model(lstm_model, X_train, X_test, y_train, y_test)
gru_test_loss, gru_test_acc, _, _ = run_model(gru_model, X_train, X_test, y_train, y_test)
print()
print("=" * 20, "모델 별 Test Loss와 정확도", "=" * 20)
print("[RNN ] 테스트 Loss: {:.5f}, 테스트 Accuracy: {:.3f}%".format(rnn_test_loss, rnn_test_acc * 100))
print("[LSTM] 테스트 Loss: {:.5f}, 테스트 Accuracy: {:.3f}%".format(lstm_test_loss, lstm_test_acc * 100))
print("[GRU ] 테스트 Loss: {:.5f}, 테스트 Accuracy: {:.3f}%".format(gru_test_loss, gru_test_acc * 100))
if __name__ == "__main__":
main()
실습5 RNN 기반 모델을 통한 회귀분석 나스닥 상장 기업 주가 데이터셋
from elice_utils import EliceUtils
elice_utils = EliceUtils()
import os
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'
import tensorflow as tf
from tensorflow.keras import layers, Sequential
from tensorflow.keras.optimizers import Adam
from sklearn.preprocessing import StandardScaler
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.ticker as ticker
def load_data(window_size):
raw_data_df = pd.read_csv("./AAPL.csv", index_col="Date")
# 데이터 전체를 표준화합니다.
scaler = StandardScaler()
raw_data = scaler.fit_transform(raw_data_df)
plot_data = {"mean": scaler.mean_[3], "var": scaler.var_[3], "date": raw_data_df.index}
# 입력 데이터(X)는 시작가, 일 최고가, 일 최저가, 종가 데이터를 사용하고
# 라벨 데이터(y)는 4번째 컬럼에 해당하는 종가 데이터만 사용합니다.
raw_X = raw_data[:, :4]
raw_y = raw_data[:, 3]
# window_size 개의 데이터를 불러와 입력 데이터(X)로 설정하고
# window_size보다 한 시점 뒤의 데이터를 예측할 대상(y)으로 설정하여
# 데이터셋을 구성합니다.
X, y = [], []
for i in range(len(raw_X) - window_size):
cur_prices = raw_X[i:i + window_size, :]
target = raw_y[i + window_size]
X.append(list(cur_prices))
y.append(target)
# X와 y를 numpy array로 변환합니다.
X = np.array(X)
y = np.array(y)
# 학습 데이터는 전체 데이터의 80%, 테스트 데이터는 20%로 설정합니다.
total_len = len(X)
train_len = int(total_len * 0.8)
X_train, y_train = X[:train_len], y[:train_len]
X_test, y_test = X[train_len:], y[train_len:]
return X_train, X_test, y_train, y_test, plot_data
def build_rnn_model(window_size, num_features):
model = Sequential()
# TODO: [지시사항 1번] SimpleRNN 기반 모델을 구성하세요.
model.add(layers.SimpleRNN(256, input_shape=(window_size, num_features)))
model.add(layers.Dense(64, activation='relu'))
model.add(layers.Dense(16, activation='relu'))
model.add(layers.Dense(1))
return model
def build_lstm_model(window_size, num_features):
model = Sequential()
# TODO: [지시사항 2번] LSTM 기반 모델을 구성하세요.
model.add(layers.LSTM(256, input_shape=(window_size, num_features)))
model.add(layers.Dense(64, activation='relu'))
model.add(layers.Dense(16, activation='relu'))
model.add(layers.Dense(1))
return model
def build_gru_model(window_size, num_features):
model = Sequential()
# TODO: [지시사항 3번] GRU 기반 모델을 구성하세요.
model.add(layers.GRU(256, input_shape=(window_size, num_features)))
model.add(layers.Dense(64, activation='relu'))
model.add(layers.Dense(16, activation='relu'))
model.add(layers.Dense(1))
return model
def run_model(model, X_train, X_test, y_train, y_test, epochs=10, name=None):
# TODO: [지시사항 4번] 모델 학습을 위한 optimizer와 loss 함수를 설정하세요.
optimizer = Adam(learning_rate=0.001)
model.compile(optimizer=optimizer, loss='mse')
# TODO: [지시사항 5번] 모델 학습을 위한 hyperparameter를 설정하세요.
hist = model.fit(X_train, y_train, batch_size=128, epochs=epochs, shuffle=True, verbose=2)
# 테스트 데이터셋으로 모델을 테스트합니다.
test_loss = model.evaluate(X_test, y_test, verbose=0)
print("[{}] 테스트 loss: {:.5f}".format(name, test_loss))
print()
return optimizer, hist
def plot_result(model, X_true, y_true, plot_data, name):
y_pred = model.predict(X_true)
# 표준화된 결과를 다시 원래 값으로 변환합니다.
y_true_orig = (y_true * np.sqrt(plot_data["var"])) + plot_data["mean"]
y_pred_orig = (y_pred * np.sqrt(plot_data["var"])) + plot_data["mean"]
# 테스트 데이터에서 사용한 날짜들만 가져옵니다.
test_date = plot_data["date"][-len(y_true):]
# 모델의 예측값을 실제값과 함께 그래프로 그립니다.
fig = plt.figure(figsize=(12, 8))
ax = plt.gca()
ax.plot(y_true_orig, color="b", label="True")
ax.plot(y_pred_orig, color="r", label="Prediction")
ax.set_xticks(list(range(len(test_date))))
ax.set_xticklabels(test_date, rotation=45)
ax.xaxis.set_major_locator(ticker.MultipleLocator(100))
ax.yaxis.set_major_locator(ticker.MultipleLocator(100))
ax.set_title("{} Result".format(name))
ax.legend(loc="upper left")
plt.tight_layout()
plt.savefig("apple_stock_{}".format(name.lower()))
elice_utils.send_image("apple_stock_{}.png".format(name.lower()))
def main():
tf.random.set_seed(2022)
window_size = 30
X_train, X_test, y_train, y_test, plot_data = load_data(window_size)
num_features = X_train[0].shape[1]
rnn_model = build_rnn_model(window_size, num_features)
lstm_model = build_lstm_model(window_size, num_features)
gru_model = build_gru_model(window_size, num_features)
run_model(rnn_model, X_train, X_test, y_train, y_test, name="RNN")
run_model(lstm_model, X_train, X_test, y_train, y_test, name="LSTM")
run_model(gru_model, X_train, X_test, y_train, y_test, name="GRU")
plot_result(rnn_model, X_test, y_test, plot_data, name="RNN")
plot_result(lstm_model, X_test, y_test, plot_data, name="LSTM")
plot_result(gru_model, X_test, y_test, plot_data, name="GRU")
if __name__ == "__main__":
main()