2016-04-07 16 views
0

RabbitMQ kullanmaya başladım ve öğreticilerle birlikte takip ettikten sonra, şimdi ihtiyacım olan şekilde çalışmaya çalışıyorum ve zorlukla karşılaşıyorum. Sahip olduğum kurulum, önce bir RPC yapabilmem ve daha sonra, müşterinin bir iş kuyruğuna başka bir mesaj yollayacağı (ya da vermeyecek) tepkisine dayanmak zorunda olduğumdur. müşteri). Ne yazık ki, bu çalışmayı bir araya getirme çabalarım istediğim gibi yürümüyor gibi görünmüyor. Sunucu tarafında, (Ben hepsi aynı sorunları olan birçok varyasyonu denedim) böyle bir şey var:Birlikte çalışmak için iş kuyruğu ve RPC alma

var factory = new ConnectionFactory() { HostName = "localhost" }; 
connection = factory.CreateConnection(); 
channel = connection.CreateModel(); 
channel.ExchangeDeclare(exchange: "jobs", type: "direct", durable: true); 

// I started with a named queue, not sure if that's better or worse for this 
var queueName = channel.QueueDeclare().QueueName; 

channel.QueueBind(queue: queueName, 
    exchange: "jobs", 
    routingKey: "saveJob_queue"); 

channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false); 

var consumer = new EventingBasicConsumer(channel); 
consumer.Received += (model, ea) => 
{ 
    // save stuff that was sent with the saveJob_queue routingKey 
} 

channel.BasicConsume(queue: queueName, 
    noAck: false, 
    consumer: consumer); 

// set up channel for RPC 
// Not sure if this has to have another channel, but it wasn't working on the same channel either 
rpcChannel = connection.CreateModel(); 
var rpcQueueName = rpcChannel.QueueDeclare().QueueName; 

rpcChannel.QueueBind(queue: rpcQueueName, 
    exchange: "jobs", 
    routingKey: "rpc_CheckJob_queue"); 

var rpcConsumer = new EventingBasicConsumer(rpcChannel); 

rpcConsumer.Received += (model, ea) => 
{ 
    // do my remote call and send back a response 
} 

Ben sorun olduğunu hala yönlendirme tuşu rpc_CheckJob_queue ile jobs alışverişi gönderilen bir mesaj ilk kanalda Recieved olayını tetikleyerek, sadece saveJob_queue rotalarını almasına rağmen sona erer. Bu işleyicide ea.RoutingKey'u kontrol edebilir ve sadece bu mesajları görmezden gelebilirim, ama neden ilk etapta niçin orada olduklarını anlamıyorum.

Bağlantı kurmak için doğru yol ne olurdu, böylece hem iş kuyruğu iletileri hem de RPC iletileri alabilir ve bunları doğru şekilde işleyebilir?

+0

"RpcChannel .BasicConsume (rpcQueueName: queueName,'?) Gibi bir şeyi kaçırıyor musunuz? Ayrıca, – cantSleepNow

cevap

0

Bu yüzden bundan vazgeçtim ve Received olayına filtre koymaya karar verdim. Sorun şu ki, RabbitMQ sadece kanalı kanalında bir Received olayına sahip, ancak kuyruk'da değil. Yani Received olayı iki şekilde çarpıyor. Bu yüzden şu an var:

channel.QueueDeclare(queue: queueName, 
     durable: true, 
     exclusive: false, 
     autoDelete: false, 
     arguments: null); 

channel.QueueDeclare(queue: rpcQueueName, 
     durable: false, 
     exclusive: false, 
     autoDelete: false, 
     arguments: null); 

channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false); 

var consumer = new EventingBasicConsumer(channel); 
consumer.Received += (model, ea) => 
{ 
    switch (ea.RoutingKey) 
    { 
     case queueName: 
      SaveJob(ea); 
      break; 
     case rpcQueueName: 
      CheckJob(ea); 
      break; 
    } 
    channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); 
}; 

channel.BasicConsume(queue: queueName, 
    noAck: false, 
    consumer: consumer); 

channel.BasicConsume(queue: rpcQueueName, 
        noAck: false, 
        consumer: consumer); 

Daha iyi önerilere açığım çünkü bu durum biraz kapalı.

Yani gönderme adildir:

düzenli çalışma işi için
var properties = channel.CreateBasicProperties(); 
properties.Persistent = true; 

channel.BasicPublish(exchange: "", 
        routingKey: queueName, 
        basicProperties: properties, 
        body: body); 

ve: RPC için

var corrId = Guid.NewGuid().ToString(); 
var props = channel.CreateBasicProperties(); 
props.ReplyTo = replyQueueName; 
props.CorrelationId = corrId; 

var messageBytes = Encoding.UTF8.GetBytes(msg); 
channel.BasicPublish(exchange: "", 
        routingKey: rpcQueueName, 
        basicProperties: props, 
        body: messageBytes); 

while (true) 
{ 
    var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue(); 
    if (ea.BasicProperties.CorrelationId == corrId) 
    { 
     return ea.Body != null && ea.Body.Any() ? BitConverter.ToInt32(ea.Body,0) : (int?)null; 
    } 
} 

.

0

Kuyruğunuz için bir ad belirtmediğiniz için aynı kuyruğu iki kez aldığınızdan şüpheleniyorum. Yani bence neler oluyor aslında bu.

işler -> saveJob_queue -> SomeSystemQueue
işler -> rpc_CheckJob_queue -> SomeSystemQueue

iki ayrı kuyruk isimleri toplama ve yeniden kodunuzu çalıştırmayı deneyin. Bunun yerine bu :

var queueName = channel.QueueDeclare().QueueName; 

channel.QueueBind(queue: queueName, 
    exchange: "jobs", 
    routingKey: "saveJob_queue"); 

var:

var name = "Queue A"; 
channel.QueueDeclare(name); 
channel.QueueBind(queue: queueName, 
     exchange: "jobs", 
     routingKey: "saveJob_queue"); 

Sonra başka ikinci kuyruk bir isim ve bu deneyin.

+0

mesajlarını göndermek için kodunuzu gönderebilirseniz, orijinal olarak (farklı bir şekilde) adında iki sıraya sahiptim ama aynıydı –

+0

Tamam, buna tamamen geri dönmek isteyebilirsiniz: Gönderme mantığınızı da gönderebilir misiniz? Böylece herkesin hangi yönlendirme anahtarını görebileceğinizi vb. –