Routers

Routers are a very good place in akka to learn better when Actor representation through ActorRef and location transparency is a very big advantage which allows to easily manipulating actors structure.

During those workshop we are going to learn :

  • How routers help us scale actor logic it can handle more messages (and prevent OutOfMemory exception)
  • How to create/declare routers in akka
  • We will practice our of implementation of routers to better understand Actors nature and good programming practices in asynchronous programs.

Part 1 - Declare a Router

DEMO FILE : https://github.com/PawelWlodarski/workshops-reactive/blob/master/src/main/scala/jug/workshops/reactive/akka/routing/RoutersPart1CreateDemo.scala

Config file

You can fully configure a router in a config file which will allow you to change those settings without touching the code.

akka.actor.deployment{
  /demoMainActor/routerNameFromConfig1 {
    router = round-robin-pool
    #router = random-pool
    nr-of-instances=5
  }
}

To use custom config file you only need to pass it to ActorSystem's constructor

val demoConfig = ConfigFactory.load("routers/routersdemo")
val system=ActorSystem("routers",demoConfig)

and then the only one difference in comparison to a standard actor creation is that you need to add FromConfig.props

 val router1=context.actorOf(FromConfig.props(Props[RouterWorker]),"routerNameFromConfig1")

Because in config we need to use a specific actor path so naming an actor is very important. For example if we use path /demoMainActor/routerNameFromConfig1 in config then :

  • demoMainActor - this needs to be name of the actor that created the router
  • routerNameFromConfig1 - exact name of created router actor

Declaring router group

In a pool a router supervises created workers but we can create a different routing construction called group which uses provided workers created by some external actor

/demoMainActor/group1FromConfig{
    router = round-robin-group
    routees.paths = ["/user/workers/groupWorker1","/user/workers/groupWorker2"]
  }

That's why we have a separated actor responsible only for creation of group workers

 //declares worker1 and worker2
  class GroupSupervisor extends Actor {

    @scala.throws[Exception](classOf[Exception])
    override def preStart(): Unit = {
      Seq(1,2).foreach{i=>
        val child=context.actorOf(Props[RouterWorker],s"groupWorker$i")
        println("created worker for group : "+child.path)
      }
}

Create router directly in code

You can create routers (either pool or group) directly in the code. This way you don't need to care about config file however during deployment you will not be able to change configuration without code modification.

 //USING POOL FROM CODE
 val router2 = context.actorOf(RoundRobinPool(3).props(Props[RouterWorker]),"router2")

Part 1-B - Overflow Demo

FILE : https://github.com/PawelWlodarski/workshops-reactive/blob/master/src/main/scala/jug/workshops/reactive/akka/routing/RoutersPart1OverflowDemo.scala

This is specially prepared simulation when one actor produces messages a lot faster than receiver is able to consume them. Try to manipulate number of workers to see how routing can help with over flooding.

You have a receiver which is a router with just one worker

//Manipulate RoundRobinPool size to observe difference
val destination=system.actorOf(RoundRobinPool(1).props(Props[Destination]))

It will receive large message with a rate a lot higher than its consumption speed

  object Message{
    val largeString:String=(1 to 100000).map(_.toChar).mkString("")

    def large=Message(largeString+System.currentTimeMillis())
  }

It is graph of memory during messages consumption when we have :

just one worker

five workers

Exercises

FILES : https://github.com/PawelWlodarski/workshops-reactive/blob/master/src/main/scala/jug/workshops/reactive/akka/routing/exercises/RoutersPart1ConfigurationExercise.scala

There is an Endpoint Actor which uses two routers for two separate mathematical operations: Addition and Power . First one is a pool and the second one is a group

Exercise : configure Routers

  • in file routersexercise.conf configure pool of AddWorkers
  • in file routersexercise.conf configure group of PowWorkers

In tests timing is important. Tests expects that whole operation will end in limited time so you must create enough workers to process each request in parallel.

Exercise : custom Router

FILE : https://github.com/PawelWlodarski/workshops-reactive/blob/master/src/main/scala/jug/workshops/reactive/akka/routing/exercises/RoutersPart1CustomRoundRobinRouterExercise.scala

Implement custom router which will be iterating messages through fixed workers pool

You can use provided cycle logic


  object Cycle{
    /**
      *
      * @param start - inclusive
      * @param end - exclusive
      * @return - cycle iterator : for (0,5) it will cycle [0,1,2,3,4,0,1,2,3,4]
      */
    def cycleBetween(start:Int,end:Int) = Iterator.iterate(start)(i=>(i+1)%end)
  }

Observe how extracting iterating logic from actor influence testing.

Part 2 - ScatterGatherFirst Router

FILES : https://github.com/PawelWlodarski/workshops-reactive/blob/master/src/main/scala/jug/workshops/reactive/akka/routing/RoutersPart2ScatterGatherDemo.scala

During the demo you can test this router functionality by sending 2 messages to 3 workers (so totally 6 messages) and receiving only 2 fastest responses back

[INFO] [11/13/2016 11:00:00.765] [ScatterGather-akka.actor.default-dispatcher-5] [akka://ScatterGather/user/sender/workerPool/$a] /user/sender/workerPool/$a calculated MessageId(1) for 2000
[INFO] [11/13/2016 11:00:00.765] [ScatterGather-akka.actor.default-dispatcher-6] [akka://ScatterGather/user/sender/workerPool/$c] /user/sender/workerPool/$c calculated MessageId(1) for 2000
[INFO] [11/13/2016 11:00:00.766] [ScatterGather-akka.actor.default-dispatcher-4] [akka://ScatterGather/user/sender] response MessageId(1) received from /user/sender/workerPool/$c
[INFO] [11/13/2016 11:00:02.767] [ScatterGather-akka.actor.default-dispatcher-6] [akka://ScatterGather/user/sender/workerPool/$c] /user/sender/workerPool/$c calculated MessageId(2) for 2000
[INFO] [11/13/2016 11:00:02.767] [ScatterGather-akka.actor.default-dispatcher-4] [akka://ScatterGather/user/sender] response MessageId(2) received from /user/sender/workerPool/$c
[INFO] [11/13/2016 11:00:03.759] [ScatterGather-akka.actor.default-dispatcher-2] [akka://ScatterGather/user/sender/workerPool/$b] /user/sender/workerPool/$b calculated MessageId(1) for 5000
[INFO] [11/13/2016 11:00:05.759] [ScatterGather-akka.actor.default-dispatcher-2] [akka://ScatterGather/user/sender/workerPool/$b] /user/sender/workerPool/$b calculated MessageId(2) for 2000
[INFO] [11/13/2016 11:00:05.766] [ScatterGather-akka.actor.default-dispatcher-5] [akka://ScatterGather/user/sender/workerPool/$a] /user/sender/workerPool/$a calculated MessageId(2) for 5000

The idea is simple - first response win. We will take a closer look at it during exercises.

Exercises

Exercise : Custom ScatterGatherFirst within small business domain

FILE : https://github.com/PawelWlodarski/workshops-reactive/blob/master/src/main/scala/jug/workshops/reactive/akka/routing/exercises/RoutersPart2CustomScatterGatherExercise.scala

this exercise is longer

You need to implement "language detection service" behind custom SGFirst router. Worker can receive two kinds of messages

  • Detect(word) - returns Option[String] - potential language
  • New (language,word) - add new language

When Detect will be sent - the router should send it to all workers BUT use only first response. You should not modify original messages because workers may resend it further. One of possible solution can be implemented with usage of Future

When New will be received you need to broadcast it to all workers but router should not expect response.

Part 3 - Consistent Hashing

FILE : https://github.com/PawelWlodarski/workshops-reactive/blob/master/src/main/scala/jug/workshops/reactive/akka/routing/RoutersPart3ConsistentHashing.scala

This type of router is able to route a message with specific hash always to the same router.

We have three ways of providing a hash value :

  • From mesage itself
  • By configuring hash logic in a router
  • By wrapping message into special envelope

During demo time we are going to see first two and while doing exercise you can try the third one.

Exercises

Exercise : Shop Basket

FILE : https://github.com/PawelWlodarski/workshops-reactive/blob/master/src/main/scala/jug/workshops/reactive/akka/routing/exercises/RoutersPart3StickyShopBasketExercise.scala

You need to simulate web sticky session like in the old good days so that given user will always be directed to the same basket

Exercise : Protocol Transmission

FILE:

  • client sends 4-bytes word to transmitter
  • transmitter splits word and send it byte-by-byte to router
  • router has to route bytes from a specific word always to the same receiver
  • after translating word to string it is forwarded to the processor

Dynamic

FILE : https://github.com/PawelWlodarski/workshops-reactive/blob/master/src/main/scala/jug/workshops/reactive/akka/routing/RoutersPart4DynamicRouterDemo.scala

In the demo config file you can find dynamic router configuration. Run demo file and investigate how workers are added or removed accordingly to configuration and message flow.

akka.actor.deployment {
  /parent/router29 {
    router = round-robin-pool
    resizer {
      lower-bound = 2
      upper-bound = 15
      messages-per-resize = 100
    }
  }
}
lower-bound = 1
upper-bound = 10
pressure-threshold = 1
rampup-rate = 0.25
backoff-threshold = 0.3
backoff-rate = 0.1
messages-per-resize = 10

http://doc.akka.io/docs/akka/2.4/scala/routing.html http://ivoroshilin.com/2013/07/15/distributed-caching-under-consistent-hashing/

results matching ""

    No results matching ""