Basic Actors
Learning approach
We will skip theoretical background and go directly to practical exercises with akka library.
With this approach participants of the qworkshop should more quickly gain an intuition on how actors work and also some practical exercises should be more interesting for beginners that pure theory.
The answer to the question why actors work the way they work should reveal itself gradually. Yet, if someone is really interested then some interesting theory may be found here :
People interested in OOP should also find this workshop interesting because actor model brings a fresh perspective to this paradigm.
Plan
Import the code : https://github.com/PawelWlodarski/workshops-reactive
During workshops we are going to :
- Write a simple actor and learn about its implementation
- train pattern matching to understand how to use receive method inside an actor
- train partial functions to understand how an actor is implemented inside
- Implement communication between two actors
- Implement more sophisticated communication between multiple actors and understand better how state is encapsulated inside and actor and how work can be divided between actors in actor system.
- There is also one example in Java
Simple Actor - 1
As the first exercise we are going to write a very simple actor implementation. We will learn how pattern matching allows actor to handle different messages.
At the bottom of the file there is an actor skeleton prepared. We are going to create this actor within actor system and send to it couple messages.
class SimpleActor extends Actor{
override def receive: Receive = {
// case "one" => println("in actor : received one")
// case "two" =>
// println("sending message two to sender")
// sender ! "in actor : received two"
// case msg => println(s"in actor : received unknown message : [value=$msg, type=${msg.getClass} ]")
case _ => println("this catches all messages - uncomment specific cases")
}
}
We will practice scala syntax and more sophisticated pattern matching in the next section. But now for a start simple String matching should be clear enough.
Also notice word sender in the code. By having access to this reference an actor can send response to original sender.
Creation
We don't want actor to exist outside an ActorSystem context that's why we are passing special Props to a factory method so an actor is created outside our control.
The result of actorOf is an ActorRef which is standard reference. During workshops we will explain what are pros/cons of this approach.
val system=ActorSystem("workshops")
val actorProperties = Props[SimpleActor]
val simpleActorInstance: ActorRef = system.actorOf(actorProperties)
Exercises
Exercise : isEven
The actor needs to be able to handle following message
actorUnderTesting ! 40
and return back true if number is even or false if it is odd.
Additional Exercise : adding
The actor needs to be able to handle following message
simpleActorInstance ! (1,2)
and return the result => 1+2
Additional Exercise : a new dedicated actor for additng numbers
Create a new actor called MultiplyingCalculator (sceleton ready at the bottom of the file) which will handle following message
calcActor ! (1,2)
and return Int result : calc (1+2) = 3
Pattern Matching - 2
Now it's time to learn how to handle more sophisticated messages than simple Strings or Ints. We are going to train usage of the mechanism which is working inside actors and which recognizes what type of message was passed into it - Pattern Matching.
In the demonstration part we can see how to handle String messages + default, how to recognize value by type and the most important example how to decompose custom case classes
sealed trait User
case class Customer(email:String,cash:Int) extends User
case class Admin(login:String) extends User
def demonstration3(user:User)=user match {
case Customer(e,c) => s"customer with email ${e} has ${c} credits"
case Admin(l) => s"Admin with login ${l}"
}
In the exercises we will use standard pattern matching outside actors to focus better on its mechanics.
Exercise : matching tuples
Write a function which handles simple mathematical operations on two arguments. Instruction will be passed as a Tuple of type (Int,Int,String)
def primaryExercise(instruction:(Int,Int,String)) = ???
primaryExercise((8, 2, "+")) mustBe 10
primaryExercise((8, 2, "-")) mustBe 6
primaryExercise((8, 2, "*")) mustBe 16
Additional Exercise : matching case classes
This time we are going to use custom case classes in pattern matching :
sealed trait Expression
case class Number(v:Int) extends Expression
case class Add(n1:Number,n2:Number) extends Expression
case class Mult(n1:Number,n2:Number) extends Expression
def additionalExercise(e:Expression) = ???
additionalExercise(Number(7)) mustBe 7
additionalExercise(Add(Number(7), Number(2))) mustBe 9
additionalExercise(Mult(Number(7), Number(2))) mustBe 14
Actors and Threads - 3
Akka actors are executing logic asynchronously in separate threads and that's why we used following printing function in our previous exercises to wait for results:
def waitPrint(msg:String): Unit ={
TimeUnit.MILLISECONDS.sleep(10)
println(msg)
}
No we are going to do a small research on relation between Actors and Threads. Also we will further practice Pattern Matching.
In demo part you will be shown that actor is really working in a different thread by calling :
Thread.currentThread()
also as a preparation to exercise you will learn how to send actor to :
sender ! message //sender of a message
self ! message //send message to itself
other forward message // forward message leaving original sender
Exercise : actors thread info
We have a skeleton for an exercise actor :
object ExerciseActor{
case class ThreadId()
case class ThreadName()
}
class ExerciseActor extends Actor{
import ExerciseActor._
override def receive: Actor.Receive = {
case ThreadId => sender ! ???
case ThreadName => sender! ???
case l:List[_] => ???
}
}
You can notice that actor has a companion object with all messages which define Actor's protocol
You need to implement handler for two mesages which will display actor's thread id, thread name and thread group
Additional Exercise : pattern matching on lists
Try to handle case when multiple messages are passed inside a list
exerciseActor ! List(ThreadId,ThreadName) // should display both actor's thread Id and thread name
Partial Functions - 4
Why we are interested in partial functions when we are talking about actors?
If you took a look into actor's internals you would see that receive method is actually a partial funcion:
def receive: Actor.Receive
where Receive is a defined type type :
object Actor {
/**
* Type alias representing a Receive-expression for Akka Actors.
*/
//#receive
type Receive = PartialFunction[Any, Unit]
Partial function is a function not defined for all possible inputs. Because an actor is a partial function Any=>Unit then anything can go in but not everything is defined in receive.
To explain quickly take a look at this function which seems to be ok :
val head = (l:List[Int]) => l.head
but in practice it will throw an exception for empty lists. So it is not defined for empty lists. We can make this constraint explicit by declaring the function as a partial function
val partialHead=new PartialFunction[List[Int],Int] {
override def isDefinedAt(x: List[Int]): Boolean = x.length>0
override def apply(v1: List[Int]): Int = v1.head
}
of course Scala gives us very nice syntactic sugar to write partial functions in more concise way :
val partialHeadPM:PartialFunction[List[Int],Int]={
case head::tail => head
}
So now we see that pattern matching is also used to define partial functions and actors are using both of those mechanisms.
Exercise : define two aprtial functions
Define two partial functions for adding and mulitplying numbers :
lazy val add:PartialFunction[(Int,Int,String),Int] = ???
lazy val mult:PartialFunction[(Int,Int,String),Int] = ???
Notice how we can compose two partial functions into partial functions which can handle both subsets of arguments
lazy val calc=add orElse mult
And specification :
calc(1,2,"+")==3
calc(6,2,"*")==12
Additional Exercise : modyfying state
Because Akka receive method is of type Any=>Unit so it doesn't return anything. The only way to actually save computation is to modify state outside the function. The role of an actor is to encapsulate modified state .
Here we are only going to simulate actors in the same thread so we have an "ObjectWithstate":
object ObjectWithState{
case class Add(v:Int)
case class Mult(v:Int)
case class PrintState()
}
class ObjectWithState{
import ObjectWithState._
type Receive = PartialFunction[Any, Unit]
private var state:Int=0
val receive:Receive = ???
}
You need to modify state variable accordingly to passed parameters. Because our function is not returning anything (Any=>Unit) so to observe any results we need to modify external state => thats why we had defined testProbe in tests
encapsulatedState.receive(Add(2))
encapsulatedState.receive(Add(3))
encapsulatedState.receive(PrintState(testProbe))
encapsulatedState.receive(Mult(6))
encapsulatedState.receive(PrintState(testProbe))
testProbe.states must contain only(5,30)
Is state variable properly encapsulated? Is there other way to protect internal state without making it private?
Actors with state - 5
We learn about Actors internal structure and now it's time to learn how multiple actors can communicate.
We are going to send messages to Forwarder actor which will have reference to History actor which will keep received messages.
How Forwarder will know to whom send a message? We can inject reference to other actors in a constructor.
class Forwarder(actorRef:ActorRef) extends Actor{
Now we see that state inside actors is protected by a fact that other actors actually don't have direct reference to other actors but they are communicating through sumbolic reference which is of type ActorRef
Exercise : testing actors with state
You need to implement actors :
- Deduplicator which removes words already processed by pipeline
- Word Processor which capitalize letters
Actors In Java - BONUS
You can use Akka also with Java. But because Java lacks native pattern matching you need to deconstruct messages by yourself to properly handle it.
public class JavaActorExample {
public static void main(String[] args){
Props javaActorProps = Props.create(JavaActor.class);
final ActorSystem system = ActorSystem.create("JavaSystem");
ActorRef javaActor = system.actorOf(javaActorProps);
javaActor.tell("hello",ActorRef.noSender());
javaActor.tell("unknown",ActorRef.noSender());
javaActor.tell(20,ActorRef.noSender());
system.terminate();
}
}
class JavaActor extends UntypedActor{
@Override
public void onReceive(Object message) throws Exception {
if("hello".equals(message)){
System.out.println("JAVA ACTOR : hello");
}else if (message instanceof String){
System.out.println("JAVA ACTOR : received some string : "+message);
}else{
System.out.println("JAVA ACTOR :received something else: "+message);
}
}
}
An interesting thing is that you can easily call Java Actor from Scala Actor.
val system=ActorSystem("scala-calling-java")
val javaActor=system.actorOf(Props[JavaActor])
javaActor ! "hello"
javaActor ! "other message"
system.terminate()
Part 6 : Testing
Assertions :
- expectMsg(DemoResponse(2)) - waits for a sp[ecific response
- expectMsgPF - decompose received message
expectMsgPF(){
case DemoResponse(number) => number mustBe 3
}
- receiveWhile - receives messages as long they are accepted by pattern matching
import scala.concurrent.duration._
val receivedNumbers=other.receiveWhile(500 millis){
case DemoResponse(number) => number
}
- other.expectNoMsg(500 millis) - to check that other messages were not sent
Write test for :
- ActorA
- when it receive StartGame message it must start sending Ball
- it will play for n rounds (counter passed in ball)
- ActorB
- it will play for N Rounds (rounds saved in actor state)
Exercise : Divided Computation - 7
EXERCISE FILE : TODO
This time we are going to split computations between Task Actors and then gather final result in the Combiner Actor
- In TaskProcessor handle ComputationTask(start:Long,end:Long) message by calcualting sum of numbers from start to end(exclusive <start,end) ) and the send result to combiner.
- In Combiner handle message Result(v:BigDecimal) sent by processors
- In Combiner handle message DisplayResult(probe) which return result to passed actor reference