2016-04-07 29 views
-1

PySpark kullanıyorum. S3'e ulaşmam, dönüştürmem ve sonra da parka s3'e ihraç etmem gereken s3 üzerinde gziped json dosyaları var. Her json dosyası yaklaşık 100k satır içerir, bu yüzden çok fazla anlam ifade etmeyecektir (fakat buna paralel olarak açılıyorum), fakat paralelleştirdiğim yaklaşık 5k dosya var. Benim yaklaşımım, json dosya listesini komut dosyasına geçiriyor -> listeye paralel koş -> run map (? Burası engellendiğim yerdir). jsonun dönüştürülmüş jsondan bir DF oluşturmasına ve onu parke olarak s3'e nasıl aktaracağım ve dönüştürebileceğim.kıvrık çalışan işlerde kıvılcım ile çalışan

+0

Bir okuyucunun yollarının birleştirilmiş bir koma listesini iletebilirsiniz. – zero323

+0

Kıvılcım kurulumunuza bağlı olarak, s3'ü şu şekilde kıvılcımdan okuyabilirsiniz: rawtext = sc.textFile ('s3: // bucket/file') ' – Paul

+0

Parke döktüğümde her json karşısında olmalı anlam 1.json => 1. parşömen, ben virgül dosya isimleri virgülle eğer bu tutarlılık kaybolacaktır. – Sar009

cevap

0

Json'u dağıtılmış bir şekilde okumak için, bahsettiğiniz gibi anahtarlarınızı paralel hale getirmeniz gerekecektir. Bunu s3'ten okurken yapmak için boto3'ü kullanmanız gerekir. Aşağıda bunun nasıl yapılacağının bir iskelet taslağıdır. Kullanım durumunuza uyacak şekilde dağıtılmış JsonRead'i değiştirmeniz gerekecektir.

import boto3 
import json 
from pyspark.sql import Row 

def distributedJsonRead(s3Key): 
    s3obj = boto3.resource('s3').Object(bucket_name='bucketName', key=key) 
    contents = json.loads(s3obj.get()['Body'].read().decode('utf-8')) 
    return Row(**contents) 

pkeys = sc.parallelize(keyList) #keyList is a list of s3 keys 
dataRdd = pkeys.map(distributedJsonRead) 

Boto3 Referans: http://boto3.readthedocs.org/en/latest/guide/quickstart.html

Düzenleme: 1 adrese:

Daha sonra çıkış dosyalarına girdi dosyalarının 1 eşleştirmesini, birleştirilmiş parke veri setini sahip daha kolay olacağını olasıdır birlikte çalışmak. json 1 eşleştirmesini için: Bu bunu yapmak için gereken yoldur Ama eğer sana bir 1 isterseniz bu işlemleri paralel hale mümkün olmayacaktır sanmıyorum bu

for k in keyList: 
    rawtext = sc.read.json(k) # or whichever method you need to use to read in the data 
    outpath = k[:-4]+'parquet' 
    rawtext.write.parquet(outpath) 

gibi bir şey deneyebilirsiniz parke dosyaları. Spark'in okuma/yazma işlevi, sürücü tarafından çağrılacak şekilde tasarlanmıştır ve sc ve sqlContext'e erişmeye ihtiyaç duyar. Bu, 1 parke dizinine sahip olmanın yolunun muhtemelen başka bir nedenidir.

+0

ama aynı zamanda parkenin hangi dosyadan üretildiğini bilmek istiyorum, yani jsonun s3: // bucket/json/12/3/abc.json'da olduğunu varsayalım, sonuç s3: // bucket/json/12/3/abc.parquet' – Sar009

+0

@ Sar009 Bu – David

+0

adresine yanıtımı düzenledim, günde yaklaşık 20 GB veri üretildi. Ve genellikle bu konuda bir ya da iki veriyi analiz etmek zorundayım. Yani temelde, dosya okumasını paralelleştiremediğimi söylüyorsun. – Sar009