728x90
반응형
package main
import (
"fmt"
"github.com/Shopify/sarama"
)
var kafkaClient sarama.AsyncProducer
func main() {
brokers := []string{"broker:9092"}
kafkaClient, _ = sarama.NewAsyncProducer(brokers, nil)
}
func SendMessage(topic string, message string) {
kafkaClient.Input() <- &sarama.ProducerMessage{
Topic: topic,
Partition: -1,
Value: sarama.StringEncoder(message),
}
}
코드는 위의 코드를 이용하면 됐었습니다.
main.go의 main()부분에
brokers := []string{"brocker:9092", "brocker:9092", "brocker:9092"}
logweb.KafkaClient, _ = sarama.NewAsyncProducer(brokers, nil)
추가하여 넣었는데 brocker:9092부분에 현재 할당 된 brocker의 ip:port를 써 주면 됩니다.
만약 포트번호는 같은데 ip가 다르다면
brokers := []string{"192.111.222:9092", "192.111.223:9092", "192.111.224:9092"}
logweb.KafkaClient, _ = sarama.NewAsyncProducer(brokers, nil)
이런식으로 넣어주면 됩니다.
logweb.KafkaClient인 이유는 KafkaClient변수가 외부 폴더의 파일에 존재하기 때문입니다.
그 후 카프카 메세지를 보내는 곳(기존에는 log파일에 데이터 보내는 곳)이 app.go에 있어서
func SendMessage(topic string, message string) {
KafkaClient.Input() <- &sarama.ProducerMessage{
Topic: topic,
Partition: 8,
Value: sarama.StringEncoder(message),
}
log.Println("[DEBUG]", "Mesaage send successfully! Topic : %s Message : %s ", topic, message)
}
거기에 이렇게 추가 해준 뒤에 Topic 부분은 Json값에 따라 유동적으로 바뀌기 때문에
func makeTopicName(i *Input) (string, error) {
var name string
name = i.Service + "." + i.Task + "." + i.LogSource
return name, nil
}
위와 같이 Input이라는 struct값 형식에 따라 json값을 입력받아 정해진 Topic의 형식대로 변환하여 name을 가져와서
func exam(inputjson *Input, in string) {
topicname, _ := makeTopicName(inputjson)
if topicname == "" {
err := errors.New("Topic 이름이 빈값입니다")
SenddingToPanic(err)
}
// JSON을 다시 보내주려면 JSON data로 바꾸어 주어야 한다.
// (지금 상태는 Go struct형태임.) 어떤 인터페이스를 받아서 JSON형태로 Encodding을 해주는 Marshal(inputjson)를 사용해준다.
jsondata, err := json.Marshal(inputjson)
if err != nil {
SenddingToPanic(err)
}
// Topic 메세지 보내기
SendMessage(topicname, string(jsondata))
}
그 값을 SendMessage에 보내주었습니다.
추가로..
main()부분에 저렇게 나누지 않고
var kafkaClient sarama.AsyncProducer
func SendMessage(topic string, message string) {
brokers := []string{"broker:9092"}
kafkaClient, _ = sarama.NewAsyncProducer(brokers, nil)
kafkaClient.Input() <- &sarama.ProducerMessage{
Topic: topic,
Partition: -1,
Value: sarama.StringEncoder(message),
}
}
이런식으로 써도 무방합니다.
또한 예제 코드와 다르게 나는 Partition을 8로 설정했는데 현재 설정된 Partition수가 8이어서 그렇습니다.
자신이 설정한 Partition수를 쓰면 됩니다.
728x90
반응형
'프로그래밍(Web) > 업무관련' 카테고리의 다른 글
[바미] Golang 같은 변수의 값 체크 시 “suspect or ” warnning 해결 방법 (0) | 2021.05.03 |
---|---|
[바미] 고통스런 yarn build (0) | 2021.03.19 |
[바미] kafka - ErrLeaderNotAvailable에러 해결하기. (0) | 2021.02.16 |
[바미] Golang sarama 연동하기(http_server) (0) | 2021.02.09 |
[바미] Golang sarama예제로 Kafka 연동하기(interceptors). (0) | 2021.02.09 |