2012-08-28 20 views
6

izleme iplik ve boost::asio::io_service::run() çağırarak bazı sayı boost::thread nesneler. Bununla birlikte, benim verdiğim bir şart, tüm konuları "sağlık" için izlemenin bir yolu olmalı. Amacım, iş parçacığı havuzundan geçirilebilen basit bir sentinel nesnesi yapmaktır - eğer bunu gerçekleştirirse, iş parçacığının hala iş işlediğini varsayabiliriz.boost :: asio, iplik havuzları ve ben <code>boost::asio</code> kullanarak bir iş parçacığı havuzu uyguladık

Ancak, benim uygulama göz önüne alındığında, ben (varsa) ben güvenilir havuzda bütün konuları izleyebilirsiniz emin değilim. Sadece bu kadar aslında nöbetçi almak ve iş yapacak iş parçacığı garanti etmeyeceğini io_service örneğine bir gözcü nesnesi gönderme, boost::asio::io_service::run() için iplik işlevini delege ettik.

Bir seçenek sadece periyodik nöbetçi eklemek olacak ve bir kısım zaman makul miktarda en az bir kez her iş parçacığı tarafından alınan, ama bu besbelli ideal değildir alır umut olabilir.

aşağıdaki örneğini ele alalım. Diğerleri neredeyse olacak ise işleyici kodlanmış bu şekilde nedeniyle, bu durumda her iplik çalışması aynı miktarda yapacak görebilirsiniz, ama gerçekte ben işleyici uygulamasının kontrole sahip olmayacak, bazı uzun çalışan olabilir derhal.

#include <iostream> 
#include <boost/asio.hpp> 
#include <vector> 
#include <boost/thread.hpp> 
#include <boost/bind.hpp> 

void handler() 
{ 
    std::cout << boost::this_thread::get_id() << "\n"; 
    boost::this_thread::sleep(boost::posix_time::milliseconds(100)); 
} 

int main(int argc, char **argv) 
{ 
    boost::asio::io_service svc(3); 

    std::unique_ptr<boost::asio::io_service::work> work(new boost::asio::io_service::work(svc)); 

    boost::thread one(boost::bind(&boost::asio::io_service::run, &svc)); 
    boost::thread two(boost::bind(&boost::asio::io_service::run, &svc)); 
    boost::thread three(boost::bind(&boost::asio::io_service::run, &svc)); 

    svc.post(handler); 
    svc.post(handler); 
    svc.post(handler); 
    svc.post(handler); 
    svc.post(handler); 
    svc.post(handler); 
    svc.post(handler); 
    svc.post(handler); 
    svc.post(handler); 
    svc.post(handler); 

    work.reset(); 

    three.join(); 
    two.join(); 
    one.join(); 

    return 0; 
} 

cevap

2

ben sırt havuzu nesneleri uygulanmasını kendi gerçeğine dayanır kullanılan çözüm:

Yapabileceğiniz senin parçacığı havuzunda bir görevi göndermek için. İstatistikleri güncelleyen ve iş parçacığı havuzuna gönderilen kullanıcı tanımlı işleyicileri kopyalayan bir sarmalayıcı türü oluşturdum. Yalnızca bu sarmalayıcı türü, temeldeki io_service'a gönderilir. Bu yöntem, kullanıcı kodu içine müdahaleci olmaksızın, gönderilen/yürütülen işleyicilerin kaydını tutmamı sağlar.

İşte tam olmayan bir ve basitleştirilmiş örnek:

#include <iostream> 
#include <memory> 
#include <vector> 
#include <boost/thread.hpp> 
#include <boost/asio.hpp> 

// Supports scheduling anonymous jobs that are 
// executable as returning nothing and taking 
// no arguments 
typedef std::function<void(void)> functor_type; 

// some way to store per-thread statistics 
typedef std::map<boost::thread::id, int> thread_jobcount_map; 

// only this type is actually posted to 
// the asio proactor, this delegates to 
// the user functor in operator() 
struct handler_wrapper 
{ 
    handler_wrapper(const functor_type& user_functor, thread_jobcount_map& statistics) 
     : user_functor_(user_functor) 
     , statistics_(statistics) 
    { 
    } 

    void operator()() 
    { 
     user_functor_(); 

     // just for illustration purposes, assume a long running job 
     boost::this_thread::sleep(boost::posix_time::milliseconds(100)); 

     // increment executed jobs 
     ++statistics_[boost::this_thread::get_id()]; 
    } 

    functor_type   user_functor_; 
    thread_jobcount_map& statistics_; 
}; 

// anonymous thread function, just runs the proactor 
void thread_func(boost::asio::io_service& proactor) 
{ 
    proactor.run(); 
} 

class ThreadPool 
{ 
public: 
    ThreadPool(size_t thread_count) 
    { 
     threads_.reserve(thread_count); 

     work_.reset(new boost::asio::io_service::work(proactor_)); 

     for(size_t curr = 0; curr < thread_count; ++curr) 
     { 
     boost::thread th(thread_func, boost::ref(proactor_)); 

     // inserting into this map before any work can be scheduled 
     // on it, means that we don't have to look it for lookups 
     // since we don't dynamically add threads 
     thread_jobcount_.insert(std::make_pair(th.get_id(), 0)); 

     threads_.emplace_back(std::move(th)); 
     } 
    } 

    // the only way for a user to get work into 
    // the pool is to use this function, which ensures 
    // that the handler_wrapper type is used 
    void schedule(const functor_type& user_functor) 
    { 
     handler_wrapper to_execute(user_functor, thread_jobcount_); 
     proactor_.post(to_execute); 
    } 

    void join() 
    { 
     // join all threads in pool: 
     work_.reset(); 
     proactor_.stop(); 

     std::for_each(
     threads_.begin(), 
     threads_.end(), 
     [] (boost::thread& t) 
     { 
     t.join(); 
     }); 
    } 

    // just an example showing statistics 
    void log() 
    { 
     std::for_each(
     thread_jobcount_.begin(), 
     thread_jobcount_.end(), 
     [] (const thread_jobcount_map::value_type& it) 
     { 
     std::cout << "Thread: " << it.first << " executed " << it.second << " jobs\n"; 
     }); 
    } 

private: 
    std::vector<boost::thread> threads_; 
    std::unique_ptr<boost::asio::io_service::work> work_; 
    boost::asio::io_service proactor_; 
    thread_jobcount_map  thread_jobcount_; 
}; 

struct add 
{ 
    add(int lhs, int rhs, int* result) 
     : lhs_(lhs) 
     , rhs_(rhs) 
     , result_(result) 
    { 
    } 

    void operator()() 
    { 
     *result_ = lhs_ + rhs_; 
    } 

    int lhs_,rhs_; 
    int* result_; 
}; 

int main(int argc, char **argv) 
{ 
    // some "state objects" that are 
    // manipulated by the user functors 
    int x = 0, y = 0, z = 0; 

    // pool of three threads 
    ThreadPool pool(3); 

    // schedule some handlers to do some work 
    pool.schedule(add(5, 4, &x)); 
    pool.schedule(add(2, 2, &y)); 
    pool.schedule(add(7, 8, &z)); 

    // give all the handlers time to execute 
    boost::this_thread::sleep(boost::posix_time::milliseconds(1000)); 

    std::cout 
     << "x = " << x << "\n" 
     << "y = " << y << "\n" 
     << "z = " << z << "\n"; 

    pool.join(); 

    pool.log(); 
} 

Çıktı:

x = 9 
y = 4 
z = 15 
Thread: 0000000000B25430 executed 1 jobs 
Thread: 0000000000B274F0 executed 1 jobs 
Thread: 0000000000B27990 executed 1 jobs 
+0

Kodu, cevabınıza @Chad ekleyebilir misiniz? –

+0

Bitti. Herhangi bir geri bildirim için mutlu. – Chad

6

Tüm iş parçacıkları ve her evreye özel io_service örneği arasında ortak io_service örneğini kullanabilirsiniz. Her iplik böyle bir yöntemi çalıştırır:

void Mythread::threadLoop() 
{ 
    while(/* termination condition */) 
    { 
     commonIoService.run_one(); 
     privateIoService.run_one(); 

     commonConditionVariable.timed_wait(time); 
    } 
} 

bu arada, bazı görev dizisindeki bir şekilde gerçekleştirilmesini temin etmek istiyorsanız, sadece onun sahip olduğu io_service bu görevi sonrası gerekir.

void MyThreadPool::post(Hander handler) 
{ 
    commonIoService.post(handler); 
    commonConditionVariable.notify_all(); 
} 
+0

İlginç bir yaklaşım, ama biraz daha yalındır şey arıyorum. Önümüzdeki birkaç gün içinde başka bir şey gelmezse, bu cevabı kabul edebilirim. – Chad

+0

Bence boot asio kullanan daha basit bir çözüm yok. Birkaç kodla böyle bir çözüm geliştirdim ve işe yarıyor. –

İlgili konular