2015-06-09 14 views
5

Kafka 0.8.2.1 SimpleConsumer kullanıyorum. Birisi SimpleConsumer ve FetchRequestBuilder için birkaç config parametresinin anlamını açıklayabilir mi? KAfka'nın kaynak kodunu okumadan kısa bir süre sonra herhangi bir doküman bulamadım. Bunun anlamı ne - Ben Int 'soTimeout' parametresini bakınız SimpleConsumer yapıcısının imzasında: -kafka: SimpleConsumer için 'soTimeout', 'bufferSize' ve 'minBytes' ne demek?

S1: - (ancak hiç şans ben kafka kullanıcı grubuna bu soruyu gönderme çalıştı) zaman aşımı? Bu Kafka brokerına bağlanmak için bir zaman aşımı mı? Herhangi bir [veya özel ??] isteğinden Kafka'ya (FetchRequest gibi) yanıt almak için zaman aşımı? Başka bir şey?

kafka.javaapi.consumer.SimpleConsumer 
    (val host: String, 
    val port: Int, 
    val soTimeout: Int, 
    val bufferSize: Int, 
    val clientId: String) 

- S2: aynı zamanda, SimpleConsumer yapıcı Int 'BufferSize' parametresini alır. Bunun anlamı nedir? Bir fetchRequest yayımlandığında bu kaç bayt SimpleConsumer okuyacak mı? Ya da Kafka'dan bir getirme başına okunan maksimum bayt sayısı - ve daha fazla veri mevcutsa birden fazla getirme gerçekleşecek mi?

- FetchRequestBuilder (aşağıya bakınız) aracılığıyla FetchRequest oluştururken, ben de 'fetchSize' belirtmek gerekir:

FetchRequest req= newFetchRequestBuilder() 
    .clientId(kafkaGroupId) 
    .addFetch(topic, partition, offset, fetchSizeInBytes) 
    .build(); 

FetchRequestBuilder kaynak koduna baktığımızda, ben değilim (düşünmek bir Scala pro) bu çağrıları aşağıdaki yöntem çağrılarına çevirir - ve burada FetchRequest'e iletilen son parametre ' minBytes' olarak adlandırılır, bu muhtemelen tam getirme boyutu değil, ima eder? . En azından 'minBytes' verisi mevcut olmadığı sürece herhangi bir şey almayacağı anlamına mı geliyor?

class FetchRequestBuilder(): 
    def addFetch(topic: String, partition: Int, offset: Long, fetchSize: Int) 

    def build() = { 
     val fetchRequest= FetchRequest(versionId, correlationId.getAndIncrement, clientId, replicaId, maxWait, minBytes, requestMap.toMap) 

FetchRequest(versionId: Short = FetchRequest.CurrentVersion, 
    correlationId: Int = FetchRequest.DefaultCorrelationId, 
    clientId: String = ConsumerConfig.DefaultClientId, 
    replicaId: Int = Request.OrdinaryConsumerId, 
    maxWait: Int = FetchRequest.DefaultMaxWait, 
    **minBytes: Int = FetchRequest.DefaultMinBytes**, 
...) 

Yani, benim son soru:

- Q3: nasıl 'BufferSize' ve 'fetchSize/minBytes' ilişkilendirilir? Tam olarak ne tanımlar? 'u yapmak zorunda mıyım? Birinin diğerinden daha küçük mi yoksa daha mı?

sayesinde

Marina

cevap

2

soTimeout verilen broker bir bağlantı için beklenecek milisaniye cinsinden süredir. Doğrulamadan başka bir özel durumun olup olmadığını bilerek, orada birtakım eylemler gerçekleştirmeye hazır olan bir komisyoncu var.

Kurucuda kullanılan bufferSize, aracı tarafından gönderilen verileri almak için istemci tarafındaki soket tarafından kullanılan arabelleğin boyutu olduğuna inanıyorum.

Son sorunuz için, bir getirme isteği ile herhangi bir sebepten ötürü toplam döndürülen bayt sayısı, istenen soket arabelleği boyutundan büyükse, tüm verileri almak için birden fazla alt düzeyli çağrı olması gerekir. Yine de daha yüksek seviyeli bir getirme çağrısı var.

+0

Teşekkürler, Chris. Soketin bufferSize değerinden daha fazla veri olduğunda birden fazla düşük seviyeli getirmenin olacağı anlamına gelir. Yine de 'fetchSize/minBytes' öğelerinin ne anlama geldiğini açık değilim. Örneğin, minBytes = 200 ise, ancak tüketim için 100 baytlık mesaj var - tüketici en az 200 bayt olana kadar bekleyecek mi?Ya da bir düşük düzey getirme çağrısı Kafka gelen veri alma önerilen "yığın" boyutudur? Teşekkürler!! – Marina

+0

Kullanılabilir mesaj yoksa, getirme sırasında bir istisna atılır (bu bekleme süresi için belirli bir zaman aşımı değeri vardır). Aksi halde, getirme, sayısı fetchSize tarafından işaret edilen bazı mesajlar alır. İdeal olarak, döndürülen mesajlar getirme boyutundan daha büyük bir toplam boyuta sahip olacaktır, ancak yeterli mesaj yoksa, o zaman verilen zaman periyodundan sonra bu sayı döndürülür. –

+0

harika, teşekkürler! – Marina