본문 바로가기
기타

[kafka] kafkajs 사용하여 producer / consumer 서버 만들기 (kafkajs)

by guru_k 2022. 1. 25.
728x90
반응형

간단하게 nodejs express를 통해 api 를 만들고 해당 api를 통해 producer -> kafka -> consumer로 전달되는 간단한 메시지 서버를 kafkajs를 이용해서 만들어 보겠습니다.

1. express 및 kafkajs 설치

npm 을 이용해서 express와 kafkajs를 설치합니다.

$ npm install express
$ npm install kafkajs

2. express 서버 구성

event 메시지를 받을 수 있는 간단한 api가 구현된 index.js 를 생성 합니다.

const express = require('express')
const app = express()
const port = 3000

app.post('/events/:event', (req, res) => {
  res.send('event : ' + req.params.event + '\n')
})

app.listen(port, () => {
  console.log(`kafka app listening on port ${port}`)
})

node 서버를 구동합니다.

$ node index.js
kafka app listening on port 3000

새로운 터미널에서 api 호출을 하여 api가 정상 동작하는 지 확인 합니다.

$curl -X POST localhost:3000/events/aa
event : aa

3. kafka 실행

2022.01.24 - [기타] - [kfaka] kafka 시작해보기

 

[kfaka] kafka 시작해보기

kafka 를 통해서 간단히 events 쓰기 / 읽기를 구현해보고 node.js에서 해당 내용을 구현해보겠습니다. 1. kafka 설치 kafka 다운로드 및 설치 진행 $ tar -xzf kafka_2.13-3.0.0.tgz $ cd kafka_2.13-3.0.0 2. ka..

gihyun.com

이전 글에서 실행한것과 같이 kafka 환경을 구성하기 위해 zookeeper server 와 kafka server를 구동하고 quickstart-event라는 topic을 생성해줍니다.

# zookeeper 실행
$ bin/zookeeper-server-start.sh config/zookeeper.properties

# kafka 실행
$ bin/kafka-server-start.sh config/server.properties

# Topic 생성
$ bin/kafka-topics.sh --create --partitions 1 --replication-factor 1 --topic quickstart-events --bootstrap-server localhost:9092

 

4. producer 만들기

위에서 만들었던  index.js를 조금 개조하여 producer.js를 만듭니다.

const express = require('express')
const app = express()
const port = 3000

const { Kafka } = require('kafkajs')

const kafka = new Kafka({
  clientId: 'my-app',
  brokers: ['localhost:9092']
})

const producer = kafka.producer()

const initKafka = async () => {
  await producer.connect()
}

app.post('/events/:event', async (req, res) => {
  await producer.send({
    topic: 'quickstart-events',
    messages: [
      { value: req.params.event },
    ],
  })
  res.send('successfully stored event : '+ req.params.event + '\n')
})

app.listen(port, async  () => {
  console.log(`kafka app listening on port ${port}`)
})

initKafka();

그리고 producer를 실행해줍니다.

$ node producer.js

5. consumer 만들기

kafka 의 event를 수신할 consumer.js를 생성 합니다.

const { Kafka } = require('kafkajs')

const kafka = new Kafka({
  clientId: 'my-app',
  brokers: ['localhost:9092']
})

const consumer = kafka.consumer({ groupId: 'test-group' })

const initKafka = async () => {
  console.log('start subscribe')
  await consumer.connect()
  await consumer.subscribe({ topic: 'quickstart-events', fromBeginning: true })
  await consumer.run({
  eachMessage: async ({ topic, partition, message }) => {
    console.log({
      value: message.value.toString(),
    })
  },
})
}

initKafka()

새로운 터미널에서 consumer.js를 실행 해줍니다.

$ node consumer.js

6. API를 호출하여 event 전송

curl을 이용하여 간단히 API를 호출 한 뒤 producer가 생성한 event가 consumer에서 제대로 수신하는지 확인 해봅시다.

curl을 이용해 API를 호출 합니다.

$ curl -X POST localhost:3000/events/hello
successfully stored event : hello
$ curl -X POST localhost:3000/events/world
successfully stored event : world

consumer에서 로그를 확인하여 정상적으로 event를 subscribe했는지 확인해봅니다.

$ node consumer.js
start subscribe
{"level":"INFO","timestamp":"2022-01-25T08:46:24.685Z","logger":"kafkajs","message":"[Consumer] Starting","groupId":"test-group"}
{"level":"INFO","timestamp":"2022-01-25T08:46:24.706Z","logger":"kafkajs","message":"[ConsumerGroup] Consumer has joined the group","groupId":"test-group","memberId":"my-app-3e27af44-7dc0-4666-bdee-2a743110f384","leaderId":"my-app-3e27af44-7dc0-4666-bdee-2a743110f384","isLeader":true,"memberAssignment":{"quickstart-events":[0]},"groupProtocol":"RoundRobinAssigner","duration":21}
{ value: 'hello' }
{ value: 'world' }

hello와 world 두가지 event가 정상적으로 수신되는것을 확인했습니다.

728x90
반응형

댓글