GoTransactional Message Queue: A Lightweight Solution Based on SQLite

Transactional Message Queue: A Lightweight Solution Based on SQLite What is a Message Queue? A message queue is an asynchronous communication mechanism used to pass messages in distributed systems. It works like a "post office" where senders put messages into the queue and receivers take them out for processing. This pattern offers several benefits: Decoupling: Producers and consumers don't need to communicate directly Peak Shaving: Can buffer sudden traffic spikes Asynchrony: Improves system response time Reliability: Message persistence ensures no data loss What is a Transactional Message Queue? The core of a transactional message queue is to ensure atomicity of message publishing. In traditional message queues, if a transaction rolls back, or if the program crashes after transaction completion but before message sending, messages may be lost. Transactional message queues solve these problems through: Atomicity: Message sending and business operations are in the same transaction - they either both succeed or both fail Rollback Mechanism: If the transaction fails, messages are automatically rolled back and won't be sent Durability: Once committed, messages are permanently saved, even if the program crashes Practical Example: Implementing User Behavior Analysis with NSQite Let's consider a content platform that needs to record user behavior data (views, likes, favorites, etc.) for later analysis. This behavior data has the following characteristics: Large volume but not time-critical Can be retried if processing fails Doesn't affect core business processes Using a message queue, we can: Return success immediately when user behavior occurs Put behavior data into the message queue Process data asynchronously (data cleaning, statistical analysis, etc.) Here's a code example using nsqite: // Define message handler type UserActionHandler struct{} func (h *UserActionHandler) HandleMessage(message *nsqite.Message) error { var action struct { UserID string `json:"user_id"` Action string `json:"action"` ContentID string `json:"content_id"` Timestamp string `json:"timestamp"` } if err := json.Unmarshal(message.Body, &action); err != nil { return err } // Data cleaning and statistical analysis return analyzeUserAction(action) } func main() { db, err := gorm.Open(sqlite.Open("user_actions.db"), &gorm.Config{}) if err != nil { log.Fatal(err) } // Set GORM database connection nsqite.SetGorm(db) const topic = "user_actions" // Create producer p := nsqite.NewProducer() // Create consumer with max retry attempts of 5 c := nsqite.NewConsumer(topic, "consumer1", nsqite.WithConsumerMaxAttempts(5)) // Add 5 concurrent handlers c.AddConcurrentHandlers(&UserActionHandler{}, 5) // Publish message within transaction db.Transaction(func(tx *gorm.DB) error { // Business operation if err := doSomeBusiness(tx); err != nil { return err } // Publish message action := map[string]interface{}{ "user_id": "123", "action": "view", "content_id": "456", "timestamp": time.Now().Format(time.RFC3339), } body, _ := json.Marshal(action) return p.PublishTx(tx, topic, body) }) time.Sleep(5*time.Second) } Why Choose NSQite? NSQite is a message queue implementation based on SQLite with the following features: Lightweight: Based on SQLite, no additional dependencies High Performance: Can process millions of messages per second in single-machine environment Easy Integration: Supports GORM, seamless integration with existing projects Reliability: Supports transactional messages, ensuring data consistency Use Cases: Project Initialization: No need for complex message queue systems Existing SQLite Projects: No additional dependencies required Single-Machine Applications: Maintain simplicity, avoid distributed complexity Resource-Constrained Environments: SQLite's lightweight nature Project Address NSQite is an open-source project available on GitHub: https://github.com/ixugo/nsqite If you find this project helpful, you can: Star the project Submit issues for feedback Contribute code through PRs

Apr 16, 2025 - 21:41
 0
GoTransactional Message Queue: A Lightweight Solution Based on SQLite

Transactional Message Queue: A Lightweight Solution Based on SQLite

What is a Message Queue?

A message queue is an asynchronous communication mechanism used to pass messages in distributed systems. It works like a "post office" where senders put messages into the queue and receivers take them out for processing. This pattern offers several benefits:

  1. Decoupling: Producers and consumers don't need to communicate directly
  2. Peak Shaving: Can buffer sudden traffic spikes
  3. Asynchrony: Improves system response time
  4. Reliability: Message persistence ensures no data loss

What is a Transactional Message Queue?

The core of a transactional message queue is to ensure atomicity of message publishing. In traditional message queues, if a transaction rolls back, or if the program crashes after transaction completion but before message sending, messages may be lost. Transactional message queues solve these problems through:

  1. Atomicity: Message sending and business operations are in the same transaction - they either both succeed or both fail
  2. Rollback Mechanism: If the transaction fails, messages are automatically rolled back and won't be sent
  3. Durability: Once committed, messages are permanently saved, even if the program crashes

Practical Example: Implementing User Behavior Analysis with NSQite

Let's consider a content platform that needs to record user behavior data (views, likes, favorites, etc.) for later analysis. This behavior data has the following characteristics:

  1. Large volume but not time-critical
  2. Can be retried if processing fails
  3. Doesn't affect core business processes

Using a message queue, we can:

  1. Return success immediately when user behavior occurs
  2. Put behavior data into the message queue
  3. Process data asynchronously (data cleaning, statistical analysis, etc.)

Here's a code example using nsqite:

// Define message handler
type UserActionHandler struct{}

func (h *UserActionHandler) HandleMessage(message *nsqite.Message) error {
    var action struct {
        UserID    string `json:"user_id"`
        Action    string `json:"action"`
        ContentID string `json:"content_id"`
        Timestamp string `json:"timestamp"`
    }
    if err := json.Unmarshal(message.Body, &action); err != nil {
        return err
    }
    // Data cleaning and statistical analysis
    return analyzeUserAction(action)
}

func main() {
    db, err := gorm.Open(sqlite.Open("user_actions.db"), &gorm.Config{})
    if err != nil {
        log.Fatal(err)
    }
    // Set GORM database connection
    nsqite.SetGorm(db)

    const topic = "user_actions"
    // Create producer
    p := nsqite.NewProducer()
    // Create consumer with max retry attempts of 5
    c := nsqite.NewConsumer(topic, "consumer1", nsqite.WithConsumerMaxAttempts(5))
    // Add 5 concurrent handlers
    c.AddConcurrentHandlers(&UserActionHandler{}, 5)

    // Publish message within transaction
    db.Transaction(func(tx *gorm.DB) error {
        // Business operation
        if err := doSomeBusiness(tx); err != nil {
            return err
        }
        // Publish message
        action := map[string]interface{}{
            "user_id":    "123",
            "action":     "view",
            "content_id": "456",
            "timestamp":  time.Now().Format(time.RFC3339),
        }
        body, _ := json.Marshal(action)
        return p.PublishTx(tx, topic, body)
    })


    time.Sleep(5*time.Second)
}

Why Choose NSQite?

NSQite is a message queue implementation based on SQLite with the following features:

  1. Lightweight: Based on SQLite, no additional dependencies
  2. High Performance: Can process millions of messages per second in single-machine environment
  3. Easy Integration: Supports GORM, seamless integration with existing projects
  4. Reliability: Supports transactional messages, ensuring data consistency

Use Cases:

  1. Project Initialization: No need for complex message queue systems
  2. Existing SQLite Projects: No additional dependencies required
  3. Single-Machine Applications: Maintain simplicity, avoid distributed complexity
  4. Resource-Constrained Environments: SQLite's lightweight nature

Project Address

NSQite is an open-source project available on GitHub:
https://github.com/ixugo/nsqite

If you find this project helpful, you can:

  1. Star the project
  2. Submit issues for feedback
  3. Contribute code through PRs