2017-02-15 25 views
5

100 milyon satır ve 5000+ sütun içeren bir DF sahibiyim. Colx ve kalan 5000+ sütun arasındaki düzeltmeyi bulmaya çalışıyorum.pyspark düzeltmesi (5K sütuntan fazla)

aggList1 = [mean(col).alias(col + '_m') for col in df.columns] #exclude keys 
df21= df.groupBy('key1', 'key2', 'key3', 'key4').agg(*aggList1) 
df = df.join(broadcast(df21),['key1', 'key2', 'key3', 'key4'])) 
df= df.select([func.round((func.col(colmd) - func.col(colmd + '_m')), 8).alias(colmd)\ 
        for colmd in all5Kcolumns]) 


aggCols= [corr(colx, col).alias(col) for col in colsall5K] 
df2 = df.groupBy('key1', 'key2', 'key3').agg(*aggCols) 

Şu anda bu kıvılcım 64KB kod hatası sorunu nedeniyle çalışmıyor (hatta kıvılcım 2.2). Bu yüzden her 300 sütun için döngü yapıyorum ve sonunda bir araya geliyorum. Fakat 40 düğümden oluşan bir kümede 30 saatten fazla sürüyor (her biri 10 çekirdek ve her bir düğüm 100GB). Bunu ayarlamak için herhangi bir yardım? şeylerin Aşağıda

zaten çalıştı - bölüm DF Re 10,000 için - Checkpoint her döngüde yılında - önbellek her döngü

+1

Gruplar 100 satırdan 5-10 milyona kadar değişebilir, grup sayısı 7000 olacaktır. – Harish

cevap

1

Sen numpy ve RDDs biraz ile deneyebilirsiniz içinde. ithalatın Önce bir demet:

keys = ["key1", "key2", "key3"] # list of key column names 
xs = ["x1", "x2", "x3"] # list of column names to compare 
y = "y"       # name of the reference column 

Ve bazı yardımcıları:

def as_pair(keys, y, xs): 
    """ Given key names, y name, and xs names 
    return a tuple of key, array-of-values""" 
    key = itemgetter(*keys) 
    value = itemgetter(y, * xs) # Python 3 syntax 

    def as_pair_(row): 
     return key(row), np.array(value(row)) 
    return as_pair_ 

def init(x): 
    """ Init function for combineByKey 
    Initialize new StatCounter and merge first value""" 
    return StatCounter().merge(x) 

def center(means): 
    """Center a row value given a 
    dictionary of mean arrays 
    """ 
    def center_(row): 
     key, value = row 
     return key, value - means[key] 
    return center_ 

def prod(arr): 
    return arr[0] * arr[1:] 

def corr(stddev_prods): 
    """Scale the row to get 1 stddev 
    given a dictionary of stddevs 
    """ 
    def corr_(row): 
     key, value = row 
     return key, value/stddev_prods[key] 
    return corr_ 

ve çiftlerinin RDD için DataFrame dönüştürmek:

pairs = df.rdd.map(as_pair(keys, y, xs)) 

from operator import itemgetter 
import numpy as np 
from pyspark.statcounter import StatCounter 

en birkaç değişken tanımlayalım

Sonraki grubunun başına istatistiklerini hesaplamak let:

stats = (pairs 
    .combineByKey(init, StatCounter.merge, StatCounter.mergeStats) 
    .collectAsMap()) 

means = {k: v.mean() for k, v in stats.items()} 

Not: 5000 özellikleri ve 7000 grup ile orada olmalıdır bellekte bu yapıyı tutulması ile hiçbir sorunu. Daha büyük veri kümeleriyle RDD ve join'u kullanmanız gerekebilir, ancak bu daha yavaş olacaktır.

Merkezi veriler:

centered = pairs.map(center(means)) 

hesaplayın kovaryans:

covariance = (centered 
    .mapValues(prod) 
    .combineByKey(init, StatCounter.merge, StatCounter.mergeStats) 
    .mapValues(StatCounter.mean)) 

Ve nihayet korelasyon:

stddev_prods = {k: prod(v.stdev()) for k, v in stats.items()} 

correlations = covariance.map(corr(stddev_prods)) 

Örnek veriler:

df = sc.parallelize([ 
    ("a", "b", "c", 0.5, 0.5, 0.3, 1.0), 
    ("a", "b", "c", 0.8, 0.8, 0.9, -2.0), 
    ("a", "b", "c", 1.5, 1.5, 2.9, 3.6), 
    ("d", "e", "f", -3.0, 4.0, 5.0, -10.0), 
    ("d", "e", "f", 15.0, -1.0, -5.0, 10.0), 
]).toDF(["key1", "key2", "key3", "y", "x1", "x2", "x3"]) 
katılan biraz oldukça elastik olduğu ve kolay bir şekilde idare etmek için ayarlanabilir süre,

correlations.collect() 
[(('a', 'b', 'c'), array([ 1.  , 0.99723002, 0.65133607])), 
(('d', 'e', 'f'), array([-1., -1., 1.]))] 

Bu çözelti: DataFrame ile

Sonuçlar:

+----+----+----+-----------+------------------+------------------+ 
|key1|key2|key3|corr(y, x1)|  corr(y, x2)|  corr(y, x3)| 
+----+----+----+-----------+------------------+------------------+ 
| d| e| f|  -1.0|    -1.0|    1.0| 
| a| b| c|  1.0|0.9972300220940342|0.6513360726920862| 
+----+----+----+-----------+------------------+------------------+ 

df.groupBy(*keys).agg(*[corr(y, x) for x in xs]).show() 
ve yöntem, yukarıda verilen farklı veri dağılımları. JIT ile daha fazla destek verilmesi de mümkün olmalıdır.