2015-06-24 14 views
5

Aşağıdaki kod C# ile yazılmıştır ancak buna göre Oracle veritabanından Elasticsearch'e veri taşımak 4-5 gün sürecektir. Kayıtları 100'lük partiler halinde ekliyorum. 4 milyon kayıtların geçişinin daha hızlı gerçekleşmesi (mümkünse bir günden az bir süre içinde) başka bir yolu var mı?C# kullanarak Oracle'dan Elasticsearch tablosuna 4 milyon kayıt daha hızlı nasıl eklenir?

public static void Selection() 
     { 
      for(int i = 1; i < 4000000; i += 1000) 
      { 
       for(int j = i; j < (i+1000); j += 100) 
       { 
        OracleCommand cmd = new OracleCommand(BuildQuery(j), 
                oracle_connection); 
        OracleDataReader reader = cmd.ExecuteReader(); 
        List<Record> list=CreateRecordList(reader); 
        insert(list); 
       } 
      } 
     } 

    private static List<Record> CreateRecordList(OracleDataReader reader) 
     { 
      List<Record> l = new List<Record>(); 
      string[] str = new string[7]; 
      try 
      { 
       while (reader.Read()) 
       { 
        for (int i = 0; i < 7; i++) 
        { 
         str[i] = reader[i].ToString(); 
        } 

        Record r = new Record(str[0], str[1], str[2], str[3],        
           str[4], str[5], str[6]); 
        l.Add(r); 
       } 
      } 
      catch (Exception er) 
      { 
       string msg = er.Message; 
      } 
      return l; 
     } 

    private static string BuildQuery(int from) 
     { 
      int to = from + change - 1; 
      StringBuilder builder = new StringBuilder(); 
      builder.AppendLine(@"select * from"); 
      builder.AppendLine("("); 
      builder.AppendLine("select FIELD_1, FIELD_2, 
      FIELD_3, FIELD_4, FIELD_5, FIELD_6, 
      FIELD_7, "); 
      builder.Append(" row_number() over(order by FIELD_1) 
      rn"); 
      builder.AppendLine(" from tablename"); 
      builder.AppendLine(")"); 
      builder.AppendLine(string.Format("where rn between {0} and {1}", 
      from, to)); 
      builder.AppendLine("order by rn"); 
      return builder.ToString(); 
     } 

    public static void insert(List<Record> l) 
     { 
      try 
      { 
       foreach(Record r in l) 
        client.Index<Record>(r, "index", "type"); 
      } 
      catch (Exception er) 
      { 
       string msg = er.Message; 
      } 
     } 
+0

ait olsun değiştirin gelmez okumak için okuma ve
paralel yazmak Fakat maksimum boyutu kullanmak istiyorsanız bir BlockingCollection kullanabilirsiniz 'client.Index' client.IndexMany (..) ile birlikte ve toplu insert için en uygun yığın boyutunu bulmaya çalışın https://www.elastic.co/guide/en/elasticsearch/guide/current/bulk.html#_how_big_is_too_big – Rob

+1

* 4-5 gün sürdüm * .. koştunuz ve 4M satırlarını geçirmenin 4/5 günü gerçekten alıp almadığını görüyor musunuz? – Rahul

+2

'ROW_NUMBER()' işlevi performansı olumsuz etkileyecek ve binlerce kez çalıştırıyorsunuz. Zaten bir "OracleDataReader" kullanıyorsunuz - dört milyon satırın tümünü aynı anda makinenize çekmeyecek, temel olarak bunları bir veya birkaçında akışa tabi tutuyor. Bunun yerine tek bir sorguya sahip olmalısınız ve her bir 100 ya da 500 ya da 1000 değerindeki "Kayıt" nesnelerini oluştururken (örneğin her bir döngüyü artıran bir saymayı saklayın), bunları yapın (örneğin, sayın% 500 == 0 '). Bu, dakikalar veya saatler içinde değil, günler içinde yapılmalıdır. –

cevap

4

ROW_NUMBER() fonksiyon performansı olumsuz etkilemeye devam ediyor ve bunu binlerce kez yapıyoruz. Zaten bir OracleDataReader kullanıyorsunuz - dört milyon satırın tümünü makinenize bir kerede çekmeyecek, temel olarak bunları bir veya birkaçında akış halinde izliyor.

Bu, dakikalar veya saatler içinde değil, birkaç gün içinde yapılabilir olmalıdır - bir Sybase ile SQL sunucusu arasında milyonlarca kaydı benzer şekilde hareket ettiren birkaç işlemimiz vardır ve beş dakikadan daha az sürer.

İşte
OracleCommand cmd = new OracleCommand("SELECT ... FROM TableName", oracle_connection); 
int batchSize = 500;  
using (OracleDataReader reader = cmd.ExecuteReader()) 
{ 
    List<Record> l = new List<Record>(batchSize); 
    string[] str = new string[7]; 
    int currentRow = 0; 

    while (reader.Read()) 
    { 
     for (int i = 0; i < 7; i++) 
     { 
      str[i] = reader[i].ToString(); 
     } 

     l.Add(new Record(str[0], str[1], str[2], str[3], str[4], str[5], str[6])); 

     // Commit every time batchSize records have been read 
     if (++currentRow == batchSize) 
     { 
      Commit(l); 
      l.Clear(); 
      currentRow = 0; 
     } 
    } 

    // commit remaining records 
    Commit(l); 
} 

Commit gibi göründüğü aşağıda:

public void Commit(IEnumerable<Record> records) 
{ 
    // TODO: Use ES's BULK features, I don't know the exact syntax 

    client.IndexMany<Record>(records, "index", "type"); 
    // client.Bulk(b => b.IndexMany(records))... something like this 
} 
+0

Yardımlarınız için çok teşekkürler! Bahsettiğiniz gibi ES Toplu özelliklerini kullandım. Çok yüksek bir hızda kayıt eklemeye başladı ancak bir süre sonra Argüman boş bir hata veremiyordu. Soruyu buna göre düzenledim. –

+0

@AakritiMittal: Sorunuzun üzerine yazmamalısınız - yeni bir sorun yaşıyorsanız, lütfen yeni bir soru sorun (isterseniz bunu belirtin). Bu bir Soru-Cevap sitesidir ve sorunuzu değiştirerek diğer cevapları geçersiz kılarsınız. Değişikliklerinizi geri aldım. –

+0

Üzerine yazdığım için üzgünüm, ancak bu hatayı çözdüm.Ayrıca, uygulamanızın Sybase ve SQL sunucusu arasındaki milyonlarca kaydı 5 dakikadan daha kısa sürede nasıl gerçekleştirdiğini sormak isterim. Yeni uygulamam, 4 milyonu Oracle'dan Elasticsearch'e taşımak için 2 gün sürüyor, çünkü tabloda çok sayıda sütun var. –

3

Ama bir tane ekleme edilmektedir Sonunda 100
gruplar halinde ekleme değildir

Belki bu denemek bir zaman
(ve bu, eklemek için doğru kod bile olmayabilir)

Önümüzdeki toplu
Oku daima daha hızlı yazma daha (neredeyse) 'dir alırken insert Sadece lag sunuyoruz seferinde
az bir satır ise okumadak

Bütün bu girations hiçbir şey

OracleCommand cmd = new OracleCommand(BuildQuery(all), oracle_connection); 
OracleDataReader reader = cmd.ExecuteReader(); 
while (reader.Read()) 
{ 
    client.Index<Record>(new Record(reader.GetSting(0), 
         reader.GetSting(1), reader.GetSting(2), reader.GetSting(3),  
         reader.GetSting(4), reader.GetSting(5), reader.GetSting(6), 
         "index", "type"); 
} 
reader.Close(); 

sen çok ileride yazma

İlgili konular