2015-08-03 3 views
3

Je souhaite obtenir une appartenance à un ensemble approximatif rapide, basée sur une fonction de chaîne de caractères appliquée à un grand RDD Spark de vecteurs de chaîne (enregistrements ~ 1B). Fondamentalement, l'idée serait de réduire en Bloom filter. Ce filtre bloom pourrait ensuite être diffusé aux travailleurs pour une utilisation ultérieure.Réduction avec un filtre de bloom

Plus précisément, j'ai actuellement

rdd: RDD[Vector[String]] 
f: Vector[String] => String 
val uniqueVals = rdd.map(f).distinct().collect() 
val uv = sc.broadcast(uniqueVals) 

Mais uniqueVals est trop grand pour être pratique, et je voudrais le remplacer par quelque chose de plus petite taille (et connu), à savoir un filtre bloom.

Mes questions:

  • est-il possible de réduire dans un filtre Bloom, ou dois-je recueillir d'abord, puis le construire dans le pilote?

  • Y a-t-il une implémentation de filtre Scala/Java Bloom mature qui conviendrait?

Répondre

9

Oui, les filtres Bloom peuvent être réduits, car ils ont quelques bonnes propriétés (ils sont monoids). Cela signifie que vous pouvez effectuer toutes les opérations d'agrégation en parallèle, en effectuant un seul passage sur les données pour construire le BloomFilter pour chaque partition, puis réduire ces BloomFilters ensemble pour obtenir un seul BloomFilter pour lequel vous pouvez demander contains.

Il existe au moins deux implémentations de BloomFilter dans Scala et les deux semblent des projets matures (ne les ont pas réellement utilisés en production). Le premier est Breeze et le second est Twitter's Algebird. Les deux contiennent des implémentations de croquis différents et beaucoup plus.

Voici un exemple comment faire avec Breeze:

import breeze.util.BloomFilter 

val nums = List(1 to 20: _*).map(_.toString) 
val rdd = sc.parallelize(nums, 5) 

val bf = rdd.mapPartitions { iter => 
    val bf = BloomFilter.optimallySized[String](10000, 0.001) 
    iter.foreach(i => bf += i) 
    Iterator(bf) 
}.reduce(_ | _) 

println(bf.contains("5")) // true 
println(bf.contains("31")) // false 
+1

Un problème avec cette solution: Il envoie tous les filtres de floraison pour toutes les partitions au conducteur avant de les fusionner qui peut facilement causer le conducteur à courir Mémoire insuffisante. 'treeReduce (_ | _, depth = DEPTH)' aide à résoudre ce problème en réduisant de manière arborescente. – anthonybell

+0

Excellente solution. Vous devriez également ajouter une coalesce entre la carte et la réduction pour une meilleure performance. Comme il n'y a qu'un seul filtre de bloom par partition, la réduction envoie directement tous les filtres de bloom au pilote pour la fusion finale. S'il y a beaucoup de partitions, cela peut être lent ou même aller au MOO. Coalescence avec le nombre de partition k tel que k * k ~ = le nombre initial de partitions sera optimal, même si certains exécutants ne sont pas utilisés. – Boris