One whole exception will be split into 2 cards when using hadoop to exclude exceptions from raw logs

I want to use hadoop to extract and analyze exceptions from raw logs. I ran into the problem that some exceptions (spanning multiple lines) would be part of two different splits and therefore 2 different cartographers.

I have an idea to avoid this problem. I could override the method getSplits()so that each split has some redundant data. I think this solution will be too expensive for me.

Does anyone have a better solution to this problem?

+4
source share
3

, XML. XMLInputformat . ( , , )

XMLinputformat, . :

, InputSplit, , .

-, , , :

:

package org.undercloud.mapreduce.example3;

import java.io.IOException;

import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;

public class XmlInputFormat extends FileInputFormat {

 public RecordReader getRecordReader(InputSplit input, JobConf job, Reporter reporter)
 throws IOException {

reporter.setStatus(input.toString());
 return new XmlRecordReader(job, (FileSplit)input);
 }

: . readUntilMatch , , . , , !

package org.undercloud.mapreduce.example3;

import java.io.IOException;

import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;

public class XmlRecordReader implements RecordReader {

    private String startTagS = "";
    private String endTagS = "";
    private byte[] startTag;
    private byte[] endTag;
    private long start;
    private long end;
    private FSDataInputStream fsin;
    private DataOutputBuffer buffer = new DataOutputBuffer();
    private LineRecordReader lineReader;
    private LongWritable lineKey;
    private Text lineValue;

    public XmlRecordReader(JobConf job, FileSplit split) throws IOException {
      lineReader = new LineRecordReader(job, split);
      lineKey = lineReader.createKey();
      lineValue = lineReader.createValue();
      startTag = startTagS.getBytes();
      endTag = endTagS.getBytes();

      // Open the file and seek to the start of the split
      start = split.getStart();
      end = start + split.getLength();
      Path file = split.getPath();
      FileSystem fs = file.getFileSystem(job);
      fsin = fs.open(split.getPath());
      fsin.seek(start);
   }

    public boolean next(Text key, XmlContent value) throws IOException {
    // Get the next line
        if (fsin.getPos() < end) { 
            if (readUntilMatch(startTag, false)) { 
                try { 
                    buffer.write(startTag); 
                    if (readUntilMatch(endTag, true)) { 
                        key.set(Long.toString(fsin.getPos())); 
                        value.bufferData = buffer.getData(); 
                        value.offsetData = 0; 
                        value.lenghtData = buffer.getLength(); 
                        return true; 
                    } 
                } 
                finally { 
                    buffer.reset(); 
                } 
            } 
        } 
        return false; 
    } 

    private boolean readUntilMatch(byte[] match, boolean withinBlock) throws IOException { 
        int i = 0; 
        while (true) { 
            int b = fsin.read(); // End of file -> T
            if (b == -1) return false;
            // F-> Save to buffer:
            if (withinBlock) buffer.write(b);
            if (b == match[i]) {
                i++;
                if (i >= match.length) return true;
            } else i = 0;
            // see if we’ve passed the stop point:
            if(!withinBlock && i == 0 && fsin.getPos() >= end) return false;
        }
    }

  public Text createKey() {
     return new Text("");
  }

  public XmlContent createValue() {
    return new XmlContent();
  }

  public long getPos() throws IOException {
    return lineReader.getPos();
  }

  public void close() throws IOException {
    lineReader.close();
  }

  public float getProgress() throws IOException {
    return lineReader.getProgress();
  }
}

, , :

package org.undercloud.mapreduce.example3;

import java.io.*;

import org.apache.hadoop.io.*;

public class XmlContent implements Writable{

    public byte[] bufferData;
    public int offsetData;
    public int lenghtData;


      public XmlContent(byte[] bufferData, int offsetData, int lenghtData) {
          this.bufferData = bufferData;
          this.offsetData = offsetData;
          this.lenghtData = lenghtData;
          }

      public XmlContent(){
          this(null,0,0);
      }

      public void write(DataOutput out) throws IOException {
          out.write(bufferData);
          out.writeInt(offsetData);
          out.writeInt(lenghtData);
      }

      public void readFields(DataInput in) throws IOException {
          in.readFully(bufferData);
          offsetData = in.readInt();
          lenghtData = in.readInt();
          }

      public String toString() {
            return Integer.toString(offsetData) + ", "
                + Integer.toString(lenghtData) +", "
                + bufferData.toString();
          }   

}

, , . , .

+2

TextInputFormat NLineInputFormat . TextInputFormat , , ( ), . , NLineInputFormat , , .

, () , .

Mahout XmlInputFormat. , . , <exception></exception> .

String input; //code this to the input string
String regex; //make this equal to the exception regex
BufferedWriter bw; //make this go to file where output will be stored

String toProcess = input;
boolean continueLoop = true;

while(continueLoop){
    Pattern p = Pattern.compile(regex);
    Matcher m = p.matcher(toProcess);
    if(m.find()){
        bw.write("<exception>"+toProcess.substring(m.start(),m.end())+"</exception>");
        toProcess = toProcess.substring(m.end());
    }else{
        continueLoop = false;
    }

}
+1

. ,

" InputSplit, , ".

, LineRecordReader . , LineRecordReader , InputSplit, 2 - . , , , LineRecordReader, .

: "nextKeyValue()" LineRecordReader.

public boolean nextKeyValue() throws IOException {
if (key == null) {
  key = new LongWritable();
}
key.set(pos);
if (value == null) {
  value = new Text();
}
int newSize = 0;
while (pos < end) {
  newSize = in.readLine(value, maxLineLength,
                        Math.max((int)Math.min(Integer.MAX_VALUE, end-pos),
                                 maxLineLength));

change the line "while (pos <end)" to "while (pos <end + {param})"

{param} means the size of the redundant data that is read by the Recorder, read across the partition boundary.

0
source

All Articles