2012-06-01 15 views
8

Arka plan: Bir akış API JSON nesneleri kapmak ve pymongo kullanılarak MongoDB içinde (her seferinde 25 kütle ekleme) saklamak için kurulmuş bir python modülü vardır. Karşılaştırma için, aynı akış API'sinden curl'a ve pipe'a mongoimport'a bir bash komutu da var. Her iki yaklaşım da verileri ayrı koleksiyonlarda saklar.Optimizasyon: Mongo bir Akış API JSON Damping

Periyodik olarak, nasıl ücret aldıklarını kontrol etmek için koleksiyonların count() izlerim.

Şu ana kadar, curl | mongoimport yaklaşımının arkasındaki 1000 JSON nesnesi tarafından kalan python modülünü görüyorum.

Sorun: nasıl curl | mongoimport ile senkronize ~ olmak benim python modülünü optimize edebilirsiniz?

Twitter API kullanmıyor, ancak 3. taraf yayın hizmeti kullanmadığım için tweetstream kullanamıyorum.

Lütfen birisi bana yardım edebilir mi?

Python modülü: okuma için


class StreamReader: 
    def __init__(self): 
     try: 
      self.buff = "" 
      self.tweet = "" 
      self.chunk_count = 0 
      self.tweet_list = [] 
      self.string_buffer = cStringIO.StringIO() 
      self.mongo = pymongo.Connection(DB_HOST) 
      self.db = self.mongo[DB_NAME] 
      self.raw_tweets = self.db["raw_tweets_gnip"] 
      self.conn = pycurl.Curl() 
      self.conn.setopt(pycurl.ENCODING, 'gzip') 
      self.conn.setopt(pycurl.URL, STREAM_URL) 
      self.conn.setopt(pycurl.USERPWD, AUTH) 
      self.conn.setopt(pycurl.WRITEFUNCTION, self.handle_data) 
      self.conn.perform() 
     except Exception as ex: 
      print "error ocurred : %s" % str(ex) 

    def handle_data(self, data): 
     try: 
      self.string_buffer = cStringIO.StringIO(data) 
      for line in self.string_buffer: 
       try: 
        self.tweet = json.loads(line) 
       except Exception as json_ex: 
        print "JSON Exception occurred: %s" % str(json_ex) 
        continue 

       if self.tweet: 
        try: 
         self.tweet_list.append(self.tweet) 
         self.chunk_count += 1 
         if self.chunk_count % 1000 == 0 
          self.raw_tweets.insert(self.tweet_list) 
          self.chunk_count = 0 
          self.tweet_list = [] 

        except Exception as insert_ex: 
         print "Error inserting tweet: %s" % str(insert_ex) 
         continue 
     except Exception as ex: 
      print "Exception occurred: %s" % str(ex) 
      print repr(self.buff) 

    def __del__(self): 
     self.string_buffer.close() 

teşekkürler.

+0

Eklemekte olduğunuz belgelerin "_id" alanı var mı? –

+0

@AsyaKamsky Evet, yaparlar. –

+0

Hangi mongo sürümü ve hangi pymongo sürümünü kullanıyorsunuz? –

cevap

1

StringIO kütüphanesinde kurtuldum bulunmaktadır.WRITEFUNCTION geri arama handle_data olarak, bu durumda, her satır için çağrılır, sadece JSON doğrudan yükleyin. Ancak, bazen, verilerde bulunan iki JSON nesne olabilir. Üzgünüm, kimlik bilgilerimizi içerdiğinden kullandığım curl komutunu gönderemiyorum. Ancak, söylediğim gibi, bu herhangi bir akış API'sine uygulanan genel bir sorundur.


def handle_data(self, buf): 
    try: 
     self.tweet = json.loads(buf) 
    except Exception as json_ex: 
     self.data_list = buf.split('\r\n') 
     for data in self.data_list: 
      self.tweet_list.append(json.loads(data))  
3

Aslında kodunuzda bir hata oluştu.

Chunk_count'u sıfırladınız, ancak tweet_list öğesini sıfırlamıyorsunuz. Böylece ikinci kez 100 ürün eklemeyi deneyin (50 tane daha öncekinin 50'sine önceden gönderilmişti). Bunu düzelttiniz, ancak yine de performansta bir fark görüyorsunuz.

Bütün parti büyüklüğü, kırmızı bir ringa olduğu ortaya çıkıyor. Büyük bir json dosyası kullanmayı denedim ve python ile yükleyip mongoimport ile yükledim ve Python her zaman daha hızlıydı (hatta güvenli modda - aşağıya bakın).

Kodunuza daha yakından baktığınızda, sorunun gerçekte akış API'sinin size verileri toplu halde teslim ettiği gerçeğinin farkına vardım. Bu parçaları almanız ve onları veritabanına eklemeniz (mongoimport'un yaptığı şey). Python'unuzun akışı ayırmak, bir listeye eklemek ve Mongo'ya periyodik olarak periyodik olarak göndermek için yaptığınız ekstra çalışma, muhtemelen gördüğüm ve gördüğünüz arasındaki farktır.

nota

def handle_data(self, data): 
    try: 
     string_buffer = StringIO(data) 
     tweets = json.load(string_buffer) 
    except Exception as ex: 
     print "Exception occurred: %s" % str(ex) 
    try: 
     self.raw_tweets.insert(tweets) 
    except Exception as ex: 
     print "Exception occurred: %s" % str(ex) 

Bir şey sizin handle_data() için bu pasajı deneyin o senin python inserts are not running in "safe mode" - Değiştirmek gerektiğini ekinizin açıklamaya bir argüman safe=True ekleyerek. Ardından, başarısız olan herhangi bir ek üzerinde bir istisna alacaksınız ve try/catch'unuz sorunu ortaya çıkaran hatayı yazdıracaktır.

Ya performansında çok mal olmaz - Şu anda bir testi çalıştıran ve yaklaşık beş dakika sonra ben iki koleksiyon boyutları 14120 14113.

+0

btw, Kodunuzu denedim - düzeltmeyle birlikte, Python mongoimport olarak iki kat daha hızlı veri ekliyor. Çünkü varsayılan olarak "güvenli" ekler kapalıdır. Güvenli yazmaları açarak (geçerken güvenli = eki yerleştirmek için) Python ekleri, mongoimport sürelerinin yaklaşık% 75'ini oluşturuyordu. –

+0

Onları işaretlediğiniz için teşekkürler! Gerekli değişiklikleri (yukarıda da güncellenen kodu) yaptım: self.chunk_count = 0'dan sonra "self.tweet_list = []" eklendi ve yığın boyutunu 1000'e artırdı. Bu hala gecikmeli görünüyor - python modülü sayısı 5000 iken curl mongoimport combo 5718. (4000: 5662 idi). Herhangi bir anlayış? Mükemmel yorumlar için –

+0

+1! –