go-nsq简单教程

  1. 1. 概述
  2. 2. 示例
    1. 2.1 单nsqd
    2. 2.2 单nsqlookupd

go-nsq简单教程

1. 概述

go-nsq是nsq的GO语言客户端。

2. 示例

2.1 单nsqd

生产者Producer

package main

import (
    "os"
    "os/signal"
    "time"

    "github.com/nsqio/go-nsq"
    "github.com/sirupsen/logrus"
)

func main() {
    c := make(chan os.Signal)
    signal.Notify(c, os.Interrupt)

    producer, err := nsq.NewProducer("192.168.1.168:4150", nsq.NewConfig())
    if err != nil {
        logrus.WithError(err).Errorln("new producer error")
        return
    }
    defer producer.Stop()
    err = producer.Ping()
    if err != nil {
        logrus.WithError(err).Errorln("producer ping err")
        return
    }
Loop:
    for {
        select {
        case <-c:
            break Loop
        case <-time.Tick(1 * time.Second):
            err = producer.Publish("test", []byte("hello world"))
            if err != nil {
                logrus.WithError(err).Errorln("producer publish error")
                break Loop
            }
            logrus.Infoln("publish ok")
        }
    }
}

消费者Consumer

package main

import (
    "github.com/nsqio/go-nsq"
    "github.com/sirupsen/logrus"
)

// MyHandler consumer handler
type MyHandler struct {
    Title string
}

// HandleMessage interface for consumer
func (h *MyHandler) HandleMessage(message *nsq.Message) (err error) {
    logrus.Infoln(message)
    return
}

func main() {
    consumer, err := nsq.NewConsumer("test", "consumer1", nsq.NewConfig())
    if err != nil {
        logrus.WithError(err).Errorln("new consumer error")
        return
    }
    defer consumer.Stop()
    consumer.AddHandler(&MyHandler{Title: "test consumer"})
    err = consumer.ConnectToNSQD("192.168.1.168:4150")
    if err != nil {
        logrus.WithError(err).Errorln("connect to nsq lookup error")
        return
    }
    select {}
}

2.2 单nsqlookupd

生产者Producer(没变化)

package main

import (
    "os"
    "os/signal"
    "time"

    "github.com/nsqio/go-nsq"
    "github.com/sirupsen/logrus"
)

func main() {
    c := make(chan os.Signal)
    signal.Notify(c, os.Interrupt)

    producer, err := nsq.NewProducer("192.168.1.168:4150", nsq.NewConfig())
    if err != nil {
        logrus.WithError(err).Errorln("new producer error")
        return
    }
    defer producer.Stop()
    err = producer.Ping()
    if err != nil {
        logrus.WithError(err).Errorln("producer ping err")
        return
    }
Loop:
    for {
        select {
        case <-c:
            break Loop
        case <-time.Tick(1 * time.Second):
            err = producer.Publish("test", []byte("hello world"))
            if err != nil {
                logrus.WithError(err).Errorln("producer publish error")
                break Loop
            }
            logrus.Infoln("publish ok")
        }
    }
}

消费者Consumer

package main

import (
    "github.com/nsqio/go-nsq"
    "github.com/sirupsen/logrus"
)

// MyHandler consumer handler
type MyHandler struct {
    Title string
}

// HandleMessage interface for consumer
func (h *MyHandler) HandleMessage(message *nsq.Message) (err error) {
    logrus.Infoln(message)
    return
}

func main() {
    consumer, err := nsq.NewConsumer("test", "consumer1", nsq.NewConfig())
    if err != nil {
        logrus.WithError(err).Errorln("new consumer error")
        return
    }
    defer consumer.Stop()
    consumer.AddHandler(&MyHandler{Title: "test consumer"})
    err = consumer.ConnectToNSQLookupd("192.168.1.168:4161") // different
    if err != nil {
        logrus.WithError(err).Errorln("connect to nsq lookup error")
        return
    }
    select {}
}

转载请注明来源,欢迎对文章中的引用来源进行考证,欢迎指出任何有错误或不够清晰的表达。可以在下面评论区评论,也可以邮件至 wind.kaisa@gmail.com

💰

×

Help us with donation