2017-04-04 1 views
0

J'ai besoin d'aide de votre part dans ce problème. J'ai lu que le bec est responsable de lire les données ou de les préparer pour le traitement dans Bolt. donc j'ai écrit du code dans le bec pour ouvrir le fichier et lire ligne par ligneerreur dans la logique du bec

class SimSpout(storm.Spout): 
    # Not much to do here for such a basic spout 
    def initialize(self, conf, context): 
    ## Open the file with read only permit 
     self.f = open('data.txt', 'r') 
    ## Read the first line 
     self._conf = conf 
     self._context = context 
     storm.logInfo("Spout instance starting...") 
    # Process the next tuple 
    def nextTuple(self): 
     # check if it reach at the EOF to close it 
     for line in self.f.readlines(): 
     # Emit a random sentence 
     storm.logInfo("Emiting %s" % line) 
     storm.emit([line]) 

# Start the spout when it's invoked 
SimSpout().run() 

Est-ce exact?

Répondre

0

Vous écrivez Spout, dont la responsabilité dans Storm est d'émettre des tuples pour les boulons aval à traiter.

La tâche nextTuple de Spout est d'émettre un seul événement chaque fois qu'il est appelé. Dans votre code, vous émettez toutes les lignes d'un fichier. Si votre tuple unique est une seule ligne. Vous devez garder un décalage dans le fichier et lire cette ligne de décalage et émettre, mise à jour offset = offset + 1. quelque chose comme ci-dessous

class SimSpout(storm.Spout): 

    # Not much to do here for such a basic spout 
    def initialize(self, conf, context): 
    ## Open the file with read only permit 
    self.f = open('data.txt', 'r') 
    ## Read the first line 
    self._conf = conf 
    self._context = context 
    self._offset = 0 
    storm.logInfo("Spout instance starting...") 

# Process the next tuple 
def nextTuple(self): 
    # check if it reach at the EOF to close it 
    with open(self.f) as f: 
     f.readlines()[self._offset] 
     #Emit a random sentence 
     storm.logInfo("Emiting %s" % line) 
     storm.emit([line]) 
    self._offset = self._offset + 1 
+0

Un grand merci pour la réponse, je suis arrivé ce « avec open (self.f) TypeError: contraindre en Unicode: besoin de chaîne ou de tampon, fichier trouvé " – user3188912

+0

l'erreur que j'ai résolu parce que j'ai ouvert le fichier deux fois, mais j'ai eu un autre AttributeError: l'objet 'SimSpout' n'a pas d'attribut '_offset' i cherché pour le résoudre, mais n'a rien trouvé d'aide – user3188912