2017-04-04 2 views
0

J'écris des données dans le stockage accumulo en utilisant nativement Geomesa Native Client. Voici mon code javaExécution de la méthode flush, pour envoyer des mutations à accumulo sans fermer l'auteur

package org.locationtech.geomesa.api; 

import com.google.common.base.Function; 
import com.google.common.collect.ImmutableMap; 
import com.google.common.collect.Iterables; 
import com.google.common.collect.Lists; 
import com.google.gson.Gson; 
import com.vividsolutions.jts.geom.Coordinate; 
import com.vividsolutions.jts.geom.Geometry; 
import com.vividsolutions.jts.geom.GeometryFactory; 
import org.apache.accumulo.core.client.Connector; 
import org.apache.accumulo.core.client.mock.MockInstance; 
import org.apache.accumulo.core.client.security.tokens.PasswordToken; 
import org.apache.accumulo.core.security.Authorizations; 
import org.geotools.factory.CommonFactoryFinder; 
import org.geotools.feature.AttributeTypeBuilder; 
import org.geotools.geometry.jts.JTSFactoryFinder; 
import org.junit.Assert; 
import org.junit.Test; 
import org.locationtech.geomesa.accumulo.data.AccumuloDataStore; 
import org.locationtech.geomesa.accumulo.index.AccumuloFeatureIndex; 
import org.locationtech.geomesa.accumulo.index.AccumuloFeatureIndex$; 
import org.locationtech.geomesa.utils.index.IndexMode$; 
import org.opengis.feature.simple.SimpleFeature; 
import org.opengis.feature.type.AttributeDescriptor; 
import org.opengis.filter.FilterFactory2; 

import javax.annotation.Nullable; 
import java.time.ZonedDateTime; 
import java.util.ArrayList; 
import java.util.Date; 
import java.util.List; 
import java.util.Map; 
import java.util.SortedSet; 
import java.util.TreeSet; 

public class WorkerBeta { 
    public static void main(String[] args){ 
     try { 
      DomainObjectValueSerializer dovs = new DomainObjectValueSerializer(); 
      final GeoMesaIndex<DomainObject> index = AccumuloGeoMesaIndex.buildWithView(
      "aj_v14", 
      "localhost:2181", 
      "hps", 
      "root", "9869547580", 
      false, 
      dovs, 
      new SimpleFeatureView<DomainObject>() { 
       AttributeTypeBuilder atb = new AttributeTypeBuilder(); 
       private List<AttributeDescriptor> attributeDescriptors = 
       Lists.newArrayList(atb.binding(Integer.class).buildDescriptor("rId") 
        , atb.binding(String.class).buildDescriptor("dId") 
        , atb.binding(Integer.class).buildDescriptor("s") 
        , atb.binding(Integer.class).buildDescriptor("a") 
        , atb.binding(Integer.class).buildDescriptor("e") 
       ); 
       @Override 
       public void populate(SimpleFeature f, DomainObject domainObject, String id, byte[] payload, Geometry geom, Date dtg) { 
       f.setAttribute("rId", domainObject.rideId); 
       f.setAttribute("dId", domainObject.deviceId); 
       f.setAttribute("s", domainObject.speed); 
       f.setAttribute("a", domainObject.angle); 
       f.setAttribute("e", domainObject.error); 
       } 

       @Override 
       public List<AttributeDescriptor> getExtraAttributes() { 
       return attributeDescriptors; 
       } 
      } 
     ); 

     //Inserting 
     final DomainObject one = new DomainObject(1, "AJJASsP", 12, 40, 1); 
     final GeometryFactory gf = JTSFactoryFinder.getGeometryFactory(); 
     System.out.println(index.insert(
       one, 
       gf.createPoint(new Coordinate(-74.0, 34.0)), 
       date("2017-03-31T01:15:00.000Z") 
      )); 

      //Read 
      GeoMesaQuery q = GeoMesaQuery.GeoMesaQueryBuilder.builder() 
       .within(-90.0, -180, 90, 180) 
       .during(date("2017-01-01T00:00:00.000Z"), date("2017-04-01T00:00:00.000Z")) 
       .build(); 
      Iterable<DomainObject> results = index.query(q); 
      int counter = 0; 
      for(DomainObject dm : results){ 
       counter += 1; 
       System.out.println("result counter: " + counter); 
       dovs.toBytes(dm); 
      } 
     } 
     catch (Exception ex){ 
      ex.printStackTrace(); 
     } 
     index.close(); 
    } 
    public static class DomainObject { 
     public final int rideId; 
     public final String deviceId; 
     public final int angle; 
     public final int speed; 
     public final int error; 

     public DomainObject(int rideId, String deviceId, int angle, int speed, int error) { 
      this.rideId = rideId; 
      this.deviceId = deviceId; 
      this.angle = angle; 
      this.speed = speed; 
      this.error = error; 
     } 
    } 
    public static class DomainObjectValueSerializer implements ValueSerializer<DomainObject> { 
     public static final Gson gson = new Gson(); 
     @Override 
     public byte[] toBytes(DomainObject o) { 
      return gson.toJson(o).getBytes(); 
     } 
     @Override 
     public DomainObject fromBytes(byte[] bytes) { 
      return gson.fromJson(new String(bytes), DomainObject.class); 
     } 
    } 
    public static Date date(String s) { 
     return Date.from(ZonedDateTime.parse(s).toInstant()); 
    } 
} 

Le problème avec ce code est, je dois créer index objet à chaque fois pour une nouvelle demande d'insertion et appeler index.close() pour refléter la même chose. Mais je ne peux pas exécuter insert(), une fois que index.close() est appelée. Cependant, je vais accepter la demande d'insertion de la file d'attente à un débit très élevé et je ne veux pas créer index objet à chaque fois. Comment puis je faire ça?

En bref comment je peux vider les écritures sans appeler close().

Répondre

0

J'ai créé le fichier de classe geomesa Client pour utiliser nativement geomesa. Voici la mise en œuvre partielle de la même qui montre comment vous pouvez vider avec AccumuloAppendFeatureWriter sans appeler pour fermer.

public class GeomesaClient { 
    private AccumuloDataStore ds = null; 
    private AccumuloAppendFeatureWriter fw = null; 
    private SimpleFeatureSource sfs = null; 
    private String tableName = ""; 
    private FeatureStore fst = null; 
    private SimpleFeatureType sft; 

    public GeomesaClient(Map<String, String> dsConf) throws Exception { 
    this.ds = (AccumuloDataStore) DataStoreFinder.getDataStore(dsConf); 
    this.tableName = dsConf.get("tableName"); 

    sft = createFeatureType(); 
    if(!Arrays.asList(this.ds.getTypeNames()).contains(sft.getTypeName())){ 
     ds.createSchema(sft); 
    } 
    this.fst = (FeatureStore)sfs; 
    this.fw = (AccumuloAppendFeatureWriter) (this.ds.getFeatureWriterAppend(sft.getTypeName(), 
     Transaction.AUTO_COMMIT)); 
    this.sfs = ds.getFeatureSource(sft.getTypeName()); 
    } 
    /* 
     Flush with AccumuloAppendFeatureWriter 
    */ 
    public void flush(boolean force) { 
    fw.flush(); 
    } 
}