프로그래밍(Web)/업무관련

[바미] Golang 카프카 삽질기...

Bami 2021. 2. 3. 23:36
728x90
반응형

현재 Golang으로 구현한 log 수집 서버는 log파일에 log들을 수집하는데 파일에서 카프카로 도입하도록 수정되었습니다. 

 

카프카를 쓰려면 주피카를 써야 하고, 이 둘에 대한 설명은 다음과 같습니다.

  • 카프카
    • 메시징 큐의 일종입니다.
    • 그리고 다른 메시징 큐 activemq, rabbitmq 대비 우수한 TPS 를 나타냅니다.따라서 대용량의 실시간 로그 처리에 특화되어 있습니다.
    • 메시지를 기본적으로 저장하는 기존 메시징 시스템과는 달리 파일 시스템에 저장합니다. 따라서 데이터 유실에 안전합니다.
    • 구성은 Producer, Broker(kafka 서버) , Consumer 로 되어있습니다.
    • 기존 메시징 시스템과의 차이점

기존시스템은 Broker가 Consumer 에게 들어오는 메시지를 Push 방식으로 진행.

카프카의 Consumer가 브로커로부터 메시지를 가져오기 때문에 자신이 처리량 만큼 처리 할 수 있어 최적의 성능을 낼 수 있습니다.

  • 성능비교 그래프


  • Zookeeper
    • Zookeeper 란 카프카 서버(클러스터)를 관리해주는 것이다.
    • 예를들어, 카프카 서버가 Broker 1,2,3 이렇게 3대가 존재합니다. 클러스터가 되어있는 상태이므로 모든 데이터는 공유가 됩니다.
    • 만약 1번이 리더고 2,3이 팔로워이면 1번 서버가 죽으면 2번 서버가 리더가 되는 방식으로 진행됩니다.
    • 1번 서버가 재시작 되면 1번은 이제 팔로워가 됩니다.
    • 이런 역할을 해주는 것이 Zookeeper 입니다.

참조 : jjjwodls.github.io/etc/2020/01/07/01-Kafka-Setup.html

 

그리고 Golang에서 Kafka의 종류는 2가지 정도인데 confluent-kafka-go와 sarma입니다.

이 둘을 비교해보니 confluent-kafka-go는 메모리를 20MB 정도 기본으로 사용하고 sarma는 최대 3MB를 사용하여 메모리를 더 적게 사용하면서 성능은 비슷했습니다. 

 

그래서 sarama에 대해 찾아보았습니다.

 

그래서 현재 해본 부분이

package main

import (
	"github.com/Shopify/sarama"

	"crypto/tls"
	"crypto/x509"
	"encoding/json"
	"flag"
	"fmt"
	"io/ioutil"
	"log"
	"net/http"
	"os"
	"strings"
	"time"
)

var (
	addr      = flag.String("addr", ":8080", "The address to bind to")
	brokers   = flag.String("brokers", os.Getenv("KAFKA_PEERS"), "The Kafka brokers to connect to, as a comma separated list")
	verbose   = flag.Bool("verbose", false, "Turn on Sarama logging")
	certFile  = flag.String("certificate", "", "The optional certificate file for client authentication")
	keyFile   = flag.String("key", "", "The optional key file for client authentication")
	caFile    = flag.String("ca", "", "The optional certificate authority file for TLS client authentication")
	verifySsl = flag.Bool("verify", false, "Optional verify ssl certificates chain")
)

func main() {
	flag.Parse()

	if *verbose {
		sarama.Logger = log.New(os.Stdout, "[sarama] ", log.LstdFlags)
	}

	if *brokers == "" {
		flag.PrintDefaults()
		os.Exit(1)
	}

	brokerList := strings.Split(*brokers, ",")
	log.Printf("Kafka brokers: %s", strings.Join(brokerList, ", "))

	server := &Server{
		DataCollector:     newDataCollector(brokerList),
		AccessLogProducer: newAccessLogProducer(brokerList),
	}
	defer func() {
		if err := server.Close(); err != nil {
			log.Println("Failed to close server", err)
		}
	}()

	log.Fatal(server.Run(*addr))
}

func createTlsConfiguration() (t *tls.Config) {
	if *certFile != "" && *keyFile != "" && *caFile != "" {
		cert, err := tls.LoadX509KeyPair(*certFile, *keyFile)
		if err != nil {
			log.Fatal(err)
		}

		caCert, err := ioutil.ReadFile(*caFile)
		if err != nil {
			log.Fatal(err)
		}

		caCertPool := x509.NewCertPool()
		caCertPool.AppendCertsFromPEM(caCert)

		t = &tls.Config{
			Certificates:       []tls.Certificate{cert},
			RootCAs:            caCertPool,
			InsecureSkipVerify: *verifySsl,
		}
	}
	// will be nil by default if nothing is provided
	return t
}

type Server struct {
	DataCollector     sarama.SyncProducer
	AccessLogProducer sarama.AsyncProducer
}

func (s *Server) Close() error {
	if err := s.DataCollector.Close(); err != nil {
		log.Println("Failed to shut down data collector cleanly", err)
	}

	if err := s.AccessLogProducer.Close(); err != nil {
		log.Println("Failed to shut down access log producer cleanly", err)
	}

	return nil
}

func (s *Server) Handler() http.Handler {
	return s.withAccessLog(s.collectQueryStringData())
}

func (s *Server) Run(addr string) error {
	httpServer := &http.Server{
		Addr:    addr,
		Handler: s.Handler(),
	}

	log.Printf("Listening for requests on %s...\n", addr)
	return httpServer.ListenAndServe()
}

func (s *Server) collectQueryStringData() http.Handler {
	return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
		if r.URL.Path != "/" {
			http.NotFound(w, r)
			return
		}

		// We are not setting a message key, which means that all messages will
		// be distributed randomly over the different partitions.
		partition, offset, err := s.DataCollector.SendMessage(&sarama.ProducerMessage{
			Topic: "important",
			Value: sarama.StringEncoder(r.URL.RawQuery),
		})

		if err != nil {
			w.WriteHeader(http.StatusInternalServerError)
			fmt.Fprintf(w, "Failed to store your data:, %s", err)
		} else {
			// The tuple (topic, partition, offset) can be used as a unique identifier
			// for a message in a Kafka cluster.
			fmt.Fprintf(w, "Your data is stored with unique identifier important/%d/%d", partition, offset)
		}
	})
}

type accessLogEntry struct {
	Method       string  `json:"method"`
	Host         string  `json:"host"`
	Path         string  `json:"path"`
	IP           string  `json:"ip"`
	ResponseTime float64 `json:"response_time"`

	encoded []byte
	err     error
}

func (ale *accessLogEntry) ensureEncoded() {
	if ale.encoded == nil && ale.err == nil {
		ale.encoded, ale.err = json.Marshal(ale)
	}
}

func (ale *accessLogEntry) Length() int {
	ale.ensureEncoded()
	return len(ale.encoded)
}

func (ale *accessLogEntry) Encode() ([]byte, error) {
	ale.ensureEncoded()
	return ale.encoded, ale.err
}

func (s *Server) withAccessLog(next http.Handler) http.Handler {

	return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
		started := time.Now()

		next.ServeHTTP(w, r)

		entry := &accessLogEntry{
			Method:       r.Method,
			Host:         r.Host,
			Path:         r.RequestURI,
			IP:           r.RemoteAddr,
			ResponseTime: float64(time.Since(started)) / float64(time.Second),
		}

		// We will use the client's IP address as key. This will cause
		// all the access log entries of the same IP address to end up
		// on the same partition.
		s.AccessLogProducer.Input() <- &sarama.ProducerMessage{
			Topic: "access_log",
			Key:   sarama.StringEncoder(r.RemoteAddr),
			Value: entry,
		}
	})
}

func newDataCollector(brokerList []string) sarama.SyncProducer {

	// For the data collector, we are looking for strong consistency semantics.
	// Because we don't change the flush settings, sarama will try to produce messages
	// as fast as possible to keep latency low.
	config := sarama.NewConfig()
	config.Producer.RequiredAcks = sarama.WaitForAll // Wait for all in-sync replicas to ack the message
	config.Producer.Retry.Max = 10                   // Retry up to 10 times to produce the message
	config.Producer.Return.Successes = true
	tlsConfig := createTlsConfiguration()
	if tlsConfig != nil {
		config.Net.TLS.Config = tlsConfig
		config.Net.TLS.Enable = true
	}

	// On the broker side, you may want to change the following settings to get
	// stronger consistency guarantees:
	// - For your broker, set `unclean.leader.election.enable` to false
	// - For the topic, you could increase `min.insync.replicas`.

	producer, err := sarama.NewSyncProducer(brokerList, config)
	if err != nil {
		log.Fatalln("Failed to start Sarama producer:", err)
	}

	return producer
}

func newAccessLogProducer(brokerList []string) sarama.AsyncProducer {

	// For the access log, we are looking for AP semantics, with high throughput.
	// By creating batches of compressed messages, we reduce network I/O at a cost of more latency.
	config := sarama.NewConfig()
	tlsConfig := createTlsConfiguration()
	if tlsConfig != nil {
		config.Net.TLS.Enable = true
		config.Net.TLS.Config = tlsConfig
	}
	config.Producer.RequiredAcks = sarama.WaitForLocal       // Only wait for the leader to ack
	config.Producer.Compression = sarama.CompressionSnappy   // Compress messages
	config.Producer.Flush.Frequency = 500 * time.Millisecond // Flush batches every 500ms

	producer, err := sarama.NewAsyncProducer(brokerList, config)
	if err != nil {
		log.Fatalln("Failed to start Sarama producer:", err)
	}

	// We will just log to STDOUT if we're not able to produce messages.
	// Note: messages will only be returned here after all retry attempts are exhausted.
	go func() {
		for err := range producer.Errors() {
			log.Println("Failed to write access log entry:", err)
		}
	}()

	return producer
}

 

이 부분도 해보고, 

 

package main

import (
	"log"
	"os"
	"os/signal"
	"time"

	"github.com/Shopify/sarama"
)

func main() {
	config := sarama.NewConfig()
	config.Consumer.Fetch.Default = 100000
	config.ChannelBufferSize = 1024

	consumer, err := sarama.NewConsumer([]string{"localhost:9092"}, config)
	if err != nil {
		panic(err)
	}

	defer func() {
		if err := consumer.Close(); err != nil {
			log.Fatalln(err)
		}
	}()

	partitionConsumer, err := consumer.ConsumePartition("observations.json", 0, sarama.OffsetNewest)
	if err != nil {
		panic(err)
	}

	defer func() {
		if err := partitionConsumer.Close(); err != nil {
			log.Fatalln(err)
		}
	}()

	consumed := uint64(0)
	start := time.Now()

	// Trap SIGINT to trigger a shutdown.
	signals := make(chan os.Signal, 1)
	signal.Notify(signals, os.Interrupt)

ConsumerLoop:
	for {
		select {
		case <-partitionConsumer.Messages():
			consumed++
		case <-signals:
			break ConsumerLoop
		}
	}

	end := time.Now()
	elapsed := uint64(end.Sub(start).Seconds())
	log.Printf("Consumed: %v\n", consumed)
	log.Printf("Elapsed Time: %v\n", elapsed)
	log.Printf("TPS: %v\n", consumed/elapsed)
}

 

이 부분도 해보고,

 

package main

import (
	"github.com/Shopify/sarama"
)

var kafkaClient sarama.AsyncProducer

func main() {
	brokers := []string{"broker:9092"}
	kafkaClient, _ = sarama.NewAsyncProducer(brokers, nil)
}

func SendMessage(message string) {
	kafkaClient.Input() <- &sarama.ProducerMessage{
		Topic:     "cbsong",
		Partition: -1,
		Value:     sarama.StringEncoder(message),
	}
}

 

 

이 부분도 해보았으나, 실행 자체가 되지 않아 

 

처음에 라즈베리파이에 깔아보기도 하고, 도커에도 시도를 했으나 둘 다 되지 않아

 

주피카와 kafka 부분을 설치해서 실행해보려고 윈도우에 까는 것을 시도하였습니다.

 

방법은 여기 참조

 

그런데....

 

 

이런식의 알 수 없는 에러를 찾다 찾다 시간만 허비했습니다...

 

 

무엇을 잘 못하고 있는지 모르겠습니다.

 


왜인지 모르겠지만 집컴퓨터에서 되지 않았는데 회사 컴퓨터, 노트북에서 Java JDK 11버전 설치, Path 설정 해주니 Zookeeper, Kafka 실행이 되었습니다.

 

방법은 여기 참조.

 

[바미] Windows 환경에서 Kafka 서버 셋팅하기.

https://kafka.apache.org/downloads Apache Kafka Apache Kafka: A Distributed Streaming Platform. kafka.apache.org 위 링크에서 다운로드를 진행 하시면 되는데요 src가 아니라 Binary 다운로드를 진행해야..

codesk.tistory.com

 

728x90
반응형