2015-08-19 27 views
6

Bir RDD'deki öğelerin sırasını rastgele sıralamaya çalışıyorum. Şu anki yaklaşımım, öğelerin karıştırılmış tam sayıların bir RDD'si ile sıkıştırılması ve daha sonra bu tam sayılarla birleştirilmesidir. Bununla birlikte, pyspark sadece 100000000 tam sayı ile düşmektedir. Aşağıdaki kodu kullanıyorum.Pyspark: shuffle RDD

Soruma şudur: rastgele dizinle sıkıştırmak veya başka bir şekilde karıştırmak için daha iyi bir yol var mı?

Rasgele bir anahtarla sıralama yapmayı denedim, ancak çalışıyor.

def random_indices(n): 
    """ 
    return an iterable of random indices in range(0,n) 
    """ 
    indices = range(n) 
    random.shuffle(indices) 
    return indices 

şu pyspark olur:

Using Python version 2.7.3 (default, Jun 22 2015 19:33:41) 
SparkContext available as sc. 
>>> import clean 
>>> clean.sc = sc 
>>> clean.random_indices(100000000) 
Killed 

cevap

5

Olası bir yaklaşım mapParitions

import os 
import numpy as np 

swap = lambda x: (x[1], x[0]) 

def add_random_key(it): 
    # make sure we get a proper random seed 
    seed = int(os.urandom(4).encode('hex'), 16) 
    # create separate generator 
    rs = np.random.RandomState(seed) 
    # Could be randint if you prefer integers 
    return ((rs.rand(), swap(x)) for x in it) 

rdd_with_keys = (rdd 
    # It will be used as final key. If you don't accept gaps 
    # use zipWithIndex but this should be cheaper 
    .zipWithUniqueId() 
    .mapPartitions(add_random_key, preservesPartitioning=True)) 

Sonraki bölümlerini yeniden yapabilirsiniz kullanarak sıralama her bölüm ve özü değerlerini rastgele tuşlara eklemektir:

n = rdd.getNumPartitions() 
(rdd_with_keys 
    # partition by random key to put data on random partition 
    .partitionBy(n) 
    # Sort partition by random value to ensure random order on partition 
    .mapPartitions(sorted, preservesPartitioning=True) 
    # Extract (unique_id, value) pairs 
    .values()) 

Bölme başına sıralama hala yavaşsa, Fisher – Yates shuffle ile değiştirilebilir. sadece rastgele veriler gerekiyorsa

o zaman mllib.RandomRDDs

from pyspark.mllib.random import RandomRDDs 

RandomRDDs.uniformRDD(sc, n) 
Teorik

o girdi rdd ile sıkıştırılmış olabilir kullanabilirsiniz ama bölüm başına elemanların sayısını eşleşen gerektirecektir.

+0

Teşekkürler, bu yararlıdır. Anahtarların gerçekten benzersiz olmasına ihtiyacım var. – Marcin

+0

Başka gereksiniminiz var mı? Eğer değilse, daha sonra zipWithIndex 'zipWithUniqueId' yazabilirsiniz. Başka bir dönüşüm ekliyor, ancak son derece pahalı değil. – zero323

+0

Hem rastgele hem de benzersiz olması için anahtarlara ihtiyacım var. Rastgele bir anahtar ile sıralayabilirim, ancak bu oldukça yavaş olduğunu kanıtlıyor. – Marcin