6

Bir konudan gelen PubSub iletileri verilerini Google Cloud Dataflow kullanarak BigQuery tablosuna eklemek istiyorum. Her şey harika çalışıyor, ancak BigQuery tablosunda "߈ " gibi okunamayan dizeleri görebiliyorum. Bu benim boru hattı:PubSub iletilerini Google Cloud Dataflow aracılığıyla BigQuery'ye ekleyin

p.apply(PubsubIO.Read.named("ReadFromPubsub").topic("projects/project-name/topics/topic-name")) 
.apply(ParDo.named("Transformation").of(new StringToRowConverter())) 
.apply(BigQueryIO.Write.named("Write into BigQuery").to("project-name:dataset-name.table") 
    .withSchema(schema) 
    .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)) 

ve benim basit StringToRowConverter fonksiyonudur:

class StringToRowConverter extends DoFn<String, TableRow> { 
private static final long serialVersionUID = 0; 

@Override 
public void processElement(ProcessContext c) { 
    for (String word : c.element().split(",")) { 
     if (!word.isEmpty()) { 
      System.out.println(word); 
     c.output(new TableRow().set("data", word)); 
     } 
    } 
} 
} 

Ve bu bir POST isteği aracılığıyla gönderilen mesajdır: ben eksik

POST https://pubsub.googleapis.com/v1/projects/project-name/topics/topic-name:publish 
{ 
"messages": [ 
    { 
    "attributes":{ 
"key": "tablet, smartphone, desktop", 
"value": "eng" 
    }, 
    "data": "34gf5ert" 
    } 
] 
} 

? Teşekkür ederiz!

+0

[This] (https://github.com/bomboradata/pubsub-to-bigquery) pub/sub yönünü BQ'ye yönlendirmek için kullanabileceğiniz açık bir kaynaktır. – PUG

cevap

6

https://cloud.google.com/pubsub/reference/rest/v1/PubsubMessage'a göre, pubsub iletisinin JSON yükü base64 kodludur. Dataflow'daki PubsubIO varsayılan olarak String UTF8 kodlayıcısını kullanır. "34gf5ert" sağladığınız örnek dizgisi, base64 kodu çözüldüğünde ve bir UTF-8 dizesi olarak yorumlandığında, tam olarak "߈ " değerini verir. Benim PubSub mesajlar açma am nasıl budur

2

:

@Override 
public void processElement(ProcessContext c) { 

    String json = c.element(); 

    HashMap<String,String> items = new Gson().fromJson(json, new TypeToken<HashMap<String, String>>(){}.getType()); 
    String unpacked = items.get("JsonKey"); 

size onun yararlıdır Umut.

İlgili konular