What is the best way to process data stored in different places in Google BigQuery?

My current workflow in BigQuery is as follows:

(1) request data in a public repository (stored in the USA), (2) write it to a table in my repository, (3) export csv to a cloud bucket, and (4) upload csv to the server I'm working on, and (5 ) work with it on the server.

Now the problem is that the server I'm working on is in the EU. Thus, I have to pay quite a lot of data transfer fees between my American bucket and my EU server. Now I could go and find my bucket in the EU, but then I still have a problem that I will transfer data from the USA (BigQuery) to the EU (bucket). Therefore, I could also set my data set to bq for placement in the EU, but then I can not do any more requests because the data in the public repository is located in the USA and requests between different addresses are not allowed.

Does anyone have an idea on how to approach this?

+4
source share
2

BigQuery - . , bucket-to-bucket, .

:

  • BigQuery , . ( Avro .)
  • , .
  • BigQuery, .

Python:

# Copyright 2018 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import datetime
import sys
import time

import googleapiclient.discovery
from google.cloud import bigquery
import json
import pytz


PROJECT_ID = 'swast-scratch'  # TODO: set this to your project name
FROM_LOCATION = 'US'  # TODO: set this to the BigQuery location
FROM_DATASET = 'workflow_test_us'  # TODO: set to BQ dataset name
FROM_BUCKET = 'swast-scratch-us'  # TODO: set to bucket name in same location
TO_LOCATION = 'EU'  # TODO: set this to the destination BigQuery location
TO_DATASET = 'workflow_test_eu'  # TODO: set to destination dataset name
TO_BUCKET = 'swast-scratch-eu'  # TODO: set to bucket name in destination loc

# Construct API clients.
bq_client = bigquery.Client(project=PROJECT_ID)
transfer_client = googleapiclient.discovery.build('storagetransfer', 'v1')


def extract_tables():
    # Extract all tables in a dataset to a Cloud Storage bucket.
    print('Extracting {}:{} to bucket {}'.format(
        PROJECT_ID, FROM_DATASET, FROM_BUCKET))

    tables = list(bq_client.list_tables(bq_client.dataset(FROM_DATASET)))
    extract_jobs = []
    for table in tables:
        job_config = bigquery.ExtractJobConfig()
        job_config.destination_format = bigquery.DestinationFormat.AVRO
        extract_job = bq_client.extract_table(
            table.reference,
            ['gs://{}/{}.avro'.format(FROM_BUCKET, table.table_id)],
            location=FROM_LOCATION,  # Available in 0.32.0 library.
            job_config=job_config)  # Starts the extract job.
        extract_jobs.append(extract_job)

    for job in extract_jobs:
        job.result()

    return tables


def transfer_buckets():
    # Transfer files from one region to another using storage transfer service.
    print('Transferring bucket {} to {}'.format(FROM_BUCKET, TO_BUCKET))
    now = datetime.datetime.now(pytz.utc)
    transfer_job = {
        'description': '{}-{}-{}_once'.format(
            PROJECT_ID, FROM_BUCKET, TO_BUCKET),
        'status': 'ENABLED',
        'projectId': PROJECT_ID,
        'transferSpec': {
            'transferOptions': {
                'overwriteObjectsAlreadyExistingInSink': True,
            },
            'gcsDataSource': {
                'bucketName': FROM_BUCKET,
            },
            'gcsDataSink': {
                'bucketName': TO_BUCKET,
            },
        },
        # Set start and end date to today (UTC) without a time part to start
        # the job immediately.
        'schedule': {
            'scheduleStartDate': {
                'year': now.year,
                'month': now.month,
                'day': now.day,
            },
            'scheduleEndDate': {
                'year': now.year,
                'month': now.month,
                'day': now.day,
            },
        },
    }
    transfer_job = transfer_client.transferJobs().create(
        body=transfer_job).execute()
    print('Returned transferJob: {}'.format(
        json.dumps(transfer_job, indent=4)))

    # Find the operation created for the job.
    job_filter = {
        'project_id': PROJECT_ID,
        'job_names': [transfer_job['name']],
    }

    # Wait until the operation has started.
    response = {}
    while ('operations' not in response) or (not response['operations']):
        time.sleep(1)
        response = transfer_client.transferOperations().list(
            name='transferOperations', filter=json.dumps(job_filter)).execute()

    operation = response['operations'][0]
    print('Returned transferOperation: {}'.format(
        json.dumps(operation, indent=4)))

    # Wait for the transfer to complete.
    print('Waiting ', end='')
    while operation['metadata']['status'] == 'IN_PROGRESS':
        print('.', end='')
        sys.stdout.flush()
        time.sleep(5)
        operation = transfer_client.transferOperations().get(
            name=operation['name']).execute()
    print()

    print('Finished transferOperation: {}'.format(
        json.dumps(operation, indent=4)))


def load_tables(tables):
    # Load all tables into the new dataset.
    print('Loading tables from bucket {} to {}:{}'.format(
        TO_BUCKET, PROJECT_ID, TO_DATASET))

    load_jobs = []
    for table in tables:
        dest_table = bq_client.dataset(TO_DATASET).table(table.table_id)
        job_config = bigquery.LoadJobConfig()
        job_config.source_format = bigquery.SourceFormat.AVRO
        load_job = bq_client.load_table_from_uri(
            ['gs://{}/{}.avro'.format(TO_BUCKET, table.table_id)],
            dest_table,
            location=TO_LOCATION,  # Available in 0.32.0 library.
            job_config=job_config)  # Starts the load job.
        load_jobs.append(load_job)

    for job in load_jobs:
        job.result()


# Actually run the script.
tables = extract_tables()
transfer_buckets()
load_tables(tables)

google-cloud-bigquery API BigQuery google-api-python-client API .

, .

+5

, , , , :

  • , , .

  • , BQ . , , , .

, . , , , , . , ( ) .

, Google , , , , .

0

All Articles