Golang Producer and Consumer with Kafka Simple


2 min read

install package:

go get https://github.com/segmentio/kafka-go

Create Topic In Kafka:

  • create topic name logging

  • topic logging has 1 partition


  • create folder producer:

  • create file main.go in folder producer

copy and paste code:

package main

import (


var (
    BrokerAddress = "localhost:9092"
    TopicLogging  = "logging"

func main() {
func Produce(ctx context.Context) {
    i := 0

    l := log.New(os.Stdout, "kafka writer: ", 0)
    // intialize the writer with the broker addresses, and the topic
    w := kafka.NewWriter(kafka.WriterConfig{
        Brokers: []string{BrokerAddress},
        Topic:   TopicLogging,
        // assign the logger to the writer
        Logger: l,
        Async:  false,

    for {
        writeMsg(ctx, w, i)

func writeMsg(ctx context.Context, w *kafka.Writer, i int) {
    msgs := kafka.Message{
        Key:   []byte(strconv.Itoa(i)),
        Value: []byte("timeNow:" + time.Now().Format(time.RFC3339Nano) + " " + "uuid:" + uuid.New().String()),
    err := w.WriteMessages(ctx, msgs)
    if err != nil {
        panic("could not write message " + err.Error())



  • create folder consumer

  • create file main.go in folder consumer

copy and paste code

package main

import (


const (
    topic         = "topic"
    TopicLogging  = "logging"
    BrokerAddress = "localhost:9092"
    Group         = "logging-consumer-group-1"

func main() {
    s := make(chan bool)
    go Consume(context.Background(), TopicLogging, 0)

func Consume(ctx context.Context, topic string, pa int) {
    // create a new logger that outputs to stdout
    // and has the `kafka reader` prefix
    l := log.New(os.Stdout, "kafka reader: ", 0)
    dialer := &kafka.Dialer{
        Timeout:   10 * time.Second,
        DualStack: true,

    r := kafka.NewReader(kafka.ReaderConfig{
        Brokers: []string{BrokerAddress},
        Topic:   topic,
        GroupID: Group,
        // assign the logger to the reader
        Logger: l,
        Dialer: dialer,
    for {

        m, err := r.FetchMessage(ctx)

        if err != nil {
        fmt.Printf("message at topic/partition/offset %v/%v/%v: %s = %s\n", m.Topic, m.Partition, m.Offset, string(m.Key), string(m.Value))
        if err := r.CommitMessages(ctx, m); err != nil {
            log.Fatal("failed to commit messages:", err)


  • push message to Kafka:

      go run producer/main.go

  • consume message from Kafka:

      go run consumer/main.go

source code: https://github.com/ducnpdev/open-dev/tree/master/kafka

Did you find this article valuable?

Support OpenDev Blog by becoming a sponsor. Any amount is appreciated!