Skip to main content

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index] [List Home]
Re: [geomesa-users] Problem with spark example

Jim,

I should have mentioned a few things earlier.  First, the issue with the core.index package object seems to occur when the package object is initialized.  This takes place, it seems before, my Spark job is initiated.  Here is the complete output of the error:

Exception in thread "main" java.lang.ExceptionInInitializerError
        at org.locationtech.geomesa.core.data.AccumuloDataStore$$anonfun$getSchema$1.apply(AccumuloDataStore.scala:712)
        at org.locationtech.geomesa.core.data.AccumuloDataStore$$anonfun$getSchema$1.apply(AccumuloDataStore.scala:705)
        at scala.Option.map(Option.scala:145)
        at org.locationtech.geomesa.core.data.AccumuloDataStore.getSchema(AccumuloDataStore.scala:705)
        at org.locationtech.geomesa.core.data.AccumuloDataStore.getSchema(AccumuloDataStore.scala:701)
        at runnables.GeomesaInputRunnable.readInput(GeomesaInputRunnable.scala:51)
        at com.apex.CEngine$class.run(CEngine.scala:48)
        at runnables.GeomesaInputRunnable.run(GeomesaInputRunnable.scala:36)
        at com.apex.CRun$.runJob(CRun.scala:120)
        at com.apex.CRun$.run(CRun.scala:105)
        at com.apex.CRun$.main(CRun.scala:49)
        at com.apex.CRun.main(CRun.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:292)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:55)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.ArithmeticException: Adding time zone offset caused overflow
        at org.joda.time.DateTimeZone.convertUTCToLocal(DateTimeZone.java:965)
        at org.joda.time.chrono.ZonedChronology$ZonedDateTimeField.get(ZonedChronology.java:422)
        at org.joda.time.base.BaseDateTime.<init>(BaseDateTime.java:129)
        at org.joda.time.base.BaseDateTime.<init>(BaseDateTime.java:97)
        at org.joda.time.DateTime.<init>(DateTime.java:209)
        at org.locationtech.geomesa.core.index.package$.<init>(index.scala:36)
        at org.locationtech.geomesa.core.index.package$.<clinit>(index.scala)
        ... 19 more


Second I'd like to point you to Sotora's Aggregate Micropath GitHub project (https://github.com/Sotera/aggregate-micro-paths).  I've used it as template of sorts for creating my Spark analytics.  If you look at their Spark project you'll notice it includes a runnable package.  This is where they handle input formats.  I'm essentially creating a new runnable to work with Geomesa.  I've also updated my analytic template to work with Spark-Jobserver (https://github.com/ooyala/spark-jobserver).  While my project works with Jobserver, I find it easier (for now) to just use spark-submit. 

Finally, you asked if I got my Spark job to run successfully.  No I haven't just yet.  After figuring out the issue with my use of DateTime, I'm now getting what an Accumulo error:

Exception in thread "main" java.io.IOException: org.apache.accumulo.core.client.AccumuloException: Table is online geomesa(e) cannot scan table in offline mode
        at org.apache.accumulo.core.client.mapreduce.InputFormatBase.getSplits(InputFormatBase.java:868)
        at org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:90)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202)
        at scala.Option.getOrElse(Option.scala:120)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:202)
        at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202)
        at scala.Option.getOrElse(Option.scala:120)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:202)
        at org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202)
        at scala.Option.getOrElse(Option.scala:120)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:202)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1094)
        at org.apache.spark.rdd.RDD.count(RDD.scala:847)
        at com.apex.CEngine$class.run(CEngine.scala:49)
        at runnables.GeomesaInputRunnable.run(GeomesaInputRunnable.scala:36)
        at com.apex.CRun$.runJob(CRun.scala:120)
        at com.apex.CRun$.run(CRun.scala:105)
        at com.apex.CRun$.main(CRun.scala:49)
        at com.apex.CRun.main(CRun.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:292)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:55)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: org.apache.accumulo.core.client.AccumuloException: Table is online geomesa(e) cannot scan table in offline mode
        at org.apache.accumulo.core.client.mapreduce.InputFormatBase.binOfflineTable(InputFormatBase.java:712)
        at org.apache.accumulo.core.client.mapreduce.InputFormatBase.getSplits(InputFormatBase.java:841)
        ... 30 more

The table I'm working with inside Geomesa is called geomesa.  I've just started to review this error.  The output before the error is here:

spark.default.parallelism 8
spark.akka.frameSize 200
spark.storage.memoryFraction 0.5
check point dir: /tmp/checkpoints
Running on geomesa table: Some(geomesa)
Scanning ST index table for feature type geomesa
Filter: [ geom bbox POLYGON ((68.5 32.5, 68.5 32.9, 68.9 32.9, 68.9 32.5, 68.5 32.5)) ]
Geometry filters: List([ geom bbox POLYGON ((68.5 32.5, 68.5 32.9, 68.9 32.9, 68.9 32.5, 68.5 32.5)) ])
Temporal filters: List()
Other filters: List()
Tweaked geom filters are List([ Location bbox POLYGON ((68.5 32.5, 68.5 32.9, 68.9 32.9, 68.9 32.5, 68.5 32.5)
) ])
GeomsToCover: List(POLYGON ((68.5 32.5, 68.5 32.9, 68.9 32.9, 68.9 32.5, 68.5 32.5)))
Planning query
Random Partition Planner: Vector(00, 01, 02, 03, 04, 05, 06, 07, 08, 09, 10, 11, 12, 13, 14, 15, 16, 17, 18, 1
9, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46,
 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 7
4, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99)
ConstPlanner: geomesa
GeoHashKeyPlanner: 4 : List(..., t.., tt., ttb)
DatePlanner: start: 00000101 end: 99991231
Total ranges: 400
ColumnFamily Planner: 109 : List(.., 8., 8y, 8z, 9., 9n, 9p, 9q, 9r, 9w, 9x, 9y, 9z, b., bb, bc, bf, bg, bu, b
v)
STII Filter: [ Location bbox POLYGON ((68.5 32.5, 68.5 32.9, 68.9 32.9, 68.9 32.5, 68.5 32.5)) ]
Interval:  No interval
Filter: SpatialFilter(GEOMETRYCOLLECTION (POLYGON ((68.5 32.5, 68.5 32.9, 68.9 32.9, 68.9 32.5, 68.5 32.5))))

Please let me know if you've seen this error before.  If not, I'll report what I discovery during my investigation.

Thanks
-- Adam





On Mon, Nov 24, 2014 at 4:29 PM, Jim Hughes <jnh5y@xxxxxxxx> wrote:
Hi Adam,

I was just about to write sharing what I know.  I tried to replicate your issue, and I made it a little further than what you reported.  Thanks for sharing what you learned about Joda DateTime versions being slightly out of sync.

My efforts focused on using the Spark-shell, and I ran into some other serialization issues.  I've worked with Andrew Hulbert a few minutes this evening to try and address things.  We'll be looking into things some more.  He said he'd have a little more time to look into things tomorrow.

Out of curiosity, did you managed to your Spark job to run successfully?  If so, how have you been submitting your jobs?

Thanks for asking a great question,

Jim



On 11/24/2014 04:58 PM, Adam F wrote:
I've been working the error and discovered it was due to conflicting versions of DateTime.  My spark job was requiring 2.4 and Geomesa requires 2.3.  When I packaged my job into an uber jar, it was only packaging 2.4.  This is an issues because apparently 2.4 of DateTime has issues with using Long.MinValue and Long.MaxValue (reference https://github.com/JodaOrg/joda-time/issues/190) when initializing a DateTime object.  This same approach is used to initialize a few DateTime objects in org.locationtech.geomesa.core.index.scala (line 36/37).

Thanks
-- Adam
 

On Fri, Nov 21, 2014 at 9:17 PM, Adam Fraser <tigclem00@xxxxxxxxx> wrote:

I'm working with Spark and attempting to emulate the Spark example (http://www.geomesa.org/2014/08/05/spark/), but am having problems. 

The example creates a dataStore using:

val ds = DataStoreFinder.getDataStore(params).asInstanceOf[AccumuloDataStore]

 It then sends ds into an init function found in compute.spark.GeoMesaSpark. This init function calls ds.getSchema.  This is where the problem is occurring.  I'm getting the following error:

 

Exception in thread "main" java.lang.ExceptionInInitializerError at org.locationtech.geomesa.core.data.AccumuloDataStore$$anonfun$getSchema$2.apply(AccumuloDataStore.scala:712)

at org.locationtech.geomesa.core.data.AccumuloDataStore$$anonfun$getSchema$2.apply(AccumuloDataStore.scala:703)

at scala.Option.map(Option.scala:145)

at org.locationtech.geomesa.core.data.AccumuloDataStore.getSchema(AccumuloDataStore.scala:703)

at org.locationtech.geomesa.core.data.AccumuloDataStore.getSchema(AccumuloDataStore.scala:701)

After some debugging efforts, I've pretty much determined that this error is occurring because of the use of the core.index package object.  If you look inside the getSchema function inside AccumuloDataStore you'll see it is used several times to retrieve a few strings and call a function. 

It is like the core.index package object hasn't been initialized. 

Any ideas for how I can get getSchema to work?  Also, is it possible for you to post a complete solution to the Spark Example on GitHub?

 

Thanks

-- Adam

 


Sent from my iPad



_______________________________________________
geomesa-users mailing list
geomesa-users@xxxxxxxxxxxxxxxx
To change your delivery options, retrieve your password, or unsubscribe from this list, visit
http://www.locationtech.org/mailman/listinfo/geomesa-users


_______________________________________________
geomesa-users mailing list
geomesa-users@xxxxxxxxxxxxxxxx
To change your delivery options, retrieve your password, or unsubscribe from this list, visit
http://www.locationtech.org/mailman/listinfo/geomesa-users


Back to the top