Concurrency Control in Production Systems: Go Patterns That Scale

Concurrency Control in Production Systems: Go Patterns That Scale

When building production systems, controlling concurrency isn’t just about performance - it’s about resource management, cost optimization, and system stability. Whether you’re rate-limiting API calls to external services, managing parallel data processing, or orchestrating distributed jobs, proper concurrency control is essential.

The Semaphore Pattern: Elegant Simplicity#

Credit where it’s due: Paul Smith demonstrated this elegant approach that we’ve since deployed across numerous production systems. The pattern uses Go’s channels as counting semaphores - a technique that’s both performant and maintainable.

Original insight from: https://pauladamsmith.com/blog/2016/04/max-clients-go-net-http.html

Core Implementation#

func processBatch(requests []Request) {
    // Limit concurrent processing to prevent resource exhaustion
    maxConcurrency := runtime.NumCPU() * 2  // Tune based on workload
    semaphore := make(chan struct{}, maxConcurrency)

    var wg sync.WaitGroup
    for _, req := range requests {
        wg.Add(1)
        go func(r Request) {
            defer wg.Done()

            semaphore <- struct{}{}        // Acquire slot (blocks when full)
            defer func() { <-semaphore }() // Release slot

            // Process request - guaranteed max concurrency
            result := processRequest(r)
            handleResult(result)
        }(req)
    }
    wg.Wait()
}

Production Applications in Distributed Systems#

1. Rate-Limited API Calls to External Services#

type APIClient struct {
    semaphore chan struct{}
    client    *http.Client
}

func NewAPIClient(rateLimit int) *APIClient {
    return &APIClient{
        semaphore: make(chan struct{}, rateLimit),
        client:    &http.Client{Timeout: 30 * time.Second},
    }
}

func (c *APIClient) FetchResource(ctx context.Context, url string) (*Response, error) {
    // Enforce rate limit across all goroutines
    select {
    case c.semaphore <- struct{}{}:
        defer func() { <-c.semaphore }()
    case <-ctx.Done():
        return nil, ctx.Err()
    }

    // Make API call - guaranteed to respect rate limit
    return c.makeRequest(url)
}

2. Parallel Data Processing with Memory Constraints#

func processFiles(files []File) []Result {
    // Limit based on available memory
    maxParallel := 4  // Each operation uses ~4GB RAM
    semaphore := make(chan struct{}, maxParallel)

    results := make([]Result, len(files))
    var wg sync.WaitGroup

    for i, file := range files {
        wg.Add(1)
        go func(idx int, f File) {
            defer wg.Done()

            semaphore <- struct{}{}
            defer func() { <-semaphore }()

            // Memory-intensive operation
            results[idx] = processFile(f)
        }(i, file)
    }

    wg.Wait()
    return results
}

3. Database Connection Pooling#

type DataLoader struct {
    dbSemaphore chan struct{}
    db          *sql.DB
}

func (d *DataLoader) LoadBatch(recordIDs []string) ([]*Record, error) {
    results := make([]*Record, len(recordIDs))
    errors := make([]error, len(recordIDs))

    var wg sync.WaitGroup
    for i, id := range recordIDs {
        wg.Add(1)
        go func(idx int, recordID string) {
            defer wg.Done()

            // Limit concurrent DB connections
            d.dbSemaphore <- struct{}{}
            defer func() { <-d.dbSemaphore }()

            record, err := d.loadFromDB(recordID)
            results[idx] = record
            errors[idx] = err
        }(i, id)
    }

    wg.Wait()
    return results, combineErrors(errors)
}

Advanced Patterns for Production#

Dynamic Concurrency Adjustment#

type AdaptiveProcessor struct {
    semaphore chan struct{}
    metrics   *SystemMetrics
}

func (p *AdaptiveProcessor) adjustConcurrency() {
    ticker := time.NewTicker(10 * time.Second)
    defer ticker.Stop()
    
    for range ticker.C {
        load := p.metrics.GetSystemLoad()
        current := cap(p.semaphore)
        
        var target int
        switch {
        case load > 0.8:
            target = int(float64(current) * 0.8)
        case load < 0.5:
            target = int(float64(current) * 1.2)
        default:
            continue
        }
        
        // Recreate semaphore with new capacity
        newSem := make(chan struct{}, target)
        // Drain old semaphore
        close(p.semaphore)
        for range p.semaphore {
            // Drain
        }
        p.semaphore = newSem
    }
}

Prioritized Processing#

type PriorityProcessor struct {
    highPriority   chan struct{}
    normalPriority chan struct{}
}

func (p *PriorityProcessor) Process(req Request) {
    var semaphore chan struct{}
    
    if req.Priority == HIGH {
        semaphore = p.highPriority
    } else {
        semaphore = p.normalPriority
    }
    
    semaphore <- struct{}{}
    defer func() { <-semaphore }()
    
    processRequest(req)
}

Performance Characteristics#

From production deployments serving millions of daily requests:

  • Overhead: ~50ns per acquire/release on modern hardware
  • Memory: 1 byte per slot in the channel buffer
  • Scalability: Tested up to 10,000 concurrent operations
  • Fair scheduling: Go’s runtime ensures FIFO ordering

Key Insights for Production Systems#

  1. Memory Management: Use semaphores to prevent OOM errors during parallel processing
  2. API Rate Limiting: Respect third-party API limits without complex retry logic
  3. Resource Allocation: Balance system resource utilization across workload types
  4. Cost Control: Limit concurrent cloud API calls to manage spend

Monitoring and Observability#

func monitoredSemaphore(name string, capacity int) chan struct{} {
    sem := make(chan struct{}, capacity)
    
    go func() {
        ticker := time.NewTicker(10 * time.Second)
        defer ticker.Stop()
        
        for range ticker.C {
            inUse := len(sem)
            metrics.Gauge("semaphore.in_use", float64(inUse), 
                []string{"name:" + name})
            metrics.Gauge("semaphore.available", float64(capacity-inUse),
                []string{"name:" + name})
        }
    }()
    
    return sem
}

This three-line pattern has become a cornerstone of production infrastructure. Its simplicity belies its power - from managing memory during parallel processing to orchestrating distributed jobs, the channel-as-semaphore pattern provides reliable concurrency control with minimal overhead.