Skip to main content

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index] [List Home]
Re: [geomesa-users] Ingest data through Spark

Hi Yikai,

It looks like you are on the right track.  We solved the same problem a little bit differently here (1).  The big difference is that code closes the writers and disposes of the datastore connection at the end of the write.  Starting in GeoMesa 1.2.4, we write pre-computed stats at ingest time if the stats config field is set (which it is by default); datastore.dispose cleans up those stats writers.

As quick suggestion, have you tried adding a 'writer.flush()' after the while loop?  (This may not address the stats writer issue I mention, but it might give a good way to make sure your data is written properly.)

I've been around Spark for awhile, but I don't get to work in it daily.  If I had to guess why your approach is dropping data, I'd point out that RDDs are meant more to capture a dataset and how to build it rather than maintaining state. 

Stepping outside that paradigm can be useful!  For GeoMesa SimpleFeatures, the serialization depends on knowing the SimpleFeatureType of the data.  We use code like (2) to broadcast all the SFTs we wish to work with and then call foreachPartition to put the SFT data into the GeoMesaSparkKryoRegistrator singleton object.  While I'm thinking through this right now, I could imagine things going wrong in some corner cases, but things have largely worked out for us since we are pushing around very little state.

Other than that, I'd like to point out our other Spark documentation and resources.  For more basics, check out (3) and (4).  Reading through the GeoMesa Spark code (5) might give some more ideas; some of that code helps support the data aggregation tutorial (6).  As I mentioned on the list earlier, GeoMesa's Spark can be wrapped up into a kernel for use in Jupyter.  This is a new capability, and we've documented it on GeoMesa's Confluence page (7).  Jupyter is definitely awesome since one can share their work in an interactive way.  (It should impress professors...;))

Great question!  All of this is still developing, so feel free to ask anything.  If you are interested in contributing, this is a great area to help with since things are evolving fast!

Cheers,

Jim

1.  https://github.com/locationtech/geomesa/blob/master/geomesa-compute/src/main/scala/org/locationtech/geomesa/compute/spark/GeoMesaSpark.scala#L153-L177
2.  https://github.com/locationtech/geomesa/blob/master/geomesa-compute/src/main/scala/org/locationtech/geomesa/compute/spark/GeoMesaSpark.scala#L197-L202

3.  http://www.geomesa.org/documentation/user/spark.html
4.  http://www.geomesa.org/documentation/tutorials/spark.html

5.  https://github.com/locationtech/geomesa/blob/master/geomesa-compute/src/main/scala/org/locationtech/geomesa/compute/spark/GeoMesaSpark.scala

6.  http://www.geomesa.org/documentation/current/tutorials/shallow-join.html
NB: The current in the URL points to the 1.2.5-SNAPSHOT docs. 

7.  https://geomesa.atlassian.net/wiki/display/GEOMESA/How-To+Articles
https://geomesa.atlassian.net/wiki/display/GEOMESA/How+to+install+the+Scala+Spark+%28Apache+Toree%29+Jupyter+kernel+with+GeoMesa+support

On 8/13/2016 2:47 AM, Yikai Gong wrote:
Hi,

May I ask a question about using AccumuloFeatureWriter for ingesting data in Spark. The idea is to make AccumuloFeatureWriter as singleton for each spark executor so that it can be reused and closed at the end.

I am relatively new to Spark and my code (in Java) looks like this:

dataRDD.foreachPartition(iterator -> {
    // get or init writer as a static variable in MyTool.class
    AppendAccumuloFeatureWriter writer = MyTool.getSingletonWriter(...);
    SimpleFeature feature = buildFeature(iterator.next());
    while(iterator.hasNext()){
        SimpleFeature toWrite = FeatureUtils.copyToWriter(writer, feature, true);
        writer.write();
    }
});

dataRDD.foreachPartition(iterator -> {
    // close writer if it hasn't been closed
    MyTool.closeWriter()
}


Problem: A random sets of data may not be saved to Accumulo in this way. I find a solution which is always do writer.flush() after writer.write(), but that is very inefficient.. I wanna ask if there are better ways to use AccumuloFeatureWriter .. and what may cause this weird losing data in writing. Thank you!

Regards
Yikai Gong


_______________________________________________
geomesa-users mailing list
geomesa-users@xxxxxxxxxxxxxxxx
To change your delivery options, retrieve your password, or unsubscribe from this list, visit
https://www.locationtech.org/mailman/listinfo/geomesa-users



Back to the top