2016-03-24 17 views
0

Ben özel WindowAssigher uyguladık:Özel için kaydetme noktaları nasıl uygulanır?

public class SessionWindowAssigner extends WindowAssigner<LogItem, SessionWindow> { 
    @Override 
    public Collection<SessionWindow> assignWindows(LogItem element, long timestamp) { 
     return Collections.singletonList(new SessionWindow(element.getSessionUid())); 
    } 

    @Override 
    public Trigger<LogItem, SessionWindow> getDefaultTrigger(StreamExecutionEnvironment env) { 
     return new SessionTrigger(60_000L); 
    } 

    @Override 
    public TypeSerializer<SessionWindow> getWindowSerializer(ExecutionConfig executionConfig) { 
     return new SessionWindow.Serializer(); 
    } 
} 

, Window:

public class SessionWindow extends Window { 
    private final String sessionUid; 

    public SessionWindow(String sessionUid) { 
     this.sessionUid = sessionUid; 
    } 

    public String getSessionUid() { 
     return sessionUid; 
    } 

    @Override 
    public long maxTimestamp() { 
     return Long.MAX_VALUE; 
    } 

    @Override 
    public boolean equals(Object o) { 
     if (this == o) return true; 
     if (o == null || getClass() != o.getClass()) return false; 

     SessionWindow that = (SessionWindow) o; 

     return sessionUid.equals(that.sessionUid); 
    } 

    @Override 
    public int hashCode() { 
     return sessionUid.hashCode(); 
    } 

    public static class Serializer extends TypeSerializer<SessionWindow> { 
     private static final long serialVersionUID = 1L; 

     @Override 
     public boolean isImmutableType() { 
      return true; 
     } 

     @Override 
     public TypeSerializer<SessionWindow> duplicate() { 
      return this; 
     } 

     @Override 
     public SessionWindow createInstance() { 
      return null; 
     } 

     @Override 
     public SessionWindow copy(SessionWindow from) { 
      return from; 
     } 

     @Override 
     public SessionWindow copy(SessionWindow from, SessionWindow reuse) { 
      return from; 
     } 

     @Override 
     public int getLength() { 
      return 0; 
     } 

     @Override 
     public void serialize(SessionWindow record, DataOutputView target) throws IOException { 
      target.writeUTF(record.sessionUid); 
     } 

     @Override 
     public SessionWindow deserialize(DataInputView source) throws IOException { 
      return new SessionWindow(source.readUTF()); 
     } 

     @Override 
     public SessionWindow deserialize(SessionWindow reuse, DataInputView source) throws IOException { 
      return new SessionWindow(source.readUTF()); 
     } 

     @Override 
     public void copy(DataInputView source, DataOutputView target) throws IOException { 
      target.writeUTF(source.readUTF()); 
     } 

     @Override 
     public boolean equals(Object obj) { 
      return obj instanceof Serializer; 
     } 

     @Override 
     public boolean canEqual(Object obj) { 
      return obj instanceof Serializer; 
     } 

     @Override 
     public int hashCode() { 
      return 0; 
     } 
    } 
} 

ve Trigger:

public class SessionTrigger extends Trigger<LogItem, SessionWindow> { 
    private final long sessionTimeout; 

    private final ValueStateDescriptor<Long> previousFinishTimestampDesc = new ValueStateDescriptor<>("SessionTrigger.timestamp", LongSerializer.INSTANCE, null); 

    public SessionTrigger(long sessionTimeout) { 
     this.sessionTimeout = sessionTimeout; 
    } 

    @Override 
    public TriggerResult onElement(LogItem element, long timestamp, SessionWindow window, TriggerContext ctx) throws Exception { 
     ValueState<Long> previousFinishTimestampState = ctx.getPartitionedState(previousFinishTimestampDesc); 

     Long previousFinishTimestamp = previousFinishTimestampState.value(); 
     Long newFinisTimestamp = timestamp + sessionTimeout; 

     if (previousFinishTimestamp != null) { 
      ctx.deleteEventTimeTimer(previousFinishTimestamp); 
     } 

     ctx.registerEventTimeTimer(newFinisTimestamp); 

     previousFinishTimestampState.update(newFinisTimestamp); 

     return TriggerResult.CONTINUE; 
    } 

    @Override 
    public TriggerResult onEventTime(long time, SessionWindow window, TriggerContext ctx) throws Exception { 
     return TriggerResult.FIRE_AND_PURGE; 
    } 

    @Override 
    public TriggerResult onProcessingTime(long time, SessionWindow window, TriggerContext ctx) throws Exception { 
     throw new UnsupportedOperationException("This is not processing time trigger"); 
    } 

    @Override 
    public void clear(SessionWindow window, TriggerContext ctx) throws Exception { 
     ValueState<Long> previousFinishTimestampState = ctx.getPartitionedState(previousFinishTimestampDesc); 

     Long previousFinishTimestamp = previousFinishTimestampState.value(); 

     ctx.deleteEventTimeTimer(previousFinishTimestamp); 

     previousFinishTimestampState.clear(); 
    } 
} 

için zaman aşımı yani tarafından oturumun sonunu algılamak son olay N saniye önce olsaydı sonra pencere işlevini değerlendirir. Gördüğünüz gibi, hatadan sonra geri yüklemek istediğim için, son olay zaman damgasını ValueState'de kaydediyorum.

Akışımın yeniden dağıtımı sırasında tetikleme durumunu kaybetmek istemediğim için bu tetikleyicideki kaydetme noktası (ve denetim noktası) anlık görüntülerini kaydetmek/geri yüklemek için Checkpointed arabirimini kullanmalıyım gibi görünüyor.

Peki, doğru şekilde dağıtımı sırasında SessionTrigger tetikleyicisinin (ve muhtemelen ilgili pencerelerin) durumunun nasıl kaydedileceğini herkes anlatabilir mi?

Anladığım kadarıyla sadece SessionTrigger için Checkpointed arabirimini kullanmalıyım çünkü yalnızca durumu vardır. Sağ? SessionWindow -s ve SessionWindowAssigner hakkında? Otomatik olarak dağıtıldıktan sonra geri yüklenecek mi yoksa manuel olarak mı yapmalıyım?

cevap

0

SessionWindowing

private static class SessionTrigger extends Trigger<Tuple3<String, Long, Integer>, GlobalWindow> { 

    private static final long serialVersionUID = 1L; 

    private final Long sessionTimeout; 

    private final ValueStateDescriptor<Long> stateDesc = 
      new ValueStateDescriptor<>("last-seen", Long.class, -1L); 


    public SessionTrigger(Long sessionTimeout) { 
     this.sessionTimeout = sessionTimeout; 

    } 

    @Override 
    public TriggerResult onElement(Tuple3<String, Long, Integer> element, long timestamp, GlobalWindow window, TriggerContext ctx) throws Exception { 

     ValueState<Long> lastSeenState = ctx.getPartitionedState(stateDesc); 
     Long lastSeen = lastSeenState.value(); 

     Long timeSinceLastEvent = timestamp - lastSeen; 

     ctx.deleteEventTimeTimer(lastSeen + sessionTimeout); 

     // Update the last seen event time 
     lastSeenState.update(timestamp); 

     ctx.registerEventTimeTimer(timestamp + sessionTimeout); 

     if (lastSeen != -1 && timeSinceLastEvent > sessionTimeout) { 
      System.out.println("FIRING ON ELEMENT: " + element + " ts: " + timestamp + " last " + lastSeen); 
      return TriggerResult.FIRE_AND_PURGE; 
     } else { 
      return TriggerResult.CONTINUE; 
     } 
    } 

    @Override 
    public TriggerResult onEventTime(long time, GlobalWindow window, TriggerContext ctx) throws Exception { 
     ValueState<Long> lastSeenState = ctx.getPartitionedState(stateDesc); 
     Long lastSeen = lastSeenState.value(); 

     if (time - lastSeen >= sessionTimeout) { 
      System.out.println("CTX: " + ctx + " Firing Time " + time + " last seen " + lastSeen); 
      return TriggerResult.FIRE_AND_PURGE; 
     } 
     return TriggerResult.CONTINUE; 
    } 

    @Override 
    public TriggerResult onProcessingTime(long time, GlobalWindow window, TriggerContext ctx) throws Exception { 
     return TriggerResult.CONTINUE; 
    } 

    @Override 
    public void clear(GlobalWindow window, TriggerContext ctx) throws Exception { 
     ValueState<Long> lastSeenState = ctx.getPartitionedState(stateDesc); 
     if (lastSeenState.value() != -1) { 
      ctx.deleteEventTimeTimer(lastSeenState.value() + sessionTimeout); 
     } 
     lastSeenState.clear(); 
    } 
} 
Alındığı
İlgili konular