2015-10-28 28 views
6

Çok berbat bir HTTPD access_log var ve sadece "berbat" satırlarını atlamak istiyorum. piton içinpyspark scala.util.Try ile eşdeğer nedir?

import scala.util.Try 

val log = sc.textFile("access_log") 

log.map(_.split(' ')).map(a => Try(a(8))).filter(_.isSuccess).map(_.get).map(code => (code,1)).reduceByKey(_ + _).collect() 

açıkça "lambda" gösterimi kullanılarak aksine bir işlev tanımlayarak aşağıdaki çözüm var ben:

log = sc.textFile("access_log") 

def wrapException(a): 
    try: 
     return a[8] 
    except: 
     return 'error' 

log.map(lambda s : s.split(' ')).map(wrapException).filter(lambda s : s!='error').map(lambda code : (code,1)).reduceByKey(lambda acu,value : acu + value).collect() 

orada mı

scala bu pürüzsüz olup pyspark'ta bunu yapmak daha iyi bir yol (örneğin Scala'da olduğu gibi)?

Çok teşekkürler!

cevap

2

İlk olarak, çalışmaya başlamak için biraz rasgele veri oluşturmama izin verin.

import random 
number_of_rows = int(1e6) 
line_error = "error line" 
text = [] 
for i in range(number_of_rows): 
    choice = random.choice([1,2,3,4]) 
    if choice == 1: 
     line = line_error 
    elif choice == 2: 
     line = "1 2 3 4 5 6 7 8 9_1" 
    elif choice == 3: 
     line = "1 2 3 4 5 6 7 8 9_2" 
    elif choice == 4: 
     line = "1 2 3 4 5 6 7 8 9_3" 
    text.append(line) 

Şimdi bir dize text görünüyor var

1 2 3 4 5 6 7 8 9_2 
    error line 
    1 2 3 4 5 6 7 8 9_3 
    1 2 3 4 5 6 7 8 9_2 
    1 2 3 4 5 6 7 8 9_3 
    1 2 3 4 5 6 7 8 9_1 
    error line 
    1 2 3 4 5 6 7 8 9_2 
    .... 

gibi Çözümün:

from operator import add 
def myfunction(l): 
    try: 
     return (l.split(' ')[8],1) 
    except: 
     return ('MYERROR', 1) 
log.map(myfunction).reduceByKey(add).collect() 
#[('9_3', 250885), ('9_1', 249307), ('MYERROR', 250036), ('9_2', 249772)] 
: Burada
def wrapException(a): 
    try: 
     return a[8] 
    except: 
     return 'error' 

log.map(lambda s : s.split(' ')).map(wrapException).filter(lambda s : s!='error').map(lambda code : (code,1)).reduceByKey(lambda acu,value : acu + value).collect() 

#[('9_3', 250885), ('9_1', 249307), ('9_2', 249772)] 

benim çözümdür

Yorum:

(1) Çok fazla ek yük eklemeyeceğinden ve hata kontrolü için de kullanılabileceğinden "hata" ile satırları da hesaplamanızı tavsiye ederim, örneğin, tüm sayımlar eklenmelidir günlüğündeki toplam satır sayısına, eğer bu satırları filtrelerseniz, bunların gerçekten bozuk satırlar olduğunu veya kodlama mantığınızda yanlış giden bir şey olduğunu bilmiyorsunuz.

(2) map, filter işlevlerinin zincirlenmesini önlemek için tüm satır düzeyi işlemlerini tek bir işlevde paketlemeye çalışacağım, bu nedenle daha okunabilir.

(3) Performans açısından baktığımda, 1M kayıtlarından oluşan bir örnek oluşturdum ve kodum 3 saniyede bitti ve 2 saniyede sizin verileriniz çok küçük olduğu için ve bu da benim kümem oldukça rahatsız edici olduğundan Daha büyük bir dosya (1e12?) Oluşturmanızı ve sizin için bir referans noktası oluşturmanızı öneririm.

6

Daha iyi öznel bir terimdir, ancak deneyebileceğiniz birkaç yaklaşım vardır.

  • Bu özel durumda yapabileceğiniz en basit şey, istisnalardan kaçınmaktır. Tek ihtiyacınız olan bir flatMap ve bazı dilimleme geçerli: Gördüğünüz gibi

    log.flatMap(lambda s : s.split(' ')[8:9]) 
    

    bir istisna işleme veya sonraki filter gerek demektir.

  • Önceki fikir, basit bir sarmalayıcı

    def seq_try(f, *args, **kwargs): 
        try: 
         return [f(*args, **kwargs)] 
        except: 
         return [] 
    

    ve örnek kullanım

    from operator import div # FYI operator provides getitem as well. 
    
    rdd = sc.parallelize([1, 2, 0, 3, 0, 5, "foo"]) 
    
    rdd.flatMap(lambda x: seq_try(div, 1., x)).collect() 
    ## [1.0, 0.5, 0.3333333333333333, 0.2] 
    
  • nihayet daha OO yaklaşımı ile uzatılabilir:

    import inspect as _inspect 
    
    class _Try(object): pass  
    
    class Failure(_Try): 
        def __init__(self, e): 
         if Exception not in _inspect.getmro(e.__class__): 
          msg = "Invalid type for Failure: {0}" 
          raise TypeError(msg.format(e.__class__)) 
         self._e = e 
         self.isSuccess = False 
         self.isFailure = True 
    
        def get(self): raise self._e 
    
        def __repr__(self): 
         return "Failure({0})".format(repr(self._e)) 
    
    class Success(_Try): 
        def __init__(self, v): 
         self._v = v 
         self.isSuccess = True 
         self.isFailure = False 
    
        def get(self): return self._v 
    
        def __repr__(self): 
         return "Success({0})".format(repr(self._v)) 
    
    def Try(f, *args, **kwargs): 
        try: 
         return Success(f(*args, **kwargs)) 
        except Exception as e: 
         return Failure(e) 
    

    ve örnek kullanımı:

    tries = rdd.map(lambda x: Try(div, 1.0, x)) 
    tries.collect() 
    ## [Success(1.0), 
    ## Success(0.5), 
    ## Failure(ZeroDivisionError('float division by zero',)), 
    ## Success(0.3333333333333333), 
    ## Failure(ZeroDivisionError('float division by zero',)), 
    ## Success(0.2), 
    ## Failure(TypeError("unsupported operand type(s) for /: 'float' and 'str'",))] 
    
    tries.filter(lambda x: x.isSuccess).map(lambda x: x.get()).collect() 
    ## [1.0, 0.5, 0.3333333333333333, 0.2] 
    

    Hatta bu yaklaşım gibi ben GitHub ve pypi için biraz daha eksiksiz uygulanmasını itti ettiyseniz multipledispatch

    from multipledispatch import dispatch 
    from operator import getitem 
    
    @dispatch(Success) 
    def check(x): return "Another great success" 
    
    @dispatch(Failure) 
    def check(x): return "What a failure" 
    
    a_list = [1, 2, 3] 
    
    check(Try(getitem, a_list, 1)) 
    ## 'Another great success' 
    
    check(Try(getitem, a_list, 10)) 
    ## 'What a failure' 
    

    ile eşleşen deseni kullanabilirsiniz.