2010-02-24 3 views
2

ObjectInputStream bloque lorsqu'il est créé jusqu'à ce qu'il reçoive un flux d'entrée série et le vérifie. J'essayais de faire mon premier programme en utilisant des sockets à travers et j'ai trouvé ça. J'ai utilisé un objet fictif pour qu'il ne bloque pas. Le code est ici:ObjectInputStream: est-ce correct de débloquer?

import java.io.*;      
import java.net.*;      
import java.util.*;      

class Dummy implements Serializable { 
} 

class X_Int implements Serializable { 
    int x; 
} 

class Server { 
     public static void main(String args[]) throws Exception { 
       ServerSocket ss = new ServerSocket(5879); 
       Socket client = ss.accept(); 
       ObjectOutputStream out = new ObjectOutputStream(client.getOutputStream()); 
       out.writeObject(new Dummy()); 
       ObjectInputStream in = new ObjectInputStream(client.getInputStream()); 
       in.readObject(); 
       out.flush(); 
       out.writeObject(new Date()); 
       out.flush(); 
       out.close(); 
     } 
} 

class Client { 
    public static void main(String args[]) throws Exception { 
     Socket server = new Socket("localhost", 5879); 
     ObjectOutputStream out = new ObjectOutputStream(server.getOutputStream()); 
     out.writeObject(new Dummy()); 
     ObjectInputStream in = new ObjectInputStream(server.getInputStream()); 
     in.readObject(); 
     out.flush(); 
     Date d = (Date)in.readObject(); 
     System.out.println(d); 
    } 
} 

Est-ce la bonne façon. Commentez s'il vous plaît.

Répondre

2

Une meilleure façon est de se débarrasser de la cause du blocage en premier lieu. Utilisez ces classes au lieu des deux côtés, si vous le pouvez:

public class ObjInputStream extends ObjectInputStream { 
    /** 
    * @param in 
    * @throws IOException 
    */ 
    public ObjInputStream(InputStream in) throws IOException { 
     super(in); 
    } 

    /* (non-Javadoc) 
    * @see java.io.ObjectInputStream#readStreamHeader() 
    */ 
    @Override 
    protected void readStreamHeader() throws IOException, StreamCorruptedException { 
    } 
} 

et

public class ObjOutputStream extends ObjectOutputStream { 

    /** 
    * @param out 
    * @throws IOException 
    */ 
    public ObjOutputStream(OutputStream out) throws IOException { 
     super(out); 
    } 

    /* (non-Javadoc) 
    * @see java.io.ObjectOutputStream#writeStreamHeader() 
    */ 
    @Override 
    protected void writeStreamHeader() throws IOException { 
    } 
} 

Cela supprime les fonctions qui sont appelées à vérifier les informations de version de flux et tel. En outre, comme vous utilisez des paquets TCP, la fragmentation IP entraînera la non réception de vos objets 'entiers' à l'autre extrémité - TCP est un socket de flux. Ce dont vous avez besoin est une classe d'entrée/sortie supplémentaire. Heureusement, je l'ai déjà codé ça :)

/** 
* 
*/ 
package objtest; 

import java.io.IOException; 
import java.io.InputStream; 
import java.nio.BufferUnderflowException; 
import java.nio.ByteBuffer; 
import java.util.ArrayDeque; 
import java.util.Queue; 

import kokuks.KokuKS; 

/** 
* UnrealConceptTest - FramedInputStream 
* @version 1.0 
*/ 
public class FramedInputStream extends InputStream { 
    public static final int INITIAL_BUFFER_SIZE = 8 << 1; 

    public static final int FRAME_HEADER_1 = 0xBEEFFACE; 
    public static final int FRAME_HEADER_2 = 0xFACEBEEF; 

    public static final byte[] HEADER_BYTES = new byte[4 * 2]; 
    protected static final byte[] CURR_HEADER_BUFF = new byte[HEADER_BYTES.length]; 

    static { 
     ByteBuffer b = ByteBuffer.allocateDirect(8); 

     b.putInt(FRAME_HEADER_1); 
     b.putInt(FRAME_HEADER_2); 

     ByteBuffer b2 = (ByteBuffer) b.flip(); 

     b2.get(HEADER_BYTES, 0, 4); 
     b2.get(HEADER_BYTES, 3, 4); 
    } 

    protected int  size   = 0; 
    protected int  chain  = 0; 
    protected boolean inFrame  = false; 
    protected boolean readingSize = false; 
    protected int  sizePos  = 0; 

    protected int dbgput = 0; 


    protected ByteBuffer  bb = ByteBuffer.allocateDirect(INITIAL_BUFFER_SIZE); 
    protected Queue<ByteBuffer> bbq = new ArrayDeque<ByteBuffer>(); 
    protected ByteBuffer  currBuff = null; 

    protected final boolean  recoverFromError; 

    /** 
    * 
    */ 
    public FramedInputStream(boolean recoverFromError) { 
     this.recoverFromError = recoverFromError; 
    } 

    public FramedInputStream() { 
     this(true); 
    } 

    protected boolean ensureFramebufferCapacity(int min) { 
     int mymin = 1 << min; 

     if (mymin <= bb.capacity()) return false; 

     int num = bb.capacity(); 
     while (num < mymin) num <<= 1; 

     ByteBuffer bb2 = ByteBuffer.allocateDirect(num); 
     // copy old data into new bytebuffer 
     int bb_pos = bb.position(); 
     bb.rewind(); 
     bb2.put(bb); 
     bb = bb2; 

     if (KokuKS.DEBUG_MODE) System.out.println("modified buffer size to: " + num); 

     return true; 
    } 

    /** 
    * @return the recoverFromError 
    */ 
    public boolean isRecoverFromError() { 
     return recoverFromError; 
    } 

    /* (non-Javadoc) 
    * @see java.io.InputStream#read() 
    */ 
    @Override 
    public int read() throws IOException { 
     if (currBuff == null || !currBuff.hasRemaining()) return -1; 

     byte b = currBuff.get(); 
     //System.out.println("data: " + b); 
     return b; 
    } 

    public void putBuffer(ByteBuffer source) { 
     ensureFramebufferCapacity(bb.capacity() + source.remaining()); 

     while (source.hasRemaining()) { 
      putByte(source.get()); 
     } 
    } 

    public boolean checkCompleteFrame() { 
     return !bbq.isEmpty(); 
    } 

    /* (non-Javadoc) 
    * @see java.io.InputStream#available() 
    */ 
    @Override 
    public int available() throws IOException { 
     return currBuff != null ? currBuff.remaining() : 0; 
    } 

    public int read(byte[] data) { 
     if (currBuff == null || !currBuff.hasRemaining()) { 
      return -1; 
     } 

     if (data.length > currBuff.remaining()) { 
      throw new BufferUnderflowException(); 
     } 

     currBuff.get(data); 

     //System.out.println("data: " + new String(data)); 

     return data.length; 
    } 

    public boolean nextFrame() { 
     ByteBuffer bbf = bbq.poll(); 

     if (bbf != null) { 
      /* 
      System.out.println("bbf limit: " + bbf.limit()); 
      System.out.println("bbf pos: " + bbf.position()); 
      System.out.println("bbf data: " + new String(bbf.array())); 
      */ 

      //byte[] data = bbf.array(); 

      //for (int i = 0; i < data.length; i++) { 
      // byte by = data[i]; 
      // System.out.println("b: " + (by > 32 ? new String(new byte[] {by}) : "??") + ", " + by); 
      //}   

      currBuff = ByteBuffer.allocateDirect(bbf.limit()); 
      currBuff.put(bbf).flip(); 
      bbf.rewind(); 

      /* 
      System.out.println("currbuf limit: " + currBuff.limit()); 
      System.out.println("currbuf pos: " + currBuff.position()); 
      System.out.println("currbuf data: " + new String(currBuff.array())); 
      */ 

      currBuff.rewind(); 
      currBuff.position(1); 

      return true; 
     } 

     return false; 
    } 


    public void putByte(byte b) { 
     //System.out.println("pb b: " + ObjTest.getByteStr(b)); 

     if (recoverFromError || !inFrame) { 
      if (b == HEADER_BYTES[chain++]) { 

       if (chain >= (HEADER_BYTES.length)) { 
        if (KokuKS.DEBUG_MODE) System.out.println("got header!" + (inFrame ? " (recovered)" : "")); 

        // we have a header! hurrah. 
        inFrame = true; 
        sizePos = 0; 
        size = 0; 
        readingSize = true; 
        chain = 0; 

        bb.clear(); 
       } 
      } else { 
       chain = 0; 
      } 
     } 

     if (inFrame) { 
      if (readingSize) { 
       size += (b & 0xFF) << ((8 * 3) - (8 * sizePos)); 
       //System.out.println("new size: " + size); 
       sizePos++; 

       if (sizePos >= 4) { 
        // we've read the size :) 
        readingSize = false; 
        sizePos = 0; 

        ensureFramebufferCapacity(size); 
        bb.clear(); 
        bb.limit(size); // set buffer limit to size 
        //System.out.println("bb limit set to: " + bb.limit()); 
       } 
      } else { 
       //System.out.println("put: " + dbgput++ + ", " + ObjTest.getByteStr(b)); 
       bb.put(b); 

       if (!bb.hasRemaining()) { 
        bb.flip(); 

        //System.out.println("bb limit after flip(): " + bb.limit()); 

        //System.out.println("bblimit: " + bb.limit()); 

        ByteBuffer newbuf = ByteBuffer.allocateDirect(bb.limit()); 
        newbuf.put(bb).flip(); //we have to flip this 
        bbq.offer(newbuf); 

        //byte[] data = newbuf.array(); 

        //for (int i = 0; i < newbuf.limit(); i++) { 
        // byte by = data[i]; 
        // System.out.println("b: " + (by > 32 ? new String(new byte[] {by}) : "??") + ", " + by); 
        //} 

        inFrame = false; 
        readingSize = false; 
        size = 0; 
        sizePos = 0; 
        chain = 0; 

        bb.clear(); 

        if (KokuKS.DEBUG_MODE) System.out.println("FIS: complete object"); 
        //System.out.println("FIS: newbuf: " + new String(newbuf.array(), 0, newbuf.limit())); 
       } 
      } 
     } 
    } 
} 

et

/** 
* 
*/ 
package objtest; 

import java.io.IOException; 
import java.nio.ByteBuffer; 

import koku.util.io.ByteBufferOutputStream; 

/** 
* UnrealConceptTest - FramedOutputStream 
* @version 1.0 
* @author Chris Dennett 
*/ 
public class FramedOutputStream extends ByteBufferOutputStream { 
    public static final int FRAME_HEADER_1 = 0xBEEFFACE; 
    public static final int FRAME_HEADER_2 = 0xFACEBEEF; 

    public static final byte[] HEADER_BYTES = new byte[4 * 2]; 
    public static final byte[] CURR_HEADER_BUFF = new byte[HEADER_BYTES.length]; 

    /* We pad the beginning of our buffer so that we can write the frame 
    * length when the time comes. */ 
    protected static final byte[] SIZE_PAD = new byte[4]; 

    static { 
     ByteBuffer b = ByteBuffer.allocate(8); 

     b.putInt(FRAME_HEADER_1); 
     b.putInt(FRAME_HEADER_2); 

     ByteBuffer b2 = (ByteBuffer) b.flip(); 

     b2.get(HEADER_BYTES, 0, 4); 
     b2.get(HEADER_BYTES, 3, 4); 
    } 

    /** 
    * 
    */ 
    public FramedOutputStream() { 
     try { 
      write(HEADER_BYTES); 
      write(SIZE_PAD); 
     } catch (IOException e) { 
      System.out.println("Couldn't write header padding!"); 
     } 
    } 

    /* (non-Javadoc) 
    * @see koku.util.io.ByteBufferOutputStream#flip() 
    */ 
    @Override 
    public ByteBuffer flip() { 
     // flip the buffer which will limit it to it's current position 
     super.flip(); 

     // then write the frame length and rewind back to the start of the 
     // buffer so that all the data is available   
     _buffer.position(11); 
     int size = _buffer.remaining(); 

     //System.out.println("remaining after complete header: " + size); 

     _buffer.position(7); 

     //System.out.println("remaining after frameheader: " + _buffer.remaining()); 

     putSizeAsBytes(size, _buffer); 

     //System.out.println("written size: " + size); 

     // System.out.println("buffer limit: " + _buffer.limit()); 

     //System.out.println("_buffer: " + new String(_buffer.array(), 0, _buffer.limit())); 

     _buffer.position(11); 

     // System.out.println("_buffer11: " + ObjTest.getByteStr(_buffer.get())); 
     //System.out.println("_buffer12: " + ObjTest.getByteStr(_buffer.get())); 
     //System.out.println("_buffer13: " + ObjTest.getByteStr(_buffer.get())); 
     //System.out.println("_buffer14: " + ObjTest.getByteStr(_buffer.get())); 

     _buffer.rewind(); 

     //_buffer.rewind(); 

     //while (_buffer.hasRemaining()) { 
     // byte b = _buffer.get(); 
     // System.out.println("b: " + (b > 32 ? new String(new byte[] {b}) : "??") + ", " + b); 
     //} 

     _buffer.rewind(); 

     return _buffer; 
    } 

    /* (non-Javadoc) 
    * @see koku.util.io.ByteBufferOutputStream#reset() 
    */ 
    @Override 
    public void reset() { 
     super.reset(); 

     try { 
      write(HEADER_BYTES); 
      write(SIZE_PAD); 
     } catch (IOException e) { 
      System.out.println("Couldn't write header padding!"); 
     } 
    } 

    public static void putSizeAsBytes(int size, ByteBuffer bb) { 
     //System.out.println("putSizeAsBytes: given size: " + size); 

     // encode 
     for (int i = 0; i < 4; i++) { 
      bb.put((byte)((size >>> ((8 * 3) - (8 * i))) & 0xFF)); 
     } 
    } 
} 

BBOS:

// 
// $Id: ByteBufferOutputStream.java 5829 2009-06-20 21:09:34Z mdb $ 
// 
// Narya library - tools for developing networked games 
// Copyright (C) 2002-2009 Three Rings Design, Inc., All Rights Reserved 
// http://www.threerings.net/code/narya/ 
// 
// This library is free software; you can redistribute it and/or modify it 
// under the terms of the GNU Lesser General Public License as published 
// by the Free Software Foundation; either version 2.1 of the License, or 
// (at your option) any later version. 
// 
// This library is distributed in the hope that it will be useful, 
// but WITHOUT ANY WARRANTY; without even the implied warranty of 
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 
// Lesser General Public License for more details. 
// 
// You should have received a copy of the GNU Lesser General Public 
// License along with this library; if not, write to the Free Software 
// Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA 

package misc; 

import java.io.OutputStream; 
import java.nio.BufferOverflowException; 
import java.nio.ByteBuffer; 

import org.apache.mina.core.buffer.IoBuffer; 

/** 
* Stores output in an {@link ByteBuffer} that grows automatically to accommodate the data. 
*/ 
public class ByteBufferOutputStream extends OutputStream 
{ 
    /** 
    * Creates a new byte buffer output stream. 
    */ 
    public ByteBufferOutputStream() 
    { 
     _buffer = IoBuffer.allocate(INITIAL_BUFFER_SIZE); 
    } 

    /** 
    * Returns a reference to the underlying buffer. 
    */ 
    public IoBuffer getBuffer() 
    { 
     return _buffer; 
    } 

    /** 
    * Flips and returns the buffer. The returned buffer will have a position of zero and a limit 
    * equal to the number of bytes written. Call {@link #reset} to reset the buffer before 
    * writing again. 
    */ 
    public IoBuffer flip() 
    { 
     return _buffer.flip(); 
    } 

    /** 
    * Resets our internal buffer. 
    */ 
    public void reset() 
    { 
     _buffer.clear(); 
    } 

    @Override // documentation inherited 
    public void write (int b) 
    { 
     try { 
      _buffer.put((byte)b); 
     } catch (BufferOverflowException boe) { 
      expand(1); 
      _buffer.put((byte)b); 
     } 
    } 

    @Override // documentation inherited 
    public void write (byte[] b, int off, int len) 
    { 
     // sanity check the arguments 
     if ((off < 0) || (off > b.length) || (len < 0) || 
      ((off + len) > b.length) || ((off + len) < 0)) { 
      throw new IndexOutOfBoundsException(); 
     } else if (len == 0) { 
      return; 
     } 

     try { 
      _buffer.put(b, off, len); 
     } catch (BufferOverflowException boe) { 
      expand(len); 
      _buffer.put(b, off, len); 
     } 
    } 

    /** 
    * Expands our buffer to accomodate the specified capacity. 
    */ 
    protected final void expand (int needed) 
    { 
     _buffer.expand(needed); 
    } 

    /** The buffer in which we store our frame data. */ 
    protected IoBuffer _buffer; 

    /** The default initial size of the internal buffer. */ 
    protected static final int INITIAL_BUFFER_SIZE = 32; 
} 
0

Vous avez juste besoin de rincer() la sortie avant de créer le flux d'entrée d'objet. Vous n'avez pas besoin d'envoyer d'objets factices.

Questions connexes