PostgreSQL is a powerful and open-source relational database system known for its reliability, flexibility, and advanced features. It handles complex queries, ensures data integrity, and can be extended to meet various needs, making it a popular choice for many applications. However, as your data grows and the number of transactions increases, scaling PostgreSQL can be challenging. Traditional scaling methods, like adding more resources to a single server (vertical scaling), have limitations and can be costly. That’s where horizontal scaling, or sharding, comes in. Sharding involves breaking the database into smaller, more manageable parts and spreading them across multiple servers. This helps manage large datasets, boosts performance, and keeps the database efficient and responsive.
In this blog, we’ll dive into how to use Citus to implement database sharding in PostgreSQL.
But before we get started, let’s go over some key terms used in Citus.
Coordinator Node
The Coordinator node is the main server responsible for managing and distributing queries. It directs queries to the right worker nodes where the data is stored and ensures the results are combined and returned to the user efficiently. Applications send their queries to the Coordinator node, which then passes them to the appropriate workers and gathers the results.
Worker Node
A Worker node is a server that stores and processes part of the sharded data. It executes the queries sent by the Coordinator node on its local data. Together, Worker nodes manage the distributed workload, ensuring efficient data handling and query performance.
Distributed Tables
Distributed Tables appear like regular tables but are divided into smaller pieces called shards and spread across multiple servers (worker nodes). When you modify the structure (schema) of a distributed table, Citus automatically updates all the shards on the worker nodes to reflect the new structure.
Distribution Column
This is the column in your table that Citus uses to determine how to split the data into shards. The cluster administrator selects this column when setting up the table. Choosing the right distribution column is important because it impacts how evenly the data is distributed and how well the system performs.
Types of Sharding in Citus
Row-Based Sharding: Stores different groups of data (tenants) as rows within the same table.
Schema-Based Sharding: Uses separate schemas (like separate folders) within the same database for different groups of data (tenants).
For more information about Citus structure, please visit: Citus Documentation
Hands-on
Requirements for Setting Up Citus
I assume you have already set up the following
- Launched 3 EC2 instances:
- Instance 1: Coordinator Node
- Instance 2: Worker Node 1
- Instance 3: Worker Node 2
- PostgreSQL is installed on each node.
- PostgreSQL port is open on both Worker Nodes for access from the Coordinator Node. This can be done by updating in-bound rules from EC2 console
Installing Citus
To install Citus on each node, follow these steps:
Add the Citus Repository
curl https://install.citusdata.com/community/deb.sh | sudo bash
Install Citus
sudo apt-get install postgresql-16-citus
Configure PostgreSQL
Update pg_hba.conf file
Add the following line to the pg_hba.conf file on both Worker nodes to allow access from the Coordinator node
host all all <COORDINATOR_NODE_IP>/32 trust
Update postgresql.conf file
Modify the following parameters in the postgresql.conf file on all nodes
listen_addresses = '*' shared_preload_libraries = 'citus'
Restart PostgreSQL
Restart PostgreSQL to apply the changes
sudo service PostgreSQL restart
Create Citus extension
After completing the steps above, we are ready to create the Citus extension on all of our nodes.
create extension citus;
Set Up the Coordinator Node
First, configure the Coordinator node by running the following query on Instance 1:
SELECT citus_set_coordinator_host('<INSTANCE1_PRIVATE_IP>', 5432);
Replace <INSTANCE1_PRIVATE_IP> with the private IP address of Instance 1 and ensure to use the port number where PostgreSQL is running.
Set Up the Worker Nodes
Once the Coordinator node is set up, configure the Worker nodes. In this example, we’ll set up two Worker nodes, but you can add more as needed based on your workload. Run the following queries on your Coordinator node, updating the IP addresses accordingly:
SELECT citus_add_node('<INSTANCE2_PRIVATE_IP>', 5432); SELECT citus_add_node('<INSTANCE3_PRIVATE_IP>', 5432);
Creating the First Sharded Table
First, let’s create a simple PostgreSQL table:
CREATE TABLE sharded_dummy_data ( id SERIAL, user_id INT, name TEXT, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, PRIMARY KEY (user_id, id) );
We will insert 5 million dummy rows into this table and record the ingestion time:
INSERT INTO sharded_dummy_data (user_id, name, created_at) SELECT generate_series(1, 5000000), 'Name ' || generate_series(1, 5000000), NOW() + (generate_series(1, 5000000) * interval '1 second');
It took approximately 13.8 seconds to ingest 5 million rows into the standard PostgreSQL table. Next, drop the table and create a sharded table with the same structure
CREATE TABLE sharded_dummy_data ( id SERIAL, user_id INT, name TEXT, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, PRIMARY KEY (user_id, id) );
Convert the table into a sharded table by using the create_distributed_table function, specifying the table name and the shard column
SELECT create_distributed_table('sharded_dummy_data', 'user_id');
Insert the 5 million rows again.
INSERT INTO sharded_dummy_data (user_id, name, created_at) SELECT generate_series(1, 5000000), 'Name ' || generate_series(1, 5000000), NOW() + (generate_series(1, 5000000) * interval '1 second');
With 1 Coordinator and 2 Worker nodes, it took only 7.9 seconds to ingest 5 million rows. This shows that parallel processing significantly improved performance.
Running a COUNT(*) query
explain analyze select count(*) from sharded_dummy_data; QUERY PLAN ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------- Aggregate (cost=250.00..250.02 rows=1 width=8) (actual time=1528.156..1528.159 rows=1 loops=1) -> Custom Scan (Citus Adaptive) (cost=0.00..0.00 rows=100000 width=8) (actual time=1528.110..1528.129 rows=32 loops=1) Task Count: 32 Tuple data received from nodes: 256 bytes Tasks Shown: One of 32 -> Task Tuple data received from node: 8 bytes Node: host=172.31.82.155 port=5432 dbname=postgres -> Aggregate (cost=3100.12..3100.14 rows=1 width=8) (actual time=1445.204..1445.207 rows=1 loops=1) -> Seq Scan on sharded_dummy_data_102113 sharded_dummy_data (cost=0.00..2710.50 rows=155850 width=0) (actual time=0.017..845.356 rows=155850 loops=1) Planning Time: 0.177 ms Execution Time: 1445.234 ms Planning Time: 0.361 ms Execution Time: 1528.182 ms
- The query was executed in parallel across 32 tasks, each running on different nodes.
- Each task performed a sequential scan on its respective shard.
- The results from the scans were aggregated both on the worker nodes and on the coordinator node.
- The total execution time was 1528.182 ms, with the majority of the time spent on the sequential scan and aggregation on the worker nodes.
FAQ
Question) How to distribute data evenly across all nodes, including worker nodes?
Answer) Use the rebalance_table_shards function in Citus to redistribute the shards of a distributed table across all worker nodes. This helps ensure even data distribution and prevents any single worker node from becoming overloaded, improving performance and resource use.
SELECT rebalance_table_shards('YOUR_TABLE_NAME');
Question) How to list all worker nodes?
Answer) Run the following query to view all active worker nodes:
SELECT * FROM master_get_active_worker_nodes();
Question) How to remove a worker node from the cluster?
Answer) Use this query to remove a worker node by specifying its IP address and port:
SELECT master_remove_node('<WORKER_NODE_IP>', '<POSTGRESQL_PORT>');
Question) Is it possible to un-shard a sharded table?
Answer) Yes, you can un-shard a table using the get_rebalance_progress function. This will move all data back to the original nodes and make it a local table again:
SELECT * FROM get_rebalance_progress();
Question) How to check connectivity between all nodes in the cluster?
Answer) To check network connectivity among all nodes, use the following query. It will verify all possible connections if there are three nodes in the cluster:
SELECT * FROM citus_check_cluster_node_health();
Question) How to check sharded table sizes?
Answer) Use this query to view the sizes of all sharded tables:
SELECT * FROM citus_tables;
Question) How to update or change the hostname of a node in the cluster?
Answer) First, get the node ID using
SELECT * FROM pg_dist_node;
Then, update the node’s IP address with
SELECT citus_update_node(123, 'new-address', 5432);
Benefits of Sharding in Database Management
- Handles large volumes of data by distributing it across multiple servers.
- Reduces the load on individual servers and speeds up data retrieval.
- Keeps the system operational even if one shard fails.
- Isolates and optimizes high-traffic areas separately.
- Helps manage data efficiently and keeps active data streamlined.
- Enables cost-effective and gradual scaling by adding more shards.
Common Use Cases for Sharding
- Distributes the load for faster response times and high availability.
- Efficiently manages and stores large volumes of sensor data.
- Provides scalability and data integrity for high transaction volumes.
- Effectively handles and serves large amounts of content.
- Distributes data for quicker query processing and analysis.
- Ensures scalability and performance in multi-tenant setups.
When Not to Use Sharding
Sharding may not be ideal for
- Workloads that fit within a single PostgreSQL node.
- Offline analytics without real-time data needs.
- Analytics applications with low numbers of concurrent users.
- Queries that generate large data sets from ETL processes instead of summaries.
- Applications needing complex multi-node transactions or strong consistency.
- Workloads with frequent small updates or low data volumes that don’t benefit from sharding.