İlk olarak, çalışmaya başlamak için biraz rasgele veri oluşturmama izin verin.
import random
number_of_rows = int(1e6)
line_error = "error line"
text = []
for i in range(number_of_rows):
choice = random.choice([1,2,3,4])
if choice == 1:
line = line_error
elif choice == 2:
line = "1 2 3 4 5 6 7 8 9_1"
elif choice == 3:
line = "1 2 3 4 5 6 7 8 9_2"
elif choice == 4:
line = "1 2 3 4 5 6 7 8 9_3"
text.append(line)
Şimdi bir dize text
görünüyor var
1 2 3 4 5 6 7 8 9_2
error line
1 2 3 4 5 6 7 8 9_3
1 2 3 4 5 6 7 8 9_2
1 2 3 4 5 6 7 8 9_3
1 2 3 4 5 6 7 8 9_1
error line
1 2 3 4 5 6 7 8 9_2
....
gibi Çözümün:
from operator import add
def myfunction(l):
try:
return (l.split(' ')[8],1)
except:
return ('MYERROR', 1)
log.map(myfunction).reduceByKey(add).collect()
#[('9_3', 250885), ('9_1', 249307), ('MYERROR', 250036), ('9_2', 249772)]
: Burada
def wrapException(a):
try:
return a[8]
except:
return 'error'
log.map(lambda s : s.split(' ')).map(wrapException).filter(lambda s : s!='error').map(lambda code : (code,1)).reduceByKey(lambda acu,value : acu + value).collect()
#[('9_3', 250885), ('9_1', 249307), ('9_2', 249772)]
benim çözümdür
Yorum:
(1) Çok fazla ek yük eklemeyeceğinden ve hata kontrolü için de kullanılabileceğinden "hata" ile satırları da hesaplamanızı tavsiye ederim, örneğin, tüm sayımlar eklenmelidir günlüğündeki toplam satır sayısına, eğer bu satırları filtrelerseniz, bunların gerçekten bozuk satırlar olduğunu veya kodlama mantığınızda yanlış giden bir şey olduğunu bilmiyorsunuz.
(2) map
, filter
işlevlerinin zincirlenmesini önlemek için tüm satır düzeyi işlemlerini tek bir işlevde paketlemeye çalışacağım, bu nedenle daha okunabilir.
(3) Performans açısından baktığımda, 1M kayıtlarından oluşan bir örnek oluşturdum ve kodum 3 saniyede bitti ve 2 saniyede sizin verileriniz çok küçük olduğu için ve bu da benim kümem oldukça rahatsız edici olduğundan Daha büyük bir dosya (1e12?) Oluşturmanızı ve sizin için bir referans noktası oluşturmanızı öneririm.