Wie ist numpy so schnell?

Lesezeit: 21 Minuten

Benutzeravatar von jere0111
jere0111

Ich versuche zu verstehen, wie numpy kann so schnell sein, basierend auf meinem schockierenden Vergleich mit optimiertem C/C++-Code, der noch weit davon entfernt ist, die Geschwindigkeit von numpy zu reproduzieren.

Betrachten Sie das folgende Beispiel: Gegeben sei ein 2D-Array mit shape=(N, N) und dtype=float32, die eine Liste von N Vektoren mit N Dimensionen darstellt, berechne ich die paarweisen Differenzen zwischen jedem Paar von Vektoren. Verwenden numpy Rundfunk, das schreibt sich einfach so:

def pairwise_sub_numpy( X ):
    return X - X[:, None, :]

Verwenden timeit Ich kann die Leistung für messen N=512: Auf meinem Laptop dauert es 88 ms pro Anruf.

Nun, in C/C++ schreibt eine naive Implementierung wie folgt:

#define X(i, j)     _X[(i)*N + (j)]
#define res(i, j, k)  _res[((i)*N + (j))*N + (k)]

float* pairwise_sub_naive( const float* _X, int N ) 
{
    float* _res = (float*) aligned_alloc( 32, N*N*N*sizeof(float));

    for (int i = 0; i < N; i++) {
        for (int j = 0; j < N; j++) {
            for (int k = 0; k < N; k++)
                res(i,j,k) = X(i,k) - X(j,k);
          }
    }
    return _res;
}

Kompilieren mit gcc 7.3.0 mit -O3 flag, ich bekomme 195 ms pro anruf dafür pairwise_sub_naive(X)was angesichts der Einfachheit des Codes nicht so schlimm ist, aber etwa 2-mal langsamer als numpy.

Jetzt werde ich ernst und füge einige kleine Optimierungen hinzu, indem ich die Zeilenvektoren direkt indiziere:

float* pairwise_sub_better( const float* _X, int N ) 
{
    float* _res = (float*) aligned_alloc( 32, N*N*N*sizeof(float));
    
    for (int i = 0; i < N; i++) {
        const float* xi = & X(i,0);

        for (int j = 0; j < N; j++) {
            const float* xj = & X(j,0);
            
            float* r = &res(i,j,0);
            for (int k = 0; k < N; k++)
                 r[k] = xi[k] - xj[k];
        }
    }
    return _res;
}

Die Geschwindigkeit bleibt mit 195 ms gleich, was bedeutet, dass der Compiler so viel berechnen konnte. Lassen Sie uns nun SIMD-Vektoranweisungen verwenden:

float* pairwise_sub_simd( const float* _X, int N ) 
{
    float* _res = (float*) aligned_alloc( 32, N*N*N*sizeof(float));

    // create caches for row vectors which are memory-aligned
    float* xi = (float*)aligned_alloc(32, N * sizeof(float));
    float* xj = (float*)aligned_alloc(32, N * sizeof(float));
    
    for (int i = 0; i < N; i++) {
        memcpy(xi, & X(i,0), N*sizeof(float));
        
        for (int j = 0; j < N; j++) {
            memcpy(xj, & X(j,0), N*sizeof(float));
            
            float* r = &res(i,j,0);
            for (int k = 0; k < N; k += 256/sizeof(float)) {
                const __m256 A = _mm256_load_ps(xi+k);
                const __m256 B = _mm256_load_ps(xj+k);
                _mm256_store_ps(r+k, _mm256_sub_ps( A, B ));
            }
        }
    }
    free(xi); 
    free(xj);
    return _res;
}

Dadurch ergibt sich nur ein kleiner Boost (178 ms statt 194 ms pro Funktionsaufruf).

Dann habe ich mich gefragt, ob ein “blockweiser” Ansatz, wie er zur Optimierung von Punktprodukten verwendet wird, von Vorteil sein könnte:

float* pairwise_sub_blocks( const float* _X, int N ) 
{
    float* _res = (float*) aligned_alloc( 32, N*N*N*sizeof(float));

    #define B 8
    float cache1[B*B], cache2[B*B];

    for (int bi = 0; bi < N; bi+=B)
      for (int bj = 0; bj < N; bj+=B)
        for (int bk = 0; bk < N; bk+=B) {
        
            // load first 8x8 block in the cache
            for (int i = 0; i < B; i++)
              for (int k = 0; k < B; k++)
                cache1[B*i + k] = X(bi+i, bk+k);

            // load second 8x8 block in the cache
            for (int j = 0; j < B; j++)
              for (int k = 0; k < B; k++)
                cache2[B*j + k] = X(bj+j, bk+k);

            // compute local operations on the caches
            for (int i = 0; i < B; i++)
             for (int j = 0; j < B; j++)
              for (int k = 0; k < B; k++)
                 res(bi+i,bj+j,bk+k) = cache1[B*i + k] - cache2[B*j + k];
         }
    return _res;
}

Und überraschenderweise ist dies die bisher langsamste Methode (258 ms pro Funktionsaufruf).

Zusammenfassend kann ich trotz einiger Bemühungen mit optimiertem C++-Code nicht annähernd die 88 ms/Aufruf erreichen, die numpy mühelos erreicht. Irgendeine Idee warum?

Hinweis: Ich deaktiviere übrigens numpy Multithreading und überhaupt ist diese Art von Operation nicht Multithreading.

Bearbeiten: Exakter Code zum Benchmarken des numpy-Codes:

import numpy as np

def pairwise_sub_numpy( X ):
    return X - X[:, None, :]

N = 512
X = np.random.rand(N,N).astype(np.float32)

import timeit
times = timeit.repeat('pairwise_sub_numpy( X )', globals=globals(), number=1, repeat=5)
print(f">> best of 5 = {1000*min(times):.3f} ms")

Vollständiger Benchmark für C-Code:

#include <stdio.h>
#include <string.h>
#include <xmmintrin.h>   // compile with -mavx -msse4.1
#include <pmmintrin.h>
#include <immintrin.h>
#include <time.h>


#define X(i, j)     _x[(i)*N + (j)]
#define res(i, j, k)  _res[((i)*N + (j))*N + (k)]

float* pairwise_sub_naive( const float* _x, int N ) 
{
    float* _res = (float*) aligned_alloc( 32, N*N*N*sizeof(float));

    for (int i = 0; i < N; i++) {
        for (int j = 0; j < N; j++) {
            for (int k = 0; k < N; k++)
                res(i,j,k) = X(i,k) - X(j,k);
          }
    }
    return _res;
}

float* pairwise_sub_better( const float* _x, int N ) 
{
    float* _res = (float*) aligned_alloc( 32, N*N*N*sizeof(float));
    
    for (int i = 0; i < N; i++) {
        const float* xi = & X(i,0);

        for (int j = 0; j < N; j++) {
            const float* xj = & X(j,0);
            
            float* r = &res(i,j,0);
            for (int k = 0; k < N; k++)
                 r[k] = xi[k] - xj[k];
        }
    }
    return _res;
}

float* pairwise_sub_simd( const float* _x, int N ) 
{
    float* _res = (float*) aligned_alloc( 32, N*N*N*sizeof(float));

    // create caches for row vectors which are memory-aligned
    float* xi = (float*)aligned_alloc(32, N * sizeof(float));
    float* xj = (float*)aligned_alloc(32, N * sizeof(float));
    
    for (int i = 0; i < N; i++) {
        memcpy(xi, & X(i,0), N*sizeof(float));
        
        for (int j = 0; j < N; j++) {
            memcpy(xj, & X(j,0), N*sizeof(float));
            
            float* r = &res(i,j,0);
            for (int k = 0; k < N; k += 256/sizeof(float)) {
                const __m256 A = _mm256_load_ps(xi+k);
                const __m256 B = _mm256_load_ps(xj+k);
                _mm256_store_ps(r+k, _mm256_sub_ps( A, B ));
            }
        }
    }
    free(xi); 
    free(xj);
    return _res;
}


float* pairwise_sub_blocks( const float* _x, int N ) 
{
    float* _res = (float*) aligned_alloc( 32, N*N*N*sizeof(float));

    #define B 8
    float cache1[B*B], cache2[B*B];

    for (int bi = 0; bi < N; bi+=B)
      for (int bj = 0; bj < N; bj+=B)
        for (int bk = 0; bk < N; bk+=B) {
        
            // load first 8x8 block in the cache
            for (int i = 0; i < B; i++)
              for (int k = 0; k < B; k++)
                cache1[B*i + k] = X(bi+i, bk+k);

            // load second 8x8 block in the cache
            for (int j = 0; j < B; j++)
              for (int k = 0; k < B; k++)
                cache2[B*j + k] = X(bj+j, bk+k);

            // compute local operations on the caches
            for (int i = 0; i < B; i++)
             for (int j = 0; j < B; j++)
              for (int k = 0; k < B; k++)
                 res(bi+i,bj+j,bk+k) = cache1[B*i + k] - cache2[B*j + k];
         }
    return _res;
}

int main() 
{
    const int N = 512;
    float* _x = (float*) malloc( N * N * sizeof(float) );
    for( int i = 0; i < N; i++)
      for( int j = 0; j < N; j++)
        X(i,j) = ((i+j*j+17*i+101) % N) / float(N);

    double best = 9e9;
    for( int i = 0; i < 5; i++)
    {
        struct timespec start, stop;
        clock_gettime(CLOCK_THREAD_CPUTIME_ID, &start);
        
        //float* res = pairwise_sub_naive( _x, N );
        //float* res = pairwise_sub_better( _x, N );
        //float* res = pairwise_sub_simd( _x, N );
        float* res = pairwise_sub_blocks( _x, N );

        clock_gettime(CLOCK_THREAD_CPUTIME_ID, &stop);

        double t = (stop.tv_sec - start.tv_sec) * 1e6 + (stop.tv_nsec - start.tv_nsec) / 1e3;    // in microseconds
        if (t < best) best = t;
        free( res );
    }
    printf("Best of 5 = %f ms\n", best / 1000);

    free( _x );
    return 0;
}

Kompiliert mit gcc 7.3.0 gcc -Wall -O3 -mavx -msse4.1 -o test_simd test_simd.c

Zusammenfassung der Timings auf meiner Maschine:

Implementierung Zeit
taub 88 ms
C++ naiv 194 ms
C++ besser 195 ms
C++ SIMD 178 ms
C++ blockiert 258 ms
C++ blockiert (gcc 8.3.1) 217 ms

  • Ihr C++-Code weist viel Speicher zu. pairwise_sub_numpy tut nichts davon. Messen Sie Zuordnungen und tatsächliche Berechnungen separat

    – 463035818_ist_keine_Nummer

    25. Januar 2021 um 16:26 Uhr


  • Leistungsfragen erfordern viele Details. Wie hast du gemessen? Wie lautet der vollständige Numpy-Code (wie nennen Sie ihn? Was genau messen Sie?) Wie kompilieren Sie den C++-Code? Welcher Compiler? Welche Flaggen?

    – 463035818_ist_keine_Nummer

    25. Januar 2021 um 16:32 Uhr

  • @sweenish -O3 fügt weitere Optimierungen hinzu, was oft zu a führt größer binär (obwohl Sie Recht haben, ist es nicht unbedingt schneller und manchmal langsamer!); die (selten verwendet) -Os ist für kleinere Binärdateien gcc.gnu.org/onlinedocs/gcc/Optimize-Options.html stackoverflow.com/questions/19689014/…

    – ti7

    25. Januar 2021 um 16:48 Uhr


  • Bist du sicher, dass du bei deinen Messungen nichts vermasselt hast? Ihr Numpy-Code benötigt hier 83 ms und Ihr C ++ – Code 92 ms für den naiven und 60 ms für die Blöcke …

    – Florestan

    25. Januar 2021 um 17:42 Uhr

  • aligned_alloc( 32, N*N*N*sizeof(float)); ist riskant: N*N*N könnte den Bereich von überschreiten int. Du solltest schreiben sizeof(float)*N*N*N wodurch die vollständige Berechnung gezwungen wird, den potenziell größeren Typ zu verwenden size_t.

    – chqrlie

    25. Januar 2021 um 20:58 Uhr


Benutzeravatar von celavek
celavek

Wie in einigen Kommentaren erwähnt, verwendet numpy SIMD in seiner Implementierung und weist zum Zeitpunkt der Berechnung keinen Speicher zu. Wenn ich die Speicherzuweisung aus Ihrer Implementierung eliminiere und alle Puffer vor der Berechnung vorab zuweise, bekomme ich eine bessere Zeit im Vergleich zu numpy, selbst mit der Scaler-Version (das ist die Version ohne Optimierungen).

Auch in Bezug auf SIMD und warum Ihre Implementierung nicht viel besser abschneidet als der Scaler, liegt daran, dass Ihre Speicherzugriffsmuster nicht ideal für die SIMD-Nutzung sind – Sie machen Memcopy und laden in SIMD-Register von weit voneinander entfernten Orten – z Sie füllen Vektoren aus Zeile 0 und Zeile 511, die möglicherweise nicht gut mit dem Cache oder dem SIMD-Prefetcher funktionieren.

Es gibt auch einen Fehler beim Laden der SIMD-Register (wenn ich richtig verstanden habe, was Sie zu berechnen versuchen): Ein 256-Bit-SIMD-Register kann 8 Gleitkommazahlen mit einfacher Genauigkeit laden 8 * 32 = 256aber in deiner Schleife springst du k vorbei “256/sizeof(float)” welches ist 256/4 = 64; _x und _res sind Float-Zeiger und die SIMD-Intrinsik erwartet auch Float-Zeiger als Argumente. Anstatt alle Elemente aus diesen Zeilen alle 8 Floats zu lesen, lesen Sie sie alle 64 Floats.

Die Berechnung kann weiter optimiert werden, indem Sie die Zugriffsmuster ändern, aber auch darauf achten, dass Sie einige Berechnungen wiederholen: zB beim Iterieren mit Zeile0 als Basis berechnen Sie Zeile0 – Zeile1 aber zu einem späteren Zeitpunkt, wenn mit iteriert wird Linie 1 Als Basis müssen Sie berechnen Zeile1 – Zeile0 das ist im Grunde -(Zeile0 – Zeile1)das ist für jede Zeile danach Zeile0 Viele Ergebnisse aus früheren Berechnungen konnten wiederverwendet werden. Häufig erfordert die SIMD-Nutzung oder -Parallelisierung eine Änderung des Datenzugriffs oder der Argumentation, um sinnvolle Verbesserungen zu erzielen.

Hier ist, was ich als ersten Schritt basierend auf Ihrer anfänglichen Implementierung getan habe, und es ist schneller als das numpy (stören Sie das OpenMP-Zeug nicht, da es nicht so ist, wie es gemacht werden soll, ich wollte nur sehen, wie es sich beim Versuch verhält naive Art).

C++
Time scaler version: 55 ms
Time SIMD version: 53 ms
**Time SIMD 2 version: 33 ms**
Time SIMD 3 version: 168 ms
Time OpenMP version: 59 ms

Python numpy
>> best of 5 = 88.794 ms


#include <cstdlib>
#include <xmmintrin.h>   // compile with -mavx -msse4.1
#include <pmmintrin.h>
#include <immintrin.h>

#include <numeric>
#include <algorithm>
#include <chrono>
#include <iostream>
#include <cstring>

using namespace std;

float* pairwise_sub_naive (const float* input, float* output, int n) 
{
    for (int i = 0; i < n; i++) {
        for (int j = 0; j < n; j++) {
            for (int k = 0; k < n; k++)
                output[(i * n + j) * n + k] = input[i * n + k] - input[j * n + k];
          }
    }
    return output;
}

float* pairwise_sub_simd (const float* input, float* output, int n) 
{    
    for (int i = 0; i < n; i++) 
    {
        const int idxi = i * n;
        for (int j = 0; j < n; j++)
        {
            const int idxj = j * n;
            const int outidx = idxi + j;
            for (int k = 0; k < n; k += 8) 
            {
                __m256 A = _mm256_load_ps(input + idxi + k);
                __m256 B = _mm256_load_ps(input + idxj + k);
                _mm256_store_ps(output + outidx * n + k, _mm256_sub_ps( A, B ));
            }
        }
    }
    
    return output;
}

float* pairwise_sub_simd_2 (const float* input, float* output, int n) 
{
    float* line_buffer = (float*) aligned_alloc(32, n * sizeof(float));

    for (int i = 0; i < n; i++) 
    {
        const int idxi = i * n;
        for (int j = 0; j < n; j++)
        {
            const int idxj = j * n;
            const int outidx = idxi + j;
            for (int k = 0; k < n; k += 8) 
            {
                __m256 A = _mm256_load_ps(input + idxi + k);
                __m256 B = _mm256_load_ps(input + idxj + k);
                _mm256_store_ps(line_buffer + k, _mm256_sub_ps( A, B ));
            }
            memcpy(output + outidx * n, line_buffer, n);
        }
    }
    
    return output;
}

float* pairwise_sub_simd_3 (const float* input, float* output, int n) 
{    
    for (int i = 0; i < n; i++) 
    {
        const int idxi = i * n;
        for (int k = 0; k < n; k += 8) 
        {
            __m256 A = _mm256_load_ps(input + idxi + k);
            for (int j = 0; j < n; j++)
            {
                const int idxj = j * n;
                const int outidx = (idxi + j) * n;
                __m256 B = _mm256_load_ps(input + idxj + k);
                _mm256_store_ps(output + outidx + k, _mm256_sub_ps( A, B     ));
             }
        }
    }

    return output;
}

float* pairwise_sub_openmp (const float* input, float* output, int n)
{
    int i, j;
    #pragma omp parallel for private(j)
    for (i = 0; i < n; i++) 
    {
        for (j = 0; j < n; j++)
        {
            const int idxi = i * n; 
            const int idxj = j * n;
            const int outidx = idxi + j;
            for (int k = 0; k < n; k += 8) 
            {
                __m256 A = _mm256_load_ps(input + idxi + k);
                __m256 B = _mm256_load_ps(input + idxj + k);
                _mm256_store_ps(output + outidx * n + k, _mm256_sub_ps( A, B ));
            }
        }
    }
    /*for (i = 0; i < n; i++) 
    {
        for (j = 0; j < n; j++) 
        {
            for (int k = 0; k < n; k++)
            {
                output[(i * n + j) * n + k] = input[i * n + k] - input[j * n + k];
            }
        }
    }*/
    
    return output;
}

int main ()
{
    constexpr size_t n = 512;
    constexpr size_t input_size = n * n;
    constexpr size_t output_size = n * n * n;

    float* input = (float*) aligned_alloc(32, input_size * sizeof(float));
    float* output = (float*) aligned_alloc(32, output_size * sizeof(float));

    float* input_simd = (float*) aligned_alloc(32, input_size * sizeof(float));
    float* output_simd = (float*) aligned_alloc(32, output_size * sizeof(float));

    float* input_par = (float*) aligned_alloc(32, input_size * sizeof(float));
    float* output_par = (float*) aligned_alloc(32, output_size * sizeof(float));

    iota(input, input + input_size, float(0.0));
    fill(output, output + output_size, float(0.0));

    iota(input_simd, input_simd + input_size, float(0.0));
    fill(output_simd, output_simd + output_size, float(0.0));
    
    iota(input_par, input_par + input_size, float(0.0));
    fill(output_par, output_par + output_size, float(0.0));

    std::chrono::milliseconds best_scaler{100000};
    for (int i = 0; i < 5; ++i)
    {
        auto start = chrono::high_resolution_clock::now();
        pairwise_sub_naive(input, output, n);
        auto stop = chrono::high_resolution_clock::now();

        auto duration = chrono::duration_cast<chrono::milliseconds>(stop - start);
        if (duration < best_scaler)
        {
            best_scaler = duration;
        }
    }
    cout << "Time scaler version: " << best_scaler.count() << " ms\n";

    std::chrono::milliseconds best_simd{100000};
for (int i = 0; i < 5; ++i)
{
    auto start = chrono::high_resolution_clock::now();
    pairwise_sub_simd(input_simd, output_simd, n);
    auto stop = chrono::high_resolution_clock::now();

    auto duration = chrono::duration_cast<chrono::milliseconds>(stop - start);
     if (duration < best_simd)
    {
        best_simd = duration;
    }
}
cout << "Time SIMD version: " << best_simd.count() << " ms\n";

std::chrono::milliseconds best_simd_2{100000};
for (int i = 0; i < 5; ++i)
{
    auto start = chrono::high_resolution_clock::now();
    pairwise_sub_simd_2(input_simd, output_simd, n);
    auto stop = chrono::high_resolution_clock::now();

    auto duration = chrono::duration_cast<chrono::milliseconds>(stop - start);
     if (duration < best_simd_2)
    {
        best_simd_2 = duration;
    }
}
cout << "Time SIMD 2 version: " << best_simd_2.count() << " ms\n";

std::chrono::milliseconds best_simd_3{100000};
for (int i = 0; i < 5; ++i)
{
    auto start = chrono::high_resolution_clock::now();
    pairwise_sub_simd_3(input_simd, output_simd, n);
    auto stop = chrono::high_resolution_clock::now();

    auto duration = chrono::duration_cast<chrono::milliseconds>(stop - start);
     if (duration < best_simd_3)
    {
        best_simd_3 = duration;
    }
}
cout << "Time SIMD 3 version: " << best_simd_3.count() << " ms\n";

    std::chrono::milliseconds best_par{100000};
    for (int i = 0; i < 5; ++i)
    {
        auto start = chrono::high_resolution_clock::now();
        pairwise_sub_openmp(input_par, output_par, n);
        auto stop = chrono::high_resolution_clock::now();

        auto duration = chrono::duration_cast<chrono::milliseconds>(stop - start);
         if (duration < best_par)
        {
            best_par = duration;
        }
    }
    cout << "Time OpenMP version: " << best_par.count() << " ms\n";

    cout << "Verification\n";
    if (equal(output, output + output_size, output_simd))
    {
        cout << "PASSED\n";
    }
    else
    {
        cout << "FAILED\n";
    }

    return 0;
}

Bearbeiten: Kleine Korrektur, da es einen falschen Aufruf im Zusammenhang mit der zweiten Version der SIMD-Implementierung gab.

Wie Sie jetzt sehen können, ist die zweite Implementierung die schnellste, da sie sich aus Sicht der Referenzlokalität des Caches am besten verhält. Die Beispiele 2 und 3 der SIMD-Implementierungen sollen Ihnen veranschaulichen, wie sich die Änderung der Speicherzugriffsmuster auf die Leistung Ihrer SIMD-Optimierungen auswirkt. Zusammenfassend (wobei ich weiß, dass ich mit meinen Ratschlägen weit davon entfernt bin, vollständig zu sein): Achten Sie auf Ihre Speicherzugriffsmuster und auf die Lade- und Speichervorgänge in/von der SIMD-Einheit. Die SIMD ist eine andere Hardwareeinheit im Prozessorkern, daher gibt es eine Strafe beim Hin- und Herschieben von Daten. Wenn Sie also ein Register aus dem Speicher laden, versuchen Sie, so viele Operationen wie möglich mit diesen Daten durchzuführen, und seien Sie nicht zu eifrig beim Speichern es zurück (in Ihrem Beispiel könnte das natürlich alles sein, was Sie mit den Daten tun müssen). Denken Sie auch daran, dass nur eine begrenzte Anzahl von SIMD-Registern verfügbar ist, und wenn Sie zu viele laden, werden sie “überlaufen”, dh sie werden hinter den Kulissen an temporären Orten im Hauptspeicher gespeichert und alle Ihre Gewinne werden zerstört. SIMD-Optimierung, das ist ein wahrer Balanceakt!

Es gibt einige Anstrengungen, einen plattformübergreifenden Intrinsic-Wrapper in den Standard aufzunehmen (ich habe in meiner glorreichen Vergangenheit selbst einen Closed-Source-Wrapper entwickelt), und selbst er ist noch lange nicht vollständig, aber es lohnt sich, einen Blick darauf zu werfen (lesen Sie die Begleitpapiere, wenn Sie ‘ wirklich daran interessiert zu erfahren, wie SIMD funktioniert).
https://github.com/VcDevel/std-simd

  • Vielen Dank für Ihre Antwort. In der Tat, wenn ich die Zuordnung aus meinem naiven Code entferne, erhalte ich die gleiche Leistung wie Sie (40 ms pro Funktionsaufruf auf meinem Laptop). Aber ich bin immer noch verwirrt: Wenn ich eine leere Funktion ausführe, die nur das Array allokiert und zurückgibt, ist es superschnell (0,003 ms pro Aufruf). Wie kann das sein?

    – jere0111

    27. Januar 2021 um 10:06 Uhr

  • Es liegt an den Optimierungen, die der Compiler vornehmen kann. Wenn Sie mehr Code um Ihre Zuweisungen haben, kann der Compiler möglicherweise nicht viel über den Kontext annehmen, als wenn Sie nur eine Zuweisung haben. Wenn du tiefer in den Kaninchenbau gehen willst, dann nimm deinen Code aus beiden Situationen, stecke ihn in den [Compiler Explorer|godbolt.org/] und schauen Sie sich den Assembler-Code sowohl mit als auch ohne Optimierungen an, um zu sehen, was der Compiler hinter den Kulissen tut – Sie könnten vom Ergebnis überrascht sein.

    – Celavek

    27. Januar 2021 um 10:11 Uhr

  • OK. Außerdem sagten Sie, dass numpy “zum Zeitpunkt der Berechnung keinen Speicher zuweist”. Was bedeutet das? Bedeutet das, dass numpy Speicher vorbelegt? Aber es kann die resultierende Größe vor der von mir durchgeführten Operation nicht im Voraus kennen, oder? Es sollte also zum Zeitpunkt der Ausführung der Operation immer noch den Speicher zuweisen, der in der endgültig gemessenen Zeit erscheinen sollte … (?)

    – jere0111

    27. Januar 2021 um 10:23 Uhr

  • “N = 512 X = np.random.rand(N,N).astype(np.float32)” Hier weist numpy (hinter den Kulissen) Speicher für Sie zu, der sich nicht am Berechnungspunkt befindet, dort befinden Sie sich Führen Sie Ihre paarweise Subtraktion tatsächlich in der Funktion “pairwise_sub_numpy” durch.

    – Celavek

    27. Januar 2021 um 14:46 Uhr


Dies ist eine Ergänzung zu der Antwort von @celakev . Ich glaube, ich habe endlich verstanden, was genau das Problem war. Das Problem war nicht über die Zuweisung des Speichers in der Hauptfunktion, die die Berechnung durchführt.

Was eigentlich Zeit brauchte, ist zu auf neuen (frischen) Speicher zugreifen. Ich glaube, dass die malloc call gibt Speicherseiten zurück, die virtuell sind, dh die nicht dem tatsächlichen physischen Speicher entsprechen – bis explizit darauf zugegriffen wird. Was tatsächlich Zeit braucht, ist der Prozess der spontanen Zuweisung von physischem Speicher (was meiner Meinung nach auf Betriebssystemebene ist), wenn im Funktionscode darauf zugegriffen wird.

Hier ist ein Beweis. Betrachten Sie die beiden folgenden trivialen Funktionen:

#include <stdio.h>
#include <stdlib.h>
#include <time.h>

float* just_alloc( size_t N ) 
{
    return (float*) aligned_alloc( 32, sizeof(float)*N );
}

void just_fill( float* _arr, size_t N ) 
{
    for (size_t i = 0; i < N; i++)
        _arr[i] = 1;
}

#define Time( code_to_benchmark, cleanup_code ) \
    do { \
        double best = 9e9; \
        for( int i = 0; i < 5; i++) { \
            struct timespec start, stop; \
            clock_gettime(CLOCK_THREAD_CPUTIME_ID, &start); \
            code_to_benchmark; \
            clock_gettime(CLOCK_THREAD_CPUTIME_ID, &stop); \
            double t = (stop.tv_sec - start.tv_sec) * 1e3 + (stop.tv_nsec - start.tv_nsec) / 1e6; \
            printf("Time[%d] = %f ms\n", i, t); \
            if (t < best) best = t; \
            cleanup_code; \
        } \
        printf("Best of 5 for '" #code_to_benchmark "' = %f ms\n\n", best); \
    } while(0)

int main() 
{
    const size_t N = 512;

    Time( float* arr = just_alloc(N*N*N), free(arr) );
    
    float* arr = just_alloc(N*N*N);
    Time( just_fill(arr, N*N*N), ; );
    free(arr);

    return 0;
}

Ich erhalte die folgenden Zeitangaben, die ich nun für jeden der Aufrufe aufführe:

Time[0] = 0.000931 ms
Time[1] = 0.000540 ms
Time[2] = 0.000523 ms
Time[3] = 0.000524 ms
Time[4] = 0.000521 ms
Best of 5 for 'float* arr = just_alloc(N*N*N)' = 0.000521 ms

Time[0] = 189.822237 ms
Time[1] = 45.041083 ms
Time[2] = 46.331428 ms
Time[3] = 44.729433 ms
Time[4] = 42.241279 ms
Best of 5 for 'just_fill(arr, N*N*N)' = 42.241279 ms

Wie Sie sehen können, ist die Zuweisung von Speicher blitzschnell, aber beim ersten Zugriff auf den Speicher ist es 5-mal langsamer als die anderen Male. Der Grund dafür, dass mein Code langsam war, war also, dass ich jedes Mal neuen Speicher neu zuordnete, der noch keine physische Adresse hatte. (Korrigieren Sie mich, wenn ich falsch liege, aber ich denke, das ist das Wesentliche!)

  • „Also, der Grund dafür, dass mein Code langsam war, war, dass ich jedes Mal neuen Speicher neu zuwies, der noch keine physische Adresse hatte. (Korrigieren Sie mich, wenn ich falsch liege, aber ich denke, das ist das Wesentliche!) Speicherzuordnung am Ende 🙂

    – Celavek

    27. Januar 2021 um 14:22 Uhr

  • Der gerade beschriebene Prozess ist Teil der Speicherzuweisung – malloc & friends rufen ein Betriebssystem-Primitives (betriebssystemspezifisch) auf, um den Speicher tatsächlich zuzuweisen, da es das Betriebssystem ist, das dies verwaltet. Das hat auch einen Preis, da Sie vom Benutzerbereich zum Kernelbereich “durchqueren”. Einige Leute verwenden benutzerdefinierte Zuweisungen, um einen Teil davon zu “vermeiden”.

    – Celavek

    27. Januar 2021 um 14:51 Uhr

Benutzeravatar von tboschi
tboschi

Ein bisschen spät zur Party, aber ich wollte noch eine hinzufügen pairwise Methode mit Eigen, das C++ eine High-Level-Algebra-Manipulationsfähigkeit verleihen und SIMD unter der Haube verwenden soll. Genau wie numpy.

Hier ist die Umsetzung

#include <iostream>
#include <vector>
#include <chrono>
#include <algorithm>
        
#include <Eigen/Dense>

auto pairwise_eigen(const Eigen::MatrixXf &input, std::vector<Eigen::MatrixXf> &output) {
        for (int k = 0; k < input.cols(); ++k)
                output[k] = input
                          // subtract matrix with repeated k-th column
                          - input.col(k) * Eigen::RowVectorXf::Ones(input.cols());

}

int main() {
        constexpr size_t n = 512;
        
        // allocate input and output 
        Eigen::MatrixXf input = Eigen::MatrixXf::Random(n, n);
        std::vector<Eigen::MatrixXf> output(n);

        std::chrono::milliseconds best_eigen{100000};
        for (int i = 0; i < 5; ++i) {
                auto start = std::chrono::high_resolution_clock::now();
                pairwise_eigen(input, output);
                auto end = std::chrono::high_resolution_clock::now();
         
                auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end-start);      
                if (duration < best_eigen)
                        best_eigen = duration;
        }

        std::cout << "Time Eigen version: " << best_eigen.count() << " ms\n";

        return 0;
}

Die vollständigen Benchmark-Tests, die von @celavek auf meinem System vorgeschlagen wurden, sind

Time scaler version: 57 ms
Time SIMD version: 58 ms
Time SIMD 2 version: 40 ms
Time SIMD 3 version: 58 ms
Time OpenMP version: 58 ms

Time Eigen version: 76 ms

Numpy >> best of 5 = 118.489 ms

Mit Eigen gibt es immer noch eine spürbare Verbesserung in Bezug auf Numpy, aber nicht so beeindruckend im Vergleich zu den “rohen” Implementierungen (es gibt sicherlich etwas Overhead). Eine zusätzliche Optimierung besteht darin, dem Ausgabevektor Kopien der Eingabe zuzuweisen und dann direkt von jedem Vektoreintrag zu subtrahieren, indem einfach die folgenden Zeilen ersetzt werden

// inside the pairwise method
        for (int k = 0; k < input.cols(); ++k)
                output[k] -= input.col(k) * Eigen::RowVectorXf::Ones(input.cols());


// at allocation time
        std::vector<Eigen::MatrixXf> output(n, input);

Dies drückt das Beste von 5 auf 60 ms.

1413830cookie-checkWie ist numpy so schnell?

This website is using cookies to improve the user-friendliness. You agree by using the website further.

Privacy policy