ES-CQRS mimarimdeki okuma tarafını uygulamaya çalışıyorum. en Böyle bir kalıcı aktör var diyelim: etki devam ettiği aldığındaAkka Persistence Query olay akışı ve CQRS
object UserWrite {
sealed trait UserEvent
sealed trait State
case object Uninitialized extends State
case class User(username: String, password: String) extends State
case class AddUser(user: User)
case class UserAdded(user: User) extends UserEvent
case class UserEvents(userEvents: Source[(Long, UserEvent), NotUsed])
case class UsersStream(fromSeqNo: Long)
case object GetCurrentUser
def props = Props(new UserWrite)
}
class UserWrite extends PersistentActor {
import UserWrite._
private var currentUser: State = Uninitialized
override def persistenceId: String = "user-write"
override def receiveRecover: Receive = {
case UserAdded(user) => currentUser = user
}
override def receiveCommand: Receive = {
case AddUser(user: User) => persist(UserAdded(user)) {
case UserAdded(`user`) => currentUser = user
}
case UsersStream(fromSeqNo: Long) => publishUserEvents(fromSeqNo)
case GetCurrentUser => sender() ! currentUser
}
def publishUserEvents(fromSeqNo: Long) = {
val readJournal = PersistenceQuery(context.system).readJournalFor[CassandraReadJournal](CassandraReadJournal.Identifier)
val userEvents = readJournal
.eventsByPersistenceId("user-write", fromSeqNo, Long.MaxValue)
.map { case EventEnvelope(_, _, seqNo, event: UserEvent) => seqNo -> event }
sender() ! UserEvents(userEvents)
}
}
Bildiğim kadarıyla anladığım kadarıyla, her zaman, biz Akka Persistence Query
aracılığıyla yayınlayabilirsiniz. Şimdi, bu etkinliklere abone olmanın uygun bir yolu olacağından emin değilim, bu yüzden okuyabildiğim yan veritabanımda bunu devam ettirebilirim? Bu fikirlerden biri, okuyucumdaki oyuncudan UserWrite
aktör ve "batırıcı" etkinliklerine UsersStream
mesajını ilk başta okumaktır. @cmbaxter önerisi ardından
DÜZENLEME
, ben kanattan geliştirdiği bu şekilde okumak uygulamıştır: Olay akışı yavaş gibi görünüyor: gibi bazı sorunlar vardır
object UserRead {
case object GetUsers
case class GetUserByUsername(username: String)
case class LastProcessedEventOffset(seqNo: Long)
case object StreamCompleted
def props = Props(new UserRead)
}
class UserRead extends PersistentActor {
import UserRead._
var inMemoryUsers = Set.empty[User]
var offset = 0L
override val persistenceId: String = "user-read"
override def receiveRecover: Receive = {
// Recovery from snapshot will always give us last sequence number
case SnapshotOffer(_, LastProcessedEventOffset(seqNo)) => offset = seqNo
case RecoveryCompleted => recoveryCompleted()
}
// After recovery is being completed, events will be projected to UserRead actor
def recoveryCompleted(): Unit = {
implicit val materializer = ActorMaterializer()
PersistenceQuery(context.system)
.readJournalFor[CassandraReadJournal](CassandraReadJournal.Identifier)
.eventsByPersistenceId("user-write", offset + 1, Long.MaxValue)
.map {
case EventEnvelope(_, _, seqNo, event: UserEvent) => seqNo -> event
}
.runWith(Sink.actorRef(self, StreamCompleted))
}
override def receiveCommand: Receive = {
case GetUsers => sender() ! inMemoryUsers
case GetUserByUsername(username) => sender() ! inMemoryUsers.find(_.username == username)
// Match projected event and update offset
case (seqNo: Long, UserAdded(user)) =>
saveSnapshot(LastProcessedEventOffset(seqNo))
inMemoryUsers += user
}
}
. Yani UserRead
aktör, yeni eklenen kullanıcı kaydedilmeden önce kullanıcı grubuyla cevap verebilir.
DÜZENLEME 2
Daha az yavaş olay akışıyla sorunu çözüldü cassandra sorgu derginin yenileme aralığını arttırmıştır. Cassandra olay günlüğünün varsayılan olarak her 3 saniyede bir sorgulandığı görülmektedir. Benim application.conf
ben ekledi:
cassandra-query-journal {
refresh-interval = 20ms
}
DÜZENLEME Aslında 3
, yenileme aralığı azaltmak yoktur. Bu bellek kullanımını artıracak ama bu tehlikeli değil, bir nokta. Genel olarak CQRS kavramı yazma ve okuma tarafının uyumsuz olmasıdır. Bu nedenle, veri yazdıktan sonra, hemen okuma için hiçbir zaman mevcut olmayacaktır. UI ile başa çıkmak? Sadece akışı açtım ve okuduğum tarafın onları kabul ettikten sonra sunucuya gönderilen olaylar aracılığıyla verileri itmesi.
Ben üzerinde bir 'Source' ile robota bir ileti göndermek yerine sizin okuma yan projeksiyon aktör haline okuma dergi tabanlı kod hareket ederim. Ardından o okuyan taraftaki projeksiyon aktöründe bu akışı işleyin ve bu bilgiyi Elasticsearch'a yansıtın. – cmbaxter
@cmbaxter Bunu yaptım. Çok iyi bir fikir gibi görünüyor. Sorunumu güncelledim ve hala bazı şüphelerim olduğu için önerileri kabul ediyorum. –