Skip to main content

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index] [List Home]
Re: [geomesa-users] Fwd: trouble with spark-shell interactive session with geomesa

Hi Jason,

thanks for your suggestion. I tried to use "--jars" instead of "--driver-class-path" before, without success.

Is the example code
geomesa/geomesa-compute/scripts/test/scala/localtest.scala
working for anyone?

Thanks so far,
Martin

On 08/01/2016 06:17 PM, Jason Brown wrote:

Hi Martin,

It may be the specification of --driver-class-path that is at issue. Try omitting that parameter:

spark-shell --jars ~/Downloads/geomesa-compute-1.2.3-shaded.jar

Let us know if that works for you!

-Jason




-------- Forwarded Message --------
Subject: [geomesa-users] trouble with spark-shell interactive session with geomesa
Date: Mon, 1 Aug 2016 13:12:57 +0200
From: Martin <mfleck@xxxxxxxxxxxxxxxxxxxxxxx>
Reply-To: Geomesa User discussions <geomesa-users@xxxxxxxxxxxxxxxx>
To: geomesa-users@xxxxxxxxxxxxxxxx


Hello everybody,

I want to run an interactive spark session with geomesa RDDs. For this I
use
geomesa/geomesa-compute/scripts/test/scala/localtest.scala
as example.

I start the shell using:

spark-1.6.1/bin/spark-shell --driver-class-path
/data/bigdata/installs/geomesa/geomesa-compute/target/geomesa-compute-1.2.3-shaded.jar
--jars
/data/bigdata/installs/accumulo-1.7.2/lib/ext/geomesa-accumulo-distributed-runtime-1.2.3.jar

As a minimal example, I paste the following to the spark-shell:

import org.apache.hadoop.conf.Configuration
import org.apache.spark.{SparkConf, SparkContext}
import org.geotools.data.{DataStoreFinder, Query}
import org.geotools.factory.CommonFactoryFinder
import org.locationtech.geomesa.accumulo.data._
import org.locationtech.geomesa.compute.spark.GeoMesaSpark
import org.opengis.filter.Filter
import scala.collection.JavaConversions._


val feature = "event_s"

// Get a handle to the data store
val params = Map(
  "instanceId" -> "GISCIENCE",
  "zookeepers" -> "localhost",
  "user"       -> "root",
  "password"   -> "CRAZYPASS1234",
  "tableName"  -> "gdelt_s")
val ds =
DataStoreFinder.getDataStore(params).asInstanceOf[AccumuloDataStore]

// We'll grab everything...but usually you want some CQL filter here
(e.g. bbox)
val q = new Query(feature, Filter.INCLUDE)

// Configure Spark to run locally with 4 threads
val conf = new Configuration
val sconf = GeoMesaSpark.init(new
SparkConf(true).setAppName("localtest").setMaster("local[4]"), ds)
val sc = new SparkContext(sconf)

// Create an RDD from a query
val queryRDD =
org.locationtech.geomesa.compute.spark.GeoMesaSpark.rdd(conf, sc, params, q)




I try to use the RDD (e.g. look at the first entry) and get the
following error:

scala> queryRDD.first()
16/08/01 13:04:20 INFO spark.SparkContext: Starting job: first at
<console>:47
16/08/01 13:04:20 INFO scheduler.DAGScheduler: Got job 0 (first at
<console>:47) with 1 output partitions
16/08/01 13:04:20 INFO scheduler.DAGScheduler: Final stage: ResultStage
0 (first at <console>:47)
16/08/01 13:04:20 INFO scheduler.DAGScheduler: Parents of final stage:
List()
16/08/01 13:04:20 INFO scheduler.DAGScheduler: Missing parents: List()
16/08/01 13:04:20 INFO scheduler.DAGScheduler: Submitting ResultStage 0
(MapPartitionsRDD[1] at map at GeoMesaSpark.scala:132), which has no
missing parents
16/08/01 13:04:20 INFO storage.MemoryStore: Block broadcast_1 stored as
values in memory (estimated size 2.5 KB, free 156.3 KB)
16/08/01 13:04:20 INFO storage.MemoryStore: Block broadcast_1_piece0
stored as bytes in memory (estimated size 1544.0 B, free 157.8 KB)
16/08/01 13:04:20 INFO storage.BlockManagerInfo: Added
broadcast_1_piece0 in memory on localhost:39451 (size: 1544.0 B, free:
11.3 GB)
16/08/01 13:04:20 INFO spark.SparkContext: Created broadcast 1 from
broadcast at DAGScheduler.scala:1006
16/08/01 13:04:20 INFO scheduler.DAGScheduler: Submitting 1 missing
tasks from ResultStage 0 (MapPartitionsRDD[1] at map at
GeoMesaSpark.scala:132)
16/08/01 13:04:20 INFO scheduler.TaskSchedulerImpl: Adding task set 0.0
with 1 tasks
16/08/01 13:04:20 INFO scheduler.TaskSetManager: Starting task 0.0 in
stage 0.0 (TID 0, localhost, partition 0,PROCESS_LOCAL, 2666 bytes)
16/08/01 13:04:20 INFO executor.Executor: Running task 0.0 in stage 0.0
(TID 0)
16/08/01 13:04:20 INFO executor.Executor: Fetching
http://127.0.0.1:45135/jars/geomesa-accumulo-distributed-runtime-1.2.3.jar
with timestamp 1470049368771
16/08/01 13:04:21 INFO util.Utils: Fetching
http://127.0.0.1:45135/jars/geomesa-accumulo-distributed-runtime-1.2.3.jar
to
/tmp/spark-9827b734-7a02-4ece-ae96-4b351e2da4bc/userFiles-cf854054-1223-4a59-9f73-64fca9da6f8e/fetchFileTemp3282897940080641691.tmp
16/08/01 13:04:21 INFO executor.Executor: Adding
file:/tmp/spark-9827b734-7a02-4ece-ae96-4b351e2da4bc/userFiles-cf854054-1223-4a59-9f73-64fca9da6f8e/geomesa-accumulo-distributed-runtime-1.2.3.jar
to class loader
16/08/01 13:04:21 INFO rdd.NewHadoopRDD: Input split:
mapreduce.GroupedSplit[localhost](2)
16/08/01 13:04:21 ERROR executor.Executor: Exception in task 0.0 in
stage 0.0 (TID 0)
java.io.NotSerializableException:
org.locationtech.geomesa.features.ScalaSimpleFeature
Serialization stack:
    - object not serializable (class:
org.locationtech.geomesa.features.ScalaSimpleFeature, value:
ScalaSimpleFeature:c0ff22f9-31fe-4e85-bd28-7ead2f65dead)
    - element of array (index: 0)
    - array (class [Lorg.opengis.feature.simple.SimpleFeature;, size 1)
    at
org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
    at
org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
    at
org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:240)
    at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
16/08/01 13:04:21 ERROR scheduler.TaskSetManager: Task 0.0 in stage 0.0
(TID 0) had a not serializable result:
org.locationtech.geomesa.features.ScalaSimpleFeature
Serialization stack:
    - object not serializable (class:
org.locationtech.geomesa.features.ScalaSimpleFeature, value:
ScalaSimpleFeature:c0ff22f9-31fe-4e85-bd28-7ead2f65dead)
    - element of array (index: 0)
    - array (class [Lorg.opengis.feature.simple.SimpleFeature;, size 1);
not retrying
16/08/01 13:04:21 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 0.0,
whose tasks have all completed, from pool
16/08/01 13:04:21 INFO scheduler.TaskSchedulerImpl: Cancelling stage 0
16/08/01 13:04:21 INFO scheduler.DAGScheduler: ResultStage 0 (first at
<console>:47) failed in 0.498 s
16/08/01 13:04:21 INFO scheduler.DAGScheduler: Job 0 failed: first at
<console>:47, took 0.577452 s
org.apache.spark.SparkException: Job aborted due to stage failure: Task
0.0 in stage 0.0 (TID 0) had a not serializable result:
org.locationtech.geomesa.features.ScalaSimpleFeature
Serialization stack:
    - object not serializable (class:
org.locationtech.geomesa.features.ScalaSimpleFeature, value:
ScalaSimpleFeature:c0ff22f9-31fe-4e85-bd28-7ead2f65dead)
    - element of array (index: 0)
    - array (class [Lorg.opengis.feature.simple.SimpleFeature;, size 1)
  at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431)
  at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419)
  at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418)
  at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
  at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418)
  at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
  at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
  at scala.Option.foreach(Option.scala:257)
  at
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799)
  at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640)
  at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599)
  at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588)
  at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
  at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:1845)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:1858)
  at org.apache.spark.rdd.RDD$$anonfun$take$1.apply(RDD.scala:1328)
  at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
  at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
  at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
  at org.apache.spark.rdd.RDD.take(RDD.scala:1302)
  at org.apache.spark.rdd.RDD$$anonfun$first$1.apply(RDD.scala:1342)
  at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
  at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
  at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
  at org.apache.spark.rdd.RDD.first(RDD.scala:1341)
  ... 52 elided

scala>


or, when using RDD.count():


scala> queryRDD.count()
16/08/01 13:08:10 INFO spark.SparkContext: Starting job: count at
<console>:47
16/08/01 13:08:10 INFO scheduler.DAGScheduler: Got job 0 (count at
<console>:47) with 2 output partitions
16/08/01 13:08:10 INFO scheduler.DAGScheduler: Final stage: ResultStage
0 (count at <console>:47)
16/08/01 13:08:10 INFO scheduler.DAGScheduler: Parents of final stage:
List()
16/08/01 13:08:10 INFO scheduler.DAGScheduler: Missing parents: List()
16/08/01 13:08:10 INFO scheduler.DAGScheduler: Submitting ResultStage 0
(MapPartitionsRDD[1] at map at GeoMesaSpark.scala:132), which has no
missing parents
16/08/01 13:08:10 INFO storage.MemoryStore: Block broadcast_1 stored as
values in memory (estimated size 2.3 KB, free 156.2 KB)
16/08/01 13:08:10 INFO storage.MemoryStore: Block broadcast_1_piece0
stored as bytes in memory (estimated size 1471.0 B, free 157.6 KB)
16/08/01 13:08:10 INFO storage.BlockManagerInfo: Added
broadcast_1_piece0 in memory on localhost:37410 (size: 1471.0 B, free:
11.3 GB)
16/08/01 13:08:10 INFO spark.SparkContext: Created broadcast 1 from
broadcast at DAGScheduler.scala:1006
16/08/01 13:08:10 INFO scheduler.DAGScheduler: Submitting 2 missing
tasks from ResultStage 0 (MapPartitionsRDD[1] at map at
GeoMesaSpark.scala:132)
16/08/01 13:08:10 INFO scheduler.TaskSchedulerImpl: Adding task set 0.0
with 2 tasks
16/08/01 13:08:10 INFO scheduler.TaskSetManager: Starting task 0.0 in
stage 0.0 (TID 0, localhost, partition 0,PROCESS_LOCAL, 2666 bytes)
16/08/01 13:08:10 INFO scheduler.TaskSetManager: Starting task 1.0 in
stage 0.0 (TID 1, localhost, partition 1,PROCESS_LOCAL, 2666 bytes)
16/08/01 13:08:10 INFO executor.Executor: Running task 1.0 in stage 0.0
(TID 1)
16/08/01 13:08:10 INFO executor.Executor: Running task 0.0 in stage 0.0
(TID 0)
16/08/01 13:08:10 INFO executor.Executor: Fetching
http://127.0.0.1:39288/jars/geomesa-accumulo-distributed-runtime-1.2.3.jar
with timestamp 1470049670427
16/08/01 13:08:10 INFO util.Utils: Fetching
http://127.0.0.1:39288/jars/geomesa-accumulo-distributed-runtime-1.2.3.jar
to
/tmp/spark-c08b0161-bf99-43c7-a895-1e7707d258af/userFiles-853eca44-c73f-4635-b325-601448df0243/fetchFileTemp7891527735922005126.tmp
16/08/01 13:08:10 INFO executor.Executor: Adding
file:/tmp/spark-c08b0161-bf99-43c7-a895-1e7707d258af/userFiles-853eca44-c73f-4635-b325-601448df0243/geomesa-accumulo-distributed-runtime-1.2.3.jar
to class loader
16/08/01 13:08:10 INFO rdd.NewHadoopRDD: Input split:
mapreduce.GroupedSplit[localhost](2)
16/08/01 13:08:10 INFO rdd.NewHadoopRDD: Input split:
mapreduce.GroupedSplit[localhost](2)
16/08/01 13:08:10 INFO executor.Executor: Finished task 0.0 in stage 0.0
(TID 0). 2082 bytes result sent to driver
16/08/01 13:08:10 INFO executor.Executor: Finished task 1.0 in stage 0.0
(TID 1). 2082 bytes result sent to driver
16/08/01 13:08:10 ERROR scheduler.TaskResultGetter: Exception while
getting task result
com.esotericsoftware.kryo.KryoException: Encountered unregistered class
ID: 13994
    at
com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:119)
    at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:610)
    at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:721)
    at
org.apache.spark.serializer.KryoSerializerInstance.deserialize(KryoSerializer.scala:311)
    at
org.apache.spark.scheduler.DirectTaskResult.value(TaskResult.scala:97)
    at
org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply$mcV$sp(TaskResultGetter.scala:60)
    at
org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:51)
    at
org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:51)
    at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1765)
    at
org.apache.spark.scheduler.TaskResultGetter$$anon$2.run(TaskResultGetter.scala:50)
    at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
16/08/01 13:08:10 ERROR scheduler.TaskResultGetter: Exception while
getting task result
com.esotericsoftware.kryo.KryoException: Encountered unregistered class
ID: 13994
    at
com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:119)
    at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:610)
    at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:721)
    at
org.apache.spark.serializer.KryoSerializerInstance.deserialize(KryoSerializer.scala:311)
    at
org.apache.spark.scheduler.DirectTaskResult.value(TaskResult.scala:97)
    at
org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply$mcV$sp(TaskResultGetter.scala:60)
    at
org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:51)
    at
org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:51)
    at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1765)
    at
org.apache.spark.scheduler.TaskResultGetter$$anon$2.run(TaskResultGetter.scala:50)
    at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
16/08/01 13:08:10 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 0.0,
whose tasks have all completed, from pool
16/08/01 13:08:10 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 0.0,
whose tasks have all completed, from pool
16/08/01 13:08:10 INFO scheduler.TaskSchedulerImpl: Cancelling stage 0
16/08/01 13:08:10 INFO scheduler.DAGScheduler: ResultStage 0 (count at
<console>:47) failed in 0.524 s
16/08/01 13:08:10 INFO scheduler.DAGScheduler: Job 0 failed: count at
<console>:47, took 0.637746 s
org.apache.spark.SparkException: Job aborted due to stage failure:
Exception while getting task result:
com.esotericsoftware.kryo.KryoException: Encountered unregistered class
ID: 13994
  at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431)
  at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419)
  at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418)
  at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
  at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418)
  at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
  at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
  at scala.Option.foreach(Option.scala:257)
  at
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799)
  at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640)
  at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599)
  at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588)
  at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
  at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:1845)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:1858)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:1929)
  at org.apache.spark.rdd.RDD.count(RDD.scala:1157)
  ... 52 elided

scala>



What is going wrong here?
I'm very thankful for your help. Let me know if more information is needed.

Best regards,
Martin
_______________________________________________
geomesa-users mailing list
geomesa-users@xxxxxxxxxxxxxxxx
To change your delivery options, retrieve your password, or unsubscribe from this list, visit
https://www.locationtech.org/mailman/listinfo/geomesa-users





Back to the top