2014-04-16 23 views
6

Azure Blob Storage'e yüklemem gereken birkaç yüz dosyam var.
Paralel görev kitaplığını kullanmak istiyorum.
Ancak, tüm 100 iş parçacığını dosya listesinde bir foreach'a yüklemek yerine, kullanabileceği maksimum iş parçacığı sayısına nasıl bir sınır koyabilir ve işi paralel olarak bitirebilirim. veya işleri otomatik olarak dengeliyor mu?Görev Paralel Kitaplığı'ndaki Threads sayısını sınırla

+1

Bunun için tüm konuları. Doğal olarak senkronize olmayan bir 'Görev' tabanlı API var: [CloudBlockBlob.UploadFromFileAsync] (http://msdn.microsoft.com/en-us/library/dn451828.aspx). VS2010 ile sınırlı ve 'async/await' kullanamazsınız (bu yüzden soruyu" C# 4.0 "ile etiketlediniz)? – Noseratio

+0

Eğer doğru hatırlıyorsam, kullanılabilir çekirdek olarak çok sayıda iş parçacığı kullanacaktır. Yine de nerede okuduğumu hatırlayamıyorum. MS blogu ya da merak ettiğim zaman SO üzerine bir cevap olabilir. Paralel'i kullanarak 100 inçlik bir liste ile test uygulamasında deneyebilirsiniz. – Dbl

+1

@Noseratio VS2010 ile sınırlı değil .. C# 5.0'yi de kullanabilirim .. etiket olarak ekleyeyim .. – Seenu

cevap

9

Bunun için iş parçacığı kullanmamalısınız. Bunun için doğal olarak eşzamansız olan Task tabanlı bir API vardır: CloudBlockBlob.UploadFromFileAsync. Paralel yüklemelerin sayısını azaltmak için async/await ve SemaphoreSlim ile kullanın.

Örnek (denenmemiş):

Esasen
const MAX_PARALLEL_UPLOADS = 5; 

async Task UploadFiles() 
{ 
    var files = new List<string>(); 
    // ... add files to the list 

    // init the blob block and 
    // upload files asynchronously 
    using (var blobBlock = new CloudBlockBlob(url, credentials)) 
    using (var semaphore = new SemaphoreSlim(MAX_PARALLEL_UPLOADS)) 
    { 
     var tasks = files.Select(async(filename) => 
     { 
      await semaphore.WaitAsync(); 
      try 
      { 
       await blobBlock.UploadFromFileAsync(filename, FileMode.Create); 
      } 
      finally 
      { 
       semaphore.Release(); 
      } 
     }).ToArray(); 

     await Task.WhenAll(tasks); 
    } 
} 
2

MaxDegreeOfParallelism'i kullanmayı denediniz mi? Şunun gibi:

System.Threading.Tasks.Parallel.Invoke(
new Tasks.ParallelOptions {MaxDegreeOfParallelism = 5 }, actionsArray) 
0

Bu çalıştırarak öğrenebilirsiniz:

class Program 
{ 
    static void Main(string[] args) 
    { 
     var list = new List<int>(); 

     for (int i = 0; i < 100; i++) 
     { 
      list.Add(i); 
     } 

     var runningIndex = 0; 

     Task.Factory.StartNew(() => Action(ref runningIndex)); 

     Parallel.ForEach(list, i => 
     { 
      runningIndex ++; 
      Console.WriteLine(i); 
      Thread.Sleep(3000); 
     }); 

     Console.ReadKey(); 
    } 

    private static void Action(ref int number) 
    { 
     while (true) 
     { 
      Console.WriteLine("worked through {0}", number); 
      Thread.Sleep(2900); 
     } 
    } 
} 

Eğer paralellik sayısı başlangıcında küçüktür görebileceğiniz gibi, büyür ve sonuna doğru giderek küçülür. Yani kesinlikle bir çeşit otomatik optimizasyon var.

0

sen sayısının sınırlandırılması, o listeyi, her dosya yüklemek için bir Eylem veya Görev oluşturmak bir List koyun ve sonra işlemek istiyorum gidiyoruz Paralel işlenebilir.

My blog post hem Görevler hem de Eylemler ile bunun nasıl yapıldığını gösterir ve her ikisini de görebilmek için indirip çalıştırabileceğiniz bir örnek proje sağlar. Eylemler kullanarak Eylemler

Şunla

, yerleşik .Net Parallel.Invoke işlevini kullanabilirsiniz. Burada paralel olarak en fazla 5 dişte çalışmasını sınırlıyoruz.

var listOfActions = new List<Action>(); 
foreach (var file in files) 
{ 
    var localFile = file; 
    // Note that we create the Task here, but do not start it. 
    listOfTasks.Add(new Task(() => blobBlock.UploadFromFileAsync(localFile, FileMode.Create))); 
} 

var options = new ParallelOptions {MaxDegreeOfParallelism = 5}; 
Parallel.Invoke(options, listOfActions.ToArray()); 

Bu seçenek olsa UploadFromFileAsync eşzamansız doğanın kullanımı yapmaz, bu yüzden aşağıda Görev örneği kullanmak isteyebilirsiniz. Görevler ile Görevler

ile

yerleşik bir işlevi yoktur. Ancak, blogumda sağladığımı kullanabilirsiniz.

/// <summary> 
    /// Starts the given tasks and waits for them to complete. This will run, at most, the specified number of tasks in parallel. 
    /// <para>NOTE: If one of the given tasks has already been started, an exception will be thrown.</para> 
    /// </summary> 
    /// <param name="tasksToRun">The tasks to run.</param> 
    /// <param name="maxTasksToRunInParallel">The maximum number of tasks to run in parallel.</param> 
    /// <param name="cancellationToken">The cancellation token.</param> 
    public static async Task StartAndWaitAllThrottledAsync(IEnumerable<Task> tasksToRun, int maxTasksToRunInParallel, CancellationToken cancellationToken = new CancellationToken()) 
    { 
     await StartAndWaitAllThrottledAsync(tasksToRun, maxTasksToRunInParallel, -1, cancellationToken); 
    } 

    /// <summary> 
    /// Starts the given tasks and waits for them to complete. This will run the specified number of tasks in parallel. 
    /// <para>NOTE: If a timeout is reached before the Task completes, another Task may be started, potentially running more than the specified maximum allowed.</para> 
    /// <para>NOTE: If one of the given tasks has already been started, an exception will be thrown.</para> 
    /// </summary> 
    /// <param name="tasksToRun">The tasks to run.</param> 
    /// <param name="maxTasksToRunInParallel">The maximum number of tasks to run in parallel.</param> 
    /// <param name="timeoutInMilliseconds">The maximum milliseconds we should allow the max tasks to run in parallel before allowing another task to start. Specify -1 to wait indefinitely.</param> 
    /// <param name="cancellationToken">The cancellation token.</param> 
    public static async Task StartAndWaitAllThrottledAsync(IEnumerable<Task> tasksToRun, int maxTasksToRunInParallel, int timeoutInMilliseconds, CancellationToken cancellationToken = new CancellationToken()) 
    { 
     // Convert to a list of tasks so that we don't enumerate over it multiple times needlessly. 
     var tasks = tasksToRun.ToList(); 

     using (var throttler = new SemaphoreSlim(maxTasksToRunInParallel)) 
     { 
      var postTaskTasks = new List<Task>(); 

      // Have each task notify the throttler when it completes so that it decrements the number of tasks currently running. 
      tasks.ForEach(t => postTaskTasks.Add(t.ContinueWith(tsk => throttler.Release()))); 

      // Start running each task. 
      foreach (var task in tasks) 
      { 
       // Increment the number of tasks currently running and wait if too many are running. 
       await throttler.WaitAsync(timeoutInMilliseconds, cancellationToken); 

       cancellationToken.ThrowIfCancellationRequested(); 
       task.Start(); 
      } 

      // Wait for all of the provided tasks to complete. 
      // We wait on the list of "post" tasks instead of the original tasks, otherwise there is a potential race condition where the throttler's using block is exited before some Tasks have had their "post" action completed, which references the throttler, resulting in an exception due to accessing a disposed object. 
      await Task.WhenAll(postTaskTasks.ToArray()); 
     } 
    } 

Sonra Görevler listenizi oluştururken ve bir seferde 5 eş zamanlı olarak maksimum demek olan, onları çalıştırmak zorunda işlevini çağırarak, bunu yapabilirdi: kullanarak olmamalıdır

var listOfTasks = new List<Task>(); 
foreach (var file in files) 
{ 
    var localFile = file; 
    // Note that we create the Task here, but do not start it. 
    listOfTasks.Add(new Task(async() => await blobBlock.UploadFromFileAsync(localFile, FileMode.Create))); 
} 
await Tasks.StartAndWaitAllThrottledAsync(listOfTasks, 5); 

İlgili konular