2012-01-13 23 views
21

FTP aracılığıyla yarım milyon dosya oluşturmak ve yüklemek için bir C# programı yazıyorum. Makinenin 4 çekirdeği olduğundan ve dosya oluşturmanın çok uzun sürdüğü için 4 dosyayı paralel olarak işlemek istiyorum. Aşağıdaki Powershell örneğini C# olarak dönüştürmek mümkün mü? Veya C# (F # MailboxProcessor gibi) Aktör çerçevesi gibi daha iyi bir çerçeve var mı? Powershell exampleParalel iş parçacığı sayısını sınırla C#

$maxConcurrentJobs = 3; # Read the input and queue it up $jobInput = get-content .\input.txt $queue = [System.Collections.Queue]::Synchronized((New-Object System.Collections.Queue)) foreach($item in $jobInput) { $queue.Enqueue($item) } # Function that pops input off the queue and starts a job with it function RunJobFromQueue { if($queue.Count -gt 0) { $j = Start-Job -ScriptBlock {param($x); Get-WinEvent -LogName $x} -ArgumentList $queue.Dequeue() Register-ObjectEvent -InputObject $j -EventName StateChanged -Action { RunJobFromQueue; Unregister-Event $eventsubscriber.SourceIdentifier; Remove-Job $eventsubscriber.SourceIdentifier } | Out-Null } } # Start up to the max number of concurrent jobs # Each job will take care of running the rest for($i = 0; $i -lt $maxConcurrentJobs; $i++) { RunJobFromQueue } 

Güncelleme:
uzak FTP sunucusuna bağlantı yavaş olabilir bu yüzden FTP yükleme işlemini sınırlamak istiyor.

+0

Paralel görevlerin sayısını sınırlamak isterseniz, neden TPL'yi kullanmıyorsunuz? –

+1

İş parçacığı havuzu sizin için bunu işlemek için yeterince akıllı olmalıdır. Neden kendiniz yönetmeye çalışıyorsunuz? –

+3

[PLINQ] 'yu (http://msdn.microsoft.com/en-us/library/dd460688.aspx) kullanabilir ve [WithDegreeOfParallelism] (http://msdn.microsoft.com/en-us/library/ adresini) kullanabilirsiniz. dd383719.aspx) buna göre. –

cevap

5

Eğer bir Parallel Foreach for instance ya da Buradan yapabilirsiniz have a look to PLinq bir comparison between the two

kullanarak dosyaların yarısından milyon Throug yapabilirsiniz "paralel" yinelemeyi yineleme ediyoruz Supposing Parallel library

kullanabilirsiniz

+0

Lütfen -1 değerini doğrulayın. –

+0

Bu soru C# -4.0 ile etiketlenmiştir, hem uzantıları hem de .NET 4 kullanarak familar olduğu açıktır. Tek bir cümle sorusuna cevap vermez. –

+0

O C# 4.0 kullanıyor, ancak Paralel kütüphaneye aşina olduğu belli değil, ptherwise bir soru sormayacak. Ayrıca, cevabım, diğerinin az ya da çok aynı bilgilerini içerir. Lütfen -1'i doğrulayın. –

16

Görev Paralel Kitaplığı burada arkadaşınız. Kullanabileceğinizleri açıklayan this bağlantısına bakın. Temel olarak, çerçeve (4), esasen arka plandaki iplik havuzlanmış dişleri, çalışan makinedeki işlemcilerin sayısına göre optimize eder.

çizgisinde

Belki bir şey: Esasen

Parallel.Invoke(options, 
() => new WebClient().Upload("http://www.linqpad.net", "lp.html"), 
() => new WebClient().Upload("http://www.jaoo.dk", "jaoo.html")); 
2

yüklemek her dosya için bir Eylem veya Görev oluşturmak istiyorum gidiyoruz: sevdiği döngü şey Ardından

ParallelOptions options = new ParallelOptions(); 

options.MaxDegreeOfParallelism = 4; 

Onları bir Liste'ye koyun ve ardından bu listeyi paralel olarak işlenebilecek sayıyı sınırlandırarak işleyin.

My blog post hem Görevler hem de Eylemler ile bunun nasıl yapıldığını gösterir ve her ikisini de görebilmek için indirip çalıştırabileceğiniz bir örnek proje sağlar. Eylemler kullanarak Eylemler

Şunla

, yerleşik .Net Parallel.Invoke işlevini kullanabilirsiniz. Burada paralel olarak en fazla 4 iplik ile çalışmasını sınırlandırıyoruz.

var listOfActions = new List<Action>(); 
foreach (var file in files) 
{ 
    var localFile = file; 
    // Note that we create the Task here, but do not start it. 
    listOfTasks.Add(new Task(() => UploadFile(localFile))); 
} 

var options = new ParallelOptions {MaxDegreeOfParallelism = 4}; 
Parallel.Invoke(options, listOfActions.ToArray()); 

Bu seçenek olsa asenk desteklemez ve aşağıdaki Görev örneği kullanmak isteyebilirsiniz yüzden, sen FileUpload fonksiyonu olacaktır farzederek söylüyorum. Görevler ile Görevler

ile

yerleşik bir işlevi yoktur. Ancak, blogumda sağladığımı kullanabilirsiniz.Ayrıca

var listOfTasks = new List<Task>(); 
foreach (var file in files) 
{ 
    var localFile = file; 
    // Note that we create the Task here, but do not start it. 
    listOfTasks.Add(new Task(async() => await UploadFile(localFile))); 
} 
await Tasks.StartAndWaitAllThrottledAsync(listOfTasks, 4); 

bu çünkü:

/// <summary> 
    /// Starts the given tasks and waits for them to complete. This will run, at most, the specified number of tasks in parallel. 
    /// <para>NOTE: If one of the given tasks has already been started, an exception will be thrown.</para> 
    /// </summary> 
    /// <param name="tasksToRun">The tasks to run.</param> 
    /// <param name="maxTasksToRunInParallel">The maximum number of tasks to run in parallel.</param> 
    /// <param name="cancellationToken">The cancellation token.</param> 
    public static async Task StartAndWaitAllThrottledAsync(IEnumerable<Task> tasksToRun, int maxTasksToRunInParallel, CancellationToken cancellationToken = new CancellationToken()) 
    { 
     await StartAndWaitAllThrottledAsync(tasksToRun, maxTasksToRunInParallel, -1, cancellationToken); 
    } 

    /// <summary> 
    /// Starts the given tasks and waits for them to complete. This will run the specified number of tasks in parallel. 
    /// <para>NOTE: If a timeout is reached before the Task completes, another Task may be started, potentially running more than the specified maximum allowed.</para> 
    /// <para>NOTE: If one of the given tasks has already been started, an exception will be thrown.</para> 
    /// </summary> 
    /// <param name="tasksToRun">The tasks to run.</param> 
    /// <param name="maxTasksToRunInParallel">The maximum number of tasks to run in parallel.</param> 
    /// <param name="timeoutInMilliseconds">The maximum milliseconds we should allow the max tasks to run in parallel before allowing another task to start. Specify -1 to wait indefinitely.</param> 
    /// <param name="cancellationToken">The cancellation token.</param> 
    public static async Task StartAndWaitAllThrottledAsync(IEnumerable<Task> tasksToRun, int maxTasksToRunInParallel, int timeoutInMilliseconds, CancellationToken cancellationToken = new CancellationToken()) 
    { 
     // Convert to a list of tasks so that we don't enumerate over it multiple times needlessly. 
     var tasks = tasksToRun.ToList(); 

     using (var throttler = new SemaphoreSlim(maxTasksToRunInParallel)) 
     { 
      var postTaskTasks = new List<Task>(); 

      // Have each task notify the throttler when it completes so that it decrements the number of tasks currently running. 
      tasks.ForEach(t => postTaskTasks.Add(t.ContinueWith(tsk => throttler.Release()))); 

      // Start running each task. 
      foreach (var task in tasks) 
      { 
       // Increment the number of tasks currently running and wait if too many are running. 
       await throttler.WaitAsync(timeoutInMilliseconds, cancellationToken); 

       cancellationToken.ThrowIfCancellationRequested(); 
       task.Start(); 
      } 

      // Wait for all of the provided tasks to complete. 
      // We wait on the list of "post" tasks instead of the original tasks, otherwise there is a potential race condition where the throttler's using block is exited before some Tasks have had their "post" action completed, which references the throttler, resulting in an exception due to accessing a disposed object. 
      await Task.WhenAll(postTaskTasks.ToArray()); 
     } 
    } 

Sonra Görevler listenizi oluştururken ve bir seferde 4 eş zamanlı olarak maksimum demek olan, onları çalıştırmak zorunda işlevini çağırarak, bunu yapabilirdi yöntem, uyumsuzluğu destekler, Parallel.Invoke veya Parallel.ForEach kullanır gibi UI iş parçacığı engellemez.

0

Aşağıda, BlockingCollection'ı iş parçacığı sayımı yöneticisi olarak kullandığım teknik kodlanmış. İşi uygulamak ve ele almak oldukça basittir. Basitçe Görev nesnelerini kabul eder ve listeyi engellemek için bir tamsayı değeri ekler, çalışan iş parçacığı sayısını 1 artırır. İş parçacığı bittiğinde, nesneyi düşürür ve yaklaşan görevler için ekleme işlemi sırasında bloğu serbest bırakır.

 public class BlockingTaskQueue 
     { 
      private BlockingCollection<int> threadManager { get; set; } = null; 
      public bool IsWorking 
      { 
       get 
       { 
        return threadManager.Count > 0 ? true : false; 
       } 
      } 

      public BlockingTaskQueue(int maxThread) 
      { 
       threadManager = new BlockingCollection<int>(maxThread); 
      } 

      public async Task AddTask(Task task) 
      { 
       Task.Run(() => 
       { 
        Run(task); 
       }); 
      } 

      private bool Run(Task task) 
      { 
       try 
       { 
        threadManager.Add(1); 
        task.Start(); 
        task.Wait(); 
        return true; 

       } 
       catch (Exception ex) 
       { 
        return false; 
       } 
       finally 
       { 
        threadManager.Take(); 
       } 

      } 

     } 
İlgili konular