Routers
Routers are a very good place in akka to learn better when Actor representation through ActorRef and location transparency is a very big advantage which allows to easily manipulating actors structure.
During those workshop we are going to learn :
- How routers help us scale actor logic it can handle more messages (and prevent OutOfMemory exception)
- How to create/declare routers in akka
- We will practice our of implementation of routers to better understand Actors nature and good programming practices in asynchronous programs.
Part 1 - Declare a Router
Config file
You can fully configure a router in a config file which will allow you to change those settings without touching the code.
akka.actor.deployment{
/demoMainActor/routerNameFromConfig1 {
router = round-robin-pool
#router = random-pool
nr-of-instances=5
}
}
To use custom config file you only need to pass it to ActorSystem's constructor
val demoConfig = ConfigFactory.load("routers/routersdemo")
val system=ActorSystem("routers",demoConfig)
and then the only one difference in comparison to a standard actor creation is that you need to add FromConfig.props
val router1=context.actorOf(FromConfig.props(Props[RouterWorker]),"routerNameFromConfig1")
Because in config we need to use a specific actor path so naming an actor is very important. For example if we use path /demoMainActor/routerNameFromConfig1 in config then :
- demoMainActor - this needs to be name of the actor that created the router
- routerNameFromConfig1 - exact name of created router actor
Declaring router group
In a pool a router supervises created workers but we can create a different routing construction called group which uses provided workers created by some external actor
/demoMainActor/group1FromConfig{
router = round-robin-group
routees.paths = ["/user/workers/groupWorker1","/user/workers/groupWorker2"]
}
That's why we have a separated actor responsible only for creation of group workers
//declares worker1 and worker2
class GroupSupervisor extends Actor {
@scala.throws[Exception](classOf[Exception])
override def preStart(): Unit = {
Seq(1,2).foreach{i=>
val child=context.actorOf(Props[RouterWorker],s"groupWorker$i")
println("created worker for group : "+child.path)
}
}
Create router directly in code
You can create routers (either pool or group) directly in the code. This way you don't need to care about config file however during deployment you will not be able to change configuration without code modification.
//USING POOL FROM CODE
val router2 = context.actorOf(RoundRobinPool(3).props(Props[RouterWorker]),"router2")
Part 1-B - Overflow Demo
This is specially prepared simulation when one actor produces messages a lot faster than receiver is able to consume them. Try to manipulate number of workers to see how routing can help with over flooding.
You have a receiver which is a router with just one worker
//Manipulate RoundRobinPool size to observe difference
val destination=system.actorOf(RoundRobinPool(1).props(Props[Destination]))
It will receive large message with a rate a lot higher than its consumption speed
object Message{
val largeString:String=(1 to 100000).map(_.toChar).mkString("")
def large=Message(largeString+System.currentTimeMillis())
}
It is graph of memory during messages consumption when we have :
just one worker
five workers
Exercises
There is an Endpoint Actor which uses two routers for two separate mathematical operations: Addition and Power . First one is a pool and the second one is a group
Exercise : configure Routers
- in file routersexercise.conf configure pool of AddWorkers
- in file routersexercise.conf configure group of PowWorkers
In tests timing is important. Tests expects that whole operation will end in limited time so you must create enough workers to process each request in parallel.
Exercise : custom Router
Implement custom router which will be iterating messages through fixed workers pool
You can use provided cycle logic
object Cycle{
/**
*
* @param start - inclusive
* @param end - exclusive
* @return - cycle iterator : for (0,5) it will cycle [0,1,2,3,4,0,1,2,3,4]
*/
def cycleBetween(start:Int,end:Int) = Iterator.iterate(start)(i=>(i+1)%end)
}
Observe how extracting iterating logic from actor influence testing.
Part 2 - ScatterGatherFirst Router
During the demo you can test this router functionality by sending 2 messages to 3 workers (so totally 6 messages) and receiving only 2 fastest responses back
[INFO] [11/13/2016 11:00:00.765] [ScatterGather-akka.actor.default-dispatcher-5] [akka://ScatterGather/user/sender/workerPool/$a] /user/sender/workerPool/$a calculated MessageId(1) for 2000
[INFO] [11/13/2016 11:00:00.765] [ScatterGather-akka.actor.default-dispatcher-6] [akka://ScatterGather/user/sender/workerPool/$c] /user/sender/workerPool/$c calculated MessageId(1) for 2000
[INFO] [11/13/2016 11:00:00.766] [ScatterGather-akka.actor.default-dispatcher-4] [akka://ScatterGather/user/sender] response MessageId(1) received from /user/sender/workerPool/$c
[INFO] [11/13/2016 11:00:02.767] [ScatterGather-akka.actor.default-dispatcher-6] [akka://ScatterGather/user/sender/workerPool/$c] /user/sender/workerPool/$c calculated MessageId(2) for 2000
[INFO] [11/13/2016 11:00:02.767] [ScatterGather-akka.actor.default-dispatcher-4] [akka://ScatterGather/user/sender] response MessageId(2) received from /user/sender/workerPool/$c
[INFO] [11/13/2016 11:00:03.759] [ScatterGather-akka.actor.default-dispatcher-2] [akka://ScatterGather/user/sender/workerPool/$b] /user/sender/workerPool/$b calculated MessageId(1) for 5000
[INFO] [11/13/2016 11:00:05.759] [ScatterGather-akka.actor.default-dispatcher-2] [akka://ScatterGather/user/sender/workerPool/$b] /user/sender/workerPool/$b calculated MessageId(2) for 2000
[INFO] [11/13/2016 11:00:05.766] [ScatterGather-akka.actor.default-dispatcher-5] [akka://ScatterGather/user/sender/workerPool/$a] /user/sender/workerPool/$a calculated MessageId(2) for 5000
The idea is simple - first response win. We will take a closer look at it during exercises.
Exercises
Exercise : Custom ScatterGatherFirst within small business domain
this exercise is longer
You need to implement "language detection service" behind custom SGFirst router. Worker can receive two kinds of messages
- Detect(word) - returns Option[String] - potential language
- New (language,word) - add new language
When Detect will be sent - the router should send it to all workers BUT use only first response. You should not modify original messages because workers may resend it further. One of possible solution can be implemented with usage of Future
When New will be received you need to broadcast it to all workers but router should not expect response.
Part 3 - Consistent Hashing
This type of router is able to route a message with specific hash always to the same router.
We have three ways of providing a hash value :
- From mesage itself
- By configuring hash logic in a router
- By wrapping message into special envelope
During demo time we are going to see first two and while doing exercise you can try the third one.
Exercises
Exercise : Shop Basket
You need to simulate web sticky session like in the old good days so that given user will always be directed to the same basket
Exercise : Protocol Transmission
FILE:
- client sends 4-bytes word to transmitter
- transmitter splits word and send it byte-by-byte to router
- router has to route bytes from a specific word always to the same receiver
- after translating word to string it is forwarded to the processor
Dynamic
In the demo config file you can find dynamic router configuration. Run demo file and investigate how workers are added or removed accordingly to configuration and message flow.
akka.actor.deployment {
/parent/router29 {
router = round-robin-pool
resizer {
lower-bound = 2
upper-bound = 15
messages-per-resize = 100
}
}
}
lower-bound = 1
upper-bound = 10
pressure-threshold = 1
rampup-rate = 0.25
backoff-threshold = 0.3
backoff-rate = 0.1
messages-per-resize = 10
Links
http://doc.akka.io/docs/akka/2.4/scala/routing.html http://ivoroshilin.com/2013/07/15/distributed-caching-under-consistent-hashing/