NSQ(A realtime distributed messaging platform)

·

2 min read

nsq

重要组件

nsqd

开放tcp:4150,http:4151端口与生产消费者通信,连接nsqlookup的4160端口注册,上报心跳

nsqlookupd

守护进程负责nsqd服务发现 开放http:4161,供nsqadmin和消费者查询当前nsqd列表等信息,开放tcp:4160端口给nsqd注册

nsqadmin

前端可视化页面,开放http:4171端口,连接nsqlookupd的4161端口查询数据

部署测试

docker-compose

官方配置文件没有指定端口映射,以及nsqd的hostname会随机生成字符串导致消费者无法识别

version: '3'
services:
  nsqlookupd:
    image: nsqio/nsq
    command: /nsqlookupd
    ports:
      - "4160:4160"
      - "4161:4161"
  nsqd:
    image: nsqio/nsq
    command: /nsqd  --lookupd-tcp-address=nsqlookupd:4160
    depends_on:
      - nsqlookupd
    ports:
      - "4150:4150"
      - "4151:4151"
    hostname: your.cloud
  nsqadmin:
    image: nsqio/nsq
    command: /nsqadmin --lookupd-http-address=nsqlookupd:4161
    depends_on:
      - nsqlookupd
    ports:
      - "4171:4171"

producer

package main

import (
    "github.com/nsqio/go-nsq"
    "log"
)

func main() {
    // Instantiate a producer.
    config := nsq.NewConfig()
    producer, err := nsq.NewProducer("127.0.0.1:4150", config)
    if err != nil {
        log.Fatal(err)
    }

    messageBody := []byte("hello")
    topicName := "topic"

    // Synchronously publish a single message to the specified topic.
    // Messages can also be sent asynchronously and/or in batches.
    err = producer.Publish(topicName, messageBody)
    if err != nil {
        log.Fatal("Publish error:", err)
    }

    // Gracefully stop the producer when appropriate (e.g. before shutting down the service)
    producer.Stop()
}

consumer

package main

import (
    "github.com/nsqio/go-nsq"
    "log"
    "os"
    "os/signal"
    "syscall"
)

type myMessageHandler struct{}

// HandleMessage implements the Handler interface.
func (h *myMessageHandler) HandleMessage(m *nsq.Message) error {
    if len(m.Body) == 0 {
        // Returning nil will automatically send a FIN command to NSQ to mark the message as processed.
        // In this case, a message with an empty body is simply ignored/discarded.
        return nil
    }

    // do whatever actual message processing is desired
    msg := string(m.Body)
    log.Println("consume message->", msg)

    // Returning a non-nil error will automatically send a REQ command to NSQ to re-queue the message.
    return nil
}

func main() {
    // Instantiate a consumer that will subscribe to the provided channel.
    config := nsq.NewConfig()
    consumer, err := nsq.NewConsumer("topic", "channel", config)
    if err != nil {
        log.Fatal(err)
    }

    // Set the Handler for messages received by this Consumer. Can be called multiple times.
    // See also AddConcurrentHandlers.
    consumer.AddHandler(&myMessageHandler{})

    // Use nsqlookupd to discover nsqd instances.
    // See also ConnectToNSQD, ConnectToNSQDs, ConnectToNSQLookupds.
    err = consumer.ConnectToNSQLookupd("127.0.0.1:4161")
    if err != nil {
        log.Fatal(err)
    }

    // wait for signal to exit
    sigChan := make(chan os.Signal, 1)
    signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
    <-sigChan

    // Gracefully stop the consumer.
    consumer.Stop()
}

起多个消费者,相同的topic和channel,消息会随机被一个消费者消费,达到负载均衡的效果