2012-03-25 5 views
0

Je crois avoir tout fait correctement. Je crée un tuyau, passe le puits à un thread d'écriture, enregistre la source sur mon sélecteur avec OP_READ, démarre mon sélecteur. Tout fonctionne mais dès que j'écris quelque chose à l'évier, je reçois une exception de tuyau cassé. Pourquoi !!!??? Il n'y a pas de tuyau cassé ici. Je suis frustré. Comment puis-je déboguer/comprendre ce qui se passe ici? Est-ce que quelqu'un a un exemple de tuyau simple que je peux exécuter pour tester si cela fonctionne. Un fil écrit sur l'évier et le sélecteur le lisant.NIO Pipe lance "Broken Pipe" quand j'écris dans le lavabo sans raison! Comment déboguer?

EDIT: J'ai suivi à peu près la suggestion here. Il est difficile de trouver des exemples concrets de tuyaux NIO sur Internet.

import java.io.*; 
import java.nio.ByteBuffer; 
import java.nio.channels.*; 
import java.util.Iterator; 

public class SystemOutPipe extends Thread { 

    public static void main(String[] args) 
    { 
    try { 
     SystemOutPipe sop = new SystemOutPipe(); 
     sop.start(); 
     System.out.println("This message should be redirected to System.err\nNow waiting 5 seconds ..."); 
     Thread.sleep(5000L); 
     sop.setStopped(true); 
     sop.join(); 
    } catch (Exception e) { 
     e.printStackTrace(); 
    } 
    } 

    private Selector selector; 
    private Pipe pipe; 
    private boolean stopped = false; 

    public SystemOutPipe() throws IOException { 
    super("SystemOutPipe"); 
    pipe = Pipe.open(); 
    System.setOut(new PrintStream(new PipeOutputStream(pipe))); 
    selector = Selector.open(); 
    pipe.source().configureBlocking(false); 
    pipe.source().register(selector, SelectionKey.OP_READ, ByteBuffer.allocate(1024)); 
    } 

    @Override 
    public void run() { 
    try { 
     while (!isStopped()) { 
     int n = selector.select(1L); 
     if (n > 0) { 
      Iterator<SelectionKey> it = selector.selectedKeys().iterator(); 
      while (it.hasNext()) { 
      SelectionKey key = it.next(); 
      it.remove(); 
      if (key.isReadable()) { 
       new ReadHandler(key).run(); 
      } 
      } 
     } 
     } 
    } catch (Exception e) { 
     e.printStackTrace(); // writes to System.err ! 
    } 
    } 

    public synchronized boolean isStopped() { 
    return stopped; 
    } 

    public synchronized void setStopped(final boolean stopped) { 
    this.stopped = stopped; 
    } 

    public class ReadHandler implements Runnable { 
    private final SelectionKey key; 

    public ReadHandler(final SelectionKey key) { 
     this.key = key; 
    } 

    @Override 
    public void run() { 
     ByteBuffer bbuf = (ByteBuffer) key.attachment(); 
     ReadableByteChannel channel = (ReadableByteChannel) key.channel(); 
     try 
     { 
     int count = 0; 
     do { 
      bbuf.clear(); 
      count = channel.read(bbuf); 
      if (count > 0) System.err.write(bbuf.array(), 0, count); 
     } while(count > 0); 
     } catch (IOException e) { 
     e.printStackTrace(); 
     key.cancel(); 
     } 
    } 
    } 

    public class PipeOutputStream extends OutputStream { 
    private final Pipe pipe; 

    public PipeOutputStream(final Pipe pipe) { 
     this.pipe = pipe; 
    } 

    @Override 
    public void write(final int b) throws IOException { 
     write(new byte[] { (byte) b }); 
    } 

    @Override 
    public void write(final byte[] b) throws IOException { 
     write(b, 0, b.length); 
    } 

    @Override 
    public void write(final byte[] b, final int off, final int len) throws IOException { 
     ByteBuffer bbuf = ByteBuffer.wrap(b, off, len); 
     bbuf.position(len); 
     bbuf.flip(); 
     int count = 0; 
     while (count < len) { 
     int n = pipe.sink().write(bbuf); 
     if (n == 0) { 
      // let's wait a bit and not consume cpu 
      try { 
      Thread.sleep(1L); 
      } catch (InterruptedException e) { 
      throw new IOException(e); 
      } 
     } 
     else count += n; 
     } 
    } 
    } 
} 

EXCEPTION:

java.io.IOException: Broken pipe 
    at sun.nio.ch.FileDispatcher.write0(Native Method) 
    at sun.nio.ch.FileDispatcher.write(FileDispatcher.java:39) 
    at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:72) 
    at sun.nio.ch.IOUtil.write(IOUtil.java:43) 
    at sun.nio.ch.SinkChannelImpl.write(SinkChannelImpl.java:149) 
    at com.niostuff.util.GCLogInterceptor.fileModified(GCLogInterceptor.java:180) 
    at net.contentobjects.jnotify.linux.JNotifyAdapterLinux$WatchData.notifyFileModified(Unknown Source) 
    at net.contentobjects.jnotify.linux.JNotifyAdapterLinux.notifyChangeEvent(Unknown Source) 
    at net.contentobjects.jnotify.linux.JNotifyAdapterLinux$1.notify(Unknown Source) 
    at net.contentobjects.jnotify.linux.JNotify_linux.callbackProcessEvent(Unknown Source) 
    at net.contentobjects.jnotify.linux.JNotify_linux.nativeNotifyLoop(Native Method) 
    at net.contentobjects.jnotify.linux.JNotify_linux.access$000(Unknown Source) 
    at net.contentobjects.jnotify.linux.JNotify_linux$1.run(Unknown Source) 
+1

Comment pouvez-vous obtenir de l'aide sans afficher votre code? –

+0

@JimGarrison Désolé mais le code est énorme avec de nombreuses ramifications. C'est le problème, mais l'opération de base que j'ai décrite est là. Un tuyau cassé est une erreur terrible, car il ne vous dit rien sur le problème. Je suppose que c'est une bonne raison de rester loin des tuyaux et d'utiliser des files d'attente non bloquantes. – chrisapotek

+0

@chrisapotek - Si le code est trop grand, réduisez-le en quelque chose de plus petit qui pose toujours le problème. –

Répondre

0

Ok, donc je trouve le problème. D'abord merci pour tout le monde qui essaie d'aider. J'espère que vous apprendrez de mon erreur. La chaîne d'événements était:

1 - Je ne drainais pas le tampon de réception (celui dans lequel le canal source est lu) et il a finalement été plein.

2 - Maintenant qu'il est plein, pipeSourceChannel.read (readBuffer) renvoie 0 octet. Il y a des données à lire mais il ne peut pas lire sur un tampon complet.

3 - Cela a provoqué la fermeture du canal (je le faisais moi-même sur bytesRead == 0) et le BrokenPipe.

Une leçon que j'ai apprise ici: Les tuyaux sont difficiles. Je pense que les files d'attente simultanées non bloquantes sont beaucoup plus simples à utiliser comme ce type ici une fois mentionné: Java NIO Pipe vs BlockingQueue

+1

On dirait que j'étais en train de déboguer les mauvaises parties tout le temps = D Bon d'entendre que vous l'avez résolu, je vais supprimer ma réponse afin que les futurs visiteurs de cette question ne soient pas confondus – esaj

+0

Ce n'est pas un problème avec les pipes NIO en particulier, c'est juste une mauvaise utilisation de NIO en général.Vous devez toujours compacter ou effacer le tampon de lecture – EJP

+0

@EJP Je suis d'accord, mais je pense que les files d'attente non bloquantes sont plus simples les mêmes résultats Pas sûr de comparaison de performance entre eux, mais vous pouvez vérifier ce que j'ai fait ici: http://stackoverflow.com/a/9863066/962872 – chrisapotek