프로그래밍(Web)/업무관련

[바미] Golang Kafka(sarama) + negroni 연동완료.

Bami 2021. 2. 23. 18:33
728x90
반응형
package main

import (
    "fmt"

    "github.com/Shopify/sarama"
)

var kafkaClient sarama.AsyncProducer

func main() {
    brokers := []string{"broker:9092"}
    kafkaClient, _ = sarama.NewAsyncProducer(brokers, nil)
}

func SendMessage(topic string, message string) {
    kafkaClient.Input() <- &sarama.ProducerMessage{
        Topic: topic,
        Partition: -1,
        Value: sarama.StringEncoder(message),
    }
}

코드는 위의 코드를 이용하면 됐었습니다.

 

main.go의 main()부분에 

	brokers := []string{"brocker:9092", "brocker:9092", "brocker:9092"}
	logweb.KafkaClient, _ = sarama.NewAsyncProducer(brokers, nil)

추가하여 넣었는데 brocker:9092부분에 현재 할당 된 brockerip:port를 써 주면 됩니다. 

 

만약 포트번호는 같은데 ip가 다르다면

 

	brokers := []string{"192.111.222:9092", "192.111.223:9092", "192.111.224:9092"}
	logweb.KafkaClient, _ = sarama.NewAsyncProducer(brokers, nil)

 

이런식으로 넣어주면 됩니다.

 

logweb.KafkaClient인 이유는 KafkaClient변수가 외부 폴더의 파일에 존재하기 때문입니다.

 

그 후 카프카 메세지를 보내는 곳(기존에는 log파일에 데이터 보내는 곳)이 app.go에 있어서

 

func SendMessage(topic string, message string) {
	KafkaClient.Input() <- &sarama.ProducerMessage{
		Topic:     topic,
		Partition: 8,
		Value:     sarama.StringEncoder(message),
	}
	log.Println("[DEBUG]", "Mesaage send successfully! Topic : %s Message : %s ", topic, message)
}

거기에 이렇게 추가 해준 뒤에 Topic 부분은 Json값에 따라 유동적으로 바뀌기 때문에

 

func makeTopicName(i *Input) (string, error) {
	var name string
	name = i.Service + "." + i.Task + "." + i.LogSource
	return name, nil
}

 

위와 같이 Input이라는 struct값 형식에 따라 json값을 입력받아 정해진 Topic의 형식대로 변환하여 name을 가져와서

func exam(inputjson *Input, in string) {
  topicname, _ := makeTopicName(inputjson)
      if topicname == "" {
          err := errors.New("Topic 이름이 빈값입니다")
          SenddingToPanic(err)
      }
      
      // JSON을 다시 보내주려면 JSON data로 바꾸어 주어야 한다.
      // (지금 상태는 Go struct형태임.) 어떤 인터페이스를 받아서 JSON형태로 Encodding을 해주는 Marshal(inputjson)를 사용해준다.
      jsondata, err := json.Marshal(inputjson)
      if err != nil {
          SenddingToPanic(err)
      }

      // Topic 메세지 보내기
      SendMessage(topicname, string(jsondata))
}

그 값을 SendMessage에 보내주었습니다.

 


추가로..

main()부분에 저렇게 나누지 않고

 

var kafkaClient sarama.AsyncProducer

func SendMessage(topic string, message string) {
    brokers := []string{"broker:9092"}
    kafkaClient, _ = sarama.NewAsyncProducer(brokers, nil)
    
    kafkaClient.Input() <- &sarama.ProducerMessage{
        Topic: topic,
        Partition: -1,
        Value: sarama.StringEncoder(message),
    }
}

 

이런식으로 써도 무방합니다.

 

또한 예제 코드와 다르게 나는 Partition을 8로 설정했는데 현재 설정된 Partition수가 8이어서 그렇습니다.

자신이 설정한 Partition수를 쓰면 됩니다.

728x90
반응형