Apache Phoenix (4.3.1 and 4.4.0-HBase-0.98) on Spark 1.3.1 ClassNotFoundException

I try to connect to Phoenix through Spark, and I continue to get the following exception when opening a connection using the JDBC driver (short for short and full stack) below:

Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.hbase.ipc.controller.ClientRpcControllerFactory
        at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
        at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
        at java.security.AccessController.doPrivileged(Native Method)

The corresponding class was provided by the phoenix-core-4.3.1.jar bank (although it is in the HBase package namespace, I think they need it to integrate with HBase).

There are many questions about SO about Spark's ClassNotFoundExceptions, and I tried the oversized jar approach (both with the Maven build plugins and the shadows, I checked the banks, they do contain ClientRpcControllerFactory), and I tried a dry jar by specifying the banks on the command line . For the latter, the command I used is as follows:

/opt/mapr/spark/spark-1.3.1/bin/spark-submit --jars lib/spark-streaming-kafka_10-1.3.1.jar,lib/kafka_2.10-0.8.1.1.jar,lib/zkclient-0.3.jar,lib/metrics-core-3.1.0.jar,lib/metrics-core-2.2.0.jar,lib/phoenix-core-4.3.1.jar --class nl.work.kafkastreamconsumer.phoenix.KafkaPhoenixConnector KafkaStreamConsumer.jar node1:5181 0 topic jdbc:phoenix:node1:5181 true

I also dumped the classpath from the code, and the first class loader in the hierarchy already knows the Phoenix jar:

2015-06-04 10:52:34,323 [Executor task launch worker-1] INFO  nl.work.kafkastreamconsumer.phoenix.LinePersister - [file:/home/work/projects/customer/KafkaStreamConsumer.jar, file:/home/work/projects/customer/lib/spark-streaming-kafka_2.10-1.3.1.jar, file:/home/work/projects/customer/lib/kafka_2.10-0.8.1.1.jar, file:/home/work/projects/customer/lib/zkclient-0.3.jar, file:/home/work/projects/customer/lib/metrics-core-3.1.0.jar, file:/home/work/projects/customer/lib/metrics-core-2.2.0.jar, file:/home/work/projects/customer/lib/phoenix-core-4.3.1.jar]

So the question is: what am I missing here? Why can't Spark load the correct class? There should be only one version of the class flying around (namely, from the phoenix kernel), so I doubt that this is a version conflict.

[Executor task launch worker-3] ERROR nl.work.kafkastreamconsumer.phoenix.LinePersister - Error while processing line
java.lang.RuntimeException: java.sql.SQLException: ERROR 103 (08004): Unable to establish connection.
        at nl.work.kafkastreamconsumer.phoenix.PhoenixConnection.<init>(PhoenixConnection.java:41)
        at nl.work.kafkastreamconsumer.phoenix.LinePersister$1.call(LinePersister.java:40)
        at nl.work.kafkastreamconsumer.phoenix.LinePersister$1.call(LinePersister.java:32)
        at org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1.apply(JavaPairRDD.scala:999)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
        at scala.collection.Iterator$class.foreach(Iterator.scala:727)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
        at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
        at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
        at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
        at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
        at scala.collection.AbstractIterator.to(Iterator.scala:1157)
        at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
        at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
        at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
        at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
        at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:813)
        at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:813)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1498)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1498)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
        at org.apache.spark.scheduler.Task.run(Task.scala:64)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.sql.SQLException: ERROR 103 (08004): Unable to establish connection.
        at org.apache.phoenix.exception.SQLExceptionCode$Factory$1.newException(SQLExceptionCode.java:362)
        at org.apache.phoenix.exception.SQLExceptionInfo.buildException(SQLExceptionInfo.java:133)
        at org.apache.phoenix.query.ConnectionQueryServicesImpl.openConnection(ConnectionQueryServicesImpl.java:282)
        at org.apache.phoenix.query.ConnectionQueryServicesImpl.access$300(ConnectionQueryServicesImpl.java:166)
        at org.apache.phoenix.query.ConnectionQueryServicesImpl$11.call(ConnectionQueryServicesImpl.java:1831)
        at org.apache.phoenix.query.ConnectionQueryServicesImpl$11.call(ConnectionQueryServicesImpl.java:1810)
        at org.apache.phoenix.util.PhoenixContextExecutor.call(PhoenixContextExecutor.java:77)
        at org.apache.phoenix.query.ConnectionQueryServicesImpl.init(ConnectionQueryServicesImpl.java:1810)
        at org.apache.phoenix.jdbc.PhoenixDriver.getConnectionQueryServices(PhoenixDriver.java:162)
        at org.apache.phoenix.jdbc.PhoenixEmbeddedDriver.connect(PhoenixEmbeddedDriver.java:126)
        at org.apache.phoenix.jdbc.PhoenixDriver.connect(PhoenixDriver.java:133)
        at java.sql.DriverManager.getConnection(DriverManager.java:571)
        at java.sql.DriverManager.getConnection(DriverManager.java:233)
        at nl.work.kafkastreamconsumer.phoenix.PhoenixConnection.<init>(PhoenixConnection.java:39)
        ... 25 more
Caused by: java.io.IOException: java.lang.reflect.InvocationTargetException
        at org.apache.hadoop.hbase.client.HConnectionManager.createConnection(HConnectionManager.java:457)
        at org.apache.hadoop.hbase.client.HConnectionManager.createConnection(HConnectionManager.java:350)
        at org.apache.phoenix.query.HConnectionFactory$HConnectionFactoryImpl.createConnection(HConnectionFactory.java:47)
        at org.apache.phoenix.query.ConnectionQueryServicesImpl.openConnection(ConnectionQueryServicesImpl.java:280)
        ... 36 more
Caused by: java.lang.reflect.InvocationTargetException
        at sun.reflect.GeneratedConstructorAccessor8.newInstance(Unknown Source)
        at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
        at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
        at org.apache.hadoop.hbase.client.HConnectionManager.createConnection(HConnectionManager.java:455)
        ... 39 more
Caused by: java.lang.UnsupportedOperationException: Unable to find org.apache.hadoop.hbase.ipc.controller.ClientRpcControllerFactory
        at org.apache.hadoop.hbase.util.ReflectionUtils.instantiateWithCustomCtor(ReflectionUtils.java:36)
        at org.apache.hadoop.hbase.ipc.RpcControllerFactory.instantiate(RpcControllerFactory.java:56)
        at org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation.<init>(HConnectionManager.java:769)
        at org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation.<init>(HConnectionManager.java:689)
        ... 43 more
Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.hbase.ipc.controller.ClientRpcControllerFactory
        at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
        at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
        at java.security.AccessController.doPrivileged(Native Method)
        at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
        at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
        at java.lang.Class.forName0(Native Method)
        at java.lang.Class.forName(Class.java:191)
        at org.apache.hadoop.hbase.util.ReflectionUtils.instantiateWithCustomCtor(ReflectionUtils.java:32)
        ... 46 more

/ change

, 4.4.0-HBase-0.98. . saveToPhoenix() Java API, POC, , JDBC -.

public class PhoenixConnection implements AutoCloseable, Serializable {
    private static final long serialVersionUID = -4491057264383873689L;
    private static final String PHOENIX_DRIVER = "org.apache.phoenix.jdbc.PhoenixDriver";

    static {
        try {
            Class.forName(PHOENIX_DRIVER);
        } catch (ClassNotFoundException e) {
            throw new RuntimeException(e);
        }
    }

    private Connection connection;

    public PhoenixConnection(final String jdbcUri) {

        try {
            connection = DriverManager.getConnection(jdbcUri);
        } catch (SQLException e) {
            throw new RuntimeException(e);
        }
    }

    public List<Map<String, Object>> executeQuery(final String sql) throws SQLException {

        ArrayList<Map<String, Object>> resultList = new ArrayList<>();
        try (PreparedStatement statement = connection.prepareStatement(sql); ResultSet resultSet = statement.executeQuery() ) {
            ResultSetMetaData metaData = resultSet.getMetaData();
            while (resultSet.next()) {
                Map<String, Object> row = new HashMap<>(metaData.getColumnCount());
                for (int column = 0; column < metaData.getColumnCount(); ++column) {
                    final String columnLabel = metaData.getColumnLabel(column);
                    row.put(columnLabel, resultSet.getObject(columnLabel));
                }
            }
        }
        resultList.trimToSize();

        return resultList;
    }

    @Override
    public void close() {
        try {
            connection.close();
        } catch (SQLException e) {
            throw new RuntimeException(e);
        }
    }

}

public class LinePersister implements Function<JavaRDD<String>, Void> {
    private static final long serialVersionUID = -2529724617108874989L;
    private static final Logger LOGGER = Logger.getLogger(LinePersister.class);
    private static final String TABLE_NAME = "mail_events";

    private final String jdbcUrl;

    public LinePersister(String jdbcUrl) {
        this.jdbcUrl = jdbcUrl;
    }



    @Override
    public Void call(JavaRDD<String> dataSet) throws Exception {
        LOGGER.info(String.format(
                "Starting conversion on rdd with %d elements", dataSet.count()));

        List<Void> collectResult = dataSet.map(new Function<String, Void>() {

            private static final long serialVersionUID = -6651313541439109868L;

            @Override
            public Void call(String line) throws Exception {
                LOGGER.info("Writing line " + line);
                Event event = EventParser.parseLine(line);
                try (PhoenixConnection connection = new PhoenixConnection(
                        jdbcUrl)) {
                    connection.executeQuery(event
                            .createUpsertStatement(TABLE_NAME));
                } catch (Exception e) {
                    LOGGER.error("Error while processing line", e);
                    dumpClasspath(this.getClass().getClassLoader());

                }
                return null;
            }
        }).collect();

        LOGGER.info(String.format("Got %d results: ", collectResult.size()));

        return null;
    }

    public static void dumpClasspath(ClassLoader loader)
    {
        LOGGER.info("Classloader " + loader + ":");

        if (loader instanceof URLClassLoader)
        {
            URLClassLoader ucl = (URLClassLoader)loader;
            LOGGER.info(Arrays.toString(ucl.getURLs()));
        }
        else
            LOGGER.error("cannot display components as not a URLClassLoader)");

        if (loader.getParent() != null)
            dumpClasspath(loader.getParent());
    }
}

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>nl.work</groupId>
    <artifactId>KafkaStreamConsumer</artifactId>
    <version>1.0</version>
    <packaging>jar</packaging>
    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <maven.compiler.source>1.7</maven.compiler.source>
        <maven.compiler.target>1.7</maven.compiler.target>
        <spark.version>1.3.1</spark.version>
        <hibernate.version>4.3.10.Final</hibernate.version>
        <phoenix.version>4.4.0-HBase-0.98</phoenix.version>
        <hbase.version>0.98.9-hadoop2</hbase.version>
        <spark-hbase.version>0.0.2-clabs-spark-1.3.1</spark-hbase.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.10</artifactId>
            <version>${spark.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.10</artifactId>
            <version>${spark.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka_2.10</artifactId>
            <version>${spark.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.phoenix</groupId>
            <artifactId>phoenix-core</artifactId>
            <version>${phoenix.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.phoenix</groupId>
            <artifactId>phoenix-spark</artifactId>
            <version>${phoenix.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-client</artifactId>
            <version>${hbase.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>com.cloudera</groupId>
            <artifactId>spark-hbase</artifactId>
            <version>${spark-hbase.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.10</version>
            <scope>test</scope>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.3</version>
                <configuration>
                    <source>${maven.compiler.source}</source>
                    <target>${maven.compiler.target}</target>
                </configuration>
            </plugin>
            <!-- <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> 
                <version>2.3</version> <executions> <execution> <phase>package</phase> <goals> 
                <goal>shade</goal> </goals> <configuration> <filters> <filter> <artifact>*:*</artifact> 
                <excludes> <exclude>META-INF/*.SF</exclude> <exclude>META-INF/*.DSA</exclude> 
                <exclude>META-INF/*.RSA</exclude> </excludes> </filter> </filters> </configuration> 
                </execution> </executions> </plugin> -->
        </plugins>
    </build>
    <repositories>
        <repository>
            <id>unknown-jars-temp-repo</id>
            <name>A temporary repository created by NetBeans for libraries and jars it could not identify. Please replace the dependencies in this repository with correct ones and delete this repository.</name>
            <url>file:${project.basedir}/lib</url>
        </repository>
    </repositories>
</project>

/edit2 saveAsHadoopApiFile (https://gist.github.com/mravi/444afe7f49821819c987#file-phoenixsparkjob-java), , :

java.lang.RuntimeException: java.sql.SQLException: ERROR 103 (08004): Unable to establish connection.
        at org.apache.phoenix.mapreduce.PhoenixOutputFormat.getRecordWriter(PhoenixOutputFormat.java:58)
        at org.apache.spark.rdd.PairRDDFunctions$$anonfun$12.apply(PairRDDFunctions.scala:995)
        at org.apache.spark.rdd.PairRDDFunctions$$anonfun$12.apply(PairRDDFunctions.scala:979)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
        at org.apache.spark.scheduler.Task.run(Task.scala:64)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.sql.SQLException: ERROR 103 (08004): Unable to establish connection.
        at org.apache.phoenix.exception.SQLExceptionCode$Factory$1.newException(SQLExceptionCode.java:386)
        at org.apache.phoenix.exception.SQLExceptionInfo.buildException(SQLExceptionInfo.java:145)
        at org.apache.phoenix.query.ConnectionQueryServicesImpl.openConnection(ConnectionQueryServicesImpl.java:288)
        at org.apache.phoenix.query.ConnectionQueryServicesImpl.access$300(ConnectionQueryServicesImpl.java:171)
        at org.apache.phoenix.query.ConnectionQueryServicesImpl$12.call(ConnectionQueryServicesImpl.java:1881)
        at org.apache.phoenix.query.ConnectionQueryServicesImpl$12.call(ConnectionQueryServicesImpl.java:1860)
        at org.apache.phoenix.util.PhoenixContextExecutor.call(PhoenixContextExecutor.java:77)
        at org.apache.phoenix.query.ConnectionQueryServicesImpl.init(ConnectionQueryServicesImpl.java:1860)
        at org.apache.phoenix.jdbc.PhoenixDriver.getConnectionQueryServices(PhoenixDriver.java:162)
        at org.apache.phoenix.jdbc.PhoenixEmbeddedDriver.connect(PhoenixEmbeddedDriver.java:131)
        at org.apache.phoenix.jdbc.PhoenixDriver.connect(PhoenixDriver.java:133)
        at java.sql.DriverManager.getConnection(DriverManager.java:571)
        at java.sql.DriverManager.getConnection(DriverManager.java:187)
        at org.apache.phoenix.mapreduce.util.ConnectionUtil.getConnection(ConnectionUtil.java:92)
        at org.apache.phoenix.mapreduce.util.ConnectionUtil.getOutputConnection(ConnectionUtil.java:80)
        at org.apache.phoenix.mapreduce.util.ConnectionUtil.getOutputConnection(ConnectionUtil.java:68)
        at org.apache.phoenix.mapreduce.PhoenixRecordWriter.<init>(PhoenixRecordWriter.java:49)
        at org.apache.phoenix.mapreduce.PhoenixOutputFormat.getRecordWriter(PhoenixOutputFormat.java:55)
        ... 8 more
Caused by: java.io.IOException: java.lang.reflect.InvocationTargetException
        at org.apache.hadoop.hbase.client.HConnectionManager.createConnection(HConnectionManager.java:457)
        at org.apache.hadoop.hbase.client.HConnectionManager.createConnection(HConnectionManager.java:350)
        at org.apache.phoenix.query.HConnectionFactory$HConnectionFactoryImpl.createConnection(HConnectionFactory.java:47)
        at org.apache.phoenix.query.ConnectionQueryServicesImpl.openConnection(ConnectionQueryServicesImpl.java:286)
        ... 23 more
Caused by: java.lang.reflect.InvocationTargetException
        at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
        at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
        at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
        at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
        at org.apache.hadoop.hbase.client.HConnectionManager.createConnection(HConnectionManager.java:455)
        ... 26 more
Caused by: java.lang.UnsupportedOperationException: Unable to find org.apache.hadoop.hbase.ipc.controller.ClientRpcControllerFactory
        at org.apache.hadoop.hbase.util.ReflectionUtils.instantiateWithCustomCtor(ReflectionUtils.java:36)
        at org.apache.hadoop.hbase.ipc.RpcControllerFactory.instantiate(RpcControllerFactory.java:56)
        at org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation.<init>(HConnectionManager.java:769)
        at org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation.<init>(HConnectionManager.java:689)
        ... 31 more
Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.hbase.ipc.controller.ClientRpcControllerFactory
        at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
        at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
        at java.security.AccessController.doPrivileged(Native Method)
        at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
        at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
        at java.lang.Class.forName0(Native Method)
        at java.lang.Class.forName(Class.java:191)
        at org.apache.hadoop.hbase.util.ReflectionUtils.instantiateWithCustomCtor(ReflectionUtils.java:32)
        ... 34 more
+4
3

Phoenix :

" , JAR- Phoenix , SPARK_CLASSPATH, conf ( SPARK_CLASSPATH, ): spark.driver.extraClassPath spark.executor.extraClassPath "

https://www.mail-archive.com/user@spark.apache.org/msg29978.html

+4

Spark 4.3.1 Apache Phoenix. Spark 4.4.0. Maven ( 29 ).

, Phoenix 4.4.0, . CDH, .

PS: JIRA , Phoenix

+1

. : Phoenix rpc factory, Phoenix, . ClientRpcControllerFactory.

In some cases, Phoenix-enabled clusters are used from pure-HBase client applications, resulting in ClassNotFoundExceptions in MapReduce applications or jobs. Because the hbase configuration is shared between Phoenix clients and HBase clients, the complexity of the various configurations is client-side. That is why you get this exception. This issue has been fixed by HBASE-14960. If the hbase version is older than 2.0.0, 1.2.0, 1.3.0, 0.98.17, you can define your client rpc controller with this parameter in hbase-site.xml:

  <property>
    <name>hbase.rpc.controllerfactory.class</name>
    <value>org.apache.hadoop.hbase.ipc.RpcControllerFactory</value>  
  </property> 
+1
source

All Articles