2014-11-18 14 views

cevap

31

olarak bildiğim kadarıyla mevkiinde düzey verilerine erişim tipi yapılmıştır ettiğini gösterir. Bir düğüm tüm çalışmalarını bitirdiğinde ve CPU boşta kaldığında, Spark diğer yerlerden veri almayı gerektiren diğer bekleyen görevi başlatmaya karar verebilir. İdeal olarak, tüm görevleriniz daha düşük veri erişim gecikmesi ile ilişkili olduğu için süreç yerel olmalıdır. , Parametreler hakkında

spark.locality.wait 

fazla bilgi farklı seviyelerde ilgili olarak Spark Configuration docs

bulunabilir PROCESS_LOCAL, NODE_LOCAL:

kullanarak diğer yerellik seviyelere geçmeden önce bekleme süresini yapılandırabilirsiniz RACK_LOCAL veya HERHANGİ ben yılında yöntemler findTask ve findSpeculativeTask org.apache.spark.scheduler.TaskSetManager ho göstermek düşünüyorum w Spark, görev seviyelerine göre görevleri seçer. İlk önce aynı yürütme sürecinde başlatılacak PROCESS_LOCAL görevlerini kontrol edecektir. Değilse, aynı düğümdeki diğer yürütücülerde olabilecek NODE_LOCAL görevlerini veya HDFS, önbelleğe alınmış vb. Sistemlerden alınması gereken görevleri denetler. RACK_LOCAL, verilerin başka bir düğümde olduğunu ve bu nedenle önceden aktarılması gerektiğini belirtir. yürütme. Son olarak, ANY sadece geçerli düğümde çalışabilecek bekleyen herhangi bir görevi üstlenecektir.

/** 
    * Dequeue a pending task for a given node and return its index and locality level. 
    * Only search for tasks matching the given locality constraint. 
    */ 
    private def findTask(execId: String, host: String, locality: TaskLocality.Value) 
    : Option[(Int, TaskLocality.Value)] = 
    { 
    for (index <- findTaskFromList(execId, getPendingTasksForExecutor(execId))) { 
     return Some((index, TaskLocality.PROCESS_LOCAL)) 
    } 

    if (TaskLocality.isAllowed(locality, TaskLocality.NODE_LOCAL)) { 
     for (index <- findTaskFromList(execId, getPendingTasksForHost(host))) { 
     return Some((index, TaskLocality.NODE_LOCAL)) 
     } 
    } 

    if (TaskLocality.isAllowed(locality, TaskLocality.RACK_LOCAL)) { 
     for { 
     rack <- sched.getRackForHost(host) 
     index <- findTaskFromList(execId, getPendingTasksForRack(rack)) 
     } { 
     return Some((index, TaskLocality.RACK_LOCAL)) 
     } 
    } 

    // Look for no-pref tasks after rack-local tasks since they can run anywhere. 
    for (index <- findTaskFromList(execId, pendingTasksWithNoPrefs)) { 
     return Some((index, TaskLocality.PROCESS_LOCAL)) 
    } 

    if (TaskLocality.isAllowed(locality, TaskLocality.ANY)) { 
     for (index <- findTaskFromList(execId, allPendingTasks)) { 
     return Some((index, TaskLocality.ANY)) 
     } 
    } 

    // Finally, if all else has failed, find a speculative task 
    findSpeculativeTask(execId, host, locality) 
    } 
+0

"Bekleyen görevler" ile ne demek istediğinizi açıklayabilir misiniz? Bir işçi düğümünün tek işinin görev zamanlayıcısı tarafından sağlanan görevleri yürütmek olduğunu düşünürdüm. Bu görevleri yerine getirdikten sonra (belki de kıvılcım uygulaması çalıştırıldığında), o zaman boşta kalır. Bekleyen görevler o zaman nedir? – user3376961

+0

@ user3376961 Aşağıdaki sorunun bir görevin kıvılcımlandığını açıklayabileceğini düşünüyorum. Ayrıca, bir miktar elastikiyetle çalışabileceğiniz ve aynı zamanda bire bir ilişkinin olmamasının önemini de göz önünde bulundurarak aklınızda bulundurun. http://stackoverflow.com/q/25276409/91042 –