2011-05-18 19 views
5

httpsb2 ve lxml kullanarak python'da bir web kazıyıcı yazıyorum (evet - scrapy kullanabileceğimi biliyorum. Hadi bunu geçmişe taşıyalım ...) Kazıyıcıyı ayrıştırmak için yaklaşık 15000 sayfa var yaklaşık 400.000 ürün. Anında çalışacak öğeleri hemen hemen ayrıştırmak için kodum var (neredeyse) ama sayfayı sunucudan yükleyen bölüm hala çok yavaş. Bunu eşzamanlı olarak aşmak istiyorum. Ancak, HER zaman ayrıştırılması gereken HER sayfa güvenemem. Tek bir ThreadPool ile çalıştım (multiprocessing.pool gibi, ama iş parçacıklarıyla yapıldı - bu bir G/Ç bağlı işlem olduğundan iyi olmalıydı), ama almanın zarif (ya da çalışan) bir yolunu düşünemedim. Son dizin öğesinin tarihi işlem yaptığımız öğeden daha büyük olduğunda durdurulan tüm iş parçacıkları. Şu anda, ThreadPool'un iki örneğini kullanan bir yöntem üzerinde çalışıyorum - biri her sayfayı indirmek için, diğeri de sayfaları ayrıştırmak için. Basitleştirilmiş bir kod örnektir:Python - çoklu eşzamanlı threadpooller

#! /usr/bin/env python2 

import httplib2 
from Queue import PriorityQueue 
from multiprocessing.pool import ThreadPool 
from lxml.html import fromstring 

pages = [x for x in range(1000)] 
page_queue = PriorityQueue(1000) 

url = "http://www.google.com" 

def get_page(page): 
    #Grabs google.com 
    h = httplib2.Http(".cache") 
    resp, content = h.request(url, "GET") 
    tree = fromstring(str(content), base_url=url) 
    page_queue.put((page, tree)) 
    print page_queue.qsize() 

def parse_page(): 
    page_num, page = page_queue.get() 
    print "Parsing page #" + str(page_num) 
    #do more stuff with the page here 
    page_queue.task_done() 

if __name__ == "__main__": 
    collect_pool = ThreadPool() 
    collect_pool.map_async(get_page, pages) 
    collect_pool.close() 

    parse_pool = ThreadPool() 
    parse_pool.apply_async(parse_page) 
    parse_pool.close() 


    parse_pool.join() 
    collect_pool.join() 
    page_queue.join() 

ne beklemek yapmaz, ancak bu kodu Running - iki threadpools ateşlemesi olan: biri kuyruğunu doldurma ve ayrıştırmak için başka çekerek. Toplama havuzu başlar ve onun üzerinden çalışır ve sonra parse_pool başlar ve onunla çalışır (Ben varsayım, kod parse_pool almak için yeterince uzun koşmak izin vermedi - nokta bu collect_pool koşuyor gibi görünüyor). Oldukça eminim katılma çağrılarının sırası ile bir şeyleri karıştırdım(), ama yaşamam için ne sıraya gireceğimi tahmin edemem. Sorumu esasen bu : Buradaki doğru ağacı havlıyor muyum? ve eğer öyleyse, ne yapıyorum yanlış? Olmazsa - önerileriniz ne olurdu?

+1

map_async - tüm işleri işleyene kadar engeller. –

+0

Bu, neden çalışmadığını açık bir şekilde ele alıyor, ama tüm sorumu cevaplamıyor. Bu, "Bunu yapmanın deli bir yolu mu?" Eğer cevap 'hayır' ise, ben yakınım ve bunu bitirme yöntemlerini mükemmelleştirmem gerekiyor. Eğer evet ise, bunu başarılı bir şekilde nasıl gerçekleştirebileceğimi gösteren bazı işaretçiler istiyorum. – bbenne10

cevap

6

Her şeyden önce tasarımınız yüksek seviyede doğru görünüyor. Sayfaları toplamak için bir threadpool kullanımı, httlib2 modülünün eşzamanlı yapısıyla doğrulanır. (Eşzamansız bir kitaplık ile bir iş parçacığı yeterli olur; httpsb2 ve havuzun en az bir toplayıcı iş parçacığı ile GIL nedeniyle herhangi bir zamanda çalıştığı unutulmamalıdır.) Ayrıştırma havuzu, Cx ile yazılmış lxml modülü tarafından doğrulanır. C++ (ve böylece Global Interpreter Lock sayfanın ayrıştırılması sırasında yayınlandığını varsayarak - bu lxml docs veya kod içinde kontrol edilmelidir!). Eğer bu ikincisi doğru değilse, o zaman sadece bir ipliğin GIL'i alabilmesi için ayrılmış bir ayrıştırma havuzuna sahip olmanın hiçbir performans kazancı olmayacaktır. Bu durumda bir işlem havuzu kullanmak daha iyi olurdu.

ThreadPool uygulamasına aşina değilim, ancak çok işlemcili modülde Pool sınıfına benzer olduğunu varsayalım. Bu temelde sorun, parse_pool için sadece tek bir iş öğesi oluşturduğunuz ve parse_page'in ilk sayfayı işledikten sonra, daha sonra başka sayfaların deşifre edilmesini denemediği görülmektedir. Ek iş öğeleri bu havuza gönderilmez, bu nedenle işlem durur ve parse_pool.close() çağrısından sonra (boş) havuzun iş parçacığı sonlandırır.

Çözüm, page_queue öğesini ortadan kaldırmaktır. Get_page() işlevi, topladığı her sayfa için page_queue içine beslemek yerine, apply_async() öğesini çağırmak suretiyle parse_pool'a bir iş öğesi yerleştirmelidir.

Ana iş parçacığı collection_queue boş olana kadar beklemelidir (yani, collect_pool.join() çağrısı döndürülür), daha sonra parse_pool'u kapatmalıdır (ayrıştırıcı için daha fazla işin gönderilemeyeceğinden emin olabiliriz). Daha sonra parse_pool'un parse_pool.join() öğesini çağırarak boş kalmasını beklemeli ve sonra çıkmalıdır. Eş zamanlı olarak daha fazla http isteği işlemek için connect_pool'daki iş parçacığı sayısını artırmanız gerekir. Havuzdaki iş parçacıklarının varsayılan sayısı, CPU sayısıdır; Şu anda bu kadar çok talepte bulunamazsınız. Binlerce veya onda bin kadar olan değerleri deneyebilirsiniz; Havuzun CPU tüketimini gözlemlemek; 1 CPU'ya yaklaşmamalıdır.

+0

Eğer yapabilirsem bunu kabul ederim. Müthiş cevap - Teşekkürler. – bbenne10

+0

@ bbenne10: Bu çok iyi bir cevap. Onu kabul etmelisin. –

İlgili konular