AppEngine: Migrating Entity Schema with Piping

I'm curious about what works best for porting entity schemas to the Google App Engine. We use the pipeline a lot, and my tendency was to build a pipeline task to handle this migration. So this is what I came up with (in this example I save if the age of the user is a prime):

class MigrateUsers(pipeline.Pipeline):
  def run(self, keys):
    futures = []
    users = ndb.get_multi(keys)
    for user in users:
      user.age_is_prime = is_prime(user.age)
      futures.append(user.put_async())
    ndb.Future.wait_all(futures)

class Migration(pipeline.Pipeline):
  def run(self):
    all_results = []

    q = ds.User.query().filter()
    more = True
    next_cursor = None

    # Fetch user keys in batch and create MigrateUsers jobs
    while more:
      user_keys, next_cursor, more = \
        q.fetch_page(500, keys_only=True, start_cursor=next_cursor)
      all_results.append((yield MigrateUsers(keys=user_keys)))

    # Wait for them all to finish
    pipeline.After(*all_results)

My question is, am I doing this right? It feels like my Migration tasks are repeated for all users to create segmented tasks. I looked at cartouche, but I did not have the feeling that it was appropriate. I would appreciate any advice, and if you use mapreduce and don't mind converting your example, I would really appreciate it.

+4
2

MapReduce . , , , , . "", "" mapreduce.

mapreduce , , , . MR SDK DatastoreInputReader(), , :

from mapreduce import operation as op
def prime_age_map(user_entity):
    user_entity.age_is_prime = is_prime(user.age)
    if user_entity.age_is_prime:
        yield op.db.Put(user_entity)

- , , SDK, , , , , , .

, , , , (http://primes.utm.edu/lists/small/1000.txt - 30 ) . MapReduce, DatastoreInputReader, , Kind.

+3

TaskQueues . , backend MapReduce, IMO. : .

0

All Articles