Posted by virantha on Wed 27 September 2017

Part 2 - Building a Fast Event-Driven Simulator for Concurrent Systems of Hardware Processes Using Curio and Python 3.6

This is the second in my series on using Curio to build an event-driven simulator for hardware processes. (see Part 1 for the first article)

1   Enhancing the Simulator Framework

Now, we're going to make this framework a little bit more robust and less verbose when defining the system. In order to do this, I'm going to first introduce the Port class that will be used to define how Processes are connected. Then we'll write a utility function to start and end the system automatically, and leverage type-annotations to make the code cleaner. Finally, we'll also introduce a distributed timestamp system to keep track of simulator time.

2   Introducing Ports

Let's add the following classes to our framework:

class Port:
    def __init__(self):
        self.chan = None

class InputPort(Port):
    async def recv(self):
        tok = await self.chan.recv()
        return tok

class OutputPort(Port):
    async def send(self, val):
        await self.chan.send((val))

An OutputPort is meant to connect to the sender side of a Channel, so it has a send method, while the InputPort connects to the receiver side with a recv method.

Both kinds of Ports contain a link to the Channel to which it is connected to.

Now, let's introduce a utility function that given an OutputPort and an InputPort, will instantiate a channel to be used to connect the two ports. This lets us hide the concept of a Channel inside this utility connect function and will eliminate the need to manually instantiate it in our simulation like we did previously.

def connect(a, b, name=''):
    # Connect ports together by instantiating a channel

    chan = Channel(name)

    # Check to make sure the ports have not been connected previously to other channels!
    assert not a.chan, f"Channel {a} has already been connected!"
    assert not b.chan, f"Channel {b} has already been connected!"

    # Check to make sure the two ports are of opposite type (input/output)
    if isinstance(a, InputPort):
        assert isinstance(b, OutputPort), f"Channel {a} and {b} are both input ports!"

        # Store the ports this channel is connected to
        # b ---chan---> a
        chan.l = b
        chan.r = a
    else:
        assert isinstance(b, InputPort), f"Channel {a} and {b} are both output ports!"

        # Store the ports this channel is connected to
        # a ---chan---> b
        chan.l = a
        chan.r = b

    # Now assign the channel to the two ports
    a.chan = chan
    b.chan = chan

This allows us to add a variety of checks like making sure a Port is only connected once to a Channel (recall that channels are point-to-point connections), and that one is an input and the other is an output.

Next, let's modify our Processes to perform send and recv actions on their internal ports instead of channels.

class Source(Process):

    def __init__(self, name, length, srcval):
        super().__init__(name)
        self.val = srcval
        self.length = length
        self.R = OutputPort()

    async def exec(self):
        for i in range(self.length):
            self.message(f"sending {self.val}")
            await self.R.send(self.val)
            self.message(f"sent {self.val}")
        self.message("terminated")

class Sink(Process):

    def __init__(self, name):
        super().__init__(name)
        self.L = InputPort()

    async def exec(self):
        tok_count = 0
        try:
            while True:
                tok = await self.L.recv()
                tok_count += 1
                self.message(f"received {tok}")
        except CancelledError:
            self.message(f"{tok_count} tokens received")


class Buffer(Process):

    def __init__(self, name):
        super().__init__(name)
        self.L = InputPort()
        self.R = OutputPort()


    async def exec(self):
        while True:
            tok = await self.L.recv()
            self.message(f"received {tok}")
            self.message(f"sending {tok}")
            await self.R.send(tok)

In each constructor now, we've defined member variables for the Ports that each Process needs, and we've gotten rid of the connect method and delegated that to the new connect function.

At this point, we can rewrite our system and remove any instantiations of Channels, which is pretty neat. And there's much more error checking going on to make sure we don't try to connect mismatched Port types, for example.

async def system():

    N = 10 # How many buffers in our linear pipeline

    # Instantiate the processes
    src = Source('src1', 10, 1)
    buf = [Buffer(f'buf[{i}]') for i in range(N)]
    snk = Sink('snk')

    # Connect the processes with the channels
    connect(src.R, buf[0].L)
    for i in range(1, N):
        connect(buf[i-1].R, buf[i].L)
    connect(snk.L, buf[N-1].R)

    # Start the processes
    p_src = await spawn(src.exec())
    p_snk = await spawn(snk.exec())
    p_buf = [await spawn(buf[i].exec()) for i in range(N)]

    # Wait for the source to finish sending all its values
    await p_src.join()
    # Cancel the remaining processes
    for i in range(N):
        await p_buf[i].cancel()
    await p_snk.cancel()

Running this will yield the same output as our first simulator. You can find the complete code file in this port_gist.

3   Simplifying the execution

One further step we can take now is to eliminate the need to manually spawn and join/cancel the processes to run our system. Wouldn't it be nice if we could just say something like run_all which would automatically start the defined processes?

Well, let's do just that! The basic idea is to keep track of all the instantiated process in the Process base class. However, in order to join on the correct type of Process (i.e. only Processes that generate values), we'll need to distinguish two kinds of processes, Producers and others (non-producers). We'll make any of the former inherit from a new (abstract) class called Producer so we can keep track of those instances.

class Process:

    next_id = 0
    non_producer_processes = {}
    producer_processes = {}

    def __init__(self, name):
        self.name = name
        self.id = Process.next_id
        Process.next_id += 1
        # Keep track of all source processes (join on these at the end), and non-source processes (cancel on these at the end)
        if isinstance(self, Producer):
            Process.producer_processes[self.id] = self
        else:
            Process.non_producer_processes[self.id] = self

    def __str__(self):
        return f"{self.name}.{self.id}"

    def __repr__(self):
        return f"{type(self).__name__}('{self.name}')"

    def message(self, m):
        print(f"{self}: {m}")

class Producer(Process):
    # All processes that drive the system (by injecting values in on channels unconditionally)
    # must subclass this process
    pass

class Source(Producer):

    def __init__(self, name, length, srcval):
        super().__init__(name)
        self.val = srcval
        self.length = length
        self.R = OutputPort()

    async def exec(self):
        for i in range(self.length):
            self.message(f"sending {self.val}")
            await self.R.send(self.val)
            self.message(f"sent {self.val}")
        self.message("terminated")

Now, the producer_processes dict keeps track of all the Producer type processes, and non_producer_processes stores the rest. We also change Source to inherit from the new Producer subclass.

And that's pretty much it, we can now create a generic run_all function like so:

async def run_all():
    source_tasks = []
    other_tasks = []

    for p in Process.producer_processes.values():
        source_tasks.append(await spawn(p.exec()))
    for p in Process.non_producer_processes.values():
        other_tasks.append(await spawn(p.exec()))

    # Now wait for all sources to end
    for task in source_tasks:
        await task.join()
    for task in other_tasks:
        await task.cancel()

And our system reduces to just the following:

from curio import run, spawn
async def system():

    N = 10 # How many buffers in our linear pipeline

    # Instantiate the processes
    src = Source('src1', 10, 1)
    buf = [Buffer(f'buf[{i}]') for i in range(N)]
    snk = Sink('snk')

    # Connect the processes with the channels
    connect(src.R, buf[0].L)
    for i in range(1, N):
        connect(buf[i-1].R, buf[i].L)
    connect(snk.L, buf[N-1].R)

    await run_all()

Much simpler and requires just the Process and Connection definitions! The runnable code is at run_all_gist.

4   Type-annotations for readability

Python 3 introduced type annotations as part of the syntax, and I'm going to leverage that to specify the ports on a Process in a cleaner way. For example, instead of instantiating an InputPort and an OutputPort manually in the constructor of the Buffer process, I'm going to add support to do the following:

class Buffer(Process):

    L: InputPort
    R: OutputPort

    def __init__(self, name):
        super().__init__(name)

    async def exec(self):
        while True:
            tok = await self.L.recv()
            self.message(f"received {tok}")
            self.message(f"sending {tok}")
            await self.R.send(tok)

The syntax of variable: type is an annotation, and although Python does not yet assert any type checks based on this (although there are static type checking tools out there that do), we can still get access to an object's annotations by looking at the __annotations__ member dict, which is a mapping of the name to the type like so:

>>> b = Buffer("buf")
>>> b.__annotations__
{'L': <class '__main__.InputPort'>, 'R': <class '__main__.OutputPort'>}

I'm going to modify the Process base class's constructor to take any annotations that are ports, and inject instances of those types into the object like so:

    def __init__(self, name):
        self.name = name
        self.id = Process.next_id
        Process.next_id += 1
        # Keep track of all source processes (join on these at the end), and non-source processes (cancel on these at the end)
        if isinstance(self, Producer):
            Process.producer_processes[self.id] = self
        else:
            Process.non_producer_processes[self.id] = self

        # Inject the ports from the annotations on this instance
        for name, val in self.__annotations__.items():
            if issubclass(val, Port):
                print(f'injecting port({val}) {name} onto {self.__class__}:{self.name}')
                port = val()
                setattr(self, name, port)

Ignoring the print that's only three extra lines needed to move to this cleaner syntax. Isn't Python awesome?

You can find the complete code at annotate_gist.

5   Adding simulation timestamps to the framework

Finally, let's make our simulator able to model the performance of our hardware system. We can make each Process have a timestep that corresponds to how long it takes to execute its code. Each time through, each Process will update its local time with this timestep. Then, when communication occurs, we simply dispatch the local time with each sent value. At the receiving end, the local time is either smaller or larger than the message timestamp. Since messages can't have come from the future, if the local time is smaller, then we need to update the receiver's local time to the received messages timestamp, in order not to violate causality.

So what will it take to do this? I'm going to introduce a default timestep variable to the Process class, and set it to 100 units. I'm also going to introduce an advance_time method that will increment the local _time by the timestep when called like so:

class Process:

    next_id = 0
    non_producer_processes = {}
    producer_processes = {}
    timestep = 100

    def __init__(self, name):
        self.name = name
        self.id = Process.next_id
        Process.next_id += 1
        # Keep track of all source processes (join on these at the end), and non-source processes (cancel on these at the end)
        if isinstance(self, Producer):
            Process.producer_processes[self.id] = self
        else:
            Process.non_producer_processes[self.id] = self

        # Inject the ports from the annotations on this instance
        for name, val in self.__annotations__.items():
            if issubclass(val, Port):
                print(f'injecting port({val}) {name} onto {self.__class__}:{self.name}')
                port = val()
                setattr(self, name, port)
                port.proc = self  # Store the process this port is a part of (used for updating local time in the proc on a receive)

        # Local time
        self._time = 0

    def advance_local_time(self):
        self._time += self.timestep

    def __str__(self):
        return f"{self.name}.{self.id}"

    def __repr__(self):
        return f"{type(self).__name__}('{self.name}')"

    def message(self, m):
        print( f'T:{self._time}: {self} - { m }')

I've also modified the message method to provide the current time on each print while the simulation is running. One other thing to note is that I changed the Port injection code to allow every Port to know its containing Process; this gives every Port the ability to access the local _time variable. Once each Port has this information, we just make the send method transmit a tuple of the value and the local _time, and the receive method can compare each incoming message's timestamp with the local _time, and update it if necessary:

class Port:
    def __init__(self):
        self.chan = None
        self.proc = None

class InputPort(Port):
    async def recv(self):
        tok, timestamp = await self.chan.recv()
        self.proc._time = max(self.proc._time, timestamp)
        return tok

class OutputPort(Port):
    async def send(self, val):
        await self.chan.send((val, self.proc._time))

The only remaining thing to modify is each Process to make sure we advance the local time at the proper place. For example, in the buffer we will advance the time after the receive but before the send, to model the execution time of processing a value like so:

class Buffer(Process):

    L: InputPort
    R: OutputPort

    def __init__(self, name):
        super().__init__(name)

    async def exec(self):
        while True:
            tok = await self.L.recv()
            self.message(f"received {tok}")
            self.advance_local_time()
            self.message(f"sending {tok}")
            await self.R.send(tok)

And that's it. If we run the simulation now, we get the following output, with the simulation time reported in every message:

T:0: src1.0 - sending 1
T:100: src1.0 - sent 1
T:100: src1.0 - sending 1
T:100: buf[0].1 - received 1
T:200: buf[0].1 - sending 1
T:200: buf[1].2 - received 1
T:300: buf[1].2 - sending 1
.
.
.
T:1700: buf[7].8 - received 1
T:1800: buf[7].8 - sending 1
T:1800: buf[9].10 - received 1
T:1900: buf[9].10 - sending 1
T:1800: buf[8].9 - received 1
T:1900: buf[8].9 - sending 1
T:2000: snk.11 - received 1
T:1900: buf[9].10 - received 1
T:2000: buf[9].10 - sending 1
T:2100: snk.11 - received 1
T:2100: snk.11 - 10 tokens received

The complete runnable code is available as a gist - timestamp_gist.

And that brings us to the close of this article. In Part 3, I'll demonstrate how we can build and model a hardware palindrome checker using this framework. Thanks for reading!

© Virantha Ekanayake. Built using Pelican. Modified svbhack theme, based on theme by Carey Metcalfe