Kiba the author is here! You can achieve this in different ways, depending on the size of the data and your actual needs. Here are a few possibilities.
Aggregating using a variable in your Kiba script
require 'awesome_print' transform do |r| r[:amount] = BigDecimal.new(r[:amount]) r end total_amounts = Hash.new(0) transform do |r| total_amounts[r[:name]] += r[:amount] r end post_process do
This is the easiest way, but it is quite flexible.
It will keep your aggregates in memory, although it may be good enough or not, depending on your scenario. Please note that Kiba is currently single-threaded (but Kiba Pro will be multithreaded), so there is no need to add a lock or use a thread-safe structure for the aggregate. For now
TextQL call from post_process blocks
Another quick and easy way to aggregate is to create a non-aggregated CSV file first and then use TextQl to actually perform the aggregation, for example:
destination CsvSource, 'non-aggregated-output.csv', [:name, :amount] post_process do query = <<SQL select name, round(sum(amount), 2) as total_amount from tbl group by name SQL textql('non-aggregated-output.csv', query, 'aggregated-output.csv') end
The following helpers are defined:
def system!(cmd) raise "Failed to run command #{command}" unless system(command) end def textql(source_file, query, output_file) system! "cat #{source_file} | textql -header -output-header=true -sql \"#{query}\" > #{output_file}"
Be careful with accuracy though when doing the calculations.
Writing an aggregation destination in memory
A useful trick that might work here is to wrap a given destination with a class in order to perform aggregation. Here's what it might look like:
class InMemoryAggregate def initialize(sum:, group_by:, destination:) @aggregate = Hash.new(0) @sum = sum @group_by = group_by
which you can use as follows:
# convert your string into an actual number transform do |r| r[:amount] = BigDecimal.new(r[:amount]) r end destination CsvDestination, 'non-aggregated.csv', [:name, :amount] destination InMemoryAggregate, sum: :amount, group_by: :name, destination: [ CsvDestination, 'aggregated.csv', [:name, :amount] ] post_process do system!("cat aggregated.csv | csvfix ascii_table") end
The best part about this version is that you can reuse the aggregator with different addresses (for example, a database or something else).
Please note that this will save all aggregates in memory, like the first version.
Insertion into storage with aggregation capabilities
Another way (especially useful if you have very large volumes) is to send the received data to something that will be able to aggregate the data for you. It could be a regular SQL database, Redis, or something more interesting that you could query as needed.
So, as I said, the implementation will largely depend on your real needs. Hope you find something that works for you here!