Basic Actors

Learning approach

We will skip theoretical background and go directly to practical exercises with akka library.

With this approach participants of the qworkshop should more quickly gain an intuition on how actors work and also some practical exercises should be more interesting for beginners that pure theory.

The answer to the question why actors work the way they work should reveal itself gradually. Yet, if someone is really interested then some interesting theory may be found here :

People interested in OOP should also find this workshop interesting because actor model brings a fresh perspective to this paradigm.

Plan

Import the code : https://github.com/PawelWlodarski/workshops-reactive

During workshops we are going to :

  • Write a simple actor and learn about its implementation
    • train pattern matching to understand how to use receive method inside an actor
    • train partial functions to understand how an actor is implemented inside
  • Implement communication between two actors
  • Implement more sophisticated communication between multiple actors and understand better how state is encapsulated inside and actor and how work can be divided between actors in actor system.
  • There is also one example in Java

Simple Actor - 1

DEMO FILE : https://github.com/PawelWlodarski/workshops-reactive/blob/master/src/main/scala/jug/workshops/reactive/akka/basics/exercises/BasicsPart1SimpleActor1.scala

As the first exercise we are going to write a very simple actor implementation. We will learn how pattern matching allows actor to handle different messages.

At the bottom of the file there is an actor skeleton prepared. We are going to create this actor within actor system and send to it couple messages.

  class SimpleActor extends Actor{
    override def receive: Receive = {
//      case "one" => println("in actor : received one")
//      case "two" =>
//        println("sending message two to sender")
//        sender ! "in actor : received two"
//      case msg => println(s"in actor : received unknown message : [value=$msg, type=${msg.getClass} ]")
      case _ => println("this catches all messages - uncomment specific cases")
    }
  }

We will practice scala syntax and more sophisticated pattern matching in the next section. But now for a start simple String matching should be clear enough.

Also notice word sender in the code. By having access to this reference an actor can send response to original sender.

Creation

We don't want actor to exist outside an ActorSystem context that's why we are passing special Props to a factory method so an actor is created outside our control.

The result of actorOf is an ActorRef which is standard reference. During workshops we will explain what are pros/cons of this approach.

val system=ActorSystem("workshops")
val actorProperties = Props[SimpleActor]
val simpleActorInstance: ActorRef = system.actorOf(actorProperties)

Exercises

EXERCISE FILe : https://github.com/PawelWlodarski/workshops-reactive/blob/master/src/test/scala/jug/workshops/reactive/akka/basics/exercises/BasicsPart1SimpleActorSpecExercise.scala

Exercise : isEven

The actor needs to be able to handle following message

actorUnderTesting ! 40

and return back true if number is even or false if it is odd.

Additional Exercise : adding

The actor needs to be able to handle following message

simpleActorInstance ! (1,2)

and return the result => 1+2

Additional Exercise : a new dedicated actor for additng numbers

Create a new actor called MultiplyingCalculator (sceleton ready at the bottom of the file) which will handle following message

calcActor ! (1,2)

and return Int result : calc (1+2) = 3

Pattern Matching - 2

DEMO FILE : https://github.com/PawelWlodarski/workshops-reactive/blob/master/src/main/scala/jug/workshops/reactive/akka/basics/exercises/BasicsPart2PatternMatching.scala

Now it's time to learn how to handle more sophisticated messages than simple Strings or Ints. We are going to train usage of the mechanism which is working inside actors and which recognizes what type of message was passed into it - Pattern Matching.

In the demonstration part we can see how to handle String messages + default, how to recognize value by type and the most important example how to decompose custom case classes

  sealed trait User
  case class Customer(email:String,cash:Int) extends User
  case class Admin(login:String) extends User

  def demonstration3(user:User)=user match {
    case Customer(e,c) => s"customer with email ${e} has ${c} credits"
    case Admin(l) => s"Admin with login ${l}"
  }

EXERCISE FILE : https://github.com/PawelWlodarski/workshops-reactive/blob/master/src/test/scala/jug/workshops/reactive/akka/basics/exercises/BasicsPart2PatternMatchingSpecExercise.scala

In the exercises we will use standard pattern matching outside actors to focus better on its mechanics.

Exercise : matching tuples

Write a function which handles simple mathematical operations on two arguments. Instruction will be passed as a Tuple of type (Int,Int,String)

  def primaryExercise(instruction:(Int,Int,String)) = ???

  primaryExercise((8, 2, "+")) mustBe 10
  primaryExercise((8, 2, "-")) mustBe 6
  primaryExercise((8, 2, "*")) mustBe 16

Additional Exercise : matching case classes

This time we are going to use custom case classes in pattern matching :

sealed trait Expression
case class Number(v:Int) extends Expression
case class Add(n1:Number,n2:Number) extends Expression
case class Mult(n1:Number,n2:Number) extends Expression

def additionalExercise(e:Expression) = ???

additionalExercise(Number(7)) mustBe 7
additionalExercise(Add(Number(7), Number(2))) mustBe 9
additionalExercise(Mult(Number(7), Number(2))) mustBe 14

Actors and Threads - 3

Akka actors are executing logic asynchronously in separate threads and that's why we used following printing function in our previous exercises to wait for results:

 def waitPrint(msg:String): Unit ={
    TimeUnit.MILLISECONDS.sleep(10)
    println(msg)
  }

No we are going to do a small research on relation between Actors and Threads. Also we will further practice Pattern Matching.

In demo part you will be shown that actor is really working in a different thread by calling :

Thread.currentThread()

also as a preparation to exercise you will learn how to send actor to :

sender ! message //sender of a message
self ! message  //send message to itself
other forward message // forward message leaving original sender

EXERCISE FILE : https://github.com/PawelWlodarski/workshops-reactive/blob/master/src/test/scala/jug/workshops/reactive/akka/basics/exercises/BasicsPart3ThreadsAndActorsSpecExercise.scala

Exercise : actors thread info

We have a skeleton for an exercise actor :

object ExerciseActor{
  case class ThreadId()
  case class ThreadName()
}

class ExerciseActor extends Actor{
  import ExerciseActor._

  override def receive: Actor.Receive = {
    case ThreadId => sender ! ???
    case ThreadName => sender! ???
    case l:List[_] => ???
  }

}

You can notice that actor has a companion object with all messages which define Actor's protocol

You need to implement handler for two mesages which will display actor's thread id, thread name and thread group

Additional Exercise : pattern matching on lists

Try to handle case when multiple messages are passed inside a list

exerciseActor ! List(ThreadId,ThreadName) // should display both actor's thread Id and thread name

Partial Functions - 4

EXERCISE FILE : https://github.com/PawelWlodarski/workshops-reactive/blob/master/src/main/scala/jug/workshops/reactive/akka/basics/exercises/BasicsPart4PartialFunctions.scala

Why we are interested in partial functions when we are talking about actors?

If you took a look into actor's internals you would see that receive method is actually a partial funcion:

 def receive: Actor.Receive

where Receive is a defined type type :

object Actor {
  /**
   * Type alias representing a Receive-expression for Akka Actors.
   */
  //#receive
  type Receive = PartialFunction[Any, Unit]

Partial function is a function not defined for all possible inputs. Because an actor is a partial function Any=>Unit then anything can go in but not everything is defined in receive.

To explain quickly take a look at this function which seems to be ok :

 val head = (l:List[Int]) => l.head

but in practice it will throw an exception for empty lists. So it is not defined for empty lists. We can make this constraint explicit by declaring the function as a partial function

val partialHead=new PartialFunction[List[Int],Int] {
    override def isDefinedAt(x: List[Int]): Boolean = x.length>0
    override def apply(v1: List[Int]): Int = v1.head
  }

of course Scala gives us very nice syntactic sugar to write partial functions in more concise way :

val partialHeadPM:PartialFunction[List[Int],Int]={
   case head::tail => head
}

So now we see that pattern matching is also used to define partial functions and actors are using both of those mechanisms.

Exercise : define two aprtial functions

Define two partial functions for adding and mulitplying numbers :

lazy val add:PartialFunction[(Int,Int,String),Int] = ???
lazy val mult:PartialFunction[(Int,Int,String),Int] = ???

Notice how we can compose two partial functions into partial functions which can handle both subsets of arguments

lazy val calc=add orElse mult

And specification :

calc(1,2,"+")==3
calc(6,2,"*")==12

Additional Exercise : modyfying state

Because Akka receive method is of type Any=>Unit so it doesn't return anything. The only way to actually save computation is to modify state outside the function. The role of an actor is to encapsulate modified state .

Here we are only going to simulate actors in the same thread so we have an "ObjectWithstate":

object ObjectWithState{
  case class Add(v:Int)
  case class Mult(v:Int)
  case class PrintState()
}

class ObjectWithState{
  import ObjectWithState._
  type Receive = PartialFunction[Any, Unit]

  private var state:Int=0

  val receive:Receive = ???
}

You need to modify state variable accordingly to passed parameters. Because our function is not returning anything (Any=>Unit) so to observe any results we need to modify external state => thats why we had defined testProbe in tests

 encapsulatedState.receive(Add(2))
 encapsulatedState.receive(Add(3))
 encapsulatedState.receive(PrintState(testProbe))
 encapsulatedState.receive(Mult(6))
 encapsulatedState.receive(PrintState(testProbe))

 testProbe.states must contain only(5,30)

Is state variable properly encapsulated? Is there other way to protect internal state without making it private?

Actors with state - 5

DEMO FILE : https://github.com/PawelWlodarski/workshops-reactive/blob/master/src/main/scala/jug/workshops/reactive/akka/basics/exercises/BasicsPart5State.scala

We learn about Actors internal structure and now it's time to learn how multiple actors can communicate.

We are going to send messages to Forwarder actor which will have reference to History actor which will keep received messages.

How Forwarder will know to whom send a message? We can inject reference to other actors in a constructor.

class Forwarder(actorRef:ActorRef) extends Actor{

Now we see that state inside actors is protected by a fact that other actors actually don't have direct reference to other actors but they are communicating through sumbolic reference which is of type ActorRef

Exercise : testing actors with state

EXERCISE FILE : https://github.com/PawelWlodarski/workshops-reactive/blob/master/src/test/scala/jug/workshops/reactive/akka/basics/exercises/BasicsPart5StateSpecExercise.scala

You need to implement actors :

  • Deduplicator which removes words already processed by pipeline
  • Word Processor which capitalize letters

Actors In Java - BONUS

You can use Akka also with Java. But because Java lacks native pattern matching you need to deconstruct messages by yourself to properly handle it.

FILE : https://github.com/PawelWlodarski/workshops-reactive/blob/master/src/main/scala/jug/workshops/reactive/akka/a1/java/JavaActorExample.java

public class JavaActorExample {

    public static void main(String[] args){
        Props javaActorProps = Props.create(JavaActor.class);
        final ActorSystem system = ActorSystem.create("JavaSystem");

        ActorRef javaActor = system.actorOf(javaActorProps);

        javaActor.tell("hello",ActorRef.noSender());
        javaActor.tell("unknown",ActorRef.noSender());
        javaActor.tell(20,ActorRef.noSender());

        system.terminate();
    }

}

class JavaActor extends UntypedActor{

    @Override
    public void onReceive(Object message) throws Exception {
        if("hello".equals(message)){
            System.out.println("JAVA ACTOR : hello");
        }else if (message instanceof String){
            System.out.println("JAVA ACTOR : received some string : "+message);
        }else{
            System.out.println("JAVA ACTOR :received something else: "+message);
        }
    }
}

An interesting thing is that you can easily call Java Actor from Scala Actor.

 val system=ActorSystem("scala-calling-java")

 val javaActor=system.actorOf(Props[JavaActor])


 javaActor ! "hello"
 javaActor ! "other message"

 system.terminate()

Part 6 : Testing

DEMO FILE : https://github.com/PawelWlodarski/workshops-reactive/blob/master/src/test/scala/jug/workshops/reactive/akka/basics/exercises/BasicsPart6Demo.scala

Assertions :

  • expectMsg(DemoResponse(2)) - waits for a sp[ecific response
  • expectMsgPF - decompose received message
expectMsgPF(){
          case DemoResponse(number) => number mustBe 3
}
  • receiveWhile - receives messages as long they are accepted by pattern matching
import scala.concurrent.duration._

val receivedNumbers=other.receiveWhile(500 millis){
  case DemoResponse(number) => number
}
  • other.expectNoMsg(500 millis) - to check that other messages were not sent

EXERCISE FILE : https://github.com/PawelWlodarski/workshops-reactive/blob/master/src/test/scala/jug/workshops/reactive/akka/basics/exercises/BasicsPart6TestingActorExercise.scala

Write test for :

  • ActorA
    • when it receive StartGame message it must start sending Ball
    • it will play for n rounds (counter passed in ball)
  • ActorB
    • it will play for N Rounds (rounds saved in actor state)

Exercise : Divided Computation - 7

EXERCISE FILE : TODO

This time we are going to split computations between Task Actors and then gather final result in the Combiner Actor

  • In TaskProcessor handle ComputationTask(start:Long,end:Long) message by calcualting sum of numbers from start to end(exclusive <start,end) ) and the send result to combiner.
  • In Combiner handle message Result(v:BigDecimal) sent by processors
  • In Combiner handle message DisplayResult(probe) which return result to passed actor reference

results matching ""

    No results matching ""