GoTransactional Message Queue: A Lightweight Solution Based on SQLite
Transactional Message Queue: A Lightweight Solution Based on SQLite What is a Message Queue? A message queue is an asynchronous communication mechanism used to pass messages in distributed systems. It works like a "post office" where senders put messages into the queue and receivers take them out for processing. This pattern offers several benefits: Decoupling: Producers and consumers don't need to communicate directly Peak Shaving: Can buffer sudden traffic spikes Asynchrony: Improves system response time Reliability: Message persistence ensures no data loss What is a Transactional Message Queue? The core of a transactional message queue is to ensure atomicity of message publishing. In traditional message queues, if a transaction rolls back, or if the program crashes after transaction completion but before message sending, messages may be lost. Transactional message queues solve these problems through: Atomicity: Message sending and business operations are in the same transaction - they either both succeed or both fail Rollback Mechanism: If the transaction fails, messages are automatically rolled back and won't be sent Durability: Once committed, messages are permanently saved, even if the program crashes Practical Example: Implementing User Behavior Analysis with NSQite Let's consider a content platform that needs to record user behavior data (views, likes, favorites, etc.) for later analysis. This behavior data has the following characteristics: Large volume but not time-critical Can be retried if processing fails Doesn't affect core business processes Using a message queue, we can: Return success immediately when user behavior occurs Put behavior data into the message queue Process data asynchronously (data cleaning, statistical analysis, etc.) Here's a code example using nsqite: // Define message handler type UserActionHandler struct{} func (h *UserActionHandler) HandleMessage(message *nsqite.Message) error { var action struct { UserID string `json:"user_id"` Action string `json:"action"` ContentID string `json:"content_id"` Timestamp string `json:"timestamp"` } if err := json.Unmarshal(message.Body, &action); err != nil { return err } // Data cleaning and statistical analysis return analyzeUserAction(action) } func main() { db, err := gorm.Open(sqlite.Open("user_actions.db"), &gorm.Config{}) if err != nil { log.Fatal(err) } // Set GORM database connection nsqite.SetGorm(db) const topic = "user_actions" // Create producer p := nsqite.NewProducer() // Create consumer with max retry attempts of 5 c := nsqite.NewConsumer(topic, "consumer1", nsqite.WithConsumerMaxAttempts(5)) // Add 5 concurrent handlers c.AddConcurrentHandlers(&UserActionHandler{}, 5) // Publish message within transaction db.Transaction(func(tx *gorm.DB) error { // Business operation if err := doSomeBusiness(tx); err != nil { return err } // Publish message action := map[string]interface{}{ "user_id": "123", "action": "view", "content_id": "456", "timestamp": time.Now().Format(time.RFC3339), } body, _ := json.Marshal(action) return p.PublishTx(tx, topic, body) }) time.Sleep(5*time.Second) } Why Choose NSQite? NSQite is a message queue implementation based on SQLite with the following features: Lightweight: Based on SQLite, no additional dependencies High Performance: Can process millions of messages per second in single-machine environment Easy Integration: Supports GORM, seamless integration with existing projects Reliability: Supports transactional messages, ensuring data consistency Use Cases: Project Initialization: No need for complex message queue systems Existing SQLite Projects: No additional dependencies required Single-Machine Applications: Maintain simplicity, avoid distributed complexity Resource-Constrained Environments: SQLite's lightweight nature Project Address NSQite is an open-source project available on GitHub: https://github.com/ixugo/nsqite If you find this project helpful, you can: Star the project Submit issues for feedback Contribute code through PRs

Transactional Message Queue: A Lightweight Solution Based on SQLite
What is a Message Queue?
A message queue is an asynchronous communication mechanism used to pass messages in distributed systems. It works like a "post office" where senders put messages into the queue and receivers take them out for processing. This pattern offers several benefits:
- Decoupling: Producers and consumers don't need to communicate directly
- Peak Shaving: Can buffer sudden traffic spikes
- Asynchrony: Improves system response time
- Reliability: Message persistence ensures no data loss
What is a Transactional Message Queue?
The core of a transactional message queue is to ensure atomicity of message publishing. In traditional message queues, if a transaction rolls back, or if the program crashes after transaction completion but before message sending, messages may be lost. Transactional message queues solve these problems through:
- Atomicity: Message sending and business operations are in the same transaction - they either both succeed or both fail
- Rollback Mechanism: If the transaction fails, messages are automatically rolled back and won't be sent
- Durability: Once committed, messages are permanently saved, even if the program crashes
Practical Example: Implementing User Behavior Analysis with NSQite
Let's consider a content platform that needs to record user behavior data (views, likes, favorites, etc.) for later analysis. This behavior data has the following characteristics:
- Large volume but not time-critical
- Can be retried if processing fails
- Doesn't affect core business processes
Using a message queue, we can:
- Return success immediately when user behavior occurs
- Put behavior data into the message queue
- Process data asynchronously (data cleaning, statistical analysis, etc.)
Here's a code example using nsqite:
// Define message handler
type UserActionHandler struct{}
func (h *UserActionHandler) HandleMessage(message *nsqite.Message) error {
var action struct {
UserID string `json:"user_id"`
Action string `json:"action"`
ContentID string `json:"content_id"`
Timestamp string `json:"timestamp"`
}
if err := json.Unmarshal(message.Body, &action); err != nil {
return err
}
// Data cleaning and statistical analysis
return analyzeUserAction(action)
}
func main() {
db, err := gorm.Open(sqlite.Open("user_actions.db"), &gorm.Config{})
if err != nil {
log.Fatal(err)
}
// Set GORM database connection
nsqite.SetGorm(db)
const topic = "user_actions"
// Create producer
p := nsqite.NewProducer()
// Create consumer with max retry attempts of 5
c := nsqite.NewConsumer(topic, "consumer1", nsqite.WithConsumerMaxAttempts(5))
// Add 5 concurrent handlers
c.AddConcurrentHandlers(&UserActionHandler{}, 5)
// Publish message within transaction
db.Transaction(func(tx *gorm.DB) error {
// Business operation
if err := doSomeBusiness(tx); err != nil {
return err
}
// Publish message
action := map[string]interface{}{
"user_id": "123",
"action": "view",
"content_id": "456",
"timestamp": time.Now().Format(time.RFC3339),
}
body, _ := json.Marshal(action)
return p.PublishTx(tx, topic, body)
})
time.Sleep(5*time.Second)
}
Why Choose NSQite?
NSQite is a message queue implementation based on SQLite with the following features:
- Lightweight: Based on SQLite, no additional dependencies
- High Performance: Can process millions of messages per second in single-machine environment
- Easy Integration: Supports GORM, seamless integration with existing projects
- Reliability: Supports transactional messages, ensuring data consistency
Use Cases:
- Project Initialization: No need for complex message queue systems
- Existing SQLite Projects: No additional dependencies required
- Single-Machine Applications: Maintain simplicity, avoid distributed complexity
- Resource-Constrained Environments: SQLite's lightweight nature
Project Address
NSQite is an open-source project available on GitHub:
https://github.com/ixugo/nsqite
If you find this project helpful, you can:
- Star the project
- Submit issues for feedback
- Contribute code through PRs