2017-10-20 33 views
1

Je cherche à convertir une certaine logique dans mon application (Android) pour utiliser RxJava, mais j'ai du mal à trouver un moyen de faire de la logique plus avancée.Mettre à jour et conserver les données existantes avec RxJava

Le cas d'utilisation est le suivant: Je veux montrer une sorte de flux à l'utilisateur. Le flux se compose d'éléments provenant de différentes sources, par ex. messages, articles, etc. En raison des restrictions de l'API, l'application elle-même doit rassembler les sources individuelles et les afficher dans une liste.

Par exemple, supposons que mes articles dans l'alimentation sont les suivantes:

class FeedItem { 
    Type feedItem; // Type of the item, e.g. article, message, etc. 
    ... 
} 

Actuellement, l'alimentation est construit sur un thread séparé, et l'interface utilisateur est averti en utilisant un écouteur lorsque l'alimentation a été mis à jour. Pour vous donner une idée de la façon dont cela est fait, voici un code (pseudo) Java (threading et autre code administratif est omis pour plus de clarté).

class FeedProducer { 
    List<FeedItem> currentData = new ArrayList(); 

    public void refreshData() { 
     for (FeedSource source: getFeedSources()) { 
      Type sourceType = source.getType(); 
      // Remove existing items 
      currentData.removeIf(item -> item.feedItem.equals(sourceType)); 
      List<FeedItem> newItems = source.produceItems(); 
      // Add the new items 
      currentData.addAll(newItems); 
      // Notify the UI things have changed 
      notifyDataChanged(currentData); 
     } 
     // Notify the UI we are done loading 
     notifyLoadingComplete(); 
    } 
} 

Cette méthode refreshData() sera appelée à chaque fois que l'utilisateur souhaite actualiser les données. De cette façon, il est possible de mettre à jour seulement certaines sources, tandis que d'autres restent les mêmes (par exemple en changeant la valeur de retour de getFeedSources()).

Ces sources sont également utilisées individuellement dans d'autres parties de l'application; Je les ai convertis en observables là-bas. Cela a rendu les choses beaucoup plus faciles, par ex. Si la base de données change, les changements sont simplement poussés à l'interface utilisateur par l'Observable.

Ma question est donc de savoir comment je peux (élégamment) fusionner ces sources observables en un observable, mais où il y a un état "global" des résultats précédents. J'ai examiné les différents opérateurs de combinaison, mais je n'ai pas trouvé ce dont j'avais besoin. Mes excuses si je néglige quelque chose d'évident, comme je suis assez nouveau à RxJava.

Répondre

1

l'option Naive serait, appelant le stockage dernière liste et en donnant comme paramètre lorsque vous demandez de nouvelles données:

public class ReactiveMultipleSources { 

    // region Classes 
    public enum SourceType { 
     TYPE_ARTICLE, 
     TYPE_MESSAGE, 
     TYPE_VIDEO 
    } 

    public static class Feed { 
     private SourceType sourceType; 
     private String content; 

     Feed(SourceType sourceType, String content) { 
      this.sourceType = sourceType; 
      this.content = content; 
     } 

     SourceType getSourceType() { 
      return sourceType; 
     } 
    } 
    // endregion 

    public static void main(String[] args) throws InterruptedException { 
     final List<Feed>[] currentList = new List[]{new ArrayList()}; 

     // Simulate refresh 
     refreshContent(currentList[0]) 
       .subscribe(feeds -> { 
        currentList[0] = feeds; 

        for (int i = 0; i < currentList[0].size(); i++) { 
         System.out.println(currentList[0].get(i).content); 
        } 
       }); 
     Thread.sleep(2000); 
     System.out.println(); 

     // Simulate refresh 
     refreshContent(currentList[0]) 
       .subscribe(feeds -> { 
        currentList[0] = feeds; 

        for (int i = 0; i < currentList[0].size(); i++) { 
         System.out.println(currentList[0].get(i).content); 
        } 
       }); 
     Thread.sleep(2000); 


    } 

    private static Observable<List<Feed>> refreshContent(@NotNull List<Feed> currentFeed) { 
     return Observable.fromIterable(getSourceTypes()) 
       .observeOn(Schedulers.io()) 
       // Get List<Feed> forEach sourceType 
       .concatMap(ReactiveMultipleSources::getFeedItemsBySourceType) 
       .observeOn(Schedulers.computation()) 
       // Get list of "List of Feed for sourceType", = List<List<Feed>> 
       .toList() 
       .map(lists -> { 
        for (List<Feed> list : lists) { 
         SourceType sourceType = list.get(0).getSourceType(); 
         // Remove items of currentFeed whose sourceType has new List<Feed> 
         currentFeed.removeIf(temp -> temp.getSourceType() == sourceType); 
         // Add new items 
         currentFeed.addAll(list); 
        } 
        return currentFeed; 
       }) 
       .toObservable(); 
    } 

    // region Helper 
    private static List<SourceType> getSourceTypes() { 
     return new ArrayList<>(Arrays.asList(SourceType.values())); 
    } 

    private static Observable<List<Feed>> getFeedItemsBySourceType(SourceType sourceType) { 
     String content; 
     if (sourceType == SourceType.TYPE_ARTICLE) 
      content = "article "; 
     else if (sourceType == SourceType.TYPE_MESSAGE) 
      content = "message "; 
     else if (sourceType == SourceType.TYPE_VIDEO) 
      content = "video "; 
     else 
      content = "article "; 

     Feed feed1 = new Feed(sourceType, content + createRandomInt()); 
     Feed feed2 = new Feed(sourceType, content + createRandomInt()); 
     Feed feed3 = new Feed(sourceType, content + createRandomInt()); 
     Feed feed4 = new Feed(sourceType, content + createRandomInt()); 

     return Observable.just(Arrays.asList(feed1, feed2, feed3, feed4)); 
    } 

    // For simulating different items each time List<Feed> is required 
    private static int createRandomInt() { 
     return ThreadLocalRandom.current().nextInt(0, 21); 
    } 
    // endregion 
} 

Exemple de sortie:

article 19 
article 15 
article 18 
article 18 
message 3 
message 2 
message 9 
message 1 
video 19 
video 17 
video 18 
video 11 

article 0 
article 4 
article 18 
article 15 
message 11 
message 16 
message 16 
message 4 
video 1 
video 7 
video 20 
video 2 
1

Si vous avez 3 tâches individuelles qui renvoient [0, 1], [10, 11] et [20, 21], vous souhaitez les fusionner en une seule liste. Dans ce cas, vous pouvez utiliser l'opération zip.

public class TestRx { 
    public static void main(String[] args) { 
     // some individual observables. 
     Observable<List<Integer>> observable1 = Observable.just(Arrays.asList(0, 1)); 
     Observable<List<Integer>> observable2 = Observable.just(Arrays.asList(10, 11)); 
     Observable<List<Integer>> observable3 = Observable.just(Arrays.asList(20, 21)); 

     Observable.zip(observable1, observable2, observable3, 
       new Func3<List<Integer>, List<Integer>, List<Integer>, List<Integer>>() { 
        @Override 
        public List<Integer> call(List<Integer> list1, List<Integer> list2, List<Integer> list3) { 
         // TODO: Remove existing items 

         // merge all lists 
         List<Integer> mergedList = new ArrayList<>(); 
         mergedList.addAll(list1); 
         mergedList.addAll(list2); 
         mergedList.addAll(list3); 
         return mergedList; 
        } 
       }) 
       .subscribe(new Observer<List<Integer>>() { 
        @Override 
        public void onNext(List<Integer> mergedList) { 
         System.out.println(mergedList); 
         // TODO: notifyDataChanged(mergedList) 
        } 

        @Override 
        public void onError(Throwable throwable) { 
         System.out.println(throwable.toString()); 
         // TODO: handle exceptions 
        } 

        @Override 
        public void onCompleted() { 
         // TODO: notifyLoadingComplete() 
        } 
       }); 
    } 
} 

En conséquence, il imprimer comme ça [0, 1, 10, 11, 20, 21].