2

Mon équipe a été bloquée avec l'exécution d'un algorithme de logique floue sur deux grands ensembles de données. Le premier (sous-ensemble) est d'environ 180K lignes contient des noms, des adresses et des courriels pour les personnes que nous devons correspondre dans le second (surensemble). Le surensemble contient 2,5M enregistrements. Tous les deux ont la même structure et les données ont été déjà nettoyé, adresses soit analysées, les noms normalisés, etc.Logique floue sur les grands ensembles de données utilisant Python

  • ContactID int,
  • FullName varchar (150),
  • Adresse varchar (100) ,
  • Email varchar (100)

Le but est de faire correspondre les valeurs dans une rangée de sous-ensemble pour les valeurs correspondantes dans superset, de sorte que la sortie combinerait le sous-ensemble et le surensemble et les pourcentages de similarité correspondants pour chaque champ (jeton).

  • ContactID,
  • LookupContactID,
  • FullName,
  • LookupFullName,
  • FullName_Similarity,
  • Adresse,
  • lookupAddress,
  • Address_Similarity,
  • Email
  • ,
  • LookupEmail,
  • Email_Similarity

Pour simplifier et tester le code d'abord, nous avons des chaînes concaténées et nous savons que le code fonctionne sur de très petite surensemble; Cependant, une fois que nous augmentons le nombre d'enregistrements, il reste bloqué. Nous avons essayé différents algorithmes, Levenshtein, FuzzyWuzzy, etc. en vain. Le problème, à mon avis, est que Python le fait rangée par rangée; Cependant, je ne suis pas sûr. Nous avons même essayé de l'exécuter sur notre cluster Hadoop en utilisant le streaming; Cependant, il n'a donné aucun résultat positif.

#!/usr/bin/env python 
import sys 
from fuzzywuzzy import fuzz 
import datetime 
import time 
import Levenshtein 

#init for comparison 
with open('normalized_set_record_set.csv') as normalized_records_ALL_file: 
# with open('delete_this/xab') as normalized_records_ALL_file: 
    normalized_records_ALL_dict = {} 
    for line in normalized_records_ALL_file: 
     key, value = line.strip('\n').split(':', 1) 
     normalized_records_ALL_dict[key] = value 
     # normalized_records_ALL_dict[contact_id] = concat_record 

def score_it_bag(target_contact_id, target_str, ALL_records_dict): 
    ''' 
    INPUT target_str, ALL_records_dict 
    OUTPUT sorted list by highest fuzzy match 
    ''' 
    return sorted([(value_str, contact_id_index_str, fuzz.ratio(target_str, value_str)) 
     for contact_id_index_str, value_str in ALL_records_dict.iteritems()], key=lambda x:x[2])[::-1] 

def score_it_closest_match_pandas(target_contact_id, target_str, place_holder_delete): 
    ''' 
    INPUT target_str, ALL_records_dict 
    OUTPUT closest match 
    ''' 
    # simply drop this index target_contact_id 
    df_score = df_ALL.concat_record.apply(lambda x: fuzz.ratio(target_str, x)) 

    return df_ALL.concat_record[df_score.idxmax()], df_score.max(), df_score.idxmax() 

def score_it_closest_match_L(target_contact_id, target_str, ALL_records_dict_input): 
    ''' 
    INPUT target_str, ALL_records_dict 
    OUTPUT closest match tuple (best matching str, score, contact_id of best match str) 
    ''' 
    best_score = 100 

    #score it 
    for comparison_contactid, comparison_record_str in ALL_records_dict_input.iteritems(): 
     if target_contact_id != comparison_contactid: 
      current_score = Levenshtein.distance(target_str, comparison_record_str) 


      if current_score < best_score: 
       best_score = current_score 
       best_match_id = comparison_contactid 
       best_match_str = comparison_record_str 

    return (best_match_str, best_score, best_match_id) 



def score_it_closest_match_fuzz(target_contact_id, target_str, ALL_records_dict_input): 
    ''' 
    INPUT target_str, ALL_records_dict 
    OUTPUT closest match tuple (best matching str, score, contact_id of best match str) 
    ''' 
    best_score = 0 

    #score it 
    for comparison_contactid, comparison_record_str in ALL_records_dict_input.iteritems(): 
     if target_contact_id != comparison_contactid: 
      current_score = fuzz.ratio(target_str, comparison_record_str) 

      if current_score > best_score: 
       best_score = current_score 
       best_match_id = comparison_contactid 
       best_match_str = comparison_record_str 

    return (best_match_str, best_score, best_match_id) 

def score_it_threshold_match(target_contact_id, target_str, ALL_records_dict_input): 
    ''' 
    INPUT target_str, ALL_records_dict 
    OUTPUT closest match tuple (best matching str, score, contact_id of best match str) 
    ''' 
    score_threshold = 95 

    #score it 
    for comparison_contactid, comparison_record_str in ALL_records_dict_input.iteritems(): 
     if target_contact_id != comparison_contactid: 
      current_score = fuzz.ratio(target_str, comparison_record_str) 

      if current_score > score_threshold: 
       return (comparison_record_str, current_score, comparison_contactid) 

    return (None, None, None) 


def score_it_closest_match_threshold_bag(target_contact_id, target_str, ALL_records_dict): 
    ''' 
    INPUT target_str, ALL_records_dict 
    OUTPUT closest match 
    ''' 
    threshold_score = 80 
    top_matches_list = [] 
    #score it 
    #iterate through dictionary 
    for comparison_contactid, comparison_record_str in ALL_records_dict.iteritems(): 
     if target_contact_id != comparison_contactid: 
      current_score = fuzz.ratio(target_str, comparison_record_str) 

      if current_score > threshold_score: 
       top_matches_list.append((comparison_record_str, current_score, comparison_contactid)) 


    if len(top_matches_list) > 0: return top_matches_list 

def score_it_closest_match_threshold_bag_print(target_contact_id, target_str, ALL_records_dict): 
    ''' 
    INPUT target_str, ALL_records_dict 
    OUTPUT closest match 
    ''' 
    threshold_score = 80 


    #iterate through dictionary 
    for comparison_contactid, comparison_record_str in ALL_records_dict.iteritems(): 
     if target_contact_id != comparison_contactid: 

      #score it 
      current_score = fuzz.ratio(target_str, comparison_record_str) 
      if current_score > threshold_score: 
       print target_contact_id + ':' + str((target_str,comparison_record_str, current_score, comparison_contactid)) 


    pass 


#stream in all contacts ie large set 
for line in sys.stdin: 
    # ERROR DIAG TOOL 
    ts = time.time() 
    st = datetime.datetime.fromtimestamp(ts).strftime('%Y-%m-%d %H:%M:%S') 
    print >> sys.stderr, line, st 

    contact_id, target_str = line.strip().split(':', 1) 

    score_it_closest_match_threshold_bag_print(contact_id, target_str, normalized_records_ALL_dict) 
    # output = (target_str, score_it_closest_match_fuzz(contact_id, target_str, normalized_records_ALL_dict)) 
    # output = (target_str, score_it_closest_match_threshold_bag(contact_id, target_str, normalized_records_ALL_dict)) 
    # print contact_id + ':' + str(output) 

Répondre

0

Votre approche nécessite que vous effectuiez 180 000 * 2 500 000 = 450 000 000 000 comparaisons.

450 milliards c'est beaucoup.

Pour réduire le nombre de comparaisons, vous pouvez d'abord regrouper les enregistrements ayant certaines caractéristiques en commun, tels que les cinq premiers caractères d'un champ d'adresse ou un jeton commun. Ensuite, comparez uniquement les enregistrements qui partagent une fonctionnalité. Cette idée est appelée «blocage» et réduira habituellement le nombre de comparaisons totales que vous devez faire à quelque chose de gérable.

Le problème général que vous essayez de résoudre s'appelle "record linkage". Puisque vous utilisez python, vous pouvez regarder le qui fournit une approche complète.