How to do aggregation conversion in kiba etl script (kiba gem)?

I want to write a Kiba Etl script that has a source from CSV to CSV destination with a list of conversion rules, among which the 2nd transformer is an Aggregation, in which operations such as select name, sum (euro) group by name

Kiba ETL script file

source CsvSource, 'users.csv', col_sep: ';', headers: true, header_converters: :symbol transform VerifyFieldsPresence, [:name, :euro] transform AggregateFields, { sum: :euro, group_by: :name} transform RenameField,from: :euro, to: :total_amount destination CsvDestination, 'result.csv', [:name, :total_amount] 

users.csv

 date;euro;name 7/3/2015;10;Jack 7/3/2015;85;Jill 8/3/2015;6;Jack 8/3/2015;12;Jill 9/3/2015;99;Mack 

result.csv (expected result)

 total_amount;name 16;Jack 97;Jill 99;Mack 

Since etl transformers are executed one after another in one line at a time, but my behavior of the 2nd transformer depends on the whole set of lines, which I cannot get to it in the class, which is passed to the conversion method.

 transform AggregateFields, { sum: :euro, group_by: :name } 

Is it possible that this behavior can be achieved using kiba gem
Thanks at Advance

+5
source share
1 answer

Kiba the author is here! You can achieve this in different ways, depending on the size of the data and your actual needs. Here are a few possibilities.

Aggregating using a variable in your Kiba script

 require 'awesome_print' transform do |r| r[:amount] = BigDecimal.new(r[:amount]) r end total_amounts = Hash.new(0) transform do |r| total_amounts[r[:name]] += r[:amount] r end post_process do # pretty print here, but you could save to a CSV too ap total_amounts end 

This is the easiest way, but it is quite flexible.

It will keep your aggregates in memory, although it may be good enough or not, depending on your scenario. Please note that Kiba is currently single-threaded (but Kiba Pro will be multithreaded), so there is no need to add a lock or use a thread-safe structure for the aggregate. For now

TextQL call from post_process blocks

Another quick and easy way to aggregate is to create a non-aggregated CSV file first and then use TextQl to actually perform the aggregation, for example:

 destination CsvSource, 'non-aggregated-output.csv', [:name, :amount] post_process do query = <<SQL select name, /* apparently sqlite has reduced precision, round to 2 for now */ round(sum(amount), 2) as total_amount from tbl group by name SQL textql('non-aggregated-output.csv', query, 'aggregated-output.csv') end 

The following helpers are defined:

 def system!(cmd) raise "Failed to run command #{command}" unless system(command) end def textql(source_file, query, output_file) system! "cat #{source_file} | textql -header -output-header=true -sql \"#{query}\" > #{output_file}" # this one uses csvfix to pretty print the table system! "cat #{output_file} | csvfix ascii_table" end 

Be careful with accuracy though when doing the calculations.

Writing an aggregation destination in memory

A useful trick that might work here is to wrap a given destination with a class in order to perform aggregation. Here's what it might look like:

 class InMemoryAggregate def initialize(sum:, group_by:, destination:) @aggregate = Hash.new(0) @sum = sum @group_by = group_by # this relies a bit on the internals of Kiba, but not too much @destination = destination.shift.new(*destination) end def write(row) # do not write, but count here instead @aggregate[row[@group_by]] += row[@sum] end def close # use close to actually do the writing @aggregate.each do |k,v| # reformat BigDecimal additions here value = '%0.2f' % v @destination.write(@group_by => k, @sum => value) end @destination.close end end 

which you can use as follows:

 # convert your string into an actual number transform do |r| r[:amount] = BigDecimal.new(r[:amount]) r end destination CsvDestination, 'non-aggregated.csv', [:name, :amount] destination InMemoryAggregate, sum: :amount, group_by: :name, destination: [ CsvDestination, 'aggregated.csv', [:name, :amount] ] post_process do system!("cat aggregated.csv | csvfix ascii_table") end 

The best part about this version is that you can reuse the aggregator with different addresses (for example, a database or something else).

Please note that this will save all aggregates in memory, like the first version.

Insertion into storage with aggregation capabilities

Another way (especially useful if you have very large volumes) is to send the received data to something that will be able to aggregate the data for you. It could be a regular SQL database, Redis, or something more interesting that you could query as needed.

So, as I said, the implementation will largely depend on your real needs. Hope you find something that works for you here!

+7
source

All Articles