2015-02-28 29 views
42

Spark uygulamasında yeniyim ve Spark ile bir dosyadan CSV verilerini okumaya çalışıyorum. Ben bana dosyanın iki birinci sütunların listesini vermek için bu çağrıyı beklediğinizSpark ile CSV dosyasını yükle

sc.textFile('file.csv') 
    .map(lambda line: (line.split(',')[0], line.split(',')[1])) 
    .collect() 

ama bu hatayı alıyorum::

File "<ipython-input-60-73ea98550983>", line 1, in <lambda> 
IndexError: list index out of range 

benim CSV rağmen İşte ben ne yapıyorum var Birden fazla sütun olarak dosya.

cevap

35

tüm satırlarının en az 2 sütuna sahip olduğundan emin misiniz? Eğer (varsa) suçlu baskı olabilir, sadece kontrol etmek ?:

sc.textFile("file.csv") \ 
    .map(lambda line: line.split(",")) \ 
    .filter(lambda line: len(line)>1) \ 
    .map(lambda line: (line[0],line[1])) \ 
    .collect() 

Alternatif gibi bir şey deneyebilirsiniz: Şimdi

sc.textFile("file.csv") \ 
    .map(lambda line: line.split(",")) \ 
    .filter(lambda line: len(line)<=1) \ 
    .collect() 
+0

O kadardı, tek bir sütun içeren bir satır, teşekkürler. – Kernael

+1

Yerleşik 'csv' kütüphanesini kullanarak tüm çıkışları işlemek için ayrıştırmak daha iyidir çünkü basitçe virgülle bölme eğer değerlerin içinde virgüller varsa işe yaramaz. – sudo

+2

Csv'yi ayrıştıracak pek çok araç var, tekerleği yeniden icat etmeyin – Stephen

2

, aynı zamanda herhangi bir genel csv dosyası için bir başka seçenek var: https://github.com/seahboonsiew/pyspark-csv aşağıdaki gibi:

aşağıdaki bağlama sahip varsayalım

sc = SparkContext 
sqlCtx = SQLContext or HiveContext 

Öncelikle

import pyspark_csv as pycsv 
sc.addPyFile('pyspark_csv.py') 

Oku csv veri SparkContext aracılığıyla SparkContext

kullanarak emirleri yerine getirenlerin pyspark-csv.py dağıtmak ve DataFrame

için
plaintext_rdd = sc.textFile('hdfs://x.x.x.x/blah.csv') 
dataframe = pycsv.csvToDataFrame(sqlCtx, plaintext_rdd) 
9

Ve pandalar kullanarak CSV dosyası okuma oluşur henüz başka seçeneği dönüştürmek ve daha sonra Pandas DataFrame'i Spark'e aktarıyor. Örneğin

: csv veri alanlarının hiçbirinde yeni satır içermiyorsa olursa

from pyspark import SparkContext 
from pyspark.sql import SQLContext 
import pandas as pd 

sc = SparkContext('local','example') # if using locally 
sql_sc = SQLContext(sc) 

pandas_df = pd.read_csv('file.csv') # assuming the file contains a header 
# pandas_df = pd.read_csv('file.csv', names = ['column 1','column 2']) # if no header 
s_df = sql_sc.createDataFrame(pandas_df) 
+3

Pandalarda veri yükleyebiliyorsa neden bu kıvılcım yapmak istesin? – WoodChopper

+1

Akademik amaçlar – bluerubez

+0

Her kıvılcım kümesine bağımlılık yüklemek veya belirtmek istememek .... – SummerEla

2

, sen textFile() ile verilerinizi yüklemek ve ona

import csv 
import StringIO 

def loadRecord(line): 
    input = StringIO.StringIO(line) 
    reader = csv.DictReader(input, fieldnames=["name1", "name2"]) 
    return reader.next() 

input = sc.textFile(inputFile).map(loadRecord) 
87

Spark 2.0 ayrıştırma yapabilir. 0+

Yerleşik csv veri kaynağını doğrudan kullanabilirsiniz:

spark.read.csv(
    "some_input_file.csv", header=True, mode="DROPMALFORMED", schema=schema 
) 

veya herhangi bir dış bağımlılıkları dahil olmadan

(spark.read 
    .schema(schema) 
    .option("header", "true") 
    .option("mode", "DROPMALFORMED") 
    .csv("some_input_file.csv")) 

.

Spark < 2.0.0:

yerine genel bir durumda önemsiz değildir olduğunu manuel ayrıştırma ait

, ben spark-csv öneriyoruz:

Kıvılcım CSV yolu dahil olduğundan emin olun (--packages, --jars, --driver-class-path)

ve aşağıdaki gibi veri yüklemek:

(df = sqlContext 
    .read.format("com.databricks.spark.csv") 
    .option("header", "true") 
    .option("inferschema", "true") 
    .option("mode", "DROPMALFORMED") 
    .load("some_input_file.csv")) 

Bu hatalı biçimlendirilmiş çizgileri bırakarak, yükleme, şema çıkarımı işleyebilir ve p gerektirmez Python'dan JVM'ye veri aktarmak.

Not:

Şemayı biliyorsanız, o şema çıkarımı önlemek ve DataFrameReader onu geçmek daha iyidir. Eğer üç sütun varsayarsak - tamsayı, çift ve dize: alanları (örn a,b,"1,2,3",c) içindedir

from pyspark.sql.types import StructType, StructField 
from pyspark.sql.types import DoubleType, IntegerType, StringType 

schema = StructType([ 
    StructField("A", IntegerType()), 
    StructField("B", DoubleType()), 
    StructField("C", StringType()) 
]) 

(sqlContext 
    .read 
    .format("com.databricks.spark.csv") 
    .schema(schema) 
    .option("header", "true") 
    .option("mode", "DROPMALFORMED") 
    .load("some_input_file.csv")) 
+3

Bunu yaparsanız, pyspark kabuğunu açtığınızda veya kıvılcım gönderimini kullandığınızda, databricks csv paketini dahil etmeyi unutmayın. Örneğin, pyspark --packages com.databricks: spark-csv_2.11: 1.4.0' (veri tabanlarını/kıvılcım sürümlerini yüklediğiniz programlara değiştirdiğinizden emin olun). –

9

Basitçe virgül ile bölme da bölünmüş olacak virgül, bu nedenle tavsiye edilmez. Eğer DataFrames API kullanmak istiyorsanız zero323's answer iyidir, ama baz Spark sadık istiyorsanız, csv modülü ile taban Python CSV'leri ayrıştırabilir:

# works for both python 2 and 3 
import csv 
rdd = sc.textFile("file.csv") 
rdd = rdd.mapPartitions(lambda x: csv.reader(x)) 

DÜZENLEME: Açıklamalarda belirtildiği @muon gibi Bu, başlığı başka herhangi bir satır gibi ele alacaktır, böylece el ile ayıklamanız gerekir. Örneğin, header = rdd.first(); rdd = rdd.filter(lambda x: x != header) (filtre değerlendirilmeden önce header'u değiştirmediğinizden emin olun). Ama bu noktada, yerleşik bir csv çözümleyici kullanarak muhtemelen daha iyisin.

+1

DataFrames'i kullanmak için Hive'e ihtiyacınız yok. Çözümünüzle ilgili olarak: a) 'StringIO'ya gerek yoktur. 'csv' herhangi bir yinelenebilir b kullanabilir' '__next__' doğrudan kullanılmamalıdır ve boş satırda başarısız olur. FlatMap'e bir göz atın c) Her satırda okuyucuyu başlatmak yerine 'mapPartitions' kullanmak çok daha verimli olurdu :) – zero323

+0

Düzeltmeler için çok teşekkürler! Cevabımı düzenlemeden önce, tamamen anladığımdan emin olmak istiyorum. 1) Neden rdd.mapPartitions (lambda x: csv.reader (x)) '' rdd.map (lambda x: csv.reader (x)) 'çalışırken bir hata veriyor? Her ikisinin de aynı şeyi atmasını bekledim: TypeError: _csv.reader nesnelerini alamıyorum. Ayrıca "mapPartitions", "csv.reader" nesnesindeki "readlines" ile eşdeğer bir şekilde "map" ile, "csv.reader" listesinden liste almak için açıkça "__next__" aramam gerekiyordu. . 2) flatMap' nerededir? Sadece 'mapPartitions' tek başına arama benim için çalıştı. –

+1

'rdd.mapPartitions (lambda x: csv.reader (x))' mapPartitions' 'Iterable' nesnesini beklediği için çalışır. Açık olmak istiyorsan, ifadeyi anlayabilir veya üretebilirdin. 'map' tek başına çalışmıyor çünkü nesne üzerinde yinelenmiyor. Bu yüzden benim flatmap (lambda x: csv.reader ([x])) 'i kullanma önerisi okuyucu üzerinde yinelenecek. Fakat 'mapPartitions' burada daha iyi. – zero323

2

Bu, Panda'ların kullanımıyla ilgili olarak JP Mercier initially suggested ile uyumludur, ancak büyük bir değişiklikle: Pandaların içine veri parçaları okursanız daha eğlendirici olması gerekir. Anlamı, Pandalardan çok daha büyük bir dosyayı ayrıştırabilmeniz, aslında tek bir parça olarak işleyebilir ve daha küçük boyutlarda Spark'e aktarabilir. (Bu aynı zamanda onlar zaten Pandalar her şeyi yükleyebilirsiniz eğer bir Spark kullanmak isteyeyim neden yorumunu yanıtlar.) Genellikle elle CSV'leri ayrıştırmak çalışıyorum hakkında gitmek istemiyorum

from pyspark import SparkContext 
from pyspark.sql import SQLContext 
import pandas as pd 

sc = SparkContext('local','example') # if using locally 
sql_sc = SQLContext(sc) 

Spark_Full = sc.emptyRDD() 
chunk_100k = pd.read_csv("Your_Data_File.csv", chunksize=100000) 
# if you have headers in your csv file: 
headers = list(pd.read_csv("Your_Data_File.csv", nrows=0).columns) 

for chunky in chunk_100k: 
    Spark_Full += sc.parallelize(chunky.values.tolist()) 

YourSparkDataFrame = Spark_Full.toDF(headers) 
# if you do not have headers, leave empty instead: 
# YourSparkDataFrame = Spark_Full.toDF() 
YourSparkDataFrame.show() 
3
from pyspark.sql import SparkSession 

spark = SparkSession \ 
    .builder \ 
    .appName("Python Spark SQL basic example") \ 
    .config("spark.some.config.option", "some-value") \ 
    .getOrCreate() 

df = spark.read.csv("/home/stp/test1.csv",header=True,separator="|"); 

print(df.collect()) 
0

. Eğer bir dataframe olarak csv yüklemek isterseniz

import csv # Python standard CSV library 
def csv_to_rdd(csv_filename): 
    return sc.textFile(csv_filename) \ 
    .map(lambda line: tuple(list(csv.reader([line]))[0])) 
-3
import pandas as pd 

data1 = pd.read_csv("test1.csv") 
data2 = pd.read_csv("train1.csv") 
+0

Bu, Spark değil, Panda'lardır. Orijinal yazar, verilerin tek bir makine değil, dağıtılmış bir bellek Spark kümesine yüklenmesini ister. – ZakJ

-1

sonra aşağıdakileri yapabilirsiniz: Burada düzgün herhangi kaçan gibi tırnakla idare edecek bir bağımlılık içermeyen bir çözüm

from pyspark.sql import SQLContext 
sqlContext = SQLContext(sc) 

df = sqlContext.read.format('com.databricks.spark.csv') \ 
    .options(header='true', inferschema='true') \ 
    .load('sampleFile.csv') # this is your csv file 

O benim için iyi çalıştı.

İlgili konular