Kafka'dan 10 saniyelik aralıklarla kıvılcım akışı okuma olay verilerini okudum. Bu etkinlik verilerini bir postgres tablosundaki mevcut verilerle tamamlamak istiyorum. ...Birden çok kaynağı akıtma, veri alanını yeniden yükle
val sqlContext = new SQLContext(sc)
val data = sqlContext.load("jdbc", Map(
"url" -> url,
"dbtable" -> query))
val broadcasted = sc.broadcast(data.collect())
Ve sonra ben şöyle geçebilir:
Ben böyle bir şeyle postgres tablosunu yükleyebilirsiniz
val db = sc.parallelize(data.value)
val dataset = stream_data.transform{ rdd => rdd.leftOuterJoin(db)}
isterim Mevcut veri akışımı devam ettirmek ve bu tabloyu 6 saatte bir yeniden yüklemek. Apache şu anda kıvılcım oluşturduğundan çoklu çalışma ortamlarını desteklemiyor, bunu nasıl başarabilirim? Herhangi bir geçici çözüm var mı? Ya da veriyi yeniden yüklemek istediğimde sunucuyu yeniden başlatmam gerekecek mi? Bu böyle basit bir kullanım durumu ...:/
Ben de bunun için bir cevap arıyorum, herhangi bir başarınız var, @ user838681? –
Postgre tablosunu yeniden yüklediğinizde, geçmiş kafka etkinliklerini önemsiyor musunuz, yoksa sadece yeni kafka verisine, en son postgres'lerin yeniden yüklenme zamanından itibaren katılmaya mı çalışıyorsunuz? –
@HamelKothari Geçmişteki Kafka etkinliklerini güncellemeye veya yeniden işlemeye gerek yok. SQL tablosunu güncellediğimde, sadece Kafka'nın gelecekteki herhangi bir etkinliğinde kullanmak istiyorum. –