2016-07-07 26 views
7

ES-CQRS mimarimdeki okuma tarafını uygulamaya çalışıyorum. en Böyle bir kalıcı aktör var diyelim: etki devam ettiği aldığındaAkka Persistence Query olay akışı ve CQRS

object UserWrite { 

    sealed trait UserEvent 
    sealed trait State 
    case object Uninitialized extends State 
    case class User(username: String, password: String) extends State 
    case class AddUser(user: User) 
    case class UserAdded(user: User) extends UserEvent 
    case class UserEvents(userEvents: Source[(Long, UserEvent), NotUsed]) 
    case class UsersStream(fromSeqNo: Long) 
    case object GetCurrentUser 

    def props = Props(new UserWrite) 
} 

class UserWrite extends PersistentActor { 

    import UserWrite._ 

    private var currentUser: State = Uninitialized 

    override def persistenceId: String = "user-write" 

    override def receiveRecover: Receive = { 
    case UserAdded(user) => currentUser = user 
    } 

    override def receiveCommand: Receive = { 
    case AddUser(user: User) => persist(UserAdded(user)) { 
     case UserAdded(`user`) => currentUser = user 
    } 
    case UsersStream(fromSeqNo: Long) => publishUserEvents(fromSeqNo) 
    case GetCurrentUser => sender() ! currentUser 
    } 

    def publishUserEvents(fromSeqNo: Long) = { 
    val readJournal = PersistenceQuery(context.system).readJournalFor[CassandraReadJournal](CassandraReadJournal.Identifier) 
    val userEvents = readJournal 
     .eventsByPersistenceId("user-write", fromSeqNo, Long.MaxValue) 
     .map { case EventEnvelope(_, _, seqNo, event: UserEvent) => seqNo -> event } 
    sender() ! UserEvents(userEvents) 
    } 
} 

Bildiğim kadarıyla anladığım kadarıyla, her zaman, biz Akka Persistence Query aracılığıyla yayınlayabilirsiniz. Şimdi, bu etkinliklere abone olmanın uygun bir yolu olacağından emin değilim, bu yüzden okuyabildiğim yan veritabanımda bunu devam ettirebilirim? Bu fikirlerden biri, okuyucumdaki oyuncudan UserWrite aktör ve "batırıcı" etkinliklerine UsersStream mesajını ilk başta okumaktır. @cmbaxter önerisi ardından

DÜZENLEME

, ben kanattan geliştirdiği bu şekilde okumak uygulamıştır: Olay akışı yavaş gibi görünüyor: gibi bazı sorunlar vardır

object UserRead { 

    case object GetUsers 
    case class GetUserByUsername(username: String) 
    case class LastProcessedEventOffset(seqNo: Long) 
    case object StreamCompleted 

    def props = Props(new UserRead) 
} 

class UserRead extends PersistentActor { 
    import UserRead._ 

    var inMemoryUsers = Set.empty[User] 
    var offset  = 0L 

    override val persistenceId: String = "user-read" 

    override def receiveRecover: Receive = { 
    // Recovery from snapshot will always give us last sequence number 
    case SnapshotOffer(_, LastProcessedEventOffset(seqNo)) => offset = seqNo 
    case RecoveryCompleted         => recoveryCompleted() 
    } 

    // After recovery is being completed, events will be projected to UserRead actor 
    def recoveryCompleted(): Unit = { 
    implicit val materializer = ActorMaterializer() 
    PersistenceQuery(context.system) 
     .readJournalFor[CassandraReadJournal](CassandraReadJournal.Identifier) 
     .eventsByPersistenceId("user-write", offset + 1, Long.MaxValue) 
     .map { 
     case EventEnvelope(_, _, seqNo, event: UserEvent) => seqNo -> event 
     } 
     .runWith(Sink.actorRef(self, StreamCompleted)) 
    } 

    override def receiveCommand: Receive = { 
    case GetUsers     => sender() ! inMemoryUsers 
    case GetUserByUsername(username) => sender() ! inMemoryUsers.find(_.username == username) 
    // Match projected event and update offset 
    case (seqNo: Long, UserAdded(user)) => 
     saveSnapshot(LastProcessedEventOffset(seqNo)) 
     inMemoryUsers += user 
    } 
} 

. Yani UserRead aktör, yeni eklenen kullanıcı kaydedilmeden önce kullanıcı grubuyla cevap verebilir.

DÜZENLEME 2

Daha az yavaş olay akışıyla sorunu çözüldü cassandra sorgu derginin yenileme aralığını arttırmıştır. Cassandra olay günlüğünün varsayılan olarak her 3 saniyede bir sorgulandığı görülmektedir. Benim application.conf ben ekledi:

cassandra-query-journal { 
    refresh-interval = 20ms 
} 

DÜZENLEME Aslında 3

, yenileme aralığı azaltmak yoktur. Bu bellek kullanımını artıracak ama bu tehlikeli değil, bir nokta. Genel olarak CQRS kavramı yazma ve okuma tarafının uyumsuz olmasıdır. Bu nedenle, veri yazdıktan sonra, hemen okuma için hiçbir zaman mevcut olmayacaktır. UI ile başa çıkmak? Sadece akışı açtım ve okuduğum tarafın onları kabul ettikten sonra sunucuya gönderilen olaylar aracılığıyla verileri itmesi.

+2

Ben üzerinde bir 'Source' ile robota bir ileti göndermek yerine sizin okuma yan projeksiyon aktör haline okuma dergi tabanlı kod hareket ederim. Ardından o okuyan taraftaki projeksiyon aktöründe bu akışı işleyin ve bu bilgiyi Elasticsearch'a yansıtın. – cmbaxter

+0

@cmbaxter Bunu yaptım. Çok iyi bir fikir gibi görünüyor. Sorunumu güncelledim ve hala bazı şüphelerim olduğu için önerileri kabul ediyorum. –

cevap

4

Bunu yapmanın bazı yolları vardır. Örneğin, uygulamamda, sürekli olarak değişiklikler arayan bir PersistenceQuery'ye sahip bir sorguya sahip bir aktörüm var, ancak aynı sorguya sahip bir iş parçacığınız da olabilir. şey Bunun yerine

val readJournal = 
PersistenceQuery(system).readJournalFor[CassandraReadJournal](
    CassandraReadJournal.Identifier) 

// issue query to journal 
val source: Source[EventEnvelope, NotUsed] = 
    readJournal.eventsByPersistenceId(s"MyActorId", 0, Long.MaxValue) 

// materialize stream, consuming events 
implicit val mat = ActorMaterializer() 
source.map(_.event).runForeach{ 
    case userEvent: UserEvent => { 
    doSomething(userEvent) 
    } 
} 

, bir PersistenceQuery ve depolar yeni olaylar yükselten bir zamanlayıcı olabilir gerçekleşir gerçekleşmez kalıcı olayı okumak mümkün açık akışını sürdürmek için, ama ben düşünüyorum bir akış açık olan en iyi yolu PersistenceQuery ile çözüm sadece onaylı olmasına rağmen

2

olması nedeniyle ileride sorunlar içerir: kısmi olduğunu

  1. , sunulan EventEnvelopes okumak için tek yöntem yoktur.
  2. Durum anlık görüntüleriyle çalışmaz ve sonuç olarak, CQRS Okuyucu Bölümü, üzerinden devam etmelidir.

ilk çözüm daha iyi, ama şu sorunlara neden olur:

  1. Bu çok karmaşık. Kullanıcının sıra numaraları ile gereksiz yere uğraşmasına neden olur.
  2. Kod, Aktörler uygulamasıyla eşleştirilen durumu (sorgulama/güncelleştirme) ile ilgilidir. Orada

biri daha basit var:

import akka.NotUsed 
import akka.actor.{Actor, ActorLogging} 
import akka.persistence.query.{EventEnvelope, PersistenceQuery} 
import akka.persistence.query.javadsl.{EventsByPersistenceIdQuery, ReadJournal} 
import akka.persistence._ 
import akka.stream.ActorMaterializer 
import akka.stream.javadsl.Source 

/** 
    * Created by alexv on 4/26/2017. 
    */ 
class CQRSTest { 

    // User Command, will be transformed to User Event 
    sealed trait UserCommand 
    // User Event 
    // let's assume some conversion from Command to event here 
    case class PersistedEvent(command: UserCommand) extends Serializable 
    // User State, for simplicity assumed that all State will be snapshotted 
    sealed trait State extends Serializable{ 
    def clear(): Unit 
    def updateState(event: PersistedEvent): Unit 
    def validateCommand(command:UserCommand): Boolean 
    def applyShapshot(newState: State): Unit 
    def getShapshot() : State 
    } 
    case class SaveSnapshot() 

    /** 
    * Common code for Both reader and writer 
    * @param state - State 
    */ 
    abstract class CQRSCore(state: State) extends PersistentActor with ActorLogging { 
    override def persistenceId: String = "CQRSPersistenceId" 

    override def preStart(): Unit = { 
     // Since the state is external and not depends to Actor's failure or restarts it should be cleared. 
     state.clear() 
    } 

    override def receiveRecover: Receive = { 
     case event : PersistedEvent => state.updateState(event) 
     case SnapshotOffer(_, snapshot: State) => state.applyShapshot(snapshot) 
     case RecoveryCompleted => onRecoveryCompleted(super.lastSequenceNr) 
    } 

    abstract def onRecoveryCompleted(lastSequenceNr:Long) 
    } 

    class CQRSWriter(state: State) extends CQRSCore(state){ 
    override def preStart(): Unit = { 
     super.preStart() 
     log.info("CQRSWriter Started") 
    } 

    override def onRecoveryCompleted(lastSequenceNr: Long): Unit = { 
     log.info("Recovery completed") 
    } 

    override def receiveCommand: Receive = { 
     case command: UserCommand => 
     if(state.validateCommand(command)) { 
      // Persist events and call state.updateState with each persisted event 
      persistAll(List(PersistedEvent(command)))(state.updateState) 
     } 
     else { 
      log.error("Validation Failed for Command: {}", command) 
     } 
     case SaveSnapshot => saveSnapshot(state.getShapshot()) 
     case SaveSnapshotSuccess(metadata) => log.debug("Saved snapshot successfully: {}", metadata) 
     case SaveSnapshotFailure(metadata, reason) => log.error("Failed to Save snapshot: {} . Reason: {}", metadata, reason) 
    } 
    } 

    class CQRSReader(state: State) extends CQRSCore(state){ 
    override def preStart(): Unit = { 
     super.preStart() 
     log.info("CQRSReader Started") 
    } 

    override def onRecoveryCompleted(lastSequenceNr: Long): Unit = { 
     log.info("Recovery completed, Starting QueryStream") 

     // ReadJournal type not specified here, so may be used with Cassandra or In-memory Journal (for Tests) 
     val readJournal = PersistenceQuery(context.system).readJournalFor(
     context.system.settings.config.getString("akka.persistence.query.my-read-journal")) 
     .asInstanceOf[ReadJournal 
     with EventsByPersistenceIdQuery] 
     val source: Source[EventEnvelope, NotUsed] = readJournal.eventsByPersistenceId(
     OrgPersistentActor.orgPersistenceId, lastSequenceNr + 1, Long.MaxValue) 
     source.runForeach({ envelope => state.updateState(envelope.event.asInstanceOf[PersistedEvent]) },ActorMaterializer()) 

    } 

    // Nothing received since it is Reader only 
    override def receiveCommand: Receive = Actor.emptyBehavior 
    } 
} 
+0

Kabul Edilmiş CQRSRead Bölümü, doğrudan durumu üzerinden sorgulanmalıdır. CQRSReader, durumun CQRSWriter'in bir ile benzer olduğundan emin olur. Ben burada Beton durumunu uygulamamıştım, ama basit Hash Map'den In-memory Graph DB'ye kadar her şey olabilir. –