Upload compressed data to Google BigQuery using the API

Over the past few days, I have been working on improving the google_bigquery connector in logstash. Currently, I have been able to add features such as error handling (wrong lines), better connection management and a few other things.

The last, but most important, function that I worked on is loading compressed data into BigQuery, and the API documentation is terrible.

while I can upload CSV files directly to BQ using the Jobs.insert method and I noticed that it clearly states that the data can be downloaded compressed.

there is only one question that I can do without using Google Cloud Storage, since the compressed option is designed to reduce network bandwidth and its cost, as well as adding another route (such as cost) such as GCS, it makes no sense

The error code I get is:

BQ: job failed, turn on debug and check the full answer (maybe the problem is an incompatible circuit). DO NOT delete the local file. {: Job_id => "job_OvWTWOXGv9yGnLKfrTfGfukLytM" ,: File_name => "/Users/dave.e/Logstash/TMP/Bq-logstash_daves-mpb.local_2014-08-26.part000.log.gz",: {Job_status status "=>" DONE "," errorResult "=> {" reason "=>" internalError "," location "=>" File: 0 "," message "=>" Unexpected. Please try again. "}," errors "=> [{" reason "=>" internalError "," location "=>" File: 0 "," message "=>" Unexpected. Please try again. "}}} ,: level =>: error}

  # Uploads a local file to the configured bucket.
  def upload_object(filename)
    @logger.debug("entering upload_object")
    begin
      @logger.debug("1")
      require 'json'
      @logger.debug("2")
      table_id = @table_prefix + "_" + get_date_pattern(filename)
      @logger.debug("3")
      # BQ does not accept anything other than alphanumeric and _
      # Ref: https://developers.google.com/bigquery/browser-tool-quickstart?hl=en
      table_id = table_id.gsub(':','_').gsub('-', '_')
      @logger.debug("table bane has been modified")

      @logger.debug("BQ: upload object.",
                    :filename => filename,
                    :table_id => table_id)
      media = Google::APIClient::UploadIO.new(filename, "application/octet-stream")
      body = {
        "configuration" => {
          "load" => {
            "sourceFormat" => "NEWLINE_DELIMITED_JSON",
            "schema" => @json_schema,
            "destinationTable"  =>  {
              "projectId" => @project_id,
              "datasetId" => @dataset,
              "tableId" => table_id
            },
            'createDisposition' => 'CREATE_IF_NEEDED',
            'writeDisposition' => 'WRITE_APPEND',
            'maxBadRecords' => 99
          }
        }
      }

      @logger.debug("Execution details: ",
                   :body_object => body,
                   :parameters => {
                     'uploadType' => 'multipart',
                     'projectId' => @project_id
                   },
                  :media => media)

      datasetId = @project_id+":"+@dataset

      verify_dataset = @client.execute(:api_method => @bq.datasets.get,
                                       :parameters => {
                                         'projectId' => @project_id,
                                         'datasetId' => datasetId })

      status = JSON.parse(verify_dataset.response.body)["id"]
      if status != dataset
         @logger.info("BQ: dataset doesnt exist, creating it instead")
         create_dataset = @client.execute(:api_method => @bq.datasets.insert,
                                          :parameters => { 'projectId' => @project_id },
                                          :body_object => { 'id' => datasetId })
         sleep 10
      end

      insert_result = @client.execute(:api_method => @bq.jobs.insert,
                                      :body_object => body,
                                      :parameters => {
                                        'uploadType' => 'multipart',
                                        'projectId' => @project_id
                                      },
                                      :media => media)

      job_id = JSON.parse(insert_result.response.body)["jobReference"]["jobId"]
      @logger.debug("BQ: multipart insert",
                    :job_id => job_id)
      return job_id
    rescue => e
      @logger.error("BQ: failed to upload file", :exception => e)
      # TODO(rdc): limit retries?
      sleep 1
      if File.exist?(filename)
        retry
      end
    end
  end
+4
1

, gzip , gzip .

. , ?

+1

All Articles