Context

These are just my notes from one of the lectures in Martin Kleppmann’s Distributed Systems series. If you’re interested, the full lecture playlist is here.

1. The two generals problem

Two generals, each with an army, wants to capture a city.

Condition

They can only capture the city if both armies attach at the same time.

They can only communicate by a messenger. The messenger must pass through territory controlled by the city, so they can be captured.

There are two options:

  1. General-1 always goes ahead with the attack after sending a message to General-2
  2. General-1 always waits for acknowledgement from General-2; In this case, General-2 is in the same spot as General-1 in the first option

The problem is that no matter how many messages are exchanged, neither general can ever be certain that the other army will also turn up at the same time.

What does this demonstrate?

In a distributed system, there is no way for one node to have certainty about the state of another node. Therefore, the nature of this network results in messages being lost (poor reliability).

2. Describing nodes and network behaviour

System model specifies properties of a model. It can be used to make assumptions about the type of faults that could occur.

  • Two generals problem — model of networks

  • Byzantine generals problem — model of node behaviour

  • ? How is two generals problem a model of networks?

  • ? How is byzantine generals problem a model of node behaviour?

System model can be defined by three main properties:

2.1. Network behaviour (e.g. message loss)

Assuming two nodes in a network uses bidirectional point-to-point communication, links can be one of:

  • Reliable link - all the messages in the network were sent by a node and no node is lost or created arbitrarily

    • ! Message is received iif it is sent
    • ! Messages may be reordered
  • Fair-loss links - few messages can get lost (not delivered)

    • ! Messages may be lost, duplicated or re-ordered
    • Assumes network partition lasts only for a finite period of time
      • @ Network partition = network interruption
  • ! Arbitrary links - malicious adversary may interfere with messages

We can achieve reliability from both fair-loss and arbitrary links through:

  1. Fair-loss to Reliable - with retries
    • ! Deduplication
  2. Arbitrary to Fair-loss (assuming we can send messages in the first place) - with TLS
    • Uses cryptographic techniques
    • Transport Layer Security (TLS) protocol, prevents active adversary from eavesdropping, modifying, spoofing, or replaying traffic.
      • It cannot prevent adversary blocking communication

Practical considerations

  • Congestion/contention can cause queuing

2.2. Node behaviour (e.g. crashes)

  • & crash-stop (fail-stop) - Node stops executing in case of failure
    • If a node is fault and crashes, it stops executing forever
  • & Retry Crash-recovery (fail-recovery)
    • A node may crash, losing it’s in-memory state but resumes executing sometime later.
      • ~ Docker container restarts
  • Byzantine (fail-arbitrary) - Node behaves without definitive behaviour
    • A node is faulty if it deviates from the algorithm, i.e may do anything, including crashing or malicious behaviour

Practical considerations

  • OS scheduling issues, e.g. priority inversion
    • ? Priority inversion -
  • Stop-the-work garbage collection pauses
    • Stops all the running threads and can last for minutes
  • Page faults, swap, thrashing
  • Real-time operating systems (RTOS) provide scheduling guarantees, but most distributed systems do not use RTOS
    • ? difference b/w RTOS vs general-purpose OS (GPOS)

2.3. Synchrony assumption, i.e. Timing (e.g. latency)

  • Synchronous
    • We know the upper bound of latency
    • Execution speed of a node is known (strong assumption)
  • Partially-synchronous
    • Sync + async (hybrid)
  • Async
    • Delayed arbitrarily. Therefore, no timing guarantees at all
    • Nodes can pause execution arbitrarily

Practical considerations

  • Usually quite predictable latency
  • Switch buffer??

These parts are the basis for any distributed algorithm. If the assumptions are wrong, the system will not work as expected!

Exercise 3. Say you have a client-server RPC system in which a client repeats an RPC request until it receives a response. How could the server deduplicate the requests?

  • We can deduplicate the requests by ensuring that the client uses an unique idempotency ID for all the RPC requests until it receives a response.
  • On the server, we use the idempotency ID to avoid duplicate work.
    • This can be achieved by caching (with high-throughput and fast reads, e.g. Redis) the idempotency ID.
      • Reads >> Write (We only write to the store the first time, subsequent requests read from the store)
  • We can use idempotency ID attached the request on the client.

3. Fault tolerance and high availability

  • Availability = uptime = amount time a system is able to respond to requests

    • 99.99% - 4 nines (53 mins/year), etc…
  • Failure - when a system is completed unresponsive or crashed

    • & Whole system is not working
  • Fault - when one of the node in a system is byzantine or arbitrary

    • & Node fault or network fault
  • ! Fault tolerance - max num of faults a system can withstand and continue to work

  • ! Single point of failure - node/network link whose fault leads to failure

  • SLO - Service level objective

    • e.g. “99.9% of request in a day get a response in 400ms”
  • SLA - Service level agreement

    • Contract which specifies some SLO, penalties for violation
  • Failure detector - This is actually fault detector, which detects if a one or more node is faulty.

    • & Most cases, it periodically sends messages to other nodes, and labels a node as crashed if no response is received within the expected time.
    • Usually, it assumes all nodes are partially-synchronous.
    • The problem is it’s hard to distinctly differentiate between faulty node, temporarily irresponsive node, crashed node, or any other arbitrary node state.
      • & The two generals problem tells us that this is not a totally accurate way of detecting a crash, because absence of a response could also be due to message loss or delay.
      • Perfect timeout-based failure detector exists only in
        • Sync crash-stop system
        • Reliable links
    • & In partial synchronous systems: we have eventually perfect failure detector
      • Internally, they keep track of faultiness of a node.

Exercise 4. Reliable network links allow messages to be reordered. Give pseudocode for an algorithm that strengthens the properties of a reliable point-to-point link such that messages are received in the order they were sent (this is called a FIFO link), assuming an asynchronous crash-stop system model.

  • We use a queue data structure to store the messages, which has a FIFO behaviour in insert and pop operations.
  • ~ This can be docker containers in the network sending and received messages.

Assumptions

  1. Only messages sent are received
  2. Messages can be reordered
  3. Both nodes start with a sequence no set to zero
  4. Message delays are arbitrary without any timing guarantees
 
# Receiver.py - This is a different node
 
seq_no = 0
message_queue = Queue() 
"""Assume this is distributed queue"""
 
def sender(message: str) -> None:
	"""Node that sends a message"""
 
	seq_no += 1
	message_queue.push((seq_no, message))
 
# Sender.py - This is a different node
import heapq 
 
node_seq_no = 0
 
message_queue = Queue()
waiting_heap = heapq.heapify([])
"""Store the most minimum seq length in the number"""
 
read_queue = Queue()
 
def receiver():
 
	while message_queue:
		seq_no, message = message_queue.read()
 
		if node_seq_no == seq_no:
			read_queue.push((seq_no, message))
 
			has_more_items = True
			node_seq_no = seq_no + 1
 
			while waiting_heap and has_more_items:
				msg_seq_no, last_msg = heapq.heappop(waiting_heap)
				if last_msg_seq_no == node_seq_no:
					read_queue.push((last_msg_seq_no, last_msg))
					node_seq_no += 1
				else:
					heapq.heappush(waiting_heap, (msg_seq_no, last_msg))
					has_more_items = False
		elif seq_no > node_seq_no:
			# Buffer to hold out-of-order messages
			heapq.heappush(waiting_heap, (seq_no, message))
		else: 
			# seq_no < node_seq_no, means it's a duplicate message
			# therefore it can be safely ignored.
			pass
		
 

Exercise 5. How do we need to change the algorithm from Exercise 4 if we assume a crash-recovery model instead of a crash-stop model?

  • As the sequence number is an in-memory state, we should persist the node state in a external storage (redis). The node would sync with the external storage on startup (assuming it’s recovering from failure). Probably use node_id as the key to store the node state.