Calculate differences between successful Hadoop entries with hive requests

I have a Hive table that contains data about customer calls. For simplicity, consider that it has 2 columns, the first column contains the client identifier, and the second column contains the call timestamp (unix timestamp).

I can query this table to find all the calls for each client:

SELECT * FROM mytable SORT BY customer_id, call_time; 

Result:

 Customer1 timestamp11 Customer1 timestamp12 Customer1 timestamp13 Customer2 timestamp21 Customer3 timestamp31 Customer3 timestamp32 ... 

Is it possible to create a beehive request that returns, for each client, starting from the second call, the time interval between two successful calls? In the above example, the query should return:

 Customer1 timestamp12-timestamp11 Customer1 timestamp13-timestamp12 Customer3 timestamp32-timestamp31 ... 

I tried to adapt the solutions from the sql solution , but I adhere to the limitations of Hive: it accepts subqueries only in FROM and joins should contain only equalities .

Thanks.

EDIT1:

I tried using the UUF Hive function:

 public class DeltaComputerUDF extends UDF { private String previousCustomerId; private long previousCallTime; public String evaluate(String customerId, LongWritable callTime) { long callTimeValue = callTime.get(); String timeDifference = null; if (customerId.equals(previousCustomerId)) { timeDifference = new Long(callTimeValue - previousCallTime).toString(); } previousCustomerId = customerId; previousCallTime = callTimeValue; return timeDifference; }} 

and use it with the name "delta".

But it seems (from the logs and results) that it is used during MAP. 2 problems follow from this:

First: Table data must be sorted by customer ID and timestamp before using this function. Request:

  SELECT customer_id, call_time, delta(customer_id, call_time) FROM mytable DISTRIBUTE BY customer_id SORT BY customer_id, call_time; 

doesn't work because part of the sorting is done during REDUCE, long after using my function.

I can sort the table data before using the function, but I am not happy with this because it is an overhead that I hope to avoid.

Second:. In the case of a distributed Hadoop configuration, the data is distributed among the available job trackers. Therefore, I believe that there will be several instances of this function, one for each display device, so you can split the same client data between two cards. In this case, I will lose customer requests, which is unacceptable.

I do not know how to solve this problem. I know that DISTRIBUTE BY ensures that all data with a specific value is sent to the same gearbox (so as to ensure that SORT works as expected), does anyone know if there is something like that for converter?

Next, I plan to follow the libjack suggestion to use a shorthand script. This "calculation" is necessary between some other requests for the hive, so I want to try everything that Hive offers before moving on to another tool, as suggested by Balaswami Waddeman.

EDIT2:

I began to research the solution to custom scripts. But on the first page of chapter 14 in the Hive programming book (user scripts are presented in this chapter), I found the following paragraph:

Streaming is usually less efficient than encoding comparable UDFs or InputFormat objects. Serialization and deserialization of data for out of pipe is relatively inefficient. It is also more difficult to debug the entire program in a single order. However, it is useful for rapid prototyping and for using existing code that is not written in Java. For Hive, users who do not want to write Java code can be a very effective approach.

So, it became clear that user scripts are not the best solution in terms of efficiency.

But how do I save my UDF function, but make sure that it works as expected in a distributed Hadoop configuration? I found the answer to this question in the "Internal UDF" section of the English wiki page. If I write my request:

  SELECT customer_id, call_time, delta(customer_id, call_time) FROM (SELECT customer_id, call_time FROM mytable DISTRIBUTE BY customer_id SORT BY customer_id, call_time) t; 

runs in REDUCE, and the DISTRIBUTE BY and SORT BY constructs ensure that all records of the same client are processed by the same reducer in the order of calls.

So the above UDF and this query construct solves my problem.

(Sorry to not add links, but I am not allowed to do this because I do not have enough reputation points)

+7
source share
3 answers

This is an old question, but for future reference I am writing another sentence here:

Hive Windowing functions allow you to use the previous / next values ​​in your request.

An example simili code might be:

SELECT customer_id, LAG (call_time, 1, 0) OVER (PARTITION by customer_id ORDER BY call_time ROWS 1 PRECEDING) - call_time FROM mytable;

+11
source

You can use explicit MAP-REDUCE with another programming language such as Java or Python. Where to emit the card {cutomer_id,call_time} , and in the reducer you get {customer_id,list{time_stamp}} , and in the reducer you can sort these timestamps and process the data.

+1
source

Maybe someone meets a similar requirement, I found the following solution:

1) Create a custom function:

 package com.example; // imports (they depend on the hive version) @Description(name = "delta", value = "_FUNC_(customer id column, call time column) " + "- computes the time passed between two succesive records from the same customer. " + "It generates 3 columns: first contains the customer id, second contains call time " + "and third contains the time passed from the previous call. This function returns only " + "the records that have a previous call from the same customer (requirements are not applicable " + "to the first call)", extended = "Example:\n> SELECT _FUNC_(customer_id, call_time) AS" + "(customer_id, call_time, time_passed) FROM (SELECT customer_id, call_time FROM mytable " + "DISTRIBUTE BY customer_id SORT BY customer_id, call_time) t;") public class DeltaComputerUDTF extends GenericUDTF { private static final int NUM_COLS = 3; private Text[] retCols; // array of returned column values private ObjectInspector[] inputOIs; // input ObjectInspectors private String prevCustomerId; private Long prevCallTime; @Override public StructObjectInspector initialize(ObjectInspector[] ois) throws UDFArgumentException { if (ois.length != 2) { throw new UDFArgumentException( "There must be 2 arguments: customer Id column name and call time column name"); } inputOIs = ois; // construct the output column data holders retCols = new Text[NUM_COLS]; for (int i = 0; i < NUM_COLS; ++i) { retCols[i] = new Text(); } // construct output object inspector List<String> fieldNames = new ArrayList<String>(NUM_COLS); List<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>(NUM_COLS); for (int i = 0; i < NUM_COLS; ++i) { // column name can be anything since it will be named by UDTF as clause fieldNames.add("c" + i); // all returned type will be Text fieldOIs.add(PrimitiveObjectInspectorFactory.writableStringObjectInspector); } return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs); } @Override public void process(Object[] args) throws HiveException { String customerId = ((StringObjectInspector) inputOIs[0]).getPrimitiveJavaObject(args[0]); Long callTime = ((LongObjectInspector) inputOIs[1]).get(args[1]); if (customerId.equals(prevCustomerId)) { retCols[0].set(customerId); retCols[1].set(callTime.toString()); retCols[2].set(new Long(callTime - prevCallTime).toString()); forward(retCols); } // Store the current customer data, for the next line prevCustomerId = customerId; prevCallTime = callTime; } @Override public void close() throws HiveException { // TODO Auto-generated method stub } } 

2) Create a jar containing this function. Suppose jarname is myjar.jar.

3) Copy the can to the machine using Hive. Suppose it is placed in / tmp

4) Define a custom function inside the Hive:

 ADD JAR /tmp/myjar.jar; CREATE TEMPORARY FUNCTION delta AS 'com.example.DeltaComputerUDTF'; 

5) Complete the request:

 SELECT delta(customer_id, call_time) AS (customer_id, call_time, time_difference) FROM (SELECT customer_id, call_time FROM mytable DISTRIBUTE BY customer_id SORT BY customer_id, call_time) t; 

Notes:

but. I assumed that the call_time column stores data as bigint. In case this is a string, in the process function we extract it as a string (as we do with customerId), then analyze it on Long

b. I decided to use UDTF instead of UDF, because in this way it generates all the data it needs. Otherwise (with UDF), the generated data must be filtered to skip NULL values. So, using the UDF (DeltaComputerUDF) function described in the first edit of the original message, the query will look like this:

 SELECT customer_id, call_time, time_difference FROM ( SELECT delta(customer_id, call_time) AS (customer_id, call_time, time_difference) FROM ( SELECT customer_id, call_time FROM mytable DISTRIBUTE BY customer_id SORT BY customer_id, call_time ) t ) u WHERE time_difference IS NOT NULL; 

from. Both functions (UDF and UDTF) work as desired regardless of the order of the rows within the table (therefore, it is not necessary that the table data is sorted by client ID and call time before using delta functions)

0
source

All Articles