Sen numpy ve RDDs biraz ile deneyebilirsiniz içinde. ithalatın Önce bir demet:
keys = ["key1", "key2", "key3"] # list of key column names
xs = ["x1", "x2", "x3"] # list of column names to compare
y = "y" # name of the reference column
Ve bazı yardımcıları:
def as_pair(keys, y, xs):
""" Given key names, y name, and xs names
return a tuple of key, array-of-values"""
key = itemgetter(*keys)
value = itemgetter(y, * xs) # Python 3 syntax
def as_pair_(row):
return key(row), np.array(value(row))
return as_pair_
def init(x):
""" Init function for combineByKey
Initialize new StatCounter and merge first value"""
return StatCounter().merge(x)
def center(means):
"""Center a row value given a
dictionary of mean arrays
"""
def center_(row):
key, value = row
return key, value - means[key]
return center_
def prod(arr):
return arr[0] * arr[1:]
def corr(stddev_prods):
"""Scale the row to get 1 stddev
given a dictionary of stddevs
"""
def corr_(row):
key, value = row
return key, value/stddev_prods[key]
return corr_
ve çiftlerinin RDD
için DataFrame
dönüştürmek:
pairs = df.rdd.map(as_pair(keys, y, xs))
from operator import itemgetter
import numpy as np
from pyspark.statcounter import StatCounter
en birkaç değişken tanımlayalım
Sonraki grubunun başına istatistiklerini hesaplamak let:
stats = (pairs
.combineByKey(init, StatCounter.merge, StatCounter.mergeStats)
.collectAsMap())
means = {k: v.mean() for k, v in stats.items()}
Not: 5000 özellikleri ve 7000 grup ile orada olmalıdır bellekte bu yapıyı tutulması ile hiçbir sorunu. Daha büyük veri kümeleriyle RDD ve join
'u kullanmanız gerekebilir, ancak bu daha yavaş olacaktır.
Merkezi veriler:
centered = pairs.map(center(means))
hesaplayın kovaryans:
covariance = (centered
.mapValues(prod)
.combineByKey(init, StatCounter.merge, StatCounter.mergeStats)
.mapValues(StatCounter.mean))
Ve nihayet korelasyon:
stddev_prods = {k: prod(v.stdev()) for k, v in stats.items()}
correlations = covariance.map(corr(stddev_prods))
Örnek veriler:
df = sc.parallelize([
("a", "b", "c", 0.5, 0.5, 0.3, 1.0),
("a", "b", "c", 0.8, 0.8, 0.9, -2.0),
("a", "b", "c", 1.5, 1.5, 2.9, 3.6),
("d", "e", "f", -3.0, 4.0, 5.0, -10.0),
("d", "e", "f", 15.0, -1.0, -5.0, 10.0),
]).toDF(["key1", "key2", "key3", "y", "x1", "x2", "x3"])
katılan biraz oldukça elastik olduğu ve kolay bir şekilde idare etmek için ayarlanabilir süre,
correlations.collect()
[(('a', 'b', 'c'), array([ 1. , 0.99723002, 0.65133607])),
(('d', 'e', 'f'), array([-1., -1., 1.]))]
Bu çözelti: DataFrame
ile
Sonuçlar:
+----+----+----+-----------+------------------+------------------+
|key1|key2|key3|corr(y, x1)| corr(y, x2)| corr(y, x3)|
+----+----+----+-----------+------------------+------------------+
| d| e| f| -1.0| -1.0| 1.0|
| a| b| c| 1.0|0.9972300220940342|0.6513360726920862|
+----+----+----+-----------+------------------+------------------+
df.groupBy(*keys).agg(*[corr(y, x) for x in xs]).show()
ve yöntem, yukarıda verilen farklı veri dağılımları. JIT ile daha fazla destek verilmesi de mümkün olmalıdır.
Gruplar 100 satırdan 5-10 milyona kadar değişebilir, grup sayısı 7000 olacaktır. – Harish