Over the past few days, I have been working on improving the google_bigquery connector in logstash. Currently, I have been able to add features such as error handling (wrong lines), better connection management and a few other things.
The last, but most important, function that I worked on is loading compressed data into BigQuery, and the API documentation is terrible.
while I can upload CSV files directly to BQ using the Jobs.insert method and I noticed that it clearly states that the data can be downloaded compressed.
there is only one question that I can do without using Google Cloud Storage, since the compressed option is designed to reduce network bandwidth and its cost, as well as adding another route (such as cost) such as GCS, it makes no sense
The error code I get is:
BQ: job failed, turn on debug and check the full answer (maybe the problem is an incompatible circuit). DO NOT delete the local file. {: Job_id => "job_OvWTWOXGv9yGnLKfrTfGfukLytM" ,: File_name => "/Users/dave.e/Logstash/TMP/Bq-logstash_daves-mpb.local_2014-08-26.part000.log.gz",: {Job_status status "=>" DONE "," errorResult "=> {" reason "=>" internalError "," location "=>" File: 0 "," message "=>" Unexpected. Please try again. "}," errors "=> [{" reason "=>" internalError "," location "=>" File: 0 "," message "=>" Unexpected. Please try again. "}}} ,: level =>: error}
def upload_object(filename)
@logger.debug("entering upload_object")
begin
@logger.debug("1")
require 'json'
@logger.debug("2")
table_id = @table_prefix + "_" + get_date_pattern(filename)
@logger.debug("3")
table_id = table_id.gsub(':','_').gsub('-', '_')
@logger.debug("table bane has been modified")
@logger.debug("BQ: upload object.",
:filename => filename,
:table_id => table_id)
media = Google::APIClient::UploadIO.new(filename, "application/octet-stream")
body = {
"configuration" => {
"load" => {
"sourceFormat" => "NEWLINE_DELIMITED_JSON",
"schema" => @json_schema,
"destinationTable" => {
"projectId" => @project_id,
"datasetId" => @dataset,
"tableId" => table_id
},
'createDisposition' => 'CREATE_IF_NEEDED',
'writeDisposition' => 'WRITE_APPEND',
'maxBadRecords' => 99
}
}
}
@logger.debug("Execution details: ",
:body_object => body,
:parameters => {
'uploadType' => 'multipart',
'projectId' => @project_id
},
:media => media)
datasetId = @project_id+":"+@dataset
verify_dataset = @client.execute(:api_method => @bq.datasets.get,
:parameters => {
'projectId' => @project_id,
'datasetId' => datasetId })
status = JSON.parse(verify_dataset.response.body)["id"]
if status != dataset
@logger.info("BQ: dataset doesnt exist, creating it instead")
create_dataset = @client.execute(:api_method => @bq.datasets.insert,
:parameters => { 'projectId' => @project_id },
:body_object => { 'id' => datasetId })
sleep 10
end
insert_result = @client.execute(:api_method => @bq.jobs.insert,
:body_object => body,
:parameters => {
'uploadType' => 'multipart',
'projectId' => @project_id
},
:media => media)
job_id = JSON.parse(insert_result.response.body)["jobReference"]["jobId"]
@logger.debug("BQ: multipart insert",
:job_id => job_id)
return job_id
rescue => e
@logger.error("BQ: failed to upload file", :exception => e)
sleep 1
if File.exist?(filename)
retry
end
end
end