Foreach% dopar% + RPostgreSQL

I am using RPostgreSQL to connect to a local database. Setup works fine on my Linux machine. R 2.11.1, Postgres 8.4.

I played with "foreach" with a multi-core (doMC) parallel backend to transfer some repeated queries (a few thousand) and add the results to the data structure. Curiously, it works if I use% do%, but it fails when I switch to% dopar%, except when there is only one iteration (as shown below)

I was wondering if this is related to one connection object, so I created 10 connection objects and depending on what "i" was, a specific con object was set for this request, depending on I modulo 10. (listed below in total two connection objects). The expression evaluated by eval (expr.01) contains / - this is a query that depends on what i is.

I can not understand these error messages. I am wondering if there is a way to make this work.

Thanks.
Vishal belsare

R snippet follows:

> id.qed2.foreach <- foreach(i = 1588:1588, .inorder=FALSE) %dopar% { + if (i %% 2 == 0) {con <- con0}; + if (i %% 2 == 1) {con <- con1}; + fetch(dbSendQuery(con,eval(expr.01)),n=-1)$idreuters}; > id.qed2.foreach [[1]] [1] 411 414 2140 2406 4490 4507 4519 4570 4571 4572 4703 4731 [109] 48765 84312 91797 > id.qed2.foreach <- foreach(i = 1588:1589, .inorder=FALSE) %dopar% { + if (i %% 2 == 0) {con <- con0}; + if (i %% 2 == 1) {con <- con1}; + fetch(dbSendQuery(con,eval(expr.01)),n=-1)$idreuters}; Error in stop(paste("expired", class(con))) : no function to return from, jumping to top level Error in stop(paste("expired", class(con))) : no function to return from, jumping to top level Error in { : task 1 failed - "error in evaluating the argument 'res' in selecting a method for function 'fetch'" > 

EDIT: I changed a few things (still unsuccessfully), but something is coming out. Connection objects executed in a loop rather than being "disconnected" via dbDisconnect cause connections to hang, which is obvious in / var / log for Postgres. Several new error messages appear when I do this:

 > system.time( + id.qed2.foreach <- foreach(i = 1588:1590, .inorder=FALSE, .packages=c("DBI", "RPostgreSQL")) %dopar% {drv0 <- dbDriver("PostgreSQL"); con0 <- dbConnect(drv0, dbname='nseindia'); list(idreuters=fetch(dbSendQuery(con0,eval(expr.01)),n=-1)$idreuters); dbDisconnect(con0)}) Error in postgresqlExecStatement(conn, statement, ...) : no function to return from, jumping to top level Error in postgresqlExecStatement(conn, statement, ...) : no function to return from, jumping to top level Error in postgresqlExecStatement(conn, statement, ...) : no function to return from, jumping to top level Error in { : task 1 failed - "error in evaluating the argument 'res' in selecting a method for function 'fetch'" 
+6
foreach parallel-processing r postgresql
source share
2 answers

The following works and accelerates by ~ 1.5x in sequential form. As a next step, I wonder if it is possible to attach a connection object to each of the workers created by registerDoMC. If so, then there is no need to create / destroy connection objects, which prevents PostgreSQL server suppression through connections.

 pgparquery <- function(i) { drv <- dbDriver("PostgreSQL"); con <- dbConnect(drv, dbname='nsdq'); lst <- eval(expr.01); #contains the SQL query which depends on 'i' qry <- dbSendQuery(con,lst); tmp <- fetch(qry,n=-1); dt <- dates.qed2[i] dbDisconnect(con); result <- list(date=dt, idreuters=tmp$idreuters) return(result)} id.qed.foreach <- foreach(i = 1588:3638, .inorder=FALSE, .packages=c("DBI", "RPostgreSQL")) %dopar% {pgparquery(i)} 

- Page Vishal Belsare

+2
source share

It is more efficient to create a database connection once for each worker, and not once for each task. Unfortunately, mclapply does not provide a mechanism for initializing workers before completing tasks, so this is not easy to do with the doMC backend, but if you use the doParallel backend, you can initialize the workers with clusterEvalQ. Here is an example of how to restructure code:

 library(doParallel) cl <- makePSOCKcluster(detectCores()) registerDoParallel(cl) clusterEvalQ(cl, { library(DBI) library(RPostgreSQL) drv <- dbDriver("PostgreSQL") con <- dbConnect(drv, dbname="nsdq") NULL }) id.qed.foreach <- foreach(i=1588:3638, .inorder=FALSE, .noexport="con", .packages=c("DBI", "RPostgreSQL")) %dopar% { lst <- eval(expr.01) #contains the SQL query which depends on 'i' qry <- dbSendQuery(con, lst) tmp <- fetch(qry, n=-1) dt <- dates.qed2[i] list(date=dt, idreuters=tmp$idreuters) } clusterEvalQ(cl, { dbDisconnect(con) }) 

Since doParallel and clusterEvalQ use the same cl cluster object, the foreach loop will have access to the con database connection object when performing tasks.

+12
source share

All Articles