2016-02-13 1 views
1

Le problème est que j'ai une matrice A énorme, et donné un tableau (assez grand) entier, par exemple, disons que ma matrice est: [0,0,0,0 , 0,0,0,0, 1,1,1,1,1,1,1,1, 2,2,2,2,2,2,2,2, 3,3,3, 3,3,3,3,3, 4,4,4,4,4,4,4,4, ...............]Comment accumuler efficacement des tableaux de données en C

et la nombre entier est [0, 2, 4]

Ensuite, la réponse désirée est [6,6,6,6,6,6,6,6] en accumulant [0,0,0,0,0,0 , 0,0], [2,2,2,2,2,2,2,2], [4,4,4,4,4,4,4,4]

Ceci est un problème simple, mais une implémentation C naïve semble être très lente. C'est particulièrement le cas lorsque vous accumulez beaucoup de lignes.

manuellement loop_unrolling ne semble pas aider. Je ne suis pas familier avec l'assemblage en ligne, des suggestions? Je me demande s'il existe aussi une bibliothèque connue pour de telles opérations.

Ci-dessous mon implémentation actuelle:

void accumulateRows(int* js, int num_j, Dtype* B, int nrow, int ncol, int incRowB, Dtype* buffer){ 

int i = 0; 
int num_accumulated_rows = (num_j/8) * 8; 
int remaining_rows = num_j - num_accumulated_rows; 

// unrolling factor of 8, each time, accumulate 8 rows 
for(; i < num_accumulated_rows; i+=8){ 
    int r1 = js[i]; 
    int r2 = js[i+1]; 
    int r3 = js[i+2]; 
    int r4 = js[i+3]; 
    int r5 = js[i+4]; 
    int r6 = js[i+5]; 
    int r7 = js[i+6]; 
    int r8 = js[i+7]; 
    register Dtype* B1_row = &B[r1*incRowB]; 
    register Dtype* B2_row = &B[r2*incRowB]; 
    register Dtype* B3_row = &B[r3*incRowB]; 
    register Dtype* B4_row = &B[r4*incRowB]; 
    register Dtype* B5_row = &B[r5*incRowB]; 
    register Dtype* B6_row = &B[r6*incRowB]; 
    register Dtype* B7_row = &B[r7*incRowB]; 
    register Dtype* B8_row = &B[r8*incRowB]; 
    for(int j = 0; j < ncol; j+=1){ 
     register Dtype temp = B1_row[j] + B2_row[j] + B3_row[j] + B4_row[j]; 
     temp += B5_row[j] + B6_row[j] + B7_row[j] + B8_row[j]; 
     buffer[j] += temp; 
    } 
} 

// left_over from the loop unrolling 
for(; i < remaining_rows; i++){ 
    int r = js[i]; 
    Dtype* B_row = &B[r*incRowB]; 
    for(int i = 0; i < n; i++){ 
     buffer[i] += B_row[i]; 
    } 
} 

}

EDIT Je pense que cette accumulation est très courante dans la base de données, par exemple lorsque l'on veut faire une requête sur les ventes totales Je sais que gcc prend en charge Intel SSE, et je cherche à apprendre comment l'appliquer à ce problème, car il est très

+1

Êtes-vous allez partager votre « ralentir la mise en œuvre naïve C »? – mauro

+1

On ne sait pas très bien ce que vous voulez dire quand vous dites que vous voulez que les * rangées de "A" soient dans un seul tampon *. Pouvez-vous donner un exemple? – lurker

+0

Cette quantité de redondance est-elle commune ou attendue dans votre matrice? Si c'est le cas, additionnez simplement le tableau d'entiers à '6' et * puis * diffusez le résultat à' 6,6,6,6, ... '. Si ce n'est pas le cas, votre code C semble s'auto-vectoriser raisonnablement bien. Tant que vous chargez des données contiguës, tout devrait bien se passer. Une collecte est lente, mais les index consécutifs d'un tableau peuvent être chargés efficacement. –

Répondre

0

ici est une façon de mettre en œuvre la fonction, ainsi que quelques suggestions sur speedups autres

#include <stdlib.h> // size_t 

typedef int Dtype; 

// Note: 
// following function assumes a 'contract' with the caller 
// that no entry in 'whichRows[]' 
// is larger than (number of rows in 'baseArray[][]' -1) 

void accumulateRows(
    // describe source 2d array 
    /* size_t numRows */ size_t numCols, Dtype BaseArray[][ numCols ], 

    // describe row selector array 
    size_t numSelectRows, size_t whichRows[ numSelectRows ], 

    // describe result array 
    Dtype resultArray[ numCols ]) 
{ 
    size_t colIndex; 
    size_t selectorIndex; 

    // initialize resultArray to all 0 
    for(colIndex = 0; colIndex < numCols; colIndex++) 
    { 
     resultArray[colIndex] = 0; 
    } 

    // accumulate totals for each column of selected rows 
    for(selectorIndex = 0; selectorIndex < numSelectRows; selectorIndex++) 
    { 
     for(colIndex = 0; colIndex < numCols; colIndex++) 
     { 
      resultArray[colIndex] += BaseArray[ whichRows[selectorIndex] ][colIndex]; 
     } // end for each column 
    } // end for each selected row 
} 

#if 0 
// you might want to unroll the "initialize resultArray" loop 
// by replacing the loop with 
    resultArray[0] = 0; 
    resultArray[1] = 0; 
    resultArray[2] = 0; 
    resultArray[3] = 0; 
    resultArray[4] = 0; 
    resultArray[5] = 0; 
    resultArray[6] = 0; 
    resultArray[7] = 0; 
// however, that puts a constraint on the number of columns always being 8 
#endif 

#if 0 
// you might want to unroll the 'sum of columns' loop by replacing the loop with 
    resultArray[0] += BaseArray[ whichRows[selectorIndex] ][0]; 
    resultArray[1] += BaseArray[ whichRows[selectorIndex] ][1]; 
    resultArray[2] += BaseArray[ whichRows[selectorIndex] ][2]; 
    resultArray[3] += BaseArray[ whichRows[selectorIndex] ][3]; 
    resultArray[4] += BaseArray[ whichRows[selectorIndex] ][4]; 
    resultArray[5] += BaseArray[ whichRows[selectorIndex] ][5]; 
    resultArray[6] += BaseArray[ whichRows[selectorIndex] ][6]; 
    resultArray[7] += BaseArray[ whichRows[selectorIndex] ][7]; 
// however, that puts a constraint on the number of columns always being 8 
#endif 

#if 0 
// on Texas Instrument DSPs , 
// could use a #pragma to unroll the loop 
// or (better) 
// make use of the built-in loop table 
// to massively speed up the execution of the loop(s) 
#endif