2015-11-17 15 views
9

Kafka konusunu kullanan kodda bazı JUnit testlerim var. Denediğim sahte kafka konuları çalışmıyor ve çevrimiçi olarak bulunan örnekler çok eski ve bu nedenle de 0.8.2.1 ile çalışmazlar. 0.8.2.1 kullanarak sahte bir kafka konusunu nasıl oluşturabilirim?Bir Mock Kafka Konusunu cinayet testleri için nasıl uygulayabilirim?

Netleştirmek için: Nesnenin mockito'da alay etmek yerine gerçek bir örnekle test etmek için konunun gerçek gömülü bir örneğini kullanmayı seçiyorum. Bu yüzden özel kodlayıcılarım ve kod çözücülerimin gerçekten çalıştığını test edebilirim ve gerçek bir kafka örneğini kullanmaya başladığımda başarısız olmaz.

cevap

6

https://gist.github.com/asmaier/6465468#file-kafkaproducertest-java

Bu örnek yeni 0.8.2.2 versiyonunda çalışıyor güncellendi. İşte maven bağımlılıkları ile kod snippit geçerli:

pom.xml:

<dependencies> 
<dependency> 
    <groupId>junit</groupId> 
    <artifactId>junit</artifactId> 
    <version>4.12</version> 
</dependency> 
<dependency> 
    <groupId>org.apache.kafka</groupId> 
    <artifactId>kafka_2.11</artifactId> 
    <version>0.8.2.2</version> 
</dependency> 
<dependency> 
    <groupId>org.apache.kafka</groupId> 
    <artifactId>kafka_2.11</artifactId> 
    <version>0.8.2.2</version> 
    <classifier>test</classifier> 
</dependency> 
<dependency> 
    <groupId>org.apache.kafka</groupId> 
    <artifactId>kafka-clients</artifactId> 
    <version>0.8.2.2</version> 
</dependency> 
</dependencies> 

KafkaProducerTest.java:

import java.nio.charset.StandardCharsets; 
import java.util.ArrayList; 
import java.util.HashMap; 
import java.util.List; 
import java.util.Map; 
import java.util.Properties; 
import org.I0Itec.zkclient.ZkClient; 
import org.junit.Test; 
import kafka.admin.TopicCommand; 
import kafka.consumer.ConsumerConfig; 
import kafka.consumer.ConsumerIterator; 
import kafka.consumer.KafkaStream; 
import kafka.javaapi.consumer.ConsumerConnector; 
import kafka.producer.KeyedMessage; 
import kafka.producer.Producer; 
import kafka.producer.ProducerConfig; 
import kafka.server.KafkaConfig; 
import kafka.server.KafkaServer; 
import kafka.utils.MockTime; 
import kafka.utils.TestUtils; 
import kafka.utils.TestZKUtils; 
import kafka.utils.Time; 
import kafka.utils.ZKStringSerializer$; 
import kafka.zk.EmbeddedZookeeper; 
import static org.junit.Assert.*; 

/** 
* For online documentation 
* see 
* https://github.com/apache/kafka/blob/0.8.2/core/src/test/scala/unit/kafka/utils/TestUtils.scala 
* https://github.com/apache/kafka/blob/0.8.2/core/src/main/scala/kafka/admin/TopicCommand.scala 
* https://github.com/apache/kafka/blob/0.8.2/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala 
*/ 
public class KafkaProducerTest { 

    private int brokerId = 0; 
    private String topic = "test"; 

    @Test 
    public void producerTest() throws InterruptedException { 

     // setup Zookeeper 
     String zkConnect = TestZKUtils.zookeeperConnect(); 
     EmbeddedZookeeper zkServer = new EmbeddedZookeeper(zkConnect); 
     ZkClient zkClient = new ZkClient(zkServer.connectString(), 30000, 30000, ZKStringSerializer$.MODULE$); 

     // setup Broker 
     int port = TestUtils.choosePort(); 
     Properties props = TestUtils.createBrokerConfig(brokerId, port, true); 

     KafkaConfig config = new KafkaConfig(props); 
     Time mock = new MockTime(); 
     KafkaServer kafkaServer = TestUtils.createServer(config, mock); 

     String [] arguments = new String[]{"--topic", topic, "--partitions", "1","--replication-factor", "1"}; 
     // create topic 
     TopicCommand.createTopic(zkClient, new TopicCommand.TopicCommandOptions(arguments)); 

     List<KafkaServer> servers = new ArrayList<KafkaServer>(); 
     servers.add(kafkaServer); 
     TestUtils.waitUntilMetadataIsPropagated(scala.collection.JavaConversions.asScalaBuffer(servers), topic, 0, 5000); 

     // setup producer 
     Properties properties = TestUtils.getProducerConfig("localhost:" + port); 
     ProducerConfig producerConfig = new ProducerConfig(properties); 
     Producer producer = new Producer(producerConfig); 

     // setup simple consumer 
     Properties consumerProperties = TestUtils.createConsumerProperties(zkServer.connectString(), "group0", "consumer0", -1); 
     ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector(new ConsumerConfig(consumerProperties)); 

     // send message 
     KeyedMessage<Integer, byte[]> data = new KeyedMessage(topic, "test-message".getBytes(StandardCharsets.UTF_8)); 

     List<KeyedMessage> messages = new ArrayList<KeyedMessage>(); 
     messages.add(data); 

     producer.send(scala.collection.JavaConversions.asScalaBuffer(messages)); 
     producer.close(); 

     // deleting zookeeper information to make sure the consumer starts from the beginning 
     // see https://stackoverflow.com/questions/14935755/how-to-get-data-from-old-offset-point-in-kafka 
     zkClient.delete("/consumers/group0"); 

     // starting consumer 
     Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); 
     topicCountMap.put(topic, 1); 
     Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap); 
     KafkaStream<byte[], byte[]> stream = consumerMap.get(topic).get(0); 
     ConsumerIterator<byte[], byte[]> iterator = stream.iterator(); 

     if(iterator.hasNext()) { 
      String msg = new String(iterator.next().message(), StandardCharsets.UTF_8); 
      System.out.println(msg); 
      assertEquals("test-message", msg); 
     } else { 
      fail(); 
     } 

     // cleanup 
     consumer.shutdown(); 
     kafkaServer.shutdown(); 
     zkClient.close(); 
     zkServer.shutdown(); 
    } 
} 

sizin mvn bağımlılığı mutlaka kontrol edin: çakışan kütüphaneler için ağaç.

<dependency> 
    <groupId>org.apache.kafka</groupId> 
    <artifactId>kafka_2.11</artifactId> 
    <version>0.8.2.2</version> 
    <exclusions> 
     <exclusion> 
      <groupId>org.slf4j</groupId> 
      <artifactId>slf4j-log4j12</artifactId> 
     </exclusion> 
     <exclusion> 
      <groupId>log4j</groupId> 
      <artifactId>log4j</artifactId> 
     </exclusion> 
    </exclusions> 
</dependency> 
<dependency> 
    <groupId>org.apache.kafka</groupId> 
    <artifactId>kafka_2.11</artifactId> 
    <version>0.8.2.2</version> 
    <classifier>test</classifier> 
    <exclusions> 
     <exclusion> 
      <groupId>org.slf4j</groupId> 
      <artifactId>slf4j-log4j12</artifactId> 
     </exclusion> 
     <exclusion> 
      <groupId>log4j</groupId> 
      <artifactId>log4j</artifactId> 
     </exclusion> 
    </exclusions> 
</dependency> 
<dependency> 
    <groupId>org.apache.kafka</groupId> 
    <artifactId>kafka-clients</artifactId> 
    <version>0.8.2.2</version> 
    <exclusions> 
     <exclusion> 
      <groupId>org.slf4j</groupId> 
      <artifactId>slf4j-log4j12</artifactId> 
     </exclusion> 
     <exclusion> 
      <groupId>log4j</groupId> 
      <artifactId>log4j</artifactId> 
     </exclusion> 
    </exclusions> 
</dependency> 

ben apache Küratörü kullandığını içine arıyorum Başka bir seçenek: Ben SLF ve log4j için dışlamaları eklemek zorunda Is it possible to start a zookeeper server instance in process, say for unit tests?

<dependency> 
    <groupId>org.apache.curator</groupId> 
    <artifactId>curator-test</artifactId> 
    <version>2.2.0-incubating</version> 
    <scope>test</scope> 
</dependency> 

TestingServer zkTestServer; 

@Before 
public void startZookeeper() throws Exception { 
    zkTestServer = new TestingServer(2181); 
    cli = CuratorFrameworkFactory.newClient(zkTestServer.getConnectString(), new RetryOneTime(2000)); 
} 

@After 
public void stopZookeeper() throws IOException { 
    cli.close(); 
    zkTestServer.stop(); 
} 
+0

, lütfen 0.11.0.2 sürümü için kod çalıştırabilirsiniz. Yukarıdaki kod çalışmıyor – dhroove

2

Mockito gibi alaycı bir çerçeve kullanarak kafka tüketici nesnelerini alay etmeye çalıştınız mı?

+0

Daha doğrusu bir sahte sürümünü olurdu kafka biliyorum bu yüzden üreticiler ve tüketiciler bununla çalışıyor. Burada ve çevrimiçi olarak bazı örnekler var (örneğin: https://ransilberman.wordpress.com/2013/07/19/how-to-unit-test-kafka). Ancak, daha eski sürümler içindir, bu nedenle artık 0.8.2.1 ile çalışmaz. – Chip

İlgili konular