Troubleshoot the R mapper script on Amazon Elastic MapReduce - the results are not as expected

I am trying to use Amazon Elastic Map Reduce to run a series of simulations from several million cases. This is an Rscript streaming job without a reducer. I am using Identity Reducer in my EMR call --reducer org.apache.hadoop.mapred.lib.IdentityReducer.

The script file works fine during testing and runs locally from the command line in the Linux box when passing one line of the line manually echo "1,2443,2442,1,5" | ./mapper.R, and I get one line of results that I expect. However, when I tested my simulation using about 10,000 cases (lines) from an input file in EMR, I only got output for a dozen lines or so from 10k input lines. I tried several times and I can not understand why. Hadoop's job works fine without errors. It seems like the input lines are being skipped or maybe something is happening with the Identity reducer. The results are true for cases where there is a way out.

My input file is a csv with the following data format, a series of five integers separated by commas:

1,2443,2442,1,5
2,2743,4712,99,8
3,2443,861,282,3177
etc...

Here is my R script for mapper.R

#! /usr/bin/env Rscript

# Define Functions
trimWhiteSpace <- function(line) gsub("(^ +)|( +$)", "", line)
splitIntoWords <- function(line) unlist(strsplit(line, "[[:space:]]+"))
# function to read in the relevant data from needed data files
get.data <- function(casename) {
    list <- lapply(casename, function(x) {
        read.csv(file = paste("./inputdata/",x, ".csv", sep = ""),
                 header = TRUE,
        stringsAsFactors = FALSE)})
    return(data.frame(list))
}

con <- file("stdin")            
line <- readLines(con, n = 1, warn = FALSE) 
line <- trimWhiteSpace(line)
values <- unlist(strsplit(line, ","))
lv <- length(values)
cases <- as.numeric(values[2:lv])
simid <- paste("sim", values[1], ":", sep = "")
l <- length(cases)                      # for indexing

## create a vector for the case names
names.vector <- paste("case", cases, sep = ".")

## read in metadata and necessary data columns using get.data function
metadata <- read.csv(file = "./inputdata/metadata.csv", header = TRUE,
                     stringsAsFactors = FALSE)
d <- cbind(metadata[,1:3], get.data(names.vector))

## Calculations that use df d and produce a string called 'output' 
## in the form of "id: value1 value2 value3 ..." to be used at a 
## later time for agregation.

cat(output, "\n")
close(con)

() EMR- :

ruby elastic-mapreduce --create --stream --input s3n://bucket/project/input.txt --output s3n://bucket/project/output --mapper s3n://bucket/project/mapper.R --reducer org.apache.hadoop.mapred.lib.IdentityReducer --cache-archive s3n://bucket/project/inputdata.tar.gz#inputdata --name Simulation --num-instances 2

- , , , / R script.

- script R, . , EMR. JD Long's R/EMR script.

+5
1

. 10 ? , 10 , . , , R script .

EMR - .

EDIT:

, EMR, AWS. " " , . , , , , . ssh , , :

cat infile.txt | yourMapper.R > outfile.txt

, , infile EMR Hadoop.

2:

, , script stdin, . , , . , , infile.txt. cat , !

, Pete R:

#! /usr/bin/env Rscript

trimWhiteSpace <- function(line) gsub("(^ +)|( +$)", "", line)
splitIntoWords <- function(line) unlist(strsplit(line, "[[:space:]]+"))

## **** could wo with a single readLines or in blocks
con <- file("stdin", open = "r")
while (length(line <- readLines(con, n = 1, warn = FALSE)) > 0) {
    line <- trimWhiteSpace(line)
    words <- splitIntoWords(line)
    ## **** can be done as cat(paste(words, "\t1\n", sep=""), sep="")
    for (w in words)
        cat(w, "\t1\n", sep="")
}
close(con)

script , :

 while (length(line <- readLines(con, n = 1, warn = FALSE)) > 0) {
        #do your dance
        #do your dance quick
        #come on everybody tell me what the word
        #word up
    }

, , Cameo Word Up! .

, :

http://www.youtube.com/watch?v=MZjAantupsA

+4

All Articles