2016-07-15 3 views
0

J'essaie de créer un serveur TCP qui lit des données périodiquement à partir d'une base de données (Redis) et l'envoie au client approprié.Netty et Scheduled Executor Service

Cependant, étant donné que je suis assez nouveau pour Netty, je ne sais pas comment pourrais-je planifier cela. Je sais que je dois utiliser un service comme celui-ci Exécuteur planifiée:

ScheduledExecutorService e = Executors.newSingleThreadScheduledExecutor(); 
e.scheduleAtFixedRate(() -> { 
    System.out.println("Calling..."); 
    // Do something 
}, 1, 1, TimeUnit.SECONDS); 

Cependant, quand j'ai essayé de mettre cela dans le code du serveur, il est seulement appeler une fois la méthode. J'ai essayé de mettre cela dans un endroit différent mais je n'arrive toujours pas à le faire correctement. Que devrais-je faire?

Voici le code du serveur:

package com.example.test.app; 

import io.netty.bootstrap.ServerBootstrap; 
import io.netty.channel.ChannelFuture; 
import io.netty.channel.ChannelInitializer; 
import io.netty.channel.ChannelOption; 
import io.netty.channel.EventLoopGroup; 
import io.netty.channel.nio.NioEventLoopGroup; 
import io.netty.channel.socket.SocketChannel; 
import io.netty.channel.socket.nio.NioServerSocketChannel; 
import java.util.concurrent.Executors; 
import java.util.concurrent.ScheduledExecutorService; 
import java.util.concurrent.TimeUnit; 

public class Server { 

    public static void main(String[] args) throws Exception 
    { 
     EventLoopGroup bossGroup = new NioEventLoopGroup(); 
     EventLoopGroup workerGroup = new NioEventLoopGroup(); 

     final ServerHandler handler = new ServerHandler(); 

     try { 

      ServerBootstrap b = new ServerBootstrap(); 
      b.group(bossGroup, workerGroup); 
      b.channel(NioServerSocketChannel.class); 
      b.childHandler(new ChannelInitializer<SocketChannel>() { 
       @Override 
       protected void initChannel(SocketChannel ch) throws Exception 
       { 
        ch.pipeline().addLast(handler); 
       } 

      }); 
      b.option(ChannelOption.SO_BACKLOG, 128); 
      b.childOption(ChannelOption.SO_KEEPALIVE, true); 

      ScheduledExecutorService e = Executors.newSingleThreadScheduledExecutor(); 
      e.scheduleAtFixedRate(() -> { 
       System.out.println("Calling..."); 
       handler.saySomething(); 
      }, 1, 1, TimeUnit.SECONDS); 

      ChannelFuture f = b.bind(1337).sync(); 
      f.channel().closeFuture().sync(); 

     } finally { 
      workerGroup.shutdownGracefully(); 
      bossGroup.shutdownGracefully(); 
     } 
    } 

} 

Et voici le gestionnaire de serveur:

package com.example.test.app; 

import io.netty.channel.ChannelFuture; 
import io.netty.channel.ChannelFutureListener; 
import io.netty.channel.ChannelHandlerContext; 
import io.netty.channel.ChannelInboundHandlerAdapter; 

public class ServerHandler extends ChannelInboundHandlerAdapter { 

    private ChannelHandlerContext ctx; 

    @Override 
    public void channelActive(ChannelHandlerContext ctx) 
    { 
     this.ctx = ctx; 
     System.out.println("Someone's connedted!"); 
    } 

    public void saySomething() 
    { 
     final ChannelFuture f = ctx.writeAndFlush("Sup!"); 
     f.addListener((ChannelFutureListener) (ChannelFuture future) -> { 
      System.out.println("Something has been said!"); 
     }); 
    } 

} 

Répondre

1

La méthode saySomething() génère NullPointerException pour appeler final ChannelFuture f = ctx.writeAndFlush("Sup!"); en ctx est nulle. EventExecutorGroup.scheduleAtFixedRate javadoc La description indique que "Si une exécution de la tâche rencontre une exception, les exécutions suivantes sont supprimées". Donc, c'est pourquoi vous obtenez est appelé une seule fois ...

Aussi, semble que Netty vous permet de réutiliser une instance de gestionnaire pour différentes instances de pipeline uniquement si vous annotez la classe de ce gestionnaire comme @Sharable. Sinon, il va lancer une exception. Si votre gestionnaire est sans état (ce qui n'est pas votre cas, comme le vôtre a le membre ctx), vous devez l'annoter comme @Sharable et le réutiliser dans tous les pipelines créés. Si c'est le cas, créez une nouvelle instance pour chaque nouveau pipeline (nouvelle connexion client). Enfin, pour planifier votre tâche pour chaque client connecté, vous pouvez utiliser l'exécuteur qui peut être référencé par le ctx du canal du client connecté (par défaut, comme dans votre cas, le EventLoop du canal) sur votre implémentation channelActive(). Cet exécuteur exécute ScheduledExecutorService, donc vous avez également scheduleAtFixedRate. Jetez un oeil à ma version de votre code et voyez si cela vous convient.

Serveur:

package com.example.test.app; 

import io.netty.bootstrap.ServerBootstrap; 
import io.netty.channel.ChannelFuture; 
import io.netty.channel.ChannelInitializer; 
import io.netty.channel.ChannelOption; 
import io.netty.channel.EventLoopGroup; 
import io.netty.channel.nio.NioEventLoopGroup; 
import io.netty.channel.socket.SocketChannel; 
import io.netty.channel.socket.nio.NioServerSocketChannel; 
import java.util.concurrent.Executors; 
import java.util.concurrent.ScheduledExecutorService; 
import java.util.concurrent.TimeUnit; 

public class Server { 

    public static void main(String[] args) throws Exception 
    { 
     EventLoopGroup bossGroup = new NioEventLoopGroup(); 
     EventLoopGroup workerGroup = new NioEventLoopGroup(); 

     try { 

      ServerBootstrap b = new ServerBootstrap(); 
      b.group(bossGroup, workerGroup); 
      b.channel(NioServerSocketChannel.class); 
      b.childHandler(new ChannelInitializer<SocketChannel>() { 
       @Override 
       protected void initChannel(SocketChannel ch) throws Exception 
       { 
        ch.pipeline().addLast(new ServerHandler()); 
       } 

      }); 
      b.option(ChannelOption.SO_BACKLOG, 128); 
      b.childOption(ChannelOption.SO_KEEPALIVE, true); 

//   ScheduledExecutorService e = Executors.newSingleThreadScheduledExecutor(); 
//   e.scheduleAtFixedRate(() -> { 
//    System.out.println("Calling..."); 
//    handler.saySomething(); 
//   }, 1, 1, TimeUnit.SECONDS); 

      ChannelFuture f = b.bind(1337).sync(); 
      f.channel().closeFuture().sync(); 

     } finally { 
      workerGroup.shutdownGracefully(); 
      bossGroup.shutdownGracefully(); 
     } 
    } 

} 

ServerHandler:

package com.example.test.app; 

import io.netty.channel.ChannelFuture; 
import io.netty.channel.ChannelFutureListener; 
import io.netty.channel.ChannelHandlerContext; 
import io.netty.channel.ChannelInboundHandlerAdapter; 
import io.netty.util.concurrent.ScheduledFuture; 

import java.util.concurrent.TimeUnit; 

public class ServerHandler extends ChannelInboundHandlerAdapter { 

    private ScheduledFuture sf; 

    @Override 
    public void channelActive(ChannelHandlerContext ctx) 
    { 
     System.out.println("Someone's connedted! "+ctx.channel()); 
     sf = ctx.executor().scheduleAtFixedRate(() -> { 
      System.out.println("Calling..."); 
      saySomething(ctx); 
     }, 1, 1, TimeUnit.SECONDS); 
    } 

    @Override 
    public void channelInactive(ChannelHandlerContext ctx) { 
     System.out.println("Someone's disconnected! "+ctx.channel()); 
     sf.cancel(false); 
    } 

    private void saySomething(ChannelHandlerContext ctx) 
    { 
      final ChannelFuture f = ctx.writeAndFlush("Sup!"); 
      f.addListener((ChannelFutureListener) (ChannelFuture future) -> { 
       System.out.println("Something has been said!"); 
      }); 
    } 

} 
+0

Merci, je ne savais pas que le ChannelHandlerContext a son propre exécuteur testamentaire. – Furunomoe