UPDATE November 2018:
The answer to this question is already outdated, as the Google cloud client has changed significantly since the last message.
; , ( Python ).
Original Answer:
, BQ.
, python-api, , : bq-python-api ( , , , ).
2 , BQ. - , , . :
import uuid
def stream_data(self, table, data, schema):
r = self.service.tables().list(projectId=your_project_id,
datasetId=your_dataset_id).execute()
table_exists = [row['tableReference']['tableId'] for row in
r['tables'] if
row['tableReference']['tableId'] == table]
if not table_exists:
body = {
'tableReference': {
'tableId': table,
'projectId': your_project_id,
'datasetId': your_dataset_id
},
'schema': schema
}
self.service.tables().insert(projectId=your_project_id,
datasetId=your_dataset_id,
body=body).execute()
body = {
'rows': [
{
'json': data,
'insertId': str(uuid.uuid4())
}
]
}
self.service.tabledata().insertAll(projectId=your_project_id),
datasetId=your_dataset_id,
tableId=table,
body=body).execute(num_retries=5)
self.service service.
data, :
data = {u'days_validated': '20', u'days_trained': '80', u'navigated_score': '1', u'description': 'First trial of top seller alg. No filter nor any condition is applied. Skus not present in train count as rank=0.5', u'init_cv_date': '2016-03-06', u'metric': 'rank', u'unix_date': '1461610020241117', u'purchased_score': '10', u'result': '0.32677139316724546', u'date': '2016-04-25', u'carted_score': '3', u'end_cv_date': '2016-03-25'}
schema:
schema = {u'fields': [{u'type': u'STRING', u'name': u'date', u'mode': u'NULLABLE'}, {u'type': u'INTEGER', u'name': u'unix_date', u'mode': u'NULLABLE'}, {u'type': u'STRING', u'name': u'init_cv_date', u'mode': u'NULLABLE'}, {u'type': u'STRING', u'name': u'end_cv_date', u'mode': u'NULLABLE'}, {u'type': u'INTEGER', u'name': u'days_trained', u'mode': u'NULLABLE'}, {u'type': u'INTEGER', u'name': u'days_validated', u'mode': u'NULLABLE'}, {u'type': u'INTEGER', u'name': u'navigated_score', u'mode': u'NULLABLE'}, {u'type': u'INTEGER', u'name': u'carted_score', u'mode': u'NULLABLE'}, {u'type': u'INTEGER', u'name': u'purchased_score', u'mode': u'NULLABLE'}, {u'type': u'STRING', u'name': u'description', u'mode': u'NULLABLE'}, {u'type': u'STRING', u'name': u'metric', u'mode': u'NULLABLE'}, {u'type': u'FLOAT', u'name': u'result', u'mode': u'NULLABLE'}]}
- . , . , , :
def create_table_from_query(self,
query,
dest_table,
how):
body = {
'configuration': {
'query': {
'destinationTable': {
'projectId': your_project_id,
'tableId': dest_table,
'datasetId': your_dataset_id
},
'writeDisposition': how,
'query': query,
},
}
}
response = self.connector.jobs().insert(projectId=self._project_id,
body=body).execute()
self.wait_job_completion(response['jobReference']['jobId'])
def wait_job_completion(self, job_id):
while True:
response = self.connector.jobs().get(projectId=self._project_id,
jobId=job_id).execute()
if response['status']['state'] == 'DONE':
return
how (, "WRITE_TRUNCATE" "WRITE_APPEND").
CSV , , configuration :
"configuration": {
"load": {
"fieldDelimiter": "\t"
"sourceFormat": "CSV"
"destinationTable": {
"projectId": your_project_id,
"tableId": table_id,
"datasetId": your_dataset_id
},
"writeDisposition": "WRITE_TRUNCATE"
"schema": schema,
"sourceUris": file_location_in_google_cloud_storage
},
}
( CSV , . JSON, ).
() ( wait_job_completion). , .
, ,