Passing the symbolic theano.tensor to the compiled function theano.function

I am trying to reorganize my code so that it is easier to easily change architectures. I am currently creating a recurrent neural network as follows.

# input (where first dimension is time)
x = T.matrix()
# target (where first dimension is time)
t = T.matrix()

# recurrent weights as a shared variable
W_hh = theano.shared(numpy.random.uniform(size=(n, n), low=-.01, high=.01))
# input to hidden layer weights
W_hx = theano.shared(numpy.random.uniform(size=(n, nin), low=-.01, high=.01))
# hidden to output layer weights
W_yh = theano.shared(numpy.random.uniform(size=(nout, n), low=-.01, high=.01))
# hidden layer bias weights
b_h = theano.shared(numpy.zeros((n)))
# output layer bias weights
b_y = theano.shared(numpy.zeros((nout)))
# initial hidden state of the RNN
h0 = theano.shared(numpy.zeros((n)))

# recurrent function
def step(x_t, h_tm1):
    h_t = T.nnet.sigmoid(T.dot(W_hx, x_t) + T.dot(W_hh, h_tm1) + b_h)
    y_t = T.nnet.sigmoid(T.dot(W_yh, h_t) + b_y)
    return h_t, y_t

# loop over the recurrent function for the entire sequence
[h, y], _ = theano.scan(step,
                        sequences=x,
                        outputs_info=[h0, None])

# predict function outputs y for a given x
predict = theano.function(inputs=[x,], outputs=y)

This works great. But the problem with this implementation is that I have to hardcode the scales and make sure that all the math is correct with every change in architecture. Inspired by Perceptron 's layered tutorial , I tried to refactor my code by introducing the Layer class.

class Layer:
    def __init__(self, inputs=[], nins=[], nout=None, Ws=[], b=None, activation=T.tanh):
        """
        inputs:               an array of theano symbolic vectors
        activation:           the activation function for the hidden layer
        nins, nouts, Ws, bs:  either pass the dimensions of the inputs and outputs, or pass
                              the shared theano tensors for the weights and bias.
        """
        n = len(inputs)
        assert(n is not 0)

        self.inputs = inputs
        self.activation = activation

        # create the shared weights if necessary
        if len(Ws) is 0:
            assert(len(nins) is n)
            assert(nout is not None)
            for i in range(n):
                input = inputs[i]
                nin = nins[i]
                W = theano.shared(
                    numpy.random.uniform(
                        size=(nout, nin),
                        low=-numpy.sqrt(6. / (nin + nout)),
                        high=numpy.sqrt(6. / (nin + nout))
                    ),
                )
                Ws.append(W)

        # create the shared biases if necessary
        if b is None:
            assert(nout is not None)
            b = theano.shared(numpy.zeros((nout,)))

        self.Ws = Ws
        self.b = b
        self.params = self.Ws + [b]
        self.weights = Ws

        linear = self.b
        for i in range(n):
            linear += T.dot(self.Ws[i], self.inputs[i])

        if self.activation:
            self.output = self.activation(linear)
        else:
            self.output = linear

This allows me to write RNN code much cleaner, less error prone, and much easier to change architectures.

# one step of the input
x = T.vector()
# the previous hidden layer
h_tm1 = T.vector()

# the input and the hidden layer go into the input layer
hiddenLayer = Layer(inputs=[x, h_tm1],
                    nins=[nin, n],
                    nout=n,
                    activation=T.nnet.sigmoid)

# the hidden layer vector
h = hiddenLayer.output

# the hidden layer output goes to the output
outputLayer = Layer(inputs=[h],
                    nins=[n],
                    nout=nout,
                    activation=T.nnet.sigmoid)

# the output layer vector
y = outputLayer.output

# recurrent function
step = theano.function(inputs=[x, h_tm1],
                       outputs=[h, y])

# next we need to scan over all steps for a given array of observations
# input (where first dimension is time)
Xs = T.matrix()
# initial hidden state of the RNN
h0 = theano.shared(numpy.zeros((n)))

# loop over the recurrent function for the entire sequence
[Hs, Ys], _ = theano.scan(step,
                        sequences=Xs,
                        outputs_info=[h0, None])

# predict function outputs y for a given x
predict = theano.function(inputs=[Xs,], outputs=Ys)

However, when I run my program, I get an error

TypeError: ('Bad input argument to theano function at index 0(0-based)', 'Expected an array-like object, but found a Variable: maybe you are trying to call a function on a (possibly shared) variable instead of a numeric array?')

, ( Xs) .

, . 4 (x, h_tm1, h, y), , Xs. , , theano.function .

, , .

import theano
import theano.tensor as T
import numpy

# one step of the computation is defined elsewhere, perhaps in a class.
A = T.vector("A")
prior_result = T.vector('p')
next_result = prior_result*A

# Now we compile a function to represent one step of the computation given
# some symbolic variables
step = theano.function(inputs=[prior_result, A], outputs=next_result)

# Finally, we have to loop through that step k times
k = T.iscalar("k")

result, updates = theano.scan(step, 
                              outputs_info=T.ones_like(A),
                              n_steps=k,
                              non_sequences=[A])

final_result = result[-1]

# And now we can define out power function
power = theano.function(inputs=[A,k], outputs=final_result)

print power(range(10),2)
print power(range(10),4)

, ?

+4

All Articles