Skip to main content

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index] [List Home]
Re: [geomesa-users] Write to Geomesa from Spark

Hi Adam,

I haven't seen an example of writing to GeoMesa via Spark.  Since a SimpleFeature will be written as 3 or more Accumulo mutations across 2 or more tables, it is probably a little bit of work to sort out setting up an approach Spark output format.*  We may or may not be able to turn something like that around quickly.

That said, there are two other options:

First, if you are running locally or if the data is small enough, you can 'collect' the SimpleFeatures you wish to write to GeoMesa and then write them directly by getting a FeatureStore and calling addFeatures.  Admittedly, this approach has some scale limitations.  I only suggest it while we work out something a little more robust.

Second, one should be able to distribute the map of parameters required to build a GeoMesa AccumuloDataStore to the Spark servers, construct a FeatureStore on each node, and write features from there.  I'm thinking something like ...

queryRDD.foreachPartition { _.foreach { sf => // foreachPartition gets us an Iterator[SimpleFeature]; this grabs them one at a time.
    val ds = DataStoreFinder.getDataStore(params).asInstanceOf[AccumuloDataStore]
    val fs = ds.getFeatureSource("OutputFeatureName").asInstanceOf[AccumuloFeatureStore]
    fs.addFeatures(new ListFeatureCollection(fs.getSchema, List(sf)))
  }
}

I say 'one' since as I tried tonight and received errors like

14/12/09 18:20:01 INFO scheduler.DAGScheduler: Failed to run collect at <console>:41
org.apache.spark.SparkException: Job aborted due to stage failure: Task not serializable: java.io.NotSerializableException: org.locationtech.geomesa.core.data.AccumuloDataStore
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1044)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1028)

Thanks again for the questions about Spark! 

Jim

* One would need to get a FeatureStore inside the output class/object to create the actual Accumulo mutations.

On 12/09/2014 05:29 PM, Adam F wrote:
Does anyone have an example of how to write to geomesa from Spark.  This is what I have so far.  I'd welcome any feedback.

val job = new Job()
val config = new Configuration
ConfiguratorBase.setConnectorInfo(classOf[AccumuloOutputFormat], config,              ds.connector.whoami(), ds.authToken)
ConfiguratorBase.setZooKeeperInstance(classOf[AccumuloOutputFormat], config, ds.connector.getInstance().getInstanceName, ds.connector.getInstance().getZooKeepers)
OutputConfigurator.setDefaultTableName(classOf[AccumuloOutputFormat], config, ds.getSpatioTemporalIdxTableName(sft))

    val output = inputRDD.map(toFeature)
      .saveAsNewAPIHadoopFile(config.getString("accumulo.instance"),
                              classOf[Void],
                              classOf[SimpleFeature],
                              classOf[AccumuloOutputFormat],
                              job.getConfiguration)


Assume I already have the geomesa tables created and I have a handle to the datastore in the form of ds.  I also have the SimpleFeatureType as sft.  Something tells me I have default tableName identified incorrectly.  Also, I'm not sure what should go with all the classOf calls. 

Thanks
-- Adam


_______________________________________________
geomesa-users mailing list
geomesa-users@xxxxxxxxxxxxxxxx
To change your delivery options, retrieve your password, or unsubscribe from this list, visit
http://www.locationtech.org/mailman/listinfo/geomesa-users


Back to the top