Streams

  • integracja z reactive streams

  • Przyklad 1 - różne specjalizacje zrobić po pokazaniu Actor sink i actor queue!

    • Flow =
      • Flow[Int].alsoTo(Sink.foreach(println(_))).to(Sink.ignore)
    • Sink=
      • Sink.foldInt, Int( + )
      • Sink.head
      • Sink.ignore // for side effects
//efekty uboczne
emailAddresses
      .mapAsync(4){address=>
        send(Email(to = address, title = "Akka", body = "I like your tweet")))
      }.to(Sink.ignore)


.mapAsync(4)(tweet => database ? Save(tweet)      

//przykład z future

.map(_ + 1).async - async boundary

Graphs

  • MergePreferred
  • bufory i dedloki
  • ZipWith
  • Broadcast,
  • Concat
  • Exercise : BidiFlow - zaimplementowac komunikację odbiorca-nadawca
    • brodcast i zaimplementować log
  • Implementing a “println” Sink
  • val source: Source[Int, Promise[Option[Int]]] = Source.maybe[Int]
  • bidirectional flow
  • BidiFlow
  • bidiFlow.join(flow) - request/response
  • drugi strumień, który zlicza elementy

recovery

val parse: Flow[String, Event, NotUsed] =
Passes the supervisor
through attributes
Flow[String].map(LogStreamProcessor.parseLineEx)
.collect { case Some(e) => e }
.withAttributes(ActorAttributes.supervisionStrategy(decider))

results matching ""

    No results matching ""