1

Bottom line:Certains événements générés par le serveur ne sont pas livrés au client dans la production sur websockets

Certains événements générés par le serveur ne sont pas livrés au client dans la production sur websockets. Cependant, les connexions websocket sont bien établies.

Etude de cas:

J'ouvrir Google Chrome et vous connecter à notre serveur. Ouvrez les devtools. Sous l'onglet WS, je vois que la connexion s'est bien établie, mais je ne reçois aucune image quand, disons, le serveur a besoin de mettre à jour quelque chose sur la page. J'attends un moment et parfois (parfois seulement) je reçois des événements avec beaucoup de retard. Cela fonctionne comme prévu localement cependant.

Question:

Quelqu'un at-il vu le comportement de websocket similaire et a des suggestions sur la façon d'éliminer les variables de cette enquête.

Infrastructure:

Serveur: Linux Tomcat

Deux serveurs qui gèrent: 1. trafic à partir de périphériques (Communicantes sur TCP/IP avec le serveur) 2. Le trafic des Utilisateurs

Les utilisateurs et les appareils sont de plusieurs à plusieurs relation. Si un utilisateur est connecté à un serveur auquel aucun périphérique n'est connecté. Ce serveur regarde sur l'autre serveur et gère l'échange d'informations.

Il y a un pare-feu devant les serveurs.

code:

https://github.com/kino6052/websockets-issue

WebSocketServerEndpoint.java

@ServerEndpoint("/actions") 
    public class WebSocketServerEndpoint { 
     static private final org.slf4j.Logger logger = LoggerFactory.getLogger(WebSocketServerEndpoint.class); 


     @OnOpen 
     public void open(Session session) { 
      WebSocketSessionHandler.addSession(session); 
     } 

     @OnClose 
     public void close(Session session) { 
      WebSocketSessionHandler.removeSession(session); 
     } 

     @OnError 
     public void onError(Throwable error) { 
      //Logger.getLogger(WebSocketServerEndpoint.class.getName()).log(Level.SEVERE, null, error); 
     } 

     @OnMessage 
     public void handleMessage(String message, Session session) { 
      try (JsonReader reader = Json.createReader(new StringReader(message))) { 
       JsonObject jsonMessage = reader.readObject(); 
       Long userId = null; 
       Long tenantId = null; 
       switch (WebSocketActions.valueOf(jsonMessage.getString("action"))){ 
        case SaveUserId: 
         userId = getUserId(jsonMessage); 
         tenantId = getTenantId(jsonMessage); 
         Long userIdKey = WebSocketSessionHandler.saveUserId(userId, session); 
         Long tenantUserKey = WebSocketSessionHandler.saveTenantUser(tenantId, userId); 
         WebSocketSessionHandler.updateUserSessionKeys(session, tenantUserKey, userIdKey); // Needed for Making Weak Maps Keep Their Keys if Session is Currently Active 
       } 
      } catch (Exception e) { 
       logger.error(e.toString()); 
      } 
     } 

     private Long getUserId(JsonObject jsonMessage) { 
      Long userId = null; 
      try { 
       userId = Long.parseLong(((Integer) jsonMessage.getInt("userId")).toString()); 
       return userId; 
      } catch (Exception e) { 
       logger.error(e.getMessage()); 
       return userId; 
      } 
     } 

     private Long getTenantId(JsonObject jsonMessage) { 
      Long tenantId = null; 
      try { 
       tenantId = Long.parseLong(((Integer) jsonMessage.getInt("tenantId")).toString()); 
       return tenantId; 
      } catch (Exception e) { 
       logger.error(e.getMessage()); 
       return tenantId; 
      } 
     } 
    } 

WebSocketService.java

@Singleton 
    public class WebSocketService { 
     private static final Logger logger = LoggerFactory.getLogger(WebSocketService.class); 

     public enum WebSocketEvents{ 
      OnConnection, 
      OnActivity, 
      OnAccesspointStatus, 
      OnClosedStatus, 
      OnConnectedStatus, 
      OnAlert, 
      OnSessionExpired 
     } 

     public enum WebSocketActions{ 
      SaveUserId 
     } 

     @WebPost("/lookupWebSocketSessions") 
     public WebResponse lookupWebSocketSessions(@JsonArrayParam("userIds") List<Integer> userIds, @WebParam("message") String message){ 
      try { 
       for (Integer userIdInt : userIds) { 
        Long userId = Long.parseLong(userIdInt.toString()); 
        if (WebSocketSessionHandler.sendToUser(userId, message) == 0) { 
        } else { 
         //logger.debug("Couldn't Send to User"); 
        } 
       } 
      } catch (ClassCastException e) { 
       //logger.error(e.getMessage()); 
       return webResponseBuilder.fail(e); 
      } catch (Exception e) { 
       //logger.error(e.getMessage()); 
       return webResponseBuilder.fail(e); 
      } 

      return webResponseBuilder.success(message); 
     } 

     @WebPost("/lookupWebSocketHistorySessions") 
     public WebResponse lookupWebSocketHistorySessions(@JsonArrayParam("userIds") List<Integer> userIds, @WebParam("message") String message){ 
      try { 
       for (Integer userIdInt : userIds) { 
        Long userId = Long.parseLong(userIdInt.toString()); 
        if (WebSocketHistorySessionHandler.sendToUser(userId, message) == 0) { 
        } else { 
         //logger.debug("Couldn't Send to User"); 
        } 
       } 
      } catch (ClassCastException e) { 
       //logger.error(e.getMessage()); 
       return webResponseBuilder.fail(e); 
      } catch (Exception e) { 
       //logger.error(e.getMessage()); 
       return webResponseBuilder.fail(e); 
      } 

      return webResponseBuilder.success(message); 
     } 

     // Kick Out a User if Their Session is no Longer Valid 
     public void sendLogout(User user) { 
      try { 
       Long userId = user.getId(); 
       List<Long> userIds = new ArrayList<>(); 
       userIds.add(userId); 
       JSONObject result = new JSONObject(); 
       result.put("userId", userId); 
       JSON message = WebSocketSessionHandler.createMessage(WebSocketEvents.OnSessionExpired, result); 
       lookOnOtherServers(userIds, message); 
      } catch (Exception e) { 
       logger.error("Couldn't Logout User"); 
      } 
     } 

     // Send History after Processing Data 
     // Returns "0" if success, "-1" otherwise 
     public int sendHistory(Activity activity) { 
      try { 
       TimezoneService.TimeZoneConfig timeZoneConfig = timezoneService.getTimezoneConfigsByAp(null, activity.getAccesspointId()); 
       JSONObject result = (JSONObject) JSONSerializer.toJSON(activity); 
       String timezoneId = timezoneService.convertTimezoneConfigToTimezoneId(timeZoneConfig); 
       result.put("timezoneString", timezoneId); 
       result.put(
         "profileId", 
         userDao.getUserProfileId(activity.getUserId()) 
       ); 
       JSON message = WebSocketHistorySessionHandler.createMessage(WebSocketEvents.OnActivity, result); 
       List<Long> userIds = getUsersSubscribedToActivity(activity.getTenantId()); 
       lookOnOtherServersHistory(userIds, message); 
       return 0; 
      } catch (Exception e) { 
       //logger.error("Couldn't Send History"); 
       return -1; 
      } 
     } 

     // SendAlertUpdate after Processing Data 
     public void sendAlertUpdate(Alert alert) { 
      try { 
       List<Long> userIds = getUsersUnderTenantByAccesspointId(alert.getAccesspointId()); 
       JSONObject result = JSONObject.fromObject(alert); 
       JSON message = WebSocketSessionHandler.createMessage(WebSocketEvents.OnAlert, result); 
       lookOnOtherServers(userIds, message); 
      } catch (Exception e) { 
       //logger.error("Couldn't Send Aleart"); 
      } 
     } 

     // Send Connected Status after Processing Data 
     public void sendConnectedStatus(Long accesspointId, Boolean isConnected) { 
      try { 
       List<Long> userIds = getUsersUnderTenantByAccesspointId(accesspointId); 
       JSONObject result = new JSONObject(); 
       result.put("accesspointId", accesspointId); 
       result.put("isConnected", isConnected); 
       JSON message = WebSocketSessionHandler.createMessage(WebSocketEvents.OnConnectedStatus, result); 
       lookOnOtherServers(userIds, message); 
      } catch (Exception e) { 
       //logger.error("Couldn't Send Connected Status"); 
      } 
     } 


     public int sendHistory(CredentialActivity activity) { 
      try { 
       TimezoneService.TimeZoneConfig timeZoneConfig = timezoneService.getTimezoneConfigsByAp(null, activity.getAccesspointId()); 
       JSONObject result = (JSONObject) JSONSerializer.toJSON(activity); 
       String timezoneId = timezoneService.convertTimezoneConfigToTimezoneId(timeZoneConfig); 
       result.put("timezoneString", timezoneId); 
       result.put(
         "profileId", 
         userDao.getUserProfileId(activity.getUserId()) 
       ); 
       JSON message = WebSocketHistorySessionHandler.createMessage(WebSocketEvents.OnActivity, result); 
       List<Long> userIds = getUsersUnderTenantByAccesspointId(activity.getAccesspointId()); 
       lookOnOtherServersHistory(userIds, message); 
       return 0; 
      } catch (Exception e) { 
       return -1; 
      } 
     } 

     public Boolean isUserSessionAvailable(Long id) { 
      return WebSocketSessionHandler.isUserSessionAvailable(id); 
     } 

     public void lookOnOtherServers(List<Long> userId, JSON data){ 
      List<String> urls = awsService.getServerURLs(); 
      for (String url : urls) { 
       postJSONDataToUrl(url, userId, data); 
      } 
     } 

     public void lookOnOtherServersHistory(List<Long> userId, JSON data){ 
      List<String> urls = awsService.getServerURLsHistory(); 
      for (String url : urls) { 
       postJSONDataToUrl(url, userId, data); 
      } 
     } 

     public int sendClosedStatus(AccesspointStatus accesspointStatus){ 
      try { 
       JSONObject accesspointStatusJSON = new JSONObject(); 
       accesspointStatusJSON.put("accesspointId", accesspointStatus.getAccesspointId()); 
       accesspointStatusJSON.put("openStatus", accesspointStatus.getOpenStatus()); 
       List<Long> userIds = getUsersUnderTenantByAccesspointId(accesspointStatus.getAccesspointId()); 
       lookOnOtherServers(userIds, accesspointStatusJSON); 
       return 0; 
      } catch (Exception e) { 
       return -1; 
      } 
     } 

     public List<Long> getUsersSubscribedToActivity(Long tenantId) { 
      List<Long> userList = WebSocketSessionHandler.getUsersForTenant(tenantId); 
      return userList; 
     } 

     private List<Long> getUsersUnderTenantByAccesspointId(Long accesspointId) { 
      List<Long> userList = new ArrayList<>(); 
      User user = userDao.getBackgroundUserByAccesspoint(accesspointId); 
      List<Record> recordList = tenantDao.getTenantsByUser(user, user.getId()); 
      for (Record record : recordList) { 
       Long tenantId = (Long) record.get("id"); 
       userList.addAll(getUsersSubscribedToActivity(tenantId)); 
      } 
      return userList; 
     } 

     public void postJSONDataToUrl(String url, List<Long> userId, JSON data) throws AppException { 
      List<NameValuePair> parameters; 
      HttpResponse httpResponse; 
      HttpClientService.SimpleHttpClient simpleHttpClient = httpClientService.createHttpClient(url); 
      try { 
       parameters = httpClientService.convertJSONObjectToNameValuePair(userId, data); 
      } catch (Exception e) { 
       throw new AppException("Couldn't Convert Input Parameters"); 
      } 
      try { 
       httpResponse = simpleHttpClient.sendHTTPPost(parameters); 
      } catch (Exception e) { 
       throw new AppException("Couldn't Get Data from the Server"); 
      } 
      if (httpResponse == null) { 
       throw new AppException("Couldn't Send to Another Server"); 
      } else { 
       //logger.error(httpResponse.getStatusLine().toString()); 
      } 
     } 
    } 

WebSocketSessionHandler.java

public class WebSocketSessionHandler { 

     // Apparently required to instantiate the dialogue, 
     // ideally it would be better to just create session map where sessions are mapped to userId, 
     // however, userId will be send only after the session is created. 
     // TODO: Investigate Instantiation of WebSocket Session Further 

     // WeakHashMap is Used for Automatic Memory Management (So That Removal of Keys That are no Longer Used Can be Automatically Performed) 
     // NOTE: However, it Requires Certain Precautions to Make Sure Their Keys Don't Expire Unexpectedly, Look for the Commented Code Below 
     private static final Map<Long, Set<Session>> sessionMap = new WeakHashMap<>(); 

     private static final Map<Long, Set<Long>> tenantUserMap = new WeakHashMap<>(); 

     public WebSocketSessionHandler() {} 

     public static List<Long> getUsersForTenant(Long tenantId) { 
      List<Long> userIds = new ArrayList<>(); 
      Set<Long> userIdsSet = tenantUserMap.get(tenantId); 
      if (userIdsSet != null) { 
       for (Long userId : userIdsSet){ 
        userIds.add(userId); 
       } 
      } 
      return userIds; 
     } 

     public static Boolean isUserSessionAvailable(Long id){ 
      Set<Session> userSessions = sessionMap.get(id); 
      if (userSessions == null || userSessions.size() == 0) { 
       return false; 
      } else { 
       return true; 
      } 
     } 

     // addSession() should add "session" to "sessions" set 
     // returns: "0" if success and "-1" otherwise 
     public static int addSession(Session session) { 
      int output; 
      try { 
       final long ONE_DAY = 86400000; 
       session.setMaxIdleTimeout(ONE_DAY); 
       sessions.put(session, new ArrayList<>()); 
       return sendToSession(session, createMessage(WebSocketEvents.OnConnection, "Successfully Connected")); 
      } catch (Exception e) { 
       logger.error("Couldn't Add Session"); 
       return -1; 
      } 
     } 

     // removeSession() should remove "session" from "sessions" set 
     // Scenarios: 
     // sessions is null? 
     // returns: "0" if success and "-1" otherwise 
     public static int removeSession(Session session) { 
      try { 
       closeSessionProperly(session); 
       if (sessions.remove(session) != null) { 
        return 0; 
       } else { 
        return -1; 
       } 
      } catch (Exception e) { 
       logger.error("Couldn't Remove Session"); 
       return -1; 
      } 
     } 

     private static void closeSessionProperly(Session session) { 
      try { 
       session.close(); 
      } catch (IOException ex) { 

      } 
     } 

     public static Long getKeyFromMap(Map map, Long key){ // Needed for Weak Maps 
      Set<Long> keySet = map.keySet(); 
      for (Long keyReference : keySet) { 
       if (keyReference == key) { 
        return keyReference; 
       } 
      } 
      return key; // If Not Found Return the Value Passed in 
     } 

     // saveUserId() should create an { userId -> session } entry in sessionMap 
     public static Long saveUserId(Long userId, Session session){ 
      // Test Scenarios: 
      // Can userId be null or wrong? 
      // Can session be null or wrong? 
      try { 
       userId = getKeyFromMap(sessionMap, userId); // Required for Weak Maps to Work Correctly 
       Set<Session> sessionsForUser = sessionMap.get(userId); 
       if (sessionsForUser == null) { 
        sessionsForUser = new HashSet<>(); 
       } 
       sessionsForUser.add(session); 
       sessionMap.put(userId, sessionsForUser); 
       return userId; 
      } catch (Exception e) { 
       logger.error("Couldn't Save User Id"); 
       return null; 
      } 
     } 

     // saveUserId() should create an { userId -> session } entry in sessionMap 
     public static Long saveTenantUser(Long tenantId, Long userId){ 
      // Test Scenarios: 
      // Can userId be null or wrong? 
      // Can session be null or wrong? 
      try { 
       tenantId = getKeyFromMap(tenantUserMap, tenantId); // Required for Weak Maps to Work Correctly 
       Set<Long> users = tenantUserMap.get(tenantId); 
       if (users == null) { 
        users = new HashSet<>(); 
       } 
       users.add(userId); 
       tenantUserMap.put(tenantId, users); 
       return tenantId; 
      } catch (Exception e) { 
       logger.error("Couldn't Save Tenant User"); 
       return null; 
      } 
     } 

     public static void updateUserSessionKeys(Session session, Long tenantId, Long userId) { 
      try { 
       List<Long> userSessionKeys = sessions.get(session); 
       userSessionKeys.add(0, tenantId); 
       userSessionKeys.add(1, userId); 
      } catch (Exception e) { 
       logger.error("Couldn't Update User Session Keys"); 
      } 
     } 

     // removeUserId() should remove an { userId -> session } entry in sessionMap 
     // returns: "0" if success and "-1" otherwise 
     public static int removeUserId(Long userId) { 
      try { 
       sessionMap.remove(userId); 
       return 0; 
      } catch (Exception e) { 
       return -1; 
      } 
     } 

     // sendAccesspointStatus() should compose JSON message and pass it to sendToUser() 
     // returns: "0" if success and "-1" otherwise 
     public static int sendClosedStatus(Long userId, JSONObject accesspointStatus) { 
      try { 
       JSONObject accesspointStatusEventMessage = (JSONObject) createMessage(WebSocketEvents.OnClosedStatus, accesspointStatus); 
       sendToUser(userId, accesspointStatusEventMessage); 
       return 0; 
      } catch (Exception e) { 
       return -1; 
      } 
     } 

     // sendToUser() sends message to session that is mapped to userId 
     // returns: "0" if success and "-1" otherwise 
     public static int sendToUser(Long userId, JSON message) { 

      if (sessionMap.containsKey(userId)) { 
       Set<Session> sessionsForUser = sessionMap.get(userId); 
       for (Session session : sessionsForUser) { 
        if (!session.isOpen()) { 
         sessions.remove(session); 
         continue; 
        } 
        sendToSession(session, message); 
       } 
       return 0; 
      } else { 
       return -1; 
      } 
     } 


     // sendToSession() sends string message to session 
     // returns: "0" if success and "-1" otherwise 
     private static int sendToSession(Session session, JSON message){ 
      try { 
       try { 
        Long tenantId = sessions.get(session).get(0); 
        ((JSONObject) message).put("tenantId", tenantId); 
       } catch (Exception e) { 
        logger.error("No tenantId Found"); 
       } 
       session.getBasicRemote().sendText(message.toString()); 
       return 0; 
      } catch (IOException e) { 
       try { 
        session.close(); 
       } catch (IOException ex) { 

       } 

       closeSessionProperly(session); 
       sessions.remove(session); 

       return -1; 
      } 
     } 

     // sendToSession() sends string message to session 
     // returns: "0" if success and "-1" otherwise 
     private static int sendToSession(Session session, String message){ 
      try { 
       JSONObject newMessage = JSONObject.fromObject(message); 
       try { 
        Long tenantId = sessions.get(session).get(0); 
        newMessage.put("tenantId", tenantId); 
       } catch (Exception e) { 
        logger.error("No tenantId Found"); 
       } 
       session.getBasicRemote().sendText(newMessage.toString()); 
       return 0; 
      } catch (IOException e) { 
       closeSessionProperly(session); 
       sessions.remove(session); 
       return -1; 
      } 
     } 
    } 
+1

Probablement pas le seul bug, mais votre 'classe WebSocketSessionHandler' n'est pas thread-safe. Il utilise en interne 'WeakHashMap' qui n'est pas synchronisé. L'accès simultané à ces cartes peut entraîner un comportement inattendu, qui peut ou non entraîner les effets que vous voyez. – defnull

+0

@defnull merci beaucoup pour votre réponse! Je vais vérifier si cela résout le problème et donnera une mise à jour. –

+0

@defnull, vous étiez correct en soulignant le weakhashmap comme un bug potentiel. S'il vous plaît poster votre réponse afin que je puisse le changer. –

Répondre

0

Probablement pas le seul bug, mais votre classe WebSocketSessionHandler est pas thread-safe. Il utilise en interne WeakHashMap qui n'est pas synchronisé. L'accès simultané à ces cartes peut entraîner un comportement inattendu, qui peut ou non entraîner les effets que vous voyez.

Il s'avère que c'était une supposition correcte. Règle générale: Comportement inattendu ~ Condition de course

0

Probablement pas le seul bogue, mais votre classe WebSocketSessionHandler n'est pas adaptée aux threads.Il utilise en interne WeakHashMap qui n'est pas synchronisé. L'accès simultané à ces cartes peut entraîner un comportement inattendu, qui peut ou non entraîner les effets que vous voyez.

(copié de mon commentaire. Transforme ce fut la solution)