A Practical Guide to Using Advisory Locks in Your Application
Databases like PostgreSQL and MySQL provide various locking mechanisms to control concurrency and to avoid data corruption. In this comprehensive guide, we'll take a deep dive into advisory locks, implement them on the database, and see how they work in practice with Go code examples.
What are Advisory Locks and Why are They Special?
Advisory locks are locks provided by the database that you can control via your application. They're a utility that helps you manage concurrency when you want to keep your application stateless.
There are different locks that the database provides out of the box, and most of them are managed by the database until you specify something specific. PostgreSQL provides row level, table level, and page level locks. But these are locks used by the DB and the application doesn't have any control over them. That's where advisory locks come in.
Why Do You Need It?
Before diving into the nuances of advisory locks, let me explain the problem they solve and the use cases where they're used.
Imagine a scenario where you have a distributed system talking to a central database. You need to control the access to the data for all the systems or nodes that are communicating with the database, and no two nodes should alter the data at the same time.
In such a scenario, you can control the flow of the program by using an advisory lock. If a node is able to acquire the lock, you let the node access the data. The rest of the nodes either wait for the node to finish or report back to the application that the lock wasn't acquired.
Think of it this way: the advisory lock is a gateway for certain pieces of code in your application, while the other pieces of code in your application are free to do whatever they want with the data.
Common Use Cases
This scenario can arise when you have to send a notification or a report to someone. The data for that resides in a central database. You might have designed the system to run a cron job and send a report or notification every day. But your application logic resides in multiple nodes, and if they all execute at the same time, multiple reports/notifications will be sent out.
To avoid this, you can have all the nodes try to acquire an advisory lock, and if one of them is able to do that, then execute the logic to send the notification/report.
Keep in mind that these locks are still managed by the database. If you're using PostgreSQL and you try to list all the locks that are acquired on it, you'll be able to view them (we'll see this shortly).
Other use cases for advisory locks include scenarios where you want to run a background process that should be executed by only one worker or a node. Advisory locks can be used to ensure you're not wasting your compute power by executing the process more than once.
Types of Advisory Locks in PostgreSQL
PostgreSQL provides two levels of advisory locks:
1. Session-Level Locks
A session-level lock is released when the session ends or when you manually release it.
2. Transaction-Level Locks
Just like other locks, if the lock is acquired on a transaction level, the lock is released when the transaction is complete.
Blocking vs Non-Blocking Functions
You can acquire an advisory lock using either a blocking or a non-blocking function:
- Non-blocking function: The process will immediately return with a boolean value stating if the lock is acquired or not.
- Blocking function: The process trying to acquire the lock will wait until the lock is acquired.
In the use case mentioned above of sending a report or notification where only one node should execute it, a non-blocking function fits well. When each of the nodes tries to access the lock at the same time, only one node is able to acquire it and the rest of them return a false value indicating that they were not able to acquire it.
A blocking function is useful in scenarios where you need to make sure that a record is getting updated by only a single thread or a node at a time. It will wait for one of the threads or nodes to release the lock and update the records after the previous thread is done updating them, maintaining data consistency.
Hands-On: Trying Advisory Locks in PostgreSQL
Let's try this out in PostgreSQL. Connect to your PostgreSQL database using any client.
Acquiring a Session-Level Advisory Lock
Use the pg_try_advisory_lock()
function and pass any 64-bit number as an argument:
SELECT pg_try_advisory_lock(100);
It will return true
. Now you can query the advisory locks on the database:
SELECT locktype, database, relation,
objid AS key1, objsubid AS key2,
pid, mode, granted
FROM pg_locks
WHERE locktype = 'advisory';
Testing Lock Contention
Now open a new session in the terminal and connect to the database again. Try to run the command to acquire a lock on the same integer (100):
SELECT pg_try_advisory_lock(100);
You will get a false
value in return, indicating that you were not able to acquire the lock. Any new session will not be able to acquire this lock until the old one releases it. The new session can, however, acquire a lock on another integer, let's say 101.
Blocking Function Example
The command we used above (pg_try_advisory_lock
) is a non-blocking function. Let's try acquiring the same lock (integer 100) using the blocking function:
SELECT pg_advisory_lock(100);
You will see that the query hangs, waiting to acquire the lock. The shell will be waiting with no output or new prompt.
Releasing the Lock
Now let's release the lock we acquired in the old shell:
SELECT pg_advisory_unlock(100);
After releasing the lock in the first session, you'll see the blocking query in the second session complete immediately.
Terminating a Process (If Needed)
If you need to forcefully terminate a process holding a lock, you can use:
SELECT pg_terminate_backend(239);
Replace 239
with the actual PID from the pg_locks
query.
Go Implementation for PostgreSQL
The following code snippet demonstrates how to implement advisory locks in Go using the database/sql
package with the pgx
driver. Please test your application thoroughly before deploying it to production.
Here we've created a simple function that attempts to acquire an advisory lock and returns a boolean indicating whether it was successful. The function takes an integer as an argument, which will be used as the lock identifier.
Basic Implementation
package advisory
import (
"context"
"database/sql"
"fmt"
)
type LockManager struct {
db *sql.DB
}
func NewLockManager(db *sql.DB) *LockManager {
return &LockManager{db: db}
}
// TryAcquireLock attempts to acquire a session-level advisory lock
// Returns true if the lock was acquired, false otherwise
func (lm *LockManager) TryAcquireLock(ctx context.Context, lockID int64) (bool, error) {
var acquired bool
query := "SELECT pg_try_advisory_lock($1)"
err := lm.db.QueryRowContext(ctx, query, lockID).Scan(&acquired)
if err != nil {
return false, fmt.Errorf("failed to acquire lock: %w", err)
}
return acquired, nil
}
// ReleaseLock releases a session-level advisory lock
func (lm *LockManager) ReleaseLock(ctx context.Context, lockID int64) error {
var released bool
query := "SELECT pg_advisory_unlock($1)"
err := lm.db.QueryRowContext(ctx, query, lockID).Scan(&released)
if err != nil {
return fmt.Errorf("failed to release lock: %w", err)
}
if !released {
return fmt.Errorf("lock %d was not held", lockID)
}
return nil
}
// TryAcquireTransactionLock attempts to acquire a transaction-level advisory lock
// The lock will be automatically released when the transaction commits or rolls back
func (lm *LockManager) TryAcquireTransactionLock(ctx context.Context, tx *sql.Tx, lockID int64) (bool, error) {
var acquired bool
query := "SELECT pg_try_advisory_xact_lock($1)"
err := tx.QueryRowContext(ctx, query, lockID).Scan(&acquired)
if err != nil {
return false, fmt.Errorf("failed to acquire transaction lock: %w", err)
}
return acquired, nil
}
Usage Example
package main
import (
"context"
"database/sql"
"log"
"time"
_ "github.com/jackc/pgx/v5/stdlib"
)
func main() {
// Initialize database connection
db, err := sql.Open("pgx", "postgres://user:password@localhost:5432/mydb")
if err != nil {
log.Fatal(err)
}
defer db.Close()
lockManager := NewLockManager(db)
ctx := context.Background()
// Example 1: Session-level lock
const dailyReportLockID = 1001
acquired, err := lockManager.TryAcquireLock(ctx, dailyReportLockID)
if err != nil {
log.Printf("Error acquiring lock: %v", err)
return
}
if acquired {
log.Println("Lock acquired, sending daily report...")
// Only one instance will execute this code
sendDailyReport()
// Release the lock when done
if err := lockManager.ReleaseLock(ctx, dailyReportLockID); err != nil {
log.Printf("Error releasing lock: %v", err)
}
} else {
log.Println("Another instance is processing the daily report")
}
}
func sendDailyReport() {
// Your business logic here
log.Println("Daily report sent successfully")
}
// Example 2: Transaction-level lock
func processDataWithTransactionLock(db *sql.DB, lockManager *LockManager) error {
ctx := context.Background()
// Begin transaction
tx, err := db.BeginTx(ctx, nil)
if err != nil {
return err
}
defer tx.Rollback()
// Try to acquire transaction-level lock
const dataProcessLockID = 2001
acquired, err := lockManager.TryAcquireTransactionLock(ctx, tx, dataProcessLockID)
if err != nil {
return err
}
if !acquired {
log.Println("Another transaction is processing this data")
return nil
}
// Process data
log.Println("Processing data...")
// Your business logic here
// Commit transaction (lock will be automatically released)
return tx.Commit()
}
Using with Goroutines
package main
import (
"context"
"database/sql"
"log"
"sync"
"time"
)
func simulateConcurrentLocking(db *sql.DB) {
lockManager := NewLockManager(db)
const sharedLockID = 3001
var wg sync.WaitGroup
// Simulate 5 concurrent workers trying to acquire the same lock
for i := 1; i <= 5; i++ {
wg.Add(1)
go func(workerID int) {
defer wg.Done()
ctx := context.Background()
acquired, err := lockManager.TryAcquireLock(ctx, sharedLockID)
if err != nil {
log.Printf("Worker %d: Error - %v", workerID, err)
return
}
if acquired {
log.Printf("Worker %d: Lock acquired! Processing...", workerID)
// Simulate work
time.Sleep(2 * time.Second)
// Release lock
if err := lockManager.ReleaseLock(ctx, sharedLockID); err != nil {
log.Printf("Worker %d: Error releasing lock - %v", workerID, err)
}
log.Printf("Worker %d: Lock released", workerID)
} else {
log.Printf("Worker %d: Could not acquire lock", workerID)
}
}(i)
}
wg.Wait()
}
Generating Lock Keys from Email or Phone Numbers
In real-world applications, you often need to create locks based on user identifiers like email addresses or phone numbers. Since advisory locks require integer keys, you'll need to convert these strings to integers. Here are practical examples:
Using Hash Functions to Generate Keys
package advisory
import (
"crypto/sha256"
"encoding/binary"
"hash/fnv"
)
// GenerateLockKeyFromEmail generates a consistent int64 key from an email address
// Uses FNV-1a hash algorithm which is fast and provides good distribution
func GenerateLockKeyFromEmail(email string) int64 {
h := fnv.New64a()
h.Write([]byte(email))
return int64(h.Sum64())
}
// GenerateLockKeyFromPhone generates a consistent int64 key from a phone number
func GenerateLockKeyFromPhone(phone string) int64 {
h := fnv.New64a()
h.Write([]byte(phone))
return int64(h.Sum64())
}
// GenerateLockKeyFromString generates a consistent int64 key from any string
// Alternative using SHA256 for more uniform distribution
func GenerateLockKeyFromString(s string) int64 {
hash := sha256.Sum256([]byte(s))
// Take first 8 bytes and convert to int64
return int64(binary.BigEndian.Uint64(hash[:8]))
}
Practical Example: Preventing Duplicate User Registration
package main
import (
"context"
"database/sql"
"fmt"
"log"
)
type UserService struct {
db *sql.DB
lockManager *LockManager
}
func NewUserService(db *sql.DB) *UserService {
return &UserService{
db: db,
lockManager: NewLockManager(db),
}
}
// RegisterUser prevents duplicate registration for the same email
// using advisory locks
func (us *UserService) RegisterUser(ctx context.Context, email, name string) error {
// Generate lock key from email
lockKey := GenerateLockKeyFromEmail(email)
// Begin transaction
tx, err := us.db.BeginTx(ctx, nil)
if err != nil {
return fmt.Errorf("failed to begin transaction: %w", err)
}
defer tx.Rollback()
// Try to acquire transaction-level lock
acquired, err := us.lockManager.TryAcquireTransactionLock(ctx, tx, lockKey)
if err != nil {
return fmt.Errorf("failed to acquire lock: %w", err)
}
if !acquired {
return fmt.Errorf("another registration is in progress for email: %s", email)
}
// Check if user already exists
var exists bool
err = tx.QueryRowContext(ctx,
"SELECT EXISTS(SELECT 1 FROM users WHERE email = $1)",
email,
).Scan(&exists)
if err != nil {
return fmt.Errorf("failed to check existing user: %w", err)
}
if exists {
return fmt.Errorf("user with email %s already exists", email)
}
// Create new user
_, err = tx.ExecContext(ctx,
"INSERT INTO users (email, name) VALUES ($1, $2)",
email, name,
)
if err != nil {
return fmt.Errorf("failed to create user: %w", err)
}
// Commit transaction (lock automatically released)
if err := tx.Commit(); err != nil {
return fmt.Errorf("failed to commit transaction: %w", err)
}
log.Printf("Successfully registered user: %s", email)
return nil
}
Example: Phone Number Verification Lock
package main
import (
"context"
"fmt"
"time"
)
type SMSService struct {
lockManager *LockManager
}
// SendVerificationCode ensures only one verification code is sent
// within a time window for the same phone number
func (s *SMSService) SendVerificationCode(ctx context.Context, phoneNumber string) error {
// Generate lock key from phone number
lockKey := GenerateLockKeyFromPhone(phoneNumber)
// Try to acquire session-level lock
acquired, err := s.lockManager.TryAcquireLock(ctx, lockKey)
if err != nil {
return fmt.Errorf("failed to acquire lock: %w", err)
}
if !acquired {
return fmt.Errorf("verification code already being processed for: %s", phoneNumber)
}
defer s.lockManager.ReleaseLock(ctx, lockKey)
// Check rate limiting
lastSent, err := s.getLastVerificationTime(ctx, phoneNumber)
if err != nil {
return err
}
if time.Since(lastSent) < 1*time.Minute {
return fmt.Errorf("please wait before requesting another code")
}
// Send SMS
code := generateVerificationCode()
if err := s.sendSMS(phoneNumber, code); err != nil {
return fmt.Errorf("failed to send SMS: %w", err)
}
// Store verification code
if err := s.storeVerificationCode(ctx, phoneNumber, code); err != nil {
return fmt.Errorf("failed to store code: %w", err)
}
return nil
}
func generateVerificationCode() string {
// Implementation details...
return "123456"
}
func (s *SMSService) sendSMS(phone, code string) error {
// Implementation details...
return nil
}
func (s *SMSService) storeVerificationCode(ctx context.Context, phone, code string) error {
// Implementation details...
return nil
}
func (s *SMSService) getLastVerificationTime(ctx context.Context, phone string) (time.Time, error) {
// Implementation details...
return time.Now().Add(-2 * time.Minute), nil
}
Example: Payment Processing Lock
package main
import (
"context"
"database/sql"
"fmt"
)
type PaymentService struct {
db *sql.DB
lockManager *LockManager
}
// ProcessPayment ensures idempotency using user email + order ID
func (ps *PaymentService) ProcessPayment(ctx context.Context, userEmail, orderID string, amount float64) error {
// Create unique lock key combining email and order ID
lockIdentifier := fmt.Sprintf("payment:%s:%s", userEmail, orderID)
lockKey := GenerateLockKeyFromString(lockIdentifier)
tx, err := ps.db.BeginTx(ctx, nil)
if err != nil {
return err
}
defer tx.Rollback()
// Acquire transaction lock
acquired, err := ps.lockManager.TryAcquireTransactionLock(ctx, tx, lockKey)
if err != nil {
return err
}
if !acquired {
return fmt.Errorf("payment already being processed for order: %s", orderID)
}
// Check if payment already completed
var status string
err = tx.QueryRowContext(ctx,
"SELECT status FROM payments WHERE order_id = $1",
orderID,
).Scan(&status)
if err == nil && status == "completed" {
return fmt.Errorf("payment already completed for order: %s", orderID)
}
// Process payment
if err := ps.chargePayment(userEmail, amount); err != nil {
return err
}
// Update payment status
_, err = tx.ExecContext(ctx,
"INSERT INTO payments (order_id, user_email, amount, status) VALUES ($1, $2, $3, $4) "+
"ON CONFLICT (order_id) DO UPDATE SET status = $4",
orderID, userEmail, amount, "completed",
)
if err != nil {
return err
}
return tx.Commit()
}
func (ps *PaymentService) chargePayment(email string, amount float64) error {
// Payment gateway integration
return nil
}
Testing Hash Collisions
package main
import (
"fmt"
"testing"
)
func TestLockKeyGeneration(t *testing.T) {
testCases := []struct {
input string
generate func(string) int64
}{
{"[email protected]", GenerateLockKeyFromEmail},
{"[email protected]", GenerateLockKeyFromEmail},
{"+1234567890", GenerateLockKeyFromPhone},
{"+9876543210", GenerateLockKeyFromPhone},
}
keys := make(map[int64]string)
for _, tc := range testCases {
key := tc.generate(tc.input)
// Check for collisions
if existingInput, exists := keys[key]; exists {
t.Errorf("Hash collision detected: %s and %s generate the same key: %d",
existingInput, tc.input, key)
}
keys[key] = tc.input
// Verify consistency
key2 := tc.generate(tc.input)
if key != key2 {
t.Errorf("Inconsistent key generation for %s: %d vs %d",
tc.input, key, key2)
}
fmt.Printf("Input: %-25s -> Key: %d\n", tc.input, key)
}
}
Key PostgreSQL Advisory Lock Functions
Function | Type | Description |
---|---|---|
pg_advisory_lock(key) |
Session, Blocking | Acquires lock and waits if unavailable |
pg_try_advisory_lock(key) |
Session, Non-blocking | Returns true/false immediately |
pg_advisory_xact_lock(key) |
Transaction, Blocking | Lock released at transaction end |
pg_try_advisory_xact_lock(key) |
Transaction, Non-blocking | Returns true/false, auto-releases |
pg_advisory_unlock(key) |
- | Manually releases session lock |
Best Practices
- Use Consistent Lock Keys: Define your lock identifiers as constants in your application to avoid conflicts.
- Prefer Transaction-Level Locks: They automatically release when the transaction ends, reducing the risk of deadlocks.
- Monitor Lock Usage: Regularly query
pg_locks
to identify potential issues. - Choose the Right Function: Use non-blocking functions for scenarios where you want immediate feedback, and blocking functions when you need guaranteed sequential execution.
- Document Your Lock Strategy: Clearly document which lock IDs are used for which purposes in your application.
Wrapping Up
While the concept might have been a bit confusing in the beginning, I hope by now you understand how advisory locks work. We've learned about their use cases and also understood how to use them in practical scenarios.
The examples provided were for PostgreSQL, but MySQL has its own version of advisory locks. You can check them out in the official MySQL documentation.
Advisory locks are a powerful tool for managing concurrency in distributed systems. By understanding when and how to use them, you can build more robust and efficient applications that handle concurrent operations gracefully.