Akka Streams Intro

1) The First experience

DEMO FILE : https://github.com/PawelWlodarski/workshops-reactive/blob/master/src/main/scala/jug/workshops/reactive/streams/intro/StreamsIntroPart1Basics.scala

This file is a quick explanation of basic Akka Stream elements : Source, Flow and Sink. Akka Stream has very good documentation where everything is explained so there is no need to repeat it here : http://doc.akka.io/docs/akka/2.4/scala/stream/stream-quickstart.html

We are going to see in practice how to connect elements mentioned above to create a complete stream :

val source: Source[Int, NotUsed] =Source(1 to 15 by 3)
val flow: Flow[Int, String, NotUsed] =Flow[Int].map(_*2).map(i=>s"as string : $i")
val sink: Sink[Any, Future[Done]] =Sink.foreach(println)

val stream: RunnableGraph[NotUsed] =source.via(flow).to(sink)

The NotUsed type tell us that our stream doesn't materialize any additional values. We are going to take a look at materialization later - for now let's see a quick example how to materialize result future when running a stream.

val streamWithResult: RunnableGraph[Future[Done]] =source.via(flow).toMat(sink)(Keep.right)

Where are actors?

Till now we saw only constructs from Streams API - now let's see how we can integrate this API with actors. We are going to direct stream flow into an actor which is configured as a sink

 class ActorSink extends Actor{
  override def receive: Receive = {
    case msg => println(s"Real actor received $msg")

val actorSink=system.actorOf(Props[ActorSink])
val realActorSubscriber: Sink[Any, NotUsed] = Sink.actorRef(actorSink,"MY_COMPLETED")



FILE : https://github.com/PawelWlodarski/workshops-reactive/blob/master/src/main/scala/jug/workshops/reactive/streams/intro/StreamsIntroPart1Materializers.scala

Materializers allow stream to return auxiliary objects which can influence the way on how information is moving through stream :

For Example :

val source: Source[Int, ActorRef] =Source.actorRef

This way we have created a Source of Int elements with auxiliary Actor Reference. Now we can use this reference to provide information into the Stream.

The same situation can be seen in case of sink. Because Sink is on the righ side of a stream and by default ("via" method) left materializer is returned, then you need to explicitly state which one auxiliary value do you want to materialize.

val p5: Sink[Int, Future[Int]] = flow.toMat(sink)(Keep.right)

You can also materialize objects from both ends of a stream

val p8: RunnableGraph[(ActorRef, Future[Int])] =source.via(flow).toMat(sink)(Keep.both)



Exercise : Basic Stream

As an input you have :

val numberCandidates: List[String] = List("1", "2", "c", "4", "e", "6", "g", "h", "9")

Filter out letters , keep only numbers, convert to int and add.

Exercise : Reading from database

You have a database with products :

private val data = Map(
        1 -> Product(1, "Techno Greatest Hits 1995-97", BigDecimal(20)),
        2 -> Product(2, "Bazooka", BigDecimal(120)),
        3 -> Product(3, "MAc Book USB adapter", BigDecimal(1000))

Create source from the future :


Extract prices from products and sum them inside actors.


We need to create an alias for immutable Iterable because akka streams expects this type and standard scala library

//in akka stream
type Iterable[A] = scala.collection.immutable.Iterable[A]

//in scala package
type Iterator[+A] = scala.collection.Iterator[A]
val Iterator = scala.collection.Iterator

Exercise : Actor with ACK

To use back pressure we can assign a special protocol to an actor where every new message is explicitly requested

Implement variant of latest exercise with ACK actor

2) Integration

FILE : https://github.com/PawelWlodarski/workshops-reactive/blob/master/src/main/scala/jug/workshops/reactive/streams/intro/StreamsIntroPart2IntegrationDemo.scala

Next we will see how to integrate Streams with Actors and external system at the beginning and during flow processing.


To send information to an actor or any external system we can use mapAsync method which operate on Future.

So :

  • In case of an actor we need to use ask pattern
  • In case of asynchronous API which returns Future we just need to call it
  • In case of synchronous API we need to trigger call in separate thread and return Future

In the example below we are using ask pattern to receive an info from an actor.

 val flow: Flow[Int, String, NotUsed] =Flow[Int]
      .mapAsync(parallelism = 5)(elem => (actor ? elem).mapTo[String])


FILE : https://github.com/PawelWlodarski/workshops-reactive/blob/master/src/main/scala/jug/workshops/reactive/streams/intro/StreamsIntroPart2QueueDemo.scala

To Integrate stream generations with actors we can create source connected with a queue :

val source: Source[String, SourceQueueWithComplete[String]] =
      Source.queue[String](bufferSize = 4, overflowStrategy = OverflowStrategy.backpressure)

Queue is materialized when the graph is run and we each element sent to a queue flows through the stream

val queue: SourceQueueWithComplete[String] =graph.run()

val actor=system.actorOf(Props(new Deduplicator(queue)))

class Deduplicator(q:SourceQueueWithComplete[String]) extends Actor{

    var history=Set[String]()

    override def receive: Receive = {
      case s:String if(!history.contains(s)) =>
        history = history + s


Exercise : Integration with DB

FILE : https://github.com/PawelWlodarski/workshops-reactive/blob/master/src/main/scala/jug/workshops/reactive/streams/intro/exercises/StreamsIntroExercise2DatabaseFlow.scala

Exercise : Log analysis

FILE : https://github.com/PawelWlodarski/workshops-reactive/blob/master/src/main/scala/jug/workshops/reactive/streams/intro/exercises/StreamsIntroExercise2Logs.scala


DEMO FILE : https://github.com/PawelWlodarski/workshops-reactive/blob/master/src/main/scala/jug/workshops/reactive/streams/intro/StreamsIntroPart3GraphDSLDemo.scala

Graphs DSL allows you to break from linear data processing. You can easily add additional data processing paths.

    import GraphDSL.Implicits._
    val graphWithDomainLogging=GraphDSL.create(){implicit builder =>
      val broadcast=builder.add(Broadcast[String](2))

      val additionalOutput=Sink.actorRef(domainActor,"COMPLETE_MESSAGE")

      broadcast.out(0) ~> additionalOutput


When graph is constructed you cna easily create a flow from it

val flow=Flow.fromGraph(graphWithDomainLogging)


Exercise : source and sink from graph

EXERCISES : https://github.com/PawelWlodarski/workshops-reactive/blob/master/src/test/scala/jug/workshops/reactive/streams/intro/exercises/StreamsIntroExercise3GraphDSLSpec.scala

results matching ""

    No results matching ""