Kafka 0.8.2.1 SimpleConsumer kullanıyorum. Birisi SimpleConsumer ve FetchRequestBuilder için birkaç config parametresinin anlamını açıklayabilir mi? KAfka'nın kaynak kodunu okumadan kısa bir süre sonra herhangi bir doküman bulamadım. Bunun anlamı ne - Ben Int 'soTimeout' parametresini bakınız SimpleConsumer yapıcısının imzasında: -kafka: SimpleConsumer için 'soTimeout', 'bufferSize' ve 'minBytes' ne demek?
S1: - (ancak hiç şans ben kafka kullanıcı grubuna bu soruyu gönderme çalıştı) zaman aşımı? Bu Kafka brokerına bağlanmak için bir zaman aşımı mı? Herhangi bir [veya özel ??] isteğinden Kafka'ya (FetchRequest gibi) yanıt almak için zaman aşımı? Başka bir şey?
kafka.javaapi.consumer.SimpleConsumer
(val host: String,
val port: Int,
val soTimeout: Int,
val bufferSize: Int,
val clientId: String)
- S2: aynı zamanda, SimpleConsumer yapıcı Int 'BufferSize' parametresini alır. Bunun anlamı nedir? Bir fetchRequest yayımlandığında bu kaç bayt SimpleConsumer okuyacak mı? Ya da Kafka'dan bir getirme başına okunan maksimum bayt sayısı - ve daha fazla veri mevcutsa birden fazla getirme gerçekleşecek mi?
- FetchRequestBuilder (aşağıya bakınız) aracılığıyla FetchRequest oluştururken, ben de 'fetchSize' belirtmek gerekir:
FetchRequest req= newFetchRequestBuilder()
.clientId(kafkaGroupId)
.addFetch(topic, partition, offset, fetchSizeInBytes)
.build();
FetchRequestBuilder kaynak koduna baktığımızda, ben değilim (düşünmek bir Scala pro) bu çağrıları aşağıdaki yöntem çağrılarına çevirir - ve burada FetchRequest'e iletilen son parametre '
minBytes' olarak adlandırılır, bu muhtemelen tam getirme boyutu değil, ima eder? . En azından 'minBytes' verisi mevcut olmadığı sürece herhangi bir şey almayacağı anlamına mı geliyor?
class FetchRequestBuilder():
def addFetch(topic: String, partition: Int, offset: Long, fetchSize: Int)
def build() = {
val fetchRequest= FetchRequest(versionId, correlationId.getAndIncrement, clientId, replicaId, maxWait, minBytes, requestMap.toMap)
FetchRequest(versionId: Short = FetchRequest.CurrentVersion,
correlationId: Int = FetchRequest.DefaultCorrelationId,
clientId: String = ConsumerConfig.DefaultClientId,
replicaId: Int = Request.OrdinaryConsumerId,
maxWait: Int = FetchRequest.DefaultMaxWait,
**minBytes: Int = FetchRequest.DefaultMinBytes**,
...)
Yani, benim son soru:
- Q3: nasıl 'BufferSize' ve 'fetchSize/minBytes' ilişkilendirilir? Tam olarak ne tanımlar? 'u yapmak zorunda mıyım? Birinin diğerinden daha küçük mi yoksa daha mı?
sayesinde
Marina
Teşekkürler, Chris. Soketin bufferSize değerinden daha fazla veri olduğunda birden fazla düşük seviyeli getirmenin olacağı anlamına gelir. Yine de 'fetchSize/minBytes' öğelerinin ne anlama geldiğini açık değilim. Örneğin, minBytes = 200 ise, ancak tüketim için 100 baytlık mesaj var - tüketici en az 200 bayt olana kadar bekleyecek mi?Ya da bir düşük düzey getirme çağrısı Kafka gelen veri alma önerilen "yığın" boyutudur? Teşekkürler!! – Marina
Kullanılabilir mesaj yoksa, getirme sırasında bir istisna atılır (bu bekleme süresi için belirli bir zaman aşımı değeri vardır). Aksi halde, getirme, sayısı fetchSize tarafından işaret edilen bazı mesajlar alır. İdeal olarak, döndürülen mesajlar getirme boyutundan daha büyük bir toplam boyuta sahip olacaktır, ancak yeterli mesaj yoksa, o zaman verilen zaman periyodundan sonra bu sayı döndürülür. –
harika, teşekkürler! – Marina