2017-09-19 6 views
0

J'essaye de créer la liste de groupe basée sur certaines conditions dans Rxjava.Comment grouper par et renvoyer la liste dans rx java

ci-dessous est ma réponse:

{ 
    "dates":[ 
     { 
     "date":18, 
     "value":"1" 
     }, 
     { 
     "date":18, 
     "value":"2" 
     }, 
     { 
     "date":18, 
     "value":"3" 
     }, 
     { 
     "date":19, 
     "value":"1" 
     }, 
     { 
     "date":19, 
     "value":"2" 
     }, 
     { 
     "date":19, 
     "value":"3" 
     }, 
     { 
     "date":19, 
     "value":"4" 
     } 
    ] 
} 

Comment peut-groupe par les valeurs 18 [valeur 1, valeur 2, la valeur 3, highestvalue = 3, lowestvalue = 1] 19 [valeur 1, la valeur 2, Note: Je peux créer en utilisant pour boucle mais la réponse sera récupérée du serveur et puisqu'elle renvoie la pensée observable d'employer la fonctionnalité de rx java. Rxjava

Toute aide sera grandement appréciée.

Merci, Shanthi

Répondre

3

Regardez dans la fonctionnalité group by.

Voici l'exemple pour tous ceux qui est curieux:

class DateModel implements Comparable<DateModel>{ 
    Integer date; 
    Integer value; 

    public DateModel(int date, int value){ 
     this.date = date; 
     this.value = value; 
    } 

    @Override 
    public int compareTo(DateModel o) { 
     return value.compareTo(o.value); 
    } 
} 

Et si nous devons regrouper une liste de ces objets modèle:

// example list 
List<DateModel> dateList = Arrays.asList(
    new DateModel(18,1), 
    new DateModel(18,2), 
    new DateModel(18,3), 
    new DateModel(19,1), 
    new DateModel(19,2), 
    new DateModel(19,3), 
    new DateModel(19,4) 
); 

// the following observable will give you an emission for every grouping 
// for the example data above, you should get two emissions (group 18 and 19) 
Observable<PriorityQueue<DateModel>> observable = 
    Observable.from(dateList) 
    .groupBy(dateModel -> dateModel.date) 
    .flatMap(groups -> groups.collect(PriorityQueue::new, PriorityQueue::add)); 

PriorityQueue était juste un exemple de la structure utilisée pour collecte. Si vous sortez de la file d'attente, vous aurez 18-1, 18-2, 18-3 etc (dans l'ordre que vous avez demandé). Vous pouvez utiliser une structure différente dans le seul but de trouver le maximum & min.

-1

Merci pour votre réponse.

mais j'ai été capable de le résoudre en utilisant le code ci-dessous.

Map<String, List<Date>> grouped = dates.body().getDate() 
         .stream() 
         .collect(Collectors.groupingBy(x -> { 
          return x.getTime().getMday(); // we can use any logic here 
         })); 
+1

cela utilise des flux dans java8 pas Rxjava façon – Raghunandan

+0

oui cela utilise des flux. Comme je n'effectue aucune opération asynchrone, j'ai essayé cette approche. –

0

Cela peut facilement être récupéré comme ci-dessous:

List<Date> list = Arrays.asList(new Date[]{ 
      new Date(18, 1), new Date(18, 2), new Date(18, 3), new Date(19, 1), new Date(19, 2) 
    }); 

    Observable 
      .fromArray(list) 
      .map(new Function<List<Date>, List<Date>>() { 
       @Override 
       public List<Date> apply(@NonNull List<Date> dates) throws Exception { 
        TreeMap<Integer, List<Date>> treeMap = new TreeMap<Integer, List<Date>>(); 
        for (Date date : dates) { 
         List<Date> storedDates = treeMap.get(date.date); 
         if (storedDates == null) { 
          storedDates = new ArrayList<Date>(); 
          treeMap.put(date.date, storedDates); 
         } 
         storedDates.add(date); 
        } 

        List<Date> result = new ArrayList<Date>(); 
        for (Integer integer : treeMap.keySet()) { 
         result.addAll(treeMap.get(integer)); 
        } 
        return result; 
       } 
      }); 
1

Inspirés par la réponse de ce qui fonctionne @ Jon. Voici un code de démonstration complet pour Rxjava2 et la sortie.

  • Observable#collect() pour Observable
  • + Single#blockingGet() pour Flowable

La sortie:

----------------------byCollect 
[2017/11/16 20:42:43.548 CST][ 1 -       main] - flatMapSingle : : 1 
[2017/11/16 20:42:43.590 CST][ 1 -       main] - flatMapSingle : : 2 
[2017/11/16 20:42:43.591 CST][ 1 -       main] - flatMapSingle : : 0 
[2017/11/16 20:42:43.592 CST][ 1 -       main] - subscribe : onNext : {0=[3, 6, 9]} 
[2017/11/16 20:42:43.593 CST][ 1 -       main] - subscribe : onNext : {1=[1, 4, 7]} 
[2017/11/16 20:42:43.593 CST][ 1 -       main] - subscribe : onNext : {2=[2, 5, 8]} 
[2017/11/16 20:42:43.597 CST][ 1 -       main] - subscribe : onComplete : 
----------------------byParallelAndBlockingGet 
[2017/11/16 20:42:43.629 CST][ 13 -  RxComputationThreadPool-1] - flatMap : : 1 
[2017/11/16 20:42:43.629 CST][ 15 -  RxComputationThreadPool-3] - flatMap : : 0 
[2017/11/16 20:42:43.629 CST][ 14 -  RxComputationThreadPool-2] - flatMap : : 2 
[2017/11/16 20:42:43.632 CST][ 15 -  RxComputationThreadPool-3] - subscribe : onNext : {0=[3, 6, 9]} 
[2017/11/16 20:42:43.632 CST][ 15 -  RxComputationThreadPool-3] - subscribe : onNext : {1=[1, 4, 7]} 
[2017/11/16 20:42:43.633 CST][ 15 -  RxComputationThreadPool-3] - subscribe : onNext : {2=[2, 5, 8]} 
[2017/11/16 20:42:43.633 CST][ 15 -  RxComputationThreadPool-3] - subscribe : onComplete : 

La source: Demo.java

import io.reactivex.*; 
import io.reactivex.Observable; 
import io.reactivex.schedulers.*; 

import java.time.*; 
import java.time.format.*; 
import java.util.*; 

/** 
* List<Integer>    // [1..9] 
* -> 
* Map<Integer,List<Integer> // {0: [3,6,9], 1: [1,4,7], 2: [2,5,8] } 
*/ 
public class Demo { 
    public static void main(String[] args) throws InterruptedException { 
     byCollect(); 
     byParallelAndBlockingGet(); 
    } 

    public static void byCollect() throws InterruptedException { 
     System.out.println("----------------------byCollect"); 
     Observable.range(1, 9) 
       .groupBy(i -> i % 3) 
       .flatMapSingle(f -> { // GroupedObservable<Integer, List<Integer>> 

        // Look output : all runs on same thread, 
        print("flatMapSingle : ", f.getKey()); 

        // "onComplete" has not been triggered. 
        // blockingGet will block current thread. 
        //return Observable.just(Collections.singletonMap(f.getKey(), f.toList().blockingGet())) 

        return f.collect(
          // (Callable<Map<Integer, List<Integer>>>) 
          () -> Collections.singletonMap(f.getKey(), new ArrayList<Integer>()), 

          // (BiConsumer<Map<Integer, List<Integer>>, Integer>) 
          (m, i) -> m.get(f.getKey()).add(i) 
        ); 

       }) 
       .subscribe(
         i -> print("subscribe : onNext", i), 
         err -> print("subscribe : onError", err), 
         () -> print("subscribe : onComplete", "") 
       ) 
     ; 
    } 

    public static void byParallelAndBlockingGet() throws InterruptedException { 
     System.out.println("----------------------byParallelAndBlockingGet"); 
     Flowable.range(1, 9) 
       .groupBy(i -> i % 3) 
       .parallel() // There's no `parallel` method on `Observable` class 
       .runOn(Schedulers.computation()) // Important!!! 
       .flatMap(f -> { // ParallelFlowable<GroupedFlowable<Integer, List<Integer>> 
        // Look output : runs on different thread each. 
        print("flatMap : ", f.getKey()); 
        return Flowable.just(Collections.singletonMap(f.getKey(), f.toList().blockingGet())); 
       }) 
       .sequential() 
       .subscribe(
         i -> print("subscribe : onNext", i), 
         err -> print("subscribe : onError", err), 
         () -> print("subscribe : onComplete", "") 
       ) 
     ; 
     Thread.sleep(500); 
    } 

    public static void print(String step, Object data) { 
     ZonedDateTime zdt = ZonedDateTime.now(); 
     String now = zdt.format(DateTimeFormatter.ofPattern("yyyy/MM/dd HH:mm:ss.SSS z")); 
     System.out.printf("[%s][%4d - %30s] - %10s : %s%n", 
       now, 
       Thread.currentThread().getId(), 
       Thread.currentThread().getName(), 
       step, 
       data 
     ); 
    } 
}