J'ai un ensemble de données de la formeObtenez ensemble aléatoire de lignes de PCollection
user_id, date, other_columns
1, 2017-03-10, ...
2, 2017-03-10, ...
3, 2017-03-10, ...
...
et je dois faire ce qui suit: Pour chaque ligne de l'ensemble de données que je veux générer une nouvelle ligne qui contiendra le ligne courante et un sous-ensemble aléatoire de lignes N pour le même jour correspondant aux différents utilisateurs comme suit:
row, other_rows
{'user_id': 1, 'date': '2017-03-10', ...}, [{'user_id': 2,...},...]
{'user_id': 2, 'date': '2017-03-10', ...}, [{'user_id': 1,...},...]
...
Je l'ai mis en œuvre comme suit, mais il est très lent pour les grands ensembles de données lorsque exécuté sur le nuage.
dataset
| 'map-to-date' >> beam.Map(lambda x: (x['date'], x))
| 'group-by-date' >> beam.GroupByKey()
| 'generate-output' >> beam.ParDo(GenerateOutputRows())
où GenerateOutputRows
est défini comme:
class GenerateOutputRows(beam.DoFn):
def process(self, element):
(date, rows) = element
for r in rows:
other_users_rows = list(filter(lambda x: x['user_id'] != r['user_id'],
rows))
yield (r, random.sample(other_users_rows, N))
Pouvez-vous penser à une autre façon pour obtenir plus performant le résultat désiré?
Avez-vous réellement besoin de cela pour chaque rangée? ou juste une fois par utilisateur-jour? – CasualT
Oui, j'ai besoin de cela pour chaque rangée. Je génère un jeu de données d'apprentissage pour un modèle ML, et chaque ligne sera un exemple d'apprentissage. – pnezis
Quelle est la taille de votre jeu de données et quelle est l'opération la plus lente? Combien de travailleurs utilisez-vous? Et avez-vous un identifiant de travail? – Pablo