NSQ(A realtime distributed messaging platform)
nsq
重要组件
nsqd
开放tcp:4150,http:4151端口与生产消费者通信,连接nsqlookup的4160端口注册,上报心跳
nsqlookupd
守护进程负责nsqd服务发现 开放http:4161,供nsqadmin和消费者查询当前nsqd列表等信息,开放tcp:4160端口给nsqd注册
nsqadmin
前端可视化页面,开放http:4171端口,连接nsqlookupd的4161端口查询数据
部署测试
docker-compose
官方配置文件没有指定端口映射,以及nsqd的hostname会随机生成字符串导致消费者无法识别
version: '3' services: nsqlookupd: image: nsqio/nsq command: /nsqlookupd ports: - "4160:4160" - "4161:4161" nsqd: image: nsqio/nsq command: /nsqd --lookupd-tcp-address=nsqlookupd:4160 depends_on: - nsqlookupd ports: - "4150:4150" - "4151:4151" hostname: your.cloud nsqadmin: image: nsqio/nsq command: /nsqadmin --lookupd-http-address=nsqlookupd:4161 depends_on: - nsqlookupd ports: - "4171:4171"
producer
package main
import (
"github.com/nsqio/go-nsq"
"log"
)
func main() {
// Instantiate a producer.
config := nsq.NewConfig()
producer, err := nsq.NewProducer("127.0.0.1:4150", config)
if err != nil {
log.Fatal(err)
}
messageBody := []byte("hello")
topicName := "topic"
// Synchronously publish a single message to the specified topic.
// Messages can also be sent asynchronously and/or in batches.
err = producer.Publish(topicName, messageBody)
if err != nil {
log.Fatal("Publish error:", err)
}
// Gracefully stop the producer when appropriate (e.g. before shutting down the service)
producer.Stop()
}
consumer
package main
import (
"github.com/nsqio/go-nsq"
"log"
"os"
"os/signal"
"syscall"
)
type myMessageHandler struct{}
// HandleMessage implements the Handler interface.
func (h *myMessageHandler) HandleMessage(m *nsq.Message) error {
if len(m.Body) == 0 {
// Returning nil will automatically send a FIN command to NSQ to mark the message as processed.
// In this case, a message with an empty body is simply ignored/discarded.
return nil
}
// do whatever actual message processing is desired
msg := string(m.Body)
log.Println("consume message->", msg)
// Returning a non-nil error will automatically send a REQ command to NSQ to re-queue the message.
return nil
}
func main() {
// Instantiate a consumer that will subscribe to the provided channel.
config := nsq.NewConfig()
consumer, err := nsq.NewConsumer("topic", "channel", config)
if err != nil {
log.Fatal(err)
}
// Set the Handler for messages received by this Consumer. Can be called multiple times.
// See also AddConcurrentHandlers.
consumer.AddHandler(&myMessageHandler{})
// Use nsqlookupd to discover nsqd instances.
// See also ConnectToNSQD, ConnectToNSQDs, ConnectToNSQLookupds.
err = consumer.ConnectToNSQLookupd("127.0.0.1:4161")
if err != nil {
log.Fatal(err)
}
// wait for signal to exit
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
<-sigChan
// Gracefully stop the consumer.
consumer.Stop()
}
起多个消费者,相同的topic和channel,消息会随机被一个消费者消费,达到负载均衡的效果