[바미] Golang 카프카 삽질기...
현재 Golang으로 구현한 log 수집 서버는 log파일에 log들을 수집하는데 파일에서 카프카로 도입하도록 수정되었습니다.
카프카를 쓰려면 주피카를 써야 하고, 이 둘에 대한 설명은 다음과 같습니다.
- 카프카
- 메시징 큐의 일종입니다.
- 그리고 다른 메시징 큐 activemq, rabbitmq 대비 우수한 TPS 를 나타냅니다.따라서 대용량의 실시간 로그 처리에 특화되어 있습니다.
- 메시지를 기본적으로 저장하는 기존 메시징 시스템과는 달리 파일 시스템에 저장합니다. 따라서 데이터 유실에 안전합니다.
- 구성은 Producer, Broker(kafka 서버) , Consumer 로 되어있습니다.
- 기존 메시징 시스템과의 차이점
기존시스템은 Broker가 Consumer 에게 들어오는 메시지를 Push 방식으로 진행.
카프카의 Consumer가 브로커로부터 메시지를 가져오기 때문에 자신이 처리량 만큼 처리 할 수 있어 최적의 성능을 낼 수 있습니다.
- 성능비교 그래프
- Zookeeper
- Zookeeper 란 카프카 서버(클러스터)를 관리해주는 것이다.
- 예를들어, 카프카 서버가 Broker 1,2,3 이렇게 3대가 존재합니다. 클러스터가 되어있는 상태이므로 모든 데이터는 공유가 됩니다.
- 만약 1번이 리더고 2,3이 팔로워이면 1번 서버가 죽으면 2번 서버가 리더가 되는 방식으로 진행됩니다.
- 1번 서버가 재시작 되면 1번은 이제 팔로워가 됩니다.
- 이런 역할을 해주는 것이 Zookeeper 입니다.
참조 : jjjwodls.github.io/etc/2020/01/07/01-Kafka-Setup.html
그리고 Golang에서 Kafka의 종류는 2가지 정도인데 confluent-kafka-go와 sarma입니다.
이 둘을 비교해보니 confluent-kafka-go는 메모리를 20MB 정도 기본으로 사용하고 sarma는 최대 3MB를 사용하여 메모리를 더 적게 사용하면서 성능은 비슷했습니다.
그래서 sarama에 대해 찾아보았습니다.
그래서 현재 해본 부분이
package main
import (
"github.com/Shopify/sarama"
"crypto/tls"
"crypto/x509"
"encoding/json"
"flag"
"fmt"
"io/ioutil"
"log"
"net/http"
"os"
"strings"
"time"
)
var (
addr = flag.String("addr", ":8080", "The address to bind to")
brokers = flag.String("brokers", os.Getenv("KAFKA_PEERS"), "The Kafka brokers to connect to, as a comma separated list")
verbose = flag.Bool("verbose", false, "Turn on Sarama logging")
certFile = flag.String("certificate", "", "The optional certificate file for client authentication")
keyFile = flag.String("key", "", "The optional key file for client authentication")
caFile = flag.String("ca", "", "The optional certificate authority file for TLS client authentication")
verifySsl = flag.Bool("verify", false, "Optional verify ssl certificates chain")
)
func main() {
flag.Parse()
if *verbose {
sarama.Logger = log.New(os.Stdout, "[sarama] ", log.LstdFlags)
}
if *brokers == "" {
flag.PrintDefaults()
os.Exit(1)
}
brokerList := strings.Split(*brokers, ",")
log.Printf("Kafka brokers: %s", strings.Join(brokerList, ", "))
server := &Server{
DataCollector: newDataCollector(brokerList),
AccessLogProducer: newAccessLogProducer(brokerList),
}
defer func() {
if err := server.Close(); err != nil {
log.Println("Failed to close server", err)
}
}()
log.Fatal(server.Run(*addr))
}
func createTlsConfiguration() (t *tls.Config) {
if *certFile != "" && *keyFile != "" && *caFile != "" {
cert, err := tls.LoadX509KeyPair(*certFile, *keyFile)
if err != nil {
log.Fatal(err)
}
caCert, err := ioutil.ReadFile(*caFile)
if err != nil {
log.Fatal(err)
}
caCertPool := x509.NewCertPool()
caCertPool.AppendCertsFromPEM(caCert)
t = &tls.Config{
Certificates: []tls.Certificate{cert},
RootCAs: caCertPool,
InsecureSkipVerify: *verifySsl,
}
}
// will be nil by default if nothing is provided
return t
}
type Server struct {
DataCollector sarama.SyncProducer
AccessLogProducer sarama.AsyncProducer
}
func (s *Server) Close() error {
if err := s.DataCollector.Close(); err != nil {
log.Println("Failed to shut down data collector cleanly", err)
}
if err := s.AccessLogProducer.Close(); err != nil {
log.Println("Failed to shut down access log producer cleanly", err)
}
return nil
}
func (s *Server) Handler() http.Handler {
return s.withAccessLog(s.collectQueryStringData())
}
func (s *Server) Run(addr string) error {
httpServer := &http.Server{
Addr: addr,
Handler: s.Handler(),
}
log.Printf("Listening for requests on %s...\n", addr)
return httpServer.ListenAndServe()
}
func (s *Server) collectQueryStringData() http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.URL.Path != "/" {
http.NotFound(w, r)
return
}
// We are not setting a message key, which means that all messages will
// be distributed randomly over the different partitions.
partition, offset, err := s.DataCollector.SendMessage(&sarama.ProducerMessage{
Topic: "important",
Value: sarama.StringEncoder(r.URL.RawQuery),
})
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
fmt.Fprintf(w, "Failed to store your data:, %s", err)
} else {
// The tuple (topic, partition, offset) can be used as a unique identifier
// for a message in a Kafka cluster.
fmt.Fprintf(w, "Your data is stored with unique identifier important/%d/%d", partition, offset)
}
})
}
type accessLogEntry struct {
Method string `json:"method"`
Host string `json:"host"`
Path string `json:"path"`
IP string `json:"ip"`
ResponseTime float64 `json:"response_time"`
encoded []byte
err error
}
func (ale *accessLogEntry) ensureEncoded() {
if ale.encoded == nil && ale.err == nil {
ale.encoded, ale.err = json.Marshal(ale)
}
}
func (ale *accessLogEntry) Length() int {
ale.ensureEncoded()
return len(ale.encoded)
}
func (ale *accessLogEntry) Encode() ([]byte, error) {
ale.ensureEncoded()
return ale.encoded, ale.err
}
func (s *Server) withAccessLog(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
started := time.Now()
next.ServeHTTP(w, r)
entry := &accessLogEntry{
Method: r.Method,
Host: r.Host,
Path: r.RequestURI,
IP: r.RemoteAddr,
ResponseTime: float64(time.Since(started)) / float64(time.Second),
}
// We will use the client's IP address as key. This will cause
// all the access log entries of the same IP address to end up
// on the same partition.
s.AccessLogProducer.Input() <- &sarama.ProducerMessage{
Topic: "access_log",
Key: sarama.StringEncoder(r.RemoteAddr),
Value: entry,
}
})
}
func newDataCollector(brokerList []string) sarama.SyncProducer {
// For the data collector, we are looking for strong consistency semantics.
// Because we don't change the flush settings, sarama will try to produce messages
// as fast as possible to keep latency low.
config := sarama.NewConfig()
config.Producer.RequiredAcks = sarama.WaitForAll // Wait for all in-sync replicas to ack the message
config.Producer.Retry.Max = 10 // Retry up to 10 times to produce the message
config.Producer.Return.Successes = true
tlsConfig := createTlsConfiguration()
if tlsConfig != nil {
config.Net.TLS.Config = tlsConfig
config.Net.TLS.Enable = true
}
// On the broker side, you may want to change the following settings to get
// stronger consistency guarantees:
// - For your broker, set `unclean.leader.election.enable` to false
// - For the topic, you could increase `min.insync.replicas`.
producer, err := sarama.NewSyncProducer(brokerList, config)
if err != nil {
log.Fatalln("Failed to start Sarama producer:", err)
}
return producer
}
func newAccessLogProducer(brokerList []string) sarama.AsyncProducer {
// For the access log, we are looking for AP semantics, with high throughput.
// By creating batches of compressed messages, we reduce network I/O at a cost of more latency.
config := sarama.NewConfig()
tlsConfig := createTlsConfiguration()
if tlsConfig != nil {
config.Net.TLS.Enable = true
config.Net.TLS.Config = tlsConfig
}
config.Producer.RequiredAcks = sarama.WaitForLocal // Only wait for the leader to ack
config.Producer.Compression = sarama.CompressionSnappy // Compress messages
config.Producer.Flush.Frequency = 500 * time.Millisecond // Flush batches every 500ms
producer, err := sarama.NewAsyncProducer(brokerList, config)
if err != nil {
log.Fatalln("Failed to start Sarama producer:", err)
}
// We will just log to STDOUT if we're not able to produce messages.
// Note: messages will only be returned here after all retry attempts are exhausted.
go func() {
for err := range producer.Errors() {
log.Println("Failed to write access log entry:", err)
}
}()
return producer
}
이 부분도 해보고,
package main
import (
"log"
"os"
"os/signal"
"time"
"github.com/Shopify/sarama"
)
func main() {
config := sarama.NewConfig()
config.Consumer.Fetch.Default = 100000
config.ChannelBufferSize = 1024
consumer, err := sarama.NewConsumer([]string{"localhost:9092"}, config)
if err != nil {
panic(err)
}
defer func() {
if err := consumer.Close(); err != nil {
log.Fatalln(err)
}
}()
partitionConsumer, err := consumer.ConsumePartition("observations.json", 0, sarama.OffsetNewest)
if err != nil {
panic(err)
}
defer func() {
if err := partitionConsumer.Close(); err != nil {
log.Fatalln(err)
}
}()
consumed := uint64(0)
start := time.Now()
// Trap SIGINT to trigger a shutdown.
signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt)
ConsumerLoop:
for {
select {
case <-partitionConsumer.Messages():
consumed++
case <-signals:
break ConsumerLoop
}
}
end := time.Now()
elapsed := uint64(end.Sub(start).Seconds())
log.Printf("Consumed: %v\n", consumed)
log.Printf("Elapsed Time: %v\n", elapsed)
log.Printf("TPS: %v\n", consumed/elapsed)
}
이 부분도 해보고,
package main
import (
"github.com/Shopify/sarama"
)
var kafkaClient sarama.AsyncProducer
func main() {
brokers := []string{"broker:9092"}
kafkaClient, _ = sarama.NewAsyncProducer(brokers, nil)
}
func SendMessage(message string) {
kafkaClient.Input() <- &sarama.ProducerMessage{
Topic: "cbsong",
Partition: -1,
Value: sarama.StringEncoder(message),
}
}
이 부분도 해보았으나, 실행 자체가 되지 않아
처음에 라즈베리파이에 깔아보기도 하고, 도커에도 시도를 했으나 둘 다 되지 않아
주피카와 kafka 부분을 설치해서 실행해보려고 윈도우에 까는 것을 시도하였습니다.
방법은 여기 참조
그런데....
이런식의 알 수 없는 에러를 찾다 찾다 시간만 허비했습니다...

무엇을 잘 못하고 있는지 모르겠습니다.
왜인지 모르겠지만 집컴퓨터에서 되지 않았는데 회사 컴퓨터, 노트북에서 Java JDK 11버전 설치, Path 설정 해주니 Zookeeper, Kafka 실행이 되었습니다.
방법은 여기 참조.
[바미] Windows 환경에서 Kafka 서버 셋팅하기.
https://kafka.apache.org/downloads Apache Kafka Apache Kafka: A Distributed Streaming Platform. kafka.apache.org 위 링크에서 다운로드를 진행 하시면 되는데요 src가 아니라 Binary 다운로드를 진행해야..
codesk.tistory.com