2017-05-29 1 views
3

En dépit de passer par multipleexamples, je ne comprends toujours pas comment classer des séquences de longueur variable en utilisant Keras, similar to this question. Je peux former un réseau qui détecte les fréquences de sinusoïde avec différentes longueurs, en utilisant le masquage:Classer des séquences de différentes longueurs

from keras import models 
from keras.layers.recurrent import LSTM 
from keras.layers import Dense, Masking 
from keras.optimizers import RMSprop 
from keras.losses import categorical_crossentropy 
from keras.preprocessing.sequence import pad_sequences 

import numpy as np 


def gen_noise(noise_len, mag): 
    return np.random.uniform(size=noise_len) * mag 


def gen_sin(t_val, freq): 
    return 2 * np.sin(2 * np.pi * t_val * freq) 


def train_rnn(x_train, y_train, max_len, mask, number_of_categories): 
    epochs = 3 
    batch_size = 500 

    # three hidden layers of 256 each 
    vec_dims = 1 
    hidden_units = 256 
    in_shape = (max_len, vec_dims) 

    model = models.Sequential() 

    model.add(Masking(mask, name="in_layer", input_shape=in_shape,)) 
    model.add(LSTM(hidden_units, return_sequences=False)) 
    model.add(Dense(number_of_categories, input_shape=(number_of_categories,), 
       activation='softmax', name='output')) 

    model.compile(loss=categorical_crossentropy, optimizer=RMSprop()) 

    model.fit(x_train, y_train, batch_size=batch_size, epochs=epochs, 
       validation_split=0.05) 

    return model 


def gen_sig_cls_pair(freqs, t_stops, num_examples, noise_magnitude): 
    x = [] 
    y = [] 

    num_cat = len(freqs) 

    dt = 0.01 
    max_t = int(np.max(t_stops)/dt) 

    for f_i, f in enumerate(freqs): 
     for t_stop in t_stops: 
      t_range = np.arange(0, t_stop, dt) 
      t_len = t_range.size 

      for _ in range(num_examples): 
       sig = gen_sin(f, t_range) + gen_noise(t_len, noise_magnitude) 
       x.append(sig) 

       one_hot = np.zeros(num_cat, dtype=np.bool) 
       one_hot[f_i] = 1 
       y.append(one_hot) 

    pad_kwargs = dict(padding='post', maxlen=max_t, value=np.NaN, dtype=np.float32) 
    return pad_sequences(x, **pad_kwargs), np.array(y) 


if __name__ == '__main__': 
    noise_mag = 0.01 
    mask_val = -10 
    frequencies = (5, 7, 10) 
    signal_lengths = (0.8, 0.9, 1) 

    x_in, y_in = gen_sig_cls_pair(frequencies, signal_lengths, 50, noise_mag) 
    mod = train_rnn(x_in[:, :, None], y_in, 100, mask_val, len(frequencies)) 

Cependant, je ne comprends pas comment je suis censé dire Keras sur les autres séquences. J'ai pensé que je pourrais les masquer aussi, mais quand j'essaye, ils sortent juste NaN.

testing_dat, expected = gen_sig_cls_pair(frequencies, signal_lengths, 1, 0) 
res = mod.predict(testing_dat[:, :, None]) 

fig, axes = plt.subplots(3) 
axes[0].plot(np.concatenate(testing_dat), label="input") 

axes[1].plot(np.argmax(res, axis=1), "ro", label="result", alpha=0.2) 
axes[1].plot(np.argmax(expected, axis=1), "bo", label="expected", alpha=0.2) 
axes[1].legend(bbox_to_anchor=(1.1, 1)) 

axes[2].plot(res) 

plt.show() 

Comment créer un réseau capable d'évaluer des entrées de longueur variable?

Répondre

2

Vous pouvez pad les séquences d'entrée (habituellement avec des zéros) ou vous pouvez utiliser des lots de taille 1 avec différentes tailles d'entrée, comme indiqué dans answer de fchollet sur le github Keras:

for seq, label in zip(sequences, y): 
    model.train(np.array([seq]), [label]) 

Alternativement, si votre type de problème le permet, vous extrayez des sous-séquences de la série chronologique originale avec une longueur inférieure à la longueur des séquences les plus courtes. La troisième option vous permet également d'ajouter de la redondance à l'ensemble de données si vous avez peu d'échantillons et de réduire les risques de surapprentissage.

EDIT:

Seanny123 (OP) a souligné au-dessus des lignes de cette fchollet contiennent model.train, ce qui est un code valide. Il a résolu le problème en utilisant des lots de taille 1 et le code suivant:

from keras.models import Sequential 
from keras.layers import LSTM, Dense 
import numpy as np 


def gen_sig(num_samples, seq_len): 
    one_indices = np.random.choice(a=num_samples, size=num_samples // 2, replace=False) 

    x_val = np.zeros((num_samples, seq_len), dtype=np.bool) 
    x_val[one_indices, 0] = 1 

    y_val = np.zeros(num_samples, dtype=np.bool) 
    y_val[one_indices] = 1 

    return x_val, y_val 


N_train = 100 
N_test = 10 
recall_len = 20 

X_train, y_train = gen_sig(N_train, recall_len) 

X_test, y_test = gen_sig(N_train, recall_len) 

print('Build STATEFUL model...') 
model = Sequential() 
model.add(LSTM(10, batch_input_shape=(1, 1, 1), return_sequences=False, stateful=True)) 
model.add(Dense(1, activation='sigmoid')) 
model.compile(loss='binary_crossentropy', optimizer='adam', metrics=['accuracy']) 

print('Train...') 
for epoch in range(15): 
    mean_tr_acc = [] 
    mean_tr_loss = [] 

    for seq_idx in range(X_train.shape[0]): 
     start_val = X_train[seq_idx, 0] 
     assert y_train[seq_idx] == start_val 
     assert tuple(np.nonzero(X_train[seq_idx, :]))[0].shape[0] == start_val 

     y_in = np.array([y_train[seq_idx]], dtype=np.bool) 

     for j in range(np.random.choice(a=np.arange(5, recall_len+1))): 
      x_in = np.array([[[X_train[seq_idx][j]]]]) 
      tr_loss, tr_acc = model.train_on_batch(x_in, y_in) 

      mean_tr_acc.append(tr_acc) 
      mean_tr_loss.append(tr_loss) 

     model.reset_states() 

    print('accuracy training = {}'.format(np.mean(mean_tr_acc))) 
    print('loss training = {}'.format(np.mean(mean_tr_loss))) 
    print('___________________________________') 

    mean_te_acc = [] 
    mean_te_loss = [] 
    for seq_idx in range(X_test.shape[0]): 
     start_val = X_test[seq_idx, 0] 
     assert y_test[seq_idx] == start_val 
     assert tuple(np.nonzero(X_test[seq_idx, :]))[0].shape[0] == start_val 

     y_in = np.array([y_test[seq_idx]], dtype=np.bool) 

     for j in range(np.random.choice(a=np.arange(5, recall_len+1))): 
      te_loss, te_acc = model.test_on_batch(np.array([[[X_test[seq_idx][j]]]], dtype=np.bool), y_in) 
      mean_te_acc.append(te_acc) 
      mean_te_loss.append(te_loss) 
     model.reset_states() 

    print('accuracy testing = {}'.format(np.mean(mean_te_acc))) 
    print('loss testing = {}'.format(np.mean(mean_te_loss))) 
print('___________________________________') 
+0

Rembourrage la séquence d'entrée ne fonctionne pas pour moi, comme je démontrerai dans la question. Cependant, je vais essayer demain d'utiliser la taille de lot de 1 tour. Un lien vers une démonstration de ce tour serait apprécié. – Seanny123

+0

Ajouté le code dans ma réponse! – michetonu

+0

Vous avez raison, je l'ai copypassée à partir de la réponse de fchollet (le créateur de Keras) et j'ai juste supposé que ce serait correct. Je vais éditer ma réponse avec votre solution, c'est génial! – michetonu