J'ai construit un client websocket basé sur netty websocket client example pour intégrer avec un flux tiers, le code est presque le même que l'exemple officiel, seulement changé le URI
et la manipulation TextWebSocketFrame
.Netty WebSocket Client Channel devient toujours inactif sur Linux Server
Ce client fait partie d'un Spring Boot
webapp, et cela fonctionne très bien sur mon MacBook ou PC Windows 7, cependant, une fois que je déploie la guerre Linux
serveur (version Linux Red 2.6.32-504.3.3.el6.x86_64 Hat 4.4.7-11), le canal deviendrait inactif directement au démarrage de l'application.
J'ai vérifié le réseau, la version de JDK (1.8.x), la version de Tomcat (8.5.16), la version de Netty (4.1.13.Final), semble aucun problème, vraiment confus.
WebSocketClientHandler.java:
public class WebSocketClientHandler extends SimpleChannelInboundHandler<Object> {
private final Logger logger = LoggerFactory.getLogger("feedlogger");
private final WebSocketClientHandshaker handshaker;
private ChannelPromise handshakeFuture;
public WebSocketClientHandler(WebSocketClientHandshaker handshaker) {
this.handshaker = handshaker;
}
public ChannelFuture handshakeFuture() {
return handshakeFuture;
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) {
handshakeFuture = ctx.newPromise();
}
@Override
public void channelActive(ChannelHandlerContext ctx) {
handshaker.handshake(ctx.channel());
}
@Override
public void channelInactive(ChannelHandlerContext ctx) {
// always reaches here immediately on Linux
logger.error("webSocket Client disconnected!");
}
@Override
public void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
Channel ch = ctx.channel();
if (!handshaker.isHandshakeComplete()) {
handshaker.finishHandshake(ch, (FullHttpResponse) msg);
logger.info("finishHandshake and webSocket Client Connected!");
handshakeFuture.setSuccess();
return;
}
if (msg instanceof FullHttpResponse) {
FullHttpResponse response = (FullHttpResponse) msg;
logger.error("Unexpected FullHttpResponse (getStatus=" + response.status() + ", content="
+ response.content().toString(CharsetUtil.UTF_8) + ')');
return;
}
WebSocketFrame frame = (WebSocketFrame) msg;
if (frame instanceof TextWebSocketFrame) {
TextWebSocketFrame textFrame = (TextWebSocketFrame) frame;
logger.info(textFrame.text());
List<String> messages = FeedJsonUtils.deserializeAsyncMessage(textFrame.text());
if (!CollectionUtils.isEmpty(messages)) {
messages.stream().forEach(message -> {
FeedReceiver.onMessage(message);
});
}
} else if (frame instanceof PongWebSocketFrame) {
logger.error("WebSocket Client received pong");
} else if (frame instanceof CloseWebSocketFrame) {
logger.error("WebSocket Client received closing");
ch.close();
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
{
logger.error("exception caught: ", cause);
cause.printStackTrace();
if (!handshakeFuture.isDone()) {
handshakeFuture.setFailure(cause);
}
ctx.close();
}
}
WebSocketClientImpl.java:
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.net.URI;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.http.DefaultHttpHeaders;
import io.netty.handler.codec.http.HttpClientCodec;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
import io.netty.handler.codec.http.websocketx.PingWebSocketFrame;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketClientHandshakerFactory;
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketVersion;
import io.netty.handler.codec.http.websocketx.extensions.compression.WebSocketClientCompressionHandler;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
@Component
public class WebsocketClientImpl {
private final Logger logger = LoggerFactory.getLogger("feedlogger");
private static final String WSS_URL = "wss://api.xxx.com/ws/?token=";
private Channel ch = null;
@Async
public void connectWebSocket(String token) throws Exception {
URI uri = new URI(WSS_URL + token);
String scheme = uri.getScheme() == null ? "wss" : uri.getScheme();
final String host = uri.getHost() == null ? "api.mollybet.com" : uri.getHost();
final int port;
if (uri.getPort() == -1) {
if ("ws".equalsIgnoreCase(scheme)) {
port = 80;
} else if ("wss".equalsIgnoreCase(scheme)) {
port = 443;
} else {
port = -1;
}
} else {
port = uri.getPort();
}
if (!"ws".equalsIgnoreCase(scheme) && !"wss".equalsIgnoreCase(scheme)) {
logger.error("Only WS(S) is supported.");
return;
}
// SSL handling
final boolean ssl = "wss".equalsIgnoreCase(scheme);
final SslContext sslCtx;
if (ssl) {
sslCtx = SslContextBuilder.forClient().trustManager(InsecureTrustManagerFactory.INSTANCE).build();
} else {
sslCtx = null;
}
EventLoopGroup group = new NioEventLoopGroup();
try {
// Connect with V13 (RFC 6455 aka HyBi-17). You can change it to V08
// or V00.
// If you change it to V00, ping is not supported and remember to
// change
// HttpResponseDecoder to WebSocketHttpResponseDecoder in the
// pipeline.
final WebSocketClientHandler handler = new WebSocketClientHandler(WebSocketClientHandshakerFactory
.newHandshaker(uri, WebSocketVersion.V13, null, true, new DefaultHttpHeaders()));
Bootstrap b = new Bootstrap();
b.group(group).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ChannelPipeline p = ch.pipeline();
if (sslCtx != null) {
p.addLast(sslCtx.newHandler(ch.alloc(), host, port));
}
p.addLast(new HttpClientCodec(), new HttpObjectAggregator(8192),
WebSocketClientCompressionHandler.INSTANCE, handler);
}
});
ch = b.connect(uri.getHost(), port).sync().channel();
handler.handshakeFuture().sync();
BufferedReader console = new BufferedReader(new InputStreamReader(System.in));
while (true) {
String msg = console.readLine();
if (msg == null) {
break;
} else if ("bye".equals(msg.toLowerCase())) {
ch.writeAndFlush(new CloseWebSocketFrame());
ch.closeFuture().sync();
break;
} else if ("ping".equals(msg.toLowerCase())) {
WebSocketFrame frame = new PingWebSocketFrame(Unpooled.wrappedBuffer(new byte[] { 8, 1, 8, 1 }));
ch.writeAndFlush(frame);
} else {
WebSocketFrame frame = new TextWebSocketFrame(msg);
ch.writeAndFlush(frame);
}
}
} finally {
group.shutdownGracefully();
}
}
public void send(String message) {
if (ch == null) {
logger.error("channel unavailable");
return;
}
WebSocketFrame frame = new TextWebSocketFrame(message);
ch.writeAndFlush(frame);
}
}
FeedServiceImpl.java: (websocket connecter automatiquement lorsque l'application commence)
@Service
@PropertySource("classpath:api.properties")
public class FeedServiceImpl implements FeedService {
private final static Logger logger = LoggerFactory.getLogger("feedlogger");
@Autowired
private Environment env;
@Autowired
private WebsocketClientImpl websocketClientImpl;
@Override
@PostConstruct
public void connectWs() {
String token = env.getProperty("ws.token", String.class);
websocketClientImpl.connectWebSocket(token);
}
}
}
Aucun code n'apparaît dans votre question. – EJP
@EJP, désolé, seulement implicitement mentionné le code avant (exemple de client NetSocket websocket), j'ai mis à jour la question et ajouté le code connexe. – qianlei