Streams
integracja z reactive streams
Przyklad 1 - różne specjalizacje zrobić po pokazaniu Actor sink i actor queue!
- Flow =
- Flow[Int].alsoTo(Sink.foreach(println(_))).to(Sink.ignore)
- Sink=
- Sink.foldInt, Int( + )
- Sink.head
- Sink.ignore // for side effects
- Flow =
//efekty uboczne
emailAddresses
.mapAsync(4){address=>
send(Email(to = address, title = "Akka", body = "I like your tweet")))
}.to(Sink.ignore)
.mapAsync(4)(tweet => database ? Save(tweet)
//przykład z future
.map(_ + 1).async - async boundary
Graphs
- MergePreferred
- bufory i dedloki
- ZipWith
- Broadcast,
- Concat
- Exercise : BidiFlow - zaimplementowac komunikację odbiorca-nadawca
- brodcast i zaimplementować log
- Implementing a “println” Sink
- val source: Source[Int, Promise[Option[Int]]] = Source.maybe[Int]
- bidirectional flow
- BidiFlow
- bidiFlow.join(flow) - request/response
- drugi strumień, który zlicza elementy
recovery
val parse: Flow[String, Event, NotUsed] =
Passes the supervisor
through attributes
Flow[String].map(LogStreamProcessor.parseLineEx)
.collect { case Some(e) => e }
.withAttributes(ActorAttributes.supervisionStrategy(decider))