2015-03-05 13 views
5

Ben cython.parallel ile iş parçacığı yerel ndarrays başlatmak için mücadele ediyorum:cython.parallel: iş parçacığı yerel ndarray arabelleği nasıl başlatılır?

Sözde kod:

cdef: 
    ndarray buffer 

with nogil, parallel(): 
    buffer = np.empty(...) 

    for i in prange(n): 
     with gil: 
      print "Thread %d: data address: 0x%x" % (threadid(), <uintptr_t>buffer.data) 

     some_func(buffer.data) # use thread-local buffer 

cdef void some_func(char * buffer_ptr) nogil: 
    (... works on buffer contents...) 

Benim sorun olduğunu tüm evrelerde aynı adrese buffer.data puan. Son olarak buffer atanan iş parçacığının adresi. buffer rağmen

parallel() (ya da alternatif olarak prange) blok içinde tahsis edilir, Cython bir private veya iplik yerel değişken buffer olmak ancak shared değişken olarak tutar. Sonuç olarak, buffer.databuffer.data, aynı bellek bölgesine algoritmamda hasara yol açıyor.

Bu, yalnızca ndarray nesnelerle ilgili bir sorun değildir, ancak görünen tüm cdef class nesnelerle tanımlanır.

Bu sorunu nasıl çözebilirim?

+1

Eğer gil olmadan 'np.empty' arayabilir miyim? –

+1

belki de [bu yanıt] (http://stackoverflow.com/a/20520295/832621) istediğini getiriyor ... –

+1

@BiRico Bu retorik bir soru mu :)? Hayır, 'nogil' bloğu içinde bir numpy dizisini (veya bir bellek görüntülemesini) tam olarak başlatamazsınız (aksi halde dizi Python'un yönetilen hafızasına tahsis edilmez ve çöp toplanamaz vs.) –

cevap

2

Sonunda sevdiğim bu soruna çözüm bulduğumu düşünüyorum.

(number_of_threads, ...<whatever shape you need in the thread>...) Sonra openmp.omp_get_thread_num arayıp bir "iş parçacığı yerel" alt dizisini almak için endekse dizi kullanan: kısa sürümü şekle sahip bir dizi oluşturmaktır. Bu, her döngü endeksi için ayrı bir diziye sahip olmayı önler (ki bu çok büyük olabilir) ancak aynı zamanda iş parçacıklarının birbirinin üzerine yazılmasını da engeller.

İşte ne yaptım kaba bir versiyonu:

import numpy as np 
import multiprocessing 

from cython.parallel cimport parallel 
from cython.parallel import prange 
cimport openmp 

cdef extern from "stdlib.h": 
    void free(void* ptr) 
    void* malloc(size_t size) 
    void* realloc(void* ptr, size_t size) 

... 

cdef int num_items = ... 
num_threads = multiprocessing.cpu_count() 
result_array = np.zeros((num_threads, num_items), dtype=DTYPE) # Make sure each thread uses separate memory 
cdef c_numpy.ndarray result_cn 
cdef CDTYPE ** result_pointer_arr 
result_pointer_arr = <CDTYPE **> malloc(num_threads * sizeof(CDTYPE *)) 
for i in range(num_threads): 
    result_cn = result_array[i] 
    result_pointer_arr[i] = <CDTYPE*> result_cn.data 

cdef int thread_number 
for i in prange(num_items, nogil=True, chunksize=1, num_threads=num_threads, schedule='static'): 
    thread_number = openmp.omp_get_thread_num() 
    some_function(result_pointer_arr[thread_number]) 
İlgili konular