BigQuery - . , bucket-to-bucket, .
:
- BigQuery , . ( Avro .)
- , .
- BigQuery, .
Python:
import datetime
import sys
import time
import googleapiclient.discovery
from google.cloud import bigquery
import json
import pytz
PROJECT_ID = 'swast-scratch'
FROM_LOCATION = 'US'
FROM_DATASET = 'workflow_test_us'
FROM_BUCKET = 'swast-scratch-us'
TO_LOCATION = 'EU'
TO_DATASET = 'workflow_test_eu'
TO_BUCKET = 'swast-scratch-eu'
bq_client = bigquery.Client(project=PROJECT_ID)
transfer_client = googleapiclient.discovery.build('storagetransfer', 'v1')
def extract_tables():
print('Extracting {}:{} to bucket {}'.format(
PROJECT_ID, FROM_DATASET, FROM_BUCKET))
tables = list(bq_client.list_tables(bq_client.dataset(FROM_DATASET)))
extract_jobs = []
for table in tables:
job_config = bigquery.ExtractJobConfig()
job_config.destination_format = bigquery.DestinationFormat.AVRO
extract_job = bq_client.extract_table(
table.reference,
['gs://{}/{}.avro'.format(FROM_BUCKET, table.table_id)],
location=FROM_LOCATION,
job_config=job_config)
extract_jobs.append(extract_job)
for job in extract_jobs:
job.result()
return tables
def transfer_buckets():
print('Transferring bucket {} to {}'.format(FROM_BUCKET, TO_BUCKET))
now = datetime.datetime.now(pytz.utc)
transfer_job = {
'description': '{}-{}-{}_once'.format(
PROJECT_ID, FROM_BUCKET, TO_BUCKET),
'status': 'ENABLED',
'projectId': PROJECT_ID,
'transferSpec': {
'transferOptions': {
'overwriteObjectsAlreadyExistingInSink': True,
},
'gcsDataSource': {
'bucketName': FROM_BUCKET,
},
'gcsDataSink': {
'bucketName': TO_BUCKET,
},
},
'schedule': {
'scheduleStartDate': {
'year': now.year,
'month': now.month,
'day': now.day,
},
'scheduleEndDate': {
'year': now.year,
'month': now.month,
'day': now.day,
},
},
}
transfer_job = transfer_client.transferJobs().create(
body=transfer_job).execute()
print('Returned transferJob: {}'.format(
json.dumps(transfer_job, indent=4)))
job_filter = {
'project_id': PROJECT_ID,
'job_names': [transfer_job['name']],
}
response = {}
while ('operations' not in response) or (not response['operations']):
time.sleep(1)
response = transfer_client.transferOperations().list(
name='transferOperations', filter=json.dumps(job_filter)).execute()
operation = response['operations'][0]
print('Returned transferOperation: {}'.format(
json.dumps(operation, indent=4)))
print('Waiting ', end='')
while operation['metadata']['status'] == 'IN_PROGRESS':
print('.', end='')
sys.stdout.flush()
time.sleep(5)
operation = transfer_client.transferOperations().get(
name=operation['name']).execute()
print()
print('Finished transferOperation: {}'.format(
json.dumps(operation, indent=4)))
def load_tables(tables):
print('Loading tables from bucket {} to {}:{}'.format(
TO_BUCKET, PROJECT_ID, TO_DATASET))
load_jobs = []
for table in tables:
dest_table = bq_client.dataset(TO_DATASET).table(table.table_id)
job_config = bigquery.LoadJobConfig()
job_config.source_format = bigquery.SourceFormat.AVRO
load_job = bq_client.load_table_from_uri(
['gs://{}/{}.avro'.format(TO_BUCKET, table.table_id)],
dest_table,
location=TO_LOCATION,
job_config=job_config)
load_jobs.append(load_job)
for job in load_jobs:
job.result()
tables = extract_tables()
transfer_buckets()
load_tables(tables)
google-cloud-bigquery API BigQuery google-api-python-client API .
, .