Microservices
This article will not be a typical presentation of what microservices are. There’s a lot of such summaries already available on the Internet or as books (I recommend Building Microservices by the way). Microservice is ironically a big word. Many systems call themselves “microservice-based”, while actually being more like distributed monoliths or something else. The terms is definitely a bit fuzzy, and I don’t intend to resolve that now. Instead, I intend to discuss a few topics that often come up when designing distributed systems, like microservices.
Domain Driven Design
The domain is the heart of business software, and domain driven design puts a big emphasis on it. Microservice is a sound way to implement the DDD’s Bounded Context concept. I have a separate article on the DDD topic, and here I’ll just link to it.
Transactions
ACID implies support for transactions. Isolation can have various levels:
- Read Commited - basic level of isolation where reads and writes are not dirty. However, in cases where one transaction occurs in the middle of another, osme inconsistency might be observed. E.g., transaction A might read one table and transaction B would then update another table. The transaction A would then read that second table and the values might be inconsistent.
- Snapshot - when transaction starts, it will operate entirely on a snapshot of the DB taken in single point in time
Traditional transactions are not that easy to apply to distributed systems. They work well in systems where a single persistent storage is used. In microservices world, that is not usually the case. Even if multiple services use the same DB technology, they might be different instances, and more importantly, microserve’s storage system is its implementation details that other services should not know about. Without traditional transactions, we have to think of other ways to implement atomicity.
Sagas
Saga is an alternative to a traditional transaction. Saga is about splitting a big transactions into a bunch of smaller transactions. One important aspect of sagas it that not all failures are treated uniformly. Unexpected technical failures are not handled by this approach, it only handles situations that are either business cases (e.g. cancellation of order by the user) or expected technical failures (like some timeout that the developer expected and added handling for).
A failure in a saga results in:
- backward recovery (rollback)
- forward recovery (accepting the failure and carrying on somehow)
- some mix of the approaches above
Backward Recovery
Every sub-operation needs to have a corresponding compensating opposite operation. E.g., if item was added to an order, we need to be able to remove it from the order. Not all operations are “cleanly” reversible though. An example of that would be sending an e-mail. (Usually), you can’t unsend it. Your customer could have already received a notification that “the order is ready”. In such cases, you need to use alternative approaches, such as sending another e-mail, informing the customer that we apologize for the inconvenience of their order being canceled.
It’s important to plan well the order of operations in the saga. Operations that are the most likely to fail should (if possible of course) execute first.
Forward Recovery
Another way to handle failure is to try alternative/fallback handling. For example, if your system couldn’t book shipment on the date selected by the customer, maybe your system could autonomously “make a decision” to try ordering shipment on another date. If that one succeeds, the saga may continue. You might need to inform the customer about the changes, but it’s likely that they will be happier with that change compared to order cancellation.
Sagas may be implemented in two ways:
- orchestrated
- choreographed
Orchestrated Sagas
It uses some central entity to control the flow of the saga execution (and rollback). It’s easier to monitor, the central entity can easily log the progress of the process. However, it introduces a separate system that has to depend on all other microservices involved in the saga. A big benefit though is that it makes it much easier to understand the process. The central entity will usually follow procedural-like style to code the saga flow, which makes it easy to understand what the process looks like.
Choreographed Sagas
As you could figure out, this one does not use any central entity to control the flow. The advantages and disadvantages of the Orchestrated approach apply here in the opposite way. This approach is much more “microservice-native”, with the use of events to trigger the consecutive steps of the flow (and to inform about failures). Other interested microservices await for the event messages and do some actions.
Monitoring, even though more complex, is still possible. Using some correlation ID is definitely a recommended approach, to correlated events from different services.
Metrics
It’s a good idea to look at the median of app performance characteristics, e.g. req/s. Median is
the p50 percentile. Knowing the median, we know that 50% of requests are handled below the
monitored value, and the other 50% go above it.
It’s important to also consider high percentiles, such as p95, p99, p999, because it could be
that slower requests belong to users with higher amount of data, i.e. many items in their basket in
an e-commerce system. These are the high-value customers, so it’d be unwise to ignore them just
because they are some fraction of overall traffic! On the other side, high percentile (bad) results are
usually caused by transient issues that are often outside of our control.
Databases
Relational vs Document
We often need to select between Relational or Document DBs. Document DBs allow for data model to be similar to the model in the application domain layer. Relational databases, with their tables are a bit more artificial from app perspective.
Relational DBs are (obviously) good when relations between entites are dominant. However, Docuemnt DBs also deal well with one-to-many relations, due to the fact that the “one” document may store the “many” within that same document. Relation DBs are better at the many-to-one and many-to-many kinds of relations. The JOIN operations are also highly optimized, compared to Document DBs where JOINs might exist, but are usually not encouraged. To support many-to-one or many-to-many relations in Document DBs, we’d have to duplicate the data, which in some cases might lead to synchronization issues. It’s also worth to have a look at Graph databases when relations are some core concern.
Replication
There are various replication strategies:
- single-leader
- multi-leader
- leaderless (AWS Dynamo-style)
With leaders, writes must go through the deaders that replicate the data among the rest of nodes. If there are many leaders, it could be that the write needs to go to any one of them, and other leaders would get informed asynchronously (eventual consistency).
With multiple leaders, the chance of conflicts increases. Different leaders could receive conflicting writes. One solution to that could be some form of aprtitioning where a given record would alway s be processed by the same leader.
In leaderless systems, reads and writes use multiple nodes, and there’s a rule:
w + r > n
// w - nodes to write// r - nodes to read// n - all nodesIt’s the quorum equation. This way, when reading data, we will always get up to date data from at least one of them.
Tombstone
It is the term used for deletion of records. In distributed systems, concurrent writes might make it problematic to merge record into some final state. A tombstone is a marker that shows that something was intended to be deleted. Tombstone is also used in WAL (write append log) to mark deletion. Then, when merging occurs, the system knows that record was deleted and the previous values (of tombstone ID) might be discarded.
When writing data, we often have version value, which helps to keep up with changes to records. Some storage systems have built-in optimistic concurrency control, allowing clients to send the last version number they know of. Storage system might discard updates (or use some merge strategy) that are based on stale data. When we have multiple replicas, we use version vector. It’s created per key. It contains version numbers from all replicas for a given key.
Handling Concurrent Writes
In some systems, it’s client’s responsibility to handle conflicts, e.g. in Riak. When you write thinking that the last version was X, but it turns out theat the last version was X+Y, the server would return both the versions to the client to interpret it. Theses two versions are called siblings. A tombstone is useful here to mark some deletion.
OLTP and OLAP
OLTP refers to typical business operations that are necessary for the application to work, e.g. user management, sales records, inventory. It will have both reads and writes (lots of them). The bottleneck is disk seek.
OLAP refers to analytics operations, e.g. generating reports, data mining, business intelligence. The main use-case is reads, and they will be massive. The disk bandwidth will be the bottleneck. There will not be many requests though.
OLTP and OLAP have different usage, acess patterns, size, etc. Therefore, these are separate systems. OLTP databases are used for day-to-day operations, while OLAP databases are used for analytics. Using the same system for both these needs would slow down OLTP, and probably make OLAP inefficient to use. Also, in OLTP, differnet parts of the system might have different databases.
In OLAP, there’s a data warehouse, which is a read-only copy of OLTP data that analytics can play with. The process that loads dta from OLTP into OLAP is called ETL (Extract from OLTP, Transform, Load into OLAP).
Usually, the OLTP databases are row-store, while OLAP would use column-store, as expected by typical analytics queries.
Partitioning
Partitioning is also called sharing. It’s all about splitting the database into smaller databases where each one stores some subset of the overall dataset. Partitioning is often combined with replication, so each partition has its own replicas.
Ideally, by partitioning data into 10 parts (nodes), we should be able to get 10x throughput and storage increase. If that’s not the case, it means our partitioning is skewed. A partition with higher load than others is called Hot Spot.
Strategies of partitioning:
- random - writes are balanced, but when reading we have no way to know which partition to talk to.
- by key range - we come up with some way of translating the key (ID) into partition number. Incorrect translation will lead to skewed partitioning.
- by has of key - like above, but the key gets hashed, and each partition handles a range of hashed values.
Secondary Indexes
When we have partitioning, the documents are assigned to partitions based on document’s ID. Secondary indexes may be built based on any other field. Therefore, it’s not possible to say that a given value will be only on some partition. You have to query all the partitions and join the results together to get all values.
If possible, it’s recommended to included secondary key in partition selection so that it’s possible to know which partition(s) to query. It’s not easy though, especially when querying for multiple fields.
Secondary indexes may be local or global. Local index is per partition, so each partition builds its own. Global index is stored on one partition, the partition choice is based on partition strategy for secondary indexes. An index on a given partition could for example collect documents where trip destination is “Egipt”. It would point at all documents matching this criterion, even for other partitions.
Rebalancing
Our choice of partitioning strategy is important for rebalancing. As the data store grows, or queries change, we might need to rebalance our partitions to avoid hot spots.
A potential easy partitioning strategy is mod N. It’s not a good one though. We might want to
partition data to N nodes by applying mod N to the key. The issue is that when number of nodes
changes, lots of records would need to be reassigned to different partitions, because most likely mod N != mod G.
A good solution is to initially create many more partitions than the number of nodes we have.
Therefore, when new nodes come in, they will just take over some partition form other nodes, but
partition assignments for keys will not change. The number of partitions could be set to the max
number of nodees that we can have in the future. Each new partition adds some management overhead,
so we shouldn’t go crazy with it.
Some DBs partition data autonomously.
With sharding, there comes a problem of routing. How do we know which node has data for a given key? This is not an issue if our dataset is fixed, and rebalancing is never occuring. One solution is Zoo Keeper. It keeps mapping between partitions and nodes to direct us to the right node. Clients can subscribe to Zoo Keeper to know the latest mappings.
Serialization
The optimized approach to serialization might use Thrift or ProtoBuf. They are pretty similar. There is a schema file which lists the fields together with their types (and some other metadata). The actual messages use various bits to encode information. The field names are not included, they are numbered instead. The numbers correspond to the schema file numbering.
Avro is similar. It is even more “compressed” though. It does not include tag numbers, it relies on ordering. The order of fields in the schema file has to be respected in the actual data.