프로그래밍(Web)/업무관련

[바미] Golang sarama 연동하기(http_server)

Bami 2021. 2. 9. 16:45
728x90
반응형

github.com/Shopify/sarama/tree/65f0fec86aabe011db77ad641d31fddf14f3ca41/examples/http_server

 

Shopify/sarama

Sarama is a Go library for Apache Kafka 0.8, and up. - Shopify/sarama

github.com

 

이 부분 실행 방법에 대해 써보려고 합니다.

 

먼저

func (s *Server) withAccessLog(next http.Handler) http.Handler {

	return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
		started := time.Now()

		next.ServeHTTP(w, r)

		entry := &accessLogEntry{
			Method:       r.Method,
			Host:         r.Host,
			Path:         r.RequestURI,
			IP:           r.RemoteAddr,
			ResponseTime: float64(time.Since(started)) / float64(time.Second),
		}

		// We will use the client's IP address as key. This will cause
		// all the access log entries of the same IP address to end up
		// on the same partition.
		s.AccessLogProducer.Input() <- &sarama.ProducerMessage{
			Topic: "important",
			Key:   sarama.StringEncoder(r.RemoteAddr),
			Value: entry,
		}
	})
}

이 부분의 Topic 부분을 자신이 생성한 Topic 이름을 적어주어야 합니다.

그렇지 않고 실행 했을 시 

이러한 문구가 계속 출력되게 됩니다.

 

그래서 자신이 생성한 Topic이름으로 수정 해주고

func (s *Server) withAccessLog(next http.Handler) http.Handler {

	return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
		started := time.Now()

		next.ServeHTTP(w, r)

		entry := &accessLogEntry{
			Method:       r.Method,
			Host:         r.Host,
			Path:         r.RequestURI,
			IP:           r.RemoteAddr,
			ResponseTime: float64(time.Since(started)) / float64(time.Second),
		}

		// We will use the client's IP address as key. This will cause
		// all the access log entries of the same IP address to end up
		// on the same partition.
		s.AccessLogProducer.Input() <- &sarama.ProducerMessage{
			Topic: "cbsong",
			Key:   sarama.StringEncoder(r.RemoteAddr),
			Value: entry,
		}
	})
}

 

VScode 터미널 창에 각 flag 값에 맞게

 

 go run http_server.go -addr=":8080" -brokers="localhost:9092" -verbose=true -certificate="" -key="" -ca="" -verify=false

위와 같이 쓰면

 

PS C:\Users\DEV\Documents\Go\sarama-master\examples\http_server> go run http_server.go -addr=":8080" -brokers="localhost:9092" -verbose=true -certificate="" -key="" -ca="" -verify=false
2021/02/09 16:40:16 Kafka brokers: localhost:9092
[sarama] 2021/02/09 16:40:16 Initializing new client
[sarama] 2021/02/09 16:40:16 ClientID is the default of 'sarama', you should consider setting it to something application-specific.
[sarama] 2021/02/09 16:40:16 ClientID is the default of 'sarama', you should consider setting it to something application-specific.
[sarama] 2021/02/09 16:40:16 client/metadata fetching metadata for all topics from broker localhost:9092
[sarama] 2021/02/09 16:40:16 Connected to broker at localhost:9092 (unregistered)
[sarama] 2021/02/09 16:40:16 client/brokers registered new broker #0 at D-cbsong:9092
[sarama] 2021/02/09 16:40:16 Successfully initialized new client
[sarama] 2021/02/09 16:40:16 Initializing new client
[sarama] 2021/02/09 16:40:16 ClientID is the default of 'sarama', you should consider setting it to something application-specific.
[sarama] 2021/02/09 16:40:16 ClientID is the default of 'sarama', you should consider setting it to something application-specific.
[sarama] 2021/02/09 16:40:16 client/metadata fetching metadata for all topics from broker localhost:9092
[sarama] 2021/02/09 16:40:16 Connected to broker at localhost:9092 (unregistered)
[sarama] 2021/02/09 16:40:16 client/brokers registered new broker #0 at D-cbsong:9092
[sarama] 2021/02/09 16:40:16 Successfully initialized new client
2021/02/09 16:40:16 Listening for requests on :8080...
[sarama] 2021/02/09 16:40:23 client/metadata fetching metadata for [important] from broker localhost:9092
[sarama] 2021/02/09 16:40:23 client/metadata found some partitions to be leaderless
[sarama] 2021/02/09 16:40:23 client/metadata retrying after 250ms... (3 attempts remaining)
[sarama] 2021/02/09 16:40:23 client/metadata fetching metadata for [important] from broker localhost:9092
[sarama] 2021/02/09 16:40:23 ClientID is the default of 'sarama', you should consider setting it to something application-specific.
[sarama] 2021/02/09 16:40:23 producer/broker/0 starting up
[sarama] 2021/02/09 16:40:23 producer/broker/0 state change to [open] on important/0
[sarama] 2021/02/09 16:40:23 Connected to broker at D-cbsong:9092 (registered as #0)
[sarama] 2021/02/09 16:40:23 ClientID is the default of 'sarama', you should consider setting it to something application-specific.
[sarama] 2021/02/09 16:40:23 producer/broker/0 starting up
[sarama] 2021/02/09 16:40:23 producer/broker/0 state change to [open] on cbsong/0
[sarama] 2021/02/09 16:40:23 Connected to broker at D-cbsong:9092 (registered as #0)

이렇게 출력값이 뜨면서 정상적으로 실행되는 것을 볼 수 있습니다.

 

해당 flag부분들의 설명은 

 

var (
	addr      = flag.String("addr", ":8080", "The address to bind to")
	brokers   = flag.String("brokers", os.Getenv("KAFKA_PEERS"), "The Kafka brokers to connect to, as a comma separated list")
	verbose   = flag.Bool("verbose", false, "Turn on Sarama logging")
	certFile  = flag.String("certificate", "", "The optional certificate file for client authentication")
	keyFile   = flag.String("key", "", "The optional key file for client authentication")
	caFile    = flag.String("ca", "", "The optional certificate authority file for TLS client authentication")
	verifySsl = flag.Bool("verify", false, "Optional verify ssl certificates chain")
)

여기에서 볼 수 있습니다.

addr기본적으로 접속할 주소 번호, brokersserver.properties의 

 

############################# Server Basics #############################

# The id of the broker. This must be set to a unique integer for each broker.
broker.id=0

 

이 부분이 0이라서 0을 쓰는줄 알았는데 그게 아니라 Consumer 생성을 진행할 때 사용 하는 명령어 중에

kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic [topicName]

여기서 사용하는 localhost번호를 쓰는 것이였습니다...

 

 

localhost:8080/에 접속하니

 

새로고침 할 때마다 0/14 부분의 숫자가 늘어나는 것을 확인 할 수 있습니다. 

 

그 후 custormer 명령 프롬프트 창을 보면 

 

 

이런식으로 

 

http_server.go 에서 

 

type accessLogEntry struct {
	Method       string  `json:"method"`
	Host         string  `json:"host"`
	Path         string  `json:"path"`
	IP           string  `json:"ip"`
	ResponseTime float64 `json:"response_time"`

	encoded []byte
	err     error
}

 

형태의 값이 출력 되는 것을 알 수 있습니다.

728x90
반응형