I am trying to run a pig script that calls a user defined function written in java. I am trying to test this script with a very small file from 264Bytes. As a result, I get errors in the java heap area, and the operation fails. I tried to complete the task with the -Xms1024M parameter, it starts for smaller files, but does not work with a large file. And even then my cluster is strong enough not to travel through such small files, I wonder how I can fix this memory leak. Can anyone help
import java.util.HashMap; import java.lang.annotation.Annotation; import java.lang.reflect.Array; import java.lang.reflect.Method; import java.io.IOException; import java.util.Iterator; import java.util.List; import java.util.ArrayList; import java.util.Map; import java.util.Set; import java.text.*; import org.apache.pig.EvalFunc; import org.apache.pig.data.*; import com.tictactec.ta.lib.CoreAnnotated; import com.tictactec.ta.lib.MAType; import com.tictactec.ta.lib.MInteger; import com.tictactec.ta.lib.RetCode; import com.tictactec.ta.lib.meta.annotation.InputParameterInfo; import com.tictactec.ta.lib.meta.annotation.InputParameterType; import com.tictactec.ta.lib.meta.annotation.OptInputParameterInfo; import com.tictactec.ta.lib.meta.annotation.OptInputParameterType; import com.tictactec.ta.lib.meta.annotation.OutputParameterInfo; import com.tictactec.ta.lib.meta.annotation.OutputParameterType; public class taLib extends EvalFunc<DataBag> { private static final int MIN_ARGS = 3; public static CoreAnnotated core = new CoreAnnotated(); private static Method func_ref = null; public DecimalFormat df = new DecimalFormat("#.###"); public DataBag exec(Tuple args) throws IOException { DataBag input=null; MInteger outStart = new MInteger(); MInteger outLen = new MInteger(); Map<String,Object>outputParams=new HashMap<String, Object>(); String func_name; List<Integer> ip_colmns= new ArrayList<Integer>(); List<double[]>ip_list=new ArrayList<double[]>(); List<String>opt_type=new ArrayList<String>(); List<Object>opt_params=new ArrayList<Object>(); ////// long m1=Runtime.getRuntime().freeMemory(); System.out.println(m1); long m2=Runtime.getRuntime().totalMemory(); System.out.println(m2); ////// int ip_noofparams=0; int op_noofparams=0; int opt_noofparams=0; if (args == null || args.size() < MIN_ARGS) throw new IllegalArgumentException("talib: must have at least " + MIN_ARGS + " args"); if(args.get(0) instanceof DataBag) {input = (DataBag)args.get(0);} else{throw new IllegalArgumentException("Only a valid bag name can be passed");} // get no of fields in bag Tuple t0=input.iterator().next(); int fields_in_bag=t0.getAll().size(); if(args.get(1) instanceof String) {func_name = (String)args.get(1);} else{throw new IllegalArgumentException("Only valid function name can be passed at arg 1");} func_ref=methodChk(func_name); if (func_ref == null) { throw new IllegalArgumentException("talib: function " + func_name + " was not found"); } for (Annotation[] annotations : func_ref.getParameterAnnotations()) { for (Annotation annotation : annotations) { if(annotation instanceof InputParameterInfo) { InputParameterInfo inputParameterInfo = (InputParameterInfo)annotation; if(inputParameterInfo.type().equals(InputParameterType.TA_Input_Price)) { ip_noofparams=numberOfSetBits(inputParameterInfo.flags()); } else { ip_noofparams++; } } if(annotation instanceof OptInputParameterInfo) { OptInputParameterInfo optinputParameterInfo= (OptInputParameterInfo)annotation; opt_noofparams++; if (optinputParameterInfo.type().equals(OptInputParameterType.TA_OptInput_IntegerRange)) { opt_type.add("Integer"); } else if(optinputParameterInfo.type().equals(OptInputParameterType.TA_OptInput_RealRange)) { opt_type.add("Double"); } else if(optinputParameterInfo.type().equals(OptInputParameterType.TA_OptInput_IntegerList)) { opt_type.add("String"); } else{throw new IllegalArgumentException("whoopsie ...serious mess in opt_annotations");} } if (annotation instanceof OutputParameterInfo) { OutputParameterInfo outputParameterInfo = (OutputParameterInfo) annotation; op_noofparams++; if (outputParameterInfo.type().equals(OutputParameterType.TA_Output_Real)) { outputParams.put(outputParameterInfo.paramName(), new double[(int) input.size()]); } else if (outputParameterInfo.type().equals(OutputParameterType.TA_Output_Integer)) { outputParams.put(outputParameterInfo.paramName(), new int[(int)input.size()]); } } } } int total_params =ip_noofparams+opt_noofparams; if((args.size()-2)!=total_params){throw new IllegalArgumentException("Wrong no of argumets passed to UDF");} // get the ip colmns no for(int i=2;i<(2+ip_noofparams);i++) { if(args.get(i) instanceof Integer ) { if((Integer)args.get(i)>=0 && (Integer)args.get(i)<fields_in_bag) { ip_colmns.add((Integer) args.get(i)); } else{throw new IllegalArgumentException("The input colmn specified is invalid..please enter a valid colmn no:0-"+(fields_in_bag-1));} } else{throw new IllegalArgumentException("Wrong arguments entered: Only"+ip_noofparams+"field no of type(integer) allowed for fn"+func_name ); } } // create a list of ip arrays for(int i=0;i<ip_colmns.size();i++) { ip_list.add((double[]) Array.newInstance(double.class, (int)input.size())); } int z=0; int x=0; // fill up the arrays for(Tuple t1: input) { Iterator<double[]> itr=ip_list.iterator(); z=0; while(itr.hasNext()) { if((Double)t1.get(ip_colmns.get(z)) instanceof Double) { ((double[])itr.next())[x]=(Double) t1.get(ip_colmns.get(z++)); } else{throw new IllegalArgumentException("Illegal argument while filling up array...only double typr allowed");} } x++; } //deal with opt params int s=0; for(int i=(2+ip_noofparams);i<(2+ip_noofparams+opt_noofparams);i++) { if(opt_type.get(s).equalsIgnoreCase(args.get(i).getClass().getSimpleName().toString())) { if(opt_type.get(s).equalsIgnoreCase("String")) { String m=args.get(i).toString().toLowerCase(); String ma=m.substring(0, 1).toUpperCase(); String mac=m.substring(1); String macd=ma+mac; MAType type =MAType.valueOf(macd); opt_params.add(type); s++; } else{ opt_params.add(args.get(i)); s++; } } else if(opt_type.get(s).equalsIgnoreCase("Double")) { if(args.get(i).getClass().getSimpleName().toString().equalsIgnoreCase("Integer")) { opt_params.add((Double)((Integer)args.get(i)+0.0)); s++; } else{throw new IllegalArgumentException("Opt arguments do not match for fn:"+func_name+", pls enter opt arguments in right order"); } } else{throw new IllegalArgumentException("Opt arguments do not match for fn:"+func_name+", pls enter opt arguments in right order");} } List<Object> ta_argl = new ArrayList<Object>(); ta_argl.add(new Integer(0)); ta_argl.add(new Integer((int)input.size() - 1)); for(double[]in: ip_list) { ta_argl.add(in); } if(opt_noofparams!=0) {ta_argl.addAll(opt_params);} ta_argl.add(outStart); ta_argl.add(outLen); for (Map.Entry<String, Object> entry : outputParams.entrySet()) { ta_argl.add(entry.getValue()); } RetCode rc = RetCode.Success; try { rc = (RetCode)func_ref.invoke(core, ta_argl.toArray()); } catch (Exception e) { assert false : "I died in ta-lib, but Java made me a zombie..."; } assert rc == RetCode.Success : "ret code from " + func_name; if (outLen.value == 0) return null; ////// DataBag ret=null; ret =outTA(input,outputParams,outStart); outputParams.clear(); ip_list.clear(); opt_params.clear(); opt_type.clear(); ip_colmns.clear(); Runtime.getRuntime().gc(); return ret; } public DataBag outTA(DataBag bag,Map<String, Object> outputParams,MInteger outStart) { DataBag nbag=null; TupleFactory mTupleFactory=TupleFactory.getInstance(); BagFactory mBagFactory=BagFactory.getInstance(); nbag=mBagFactory.newDefaultBag(); Tuple tw=bag.iterator().next(); int fieldsintup=tw.getAll().size(); for(Tuple t0: bag) { Tuple t1=mTupleFactory.newTuple(); for(int z=0;z<fieldsintup;z++) { try { t1.append(t0.get(z)); } catch (Exception e) { // TODO Auto-generated catch block System.out.println("Ouch"); } } nbag.add(t1); } int i = 0; int j=0; for (Tuple t2: nbag) { if(i>=outStart.value) { for(Map.Entry<String,Object>entry: outputParams.entrySet()) { t2.append(entry.getKey().substring(3).toString()); if(entry.getValue() instanceof double[]) { t2.append( new Double (df.format(((double[])entry.getValue())[j]))); } else if(entry.getValue() instanceof int[]) { t2.append( ((int[])entry.getValue())[j]); } else{throw new IllegalArgumentException(entry.getValue().getClass()+"not supported");} } i++;j++; } else {t2.append(0.0); i++; } } return nbag; } public Method methodChk(String fn) { String fn_name=fn; Method tmp_fn=null; for (Method meth: core.getClass().getDeclaredMethods()) { if (meth.getName().equalsIgnoreCase(fn_name)) { tmp_fn = meth; break; } } return tmp_fn; } public int numberOfSetBits(int i) { i = i - ((i >> 1) & 0x55555555); i = (i & 0x33333333) + ((i >> 2) & 0x33333333); return ((i + (i >> 4) & 0xF0F0F0F) * 0x1010101) >> 24; } }
source share