2016-04-01 17 views
1

Verileri StandardScaler (from pyspark.mllib.feature import StandardScaler) ile ölçeklendirmek istiyorum, şimdi işlevi dönüştürmek için RDD değerlerini geçerek yapabilirim, ancak sorun anahtarı korumak istiyorum. Anahtarımı koruyarak verilerimi ölçeklendirdiğim var mı?Verileri Spark ile gruplara göre ölçeklendirmek mümkün mü?

Numune veri kümesi

0,tcp,http,SF,181,5450,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,8,8,0.00,0.00,0.00,0.00,1.00,0.00,0.00,9,9,1.00,0.00,0.11,0.00,0.00,0.00,0.00,0.00,normal. 
0,tcp,http,SF,239,486,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,8,8,0.00,0.00,0.00,0.00,1.00,0.00,0.00,19,19,1.00,0.00,0.05,0.00,0.00,0.00,0.00,0.00,normal. 
0,tcp,http,SF,235,1337,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,8,8,0.00,0.00,0.00,0.00,1.00,0.00,0.00,29,29,1.00,0.00,0.03,0.00,0.00,0.00,0.00,0.00,smurf. 

İthalat

import sys 
import os 
from collections import OrderedDict 
from numpy import array 
from math import sqrt 
try: 
    from pyspark import SparkContext, SparkConf 
    from pyspark.mllib.clustering import KMeans 
    from pyspark.mllib.feature import StandardScaler 
    from pyspark.statcounter import StatCounter 

    print ("Successfully imported Spark Modules") 
except ImportError as e: 
    print ("Can not import Spark Modules", e) 
    sys.exit(1) 

kod Porsiyon

sc = SparkContext(conf=conf) 
    raw_data = sc.textFile(data_file) 
    parsed_data = raw_data.map(Parseline) 

Parseline fonksiyonu:

def Parseline(line): 
    line_split = line.split(",") 
    clean_line_split = [line_split[0]]+line_split[4:-1] 
    return (line_split[-1], array([float(x) for x in clean_line_split])) 

cevap

3

Tam olarak güzel bir çözüm değil ama cevabımı the similar Scala question olarak ayarlayabilirsiniz. örnek verilerle başlayalım:

import numpy as np 

np.random.seed(323) 

keys = ["foo"] * 50 + ["bar"] * 50 
values = (
    np.vstack([np.repeat(-10, 500), np.repeat(10, 500)]).reshape(100, -1) + 
    np.random.rand(100, 10) 
) 

rdd = sc.parallelize(zip(keys, values)) 

Maalesef MultivariateStatisticalSummary sadece JVM modeli etrafında sarıcı ve gerçekten Python dostu değildir. Neyse ki NumPy dizisi ile biz anahtar göre istatistikleri hesaplamak için standart StatCounter kullanabilirsiniz:

from pyspark.statcounter import StatCounter 

def compute_stats(rdd): 
    return rdd.aggregateByKey(
     StatCounter(), StatCounter.merge, StatCounter.mergeStats 
    ).collectAsMap() 

Sonunda map normalleştirmek için edebilirsiniz:

def scale(rdd, stats): 
    def scale_(kv): 
     k, v = kv 
     return (v - stats[k].mean())/stats[k].stdev() 
    return rdd.map(scale_) 

scaled = scale(rdd, compute_stats(rdd)) 
scaled.first() 

## array([ 1.59879188, -1.66816084, 1.38546532, 1.76122047, 1.48132643, 
## 0.01512487, 1.49336769, 0.47765982, -1.04271866, 1.55288814]) 
+0

Bunun beni bu hata "TypeError verir bu kodu kullanmak istediğinizde: ilişkisiz yöntem birleştirme(), ilk bağımsız değişkeni olarak NoneType örneğiyle çağrılmalıdır (bunun yerine StatCounter örneği var) ", herhangi bir fikrin var mı? – Iman

+0

Verilerinizde eksik değerler var mı? Türleri nelerdir? – zero323

+0

veri yapısı bu gibi bir şeydir [Label, array ([sayısal float değerleri listesi]), her etiket normal veya saldırıdır ve eksik değerler yoktur – Iman

İlgili konular