2017-07-17 5 views
0

J'ai construit un client websocket basé sur netty websocket client example pour intégrer avec un flux tiers, le code est presque le même que l'exemple officiel, seulement changé le URI et la manipulation TextWebSocketFrame.Netty WebSocket Client Channel devient toujours inactif sur Linux Server

Ce client fait partie d'un Spring Boot webapp, et cela fonctionne très bien sur mon MacBook ou PC Windows 7, cependant, une fois que je déploie la guerre Linux serveur (version Linux Red 2.6.32-504.3.3.el6.x86_64 Hat 4.4.7-11), le canal deviendrait inactif directement au démarrage de l'application.

J'ai vérifié le réseau, la version de JDK (1.8.x), la version de Tomcat (8.5.16), la version de Netty (4.1.13.Final), semble aucun problème, vraiment confus.

WebSocketClientHandler.java:

public class WebSocketClientHandler extends SimpleChannelInboundHandler<Object> { 

    private final Logger logger = LoggerFactory.getLogger("feedlogger"); 

    private final WebSocketClientHandshaker handshaker; 
    private ChannelPromise handshakeFuture; 

    public WebSocketClientHandler(WebSocketClientHandshaker handshaker) { 
     this.handshaker = handshaker; 
    } 

    public ChannelFuture handshakeFuture() { 
     return handshakeFuture; 
    } 

    @Override 
    public void handlerAdded(ChannelHandlerContext ctx) { 
     handshakeFuture = ctx.newPromise(); 
    } 

    @Override 
    public void channelActive(ChannelHandlerContext ctx) { 
     handshaker.handshake(ctx.channel()); 
    } 

    @Override 
    public void channelInactive(ChannelHandlerContext ctx) { 
     // always reaches here immediately on Linux 
     logger.error("webSocket Client disconnected!"); 
    } 

    @Override 
    public void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception { 
     Channel ch = ctx.channel(); 
     if (!handshaker.isHandshakeComplete()) { 
      handshaker.finishHandshake(ch, (FullHttpResponse) msg); 
      logger.info("finishHandshake and webSocket Client Connected!"); 
      handshakeFuture.setSuccess(); 
      return; 
     } 

     if (msg instanceof FullHttpResponse) { 
      FullHttpResponse response = (FullHttpResponse) msg; 
      logger.error("Unexpected FullHttpResponse (getStatus=" + response.status() + ", content=" 
       + response.content().toString(CharsetUtil.UTF_8) + ')'); 
      return; 
     } 

     WebSocketFrame frame = (WebSocketFrame) msg; 
     if (frame instanceof TextWebSocketFrame) { 
      TextWebSocketFrame textFrame = (TextWebSocketFrame) frame; 
      logger.info(textFrame.text()); 
      List<String> messages = FeedJsonUtils.deserializeAsyncMessage(textFrame.text()); 
      if (!CollectionUtils.isEmpty(messages)) { 
       messages.stream().forEach(message -> { 
         FeedReceiver.onMessage(message); 
       }); 
      } 
     } else if (frame instanceof PongWebSocketFrame) { 
      logger.error("WebSocket Client received pong"); 
     } else if (frame instanceof CloseWebSocketFrame) { 
      logger.error("WebSocket Client received closing"); 
      ch.close(); 
     } 
    } 

    @Override 
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) 
    { 
     logger.error("exception caught: ", cause); 
     cause.printStackTrace(); 
     if (!handshakeFuture.isDone()) { 
      handshakeFuture.setFailure(cause); 
     } 
     ctx.close(); 
    } 
} 

WebSocketClientImpl.java:

import java.io.BufferedReader; 
import java.io.InputStreamReader; 
import java.net.URI; 

import org.slf4j.Logger; 
import org.slf4j.LoggerFactory; 
import org.springframework.scheduling.annotation.Async; 
import org.springframework.stereotype.Component; 
import org.springframework.util.StringUtils; 

import io.netty.bootstrap.Bootstrap; 
import io.netty.buffer.Unpooled; 
import io.netty.channel.Channel; 
import io.netty.channel.ChannelInitializer; 
import io.netty.channel.ChannelPipeline; 
import io.netty.channel.EventLoopGroup; 
import io.netty.channel.nio.NioEventLoopGroup; 
import io.netty.channel.socket.SocketChannel; 
import io.netty.channel.socket.nio.NioSocketChannel; 
import io.netty.handler.codec.http.DefaultHttpHeaders; 
import io.netty.handler.codec.http.HttpClientCodec; 
import io.netty.handler.codec.http.HttpObjectAggregator; 
import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame; 
import io.netty.handler.codec.http.websocketx.PingWebSocketFrame; 
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; 
import io.netty.handler.codec.http.websocketx.WebSocketClientHandshakerFactory; 
import io.netty.handler.codec.http.websocketx.WebSocketFrame; 
import io.netty.handler.codec.http.websocketx.WebSocketVersion; 
import io.netty.handler.codec.http.websocketx.extensions.compression.WebSocketClientCompressionHandler; 
import io.netty.handler.ssl.SslContext; 
import io.netty.handler.ssl.SslContextBuilder; 
import io.netty.handler.ssl.util.InsecureTrustManagerFactory; 


@Component 
public class WebsocketClientImpl { 
    private final Logger logger = LoggerFactory.getLogger("feedlogger"); 

    private static final String WSS_URL = "wss://api.xxx.com/ws/?token="; 

    private Channel ch = null; 

    @Async 
    public void connectWebSocket(String token) throws Exception { 

     URI uri = new URI(WSS_URL + token); 
     String scheme = uri.getScheme() == null ? "wss" : uri.getScheme(); 
     final String host = uri.getHost() == null ? "api.mollybet.com" : uri.getHost(); 

     final int port; 
     if (uri.getPort() == -1) { 
      if ("ws".equalsIgnoreCase(scheme)) { 
       port = 80; 
      } else if ("wss".equalsIgnoreCase(scheme)) { 
       port = 443; 
      } else { 
       port = -1; 
      } 
     } else { 
      port = uri.getPort(); 
     } 

     if (!"ws".equalsIgnoreCase(scheme) && !"wss".equalsIgnoreCase(scheme)) { 
      logger.error("Only WS(S) is supported."); 
      return; 
     } 

     // SSL handling 
     final boolean ssl = "wss".equalsIgnoreCase(scheme); 
     final SslContext sslCtx; 
     if (ssl) { 
      sslCtx = SslContextBuilder.forClient().trustManager(InsecureTrustManagerFactory.INSTANCE).build(); 
     } else { 
      sslCtx = null; 
     } 

     EventLoopGroup group = new NioEventLoopGroup(); 
     try { 
     // Connect with V13 (RFC 6455 aka HyBi-17). You can change it to V08 
     // or V00. 
     // If you change it to V00, ping is not supported and remember to 
     // change 
     // HttpResponseDecoder to WebSocketHttpResponseDecoder in the 
     // pipeline. 
     final WebSocketClientHandler handler = new WebSocketClientHandler(WebSocketClientHandshakerFactory 
       .newHandshaker(uri, WebSocketVersion.V13, null, true, new DefaultHttpHeaders())); 

     Bootstrap b = new Bootstrap(); 
     b.group(group).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() { 
      @Override 
      protected void initChannel(SocketChannel ch) { 
       ChannelPipeline p = ch.pipeline(); 
       if (sslCtx != null) { 
        p.addLast(sslCtx.newHandler(ch.alloc(), host, port)); 
       } 
       p.addLast(new HttpClientCodec(), new HttpObjectAggregator(8192), 
         WebSocketClientCompressionHandler.INSTANCE, handler); 
       } 
      }); 

      ch = b.connect(uri.getHost(), port).sync().channel(); 
      handler.handshakeFuture().sync(); 

      BufferedReader console = new BufferedReader(new InputStreamReader(System.in)); 
      while (true) { 
       String msg = console.readLine(); 
       if (msg == null) { 
        break; 
       } else if ("bye".equals(msg.toLowerCase())) { 
        ch.writeAndFlush(new CloseWebSocketFrame()); 
        ch.closeFuture().sync(); 
        break; 
       } else if ("ping".equals(msg.toLowerCase())) { 
        WebSocketFrame frame = new PingWebSocketFrame(Unpooled.wrappedBuffer(new byte[] { 8, 1, 8, 1 })); 
       ch.writeAndFlush(frame); 
       } else { 
        WebSocketFrame frame = new TextWebSocketFrame(msg); 
        ch.writeAndFlush(frame); 
       } 
      } 
     } finally { 
      group.shutdownGracefully(); 
     } 
    } 

    public void send(String message) { 
     if (ch == null) { 
      logger.error("channel unavailable"); 
      return; 
     } 
     WebSocketFrame frame = new TextWebSocketFrame(message); 
     ch.writeAndFlush(frame); 
    } 


} 

FeedServiceImpl.java: (websocket connecter automatiquement lorsque l'application commence)

@Service 
@PropertySource("classpath:api.properties") 
public class FeedServiceImpl implements FeedService { 
    private final static Logger logger = LoggerFactory.getLogger("feedlogger"); 

    @Autowired 
    private Environment env; 

    @Autowired 
    private WebsocketClientImpl websocketClientImpl; 

    @Override 
    @PostConstruct 
    public void connectWs() { 
     String token = env.getProperty("ws.token", String.class); 
     websocketClientImpl.connectWebSocket(token); 
    } 

} 

}

Répondre

0

Enfin, je résolu ce problème, ma WebSocketClient classe est presque identique à la WebSocketClient dans le netty websocket client example, après avoir remplacé le code suivant:

BufferedReader console = new BufferedReader(new InputStreamReader(System.in)); 
while (true) { 
    String msg = console.readLine(); 
    if (msg == null) { 
     break; 
    } else if ("bye".equals(msg.toLowerCase())) { 
     ch.writeAndFlush(new CloseWebSocketFrame()); 
     ch.closeFuture().sync(); 
     break; 
    } else if ("ping".equals(msg.toLowerCase())) { 
     WebSocketFrame frame = new PingWebSocketFrame(Unpooled.wrappedBuffer(new byte[] { 8, 1, 8, 1 })); 
     ch.writeAndFlush(frame); 
    } else { 
     WebSocketFrame frame = new TextWebSocketFrame(msg); 
     ch.writeAndFlush(frame); 
    } 
} 

avec:

Thread.sleep(Integer.MAX_VALUE); 

la connexion websocket retour à la normale maintenant.

+0

Aucun code n'apparaît dans votre question. – EJP

+0

@EJP, désolé, seulement implicitement mentionné le code avant (exemple de client NetSocket websocket), j'ai mis à jour la question et ajouté le code connexe. – qianlei