728x90
반응형
간단하게 nodejs express를 통해 api 를 만들고 해당 api를 통해 producer -> kafka -> consumer로 전달되는 간단한 메시지 서버를 kafkajs를 이용해서 만들어 보겠습니다.
1. express 및 kafkajs 설치
npm 을 이용해서 express와 kafkajs를 설치합니다.
$ npm install express
$ npm install kafkajs
2. express 서버 구성
event 메시지를 받을 수 있는 간단한 api가 구현된 index.js 를 생성 합니다.
const express = require('express')
const app = express()
const port = 3000
app.post('/events/:event', (req, res) => {
res.send('event : ' + req.params.event + '\n')
})
app.listen(port, () => {
console.log(`kafka app listening on port ${port}`)
})
node 서버를 구동합니다.
$ node index.js
kafka app listening on port 3000
새로운 터미널에서 api 호출을 하여 api가 정상 동작하는 지 확인 합니다.
$curl -X POST localhost:3000/events/aa
event : aa
3. kafka 실행
2022.01.24 - [기타] - [kfaka] kafka 시작해보기
이전 글에서 실행한것과 같이 kafka 환경을 구성하기 위해 zookeeper server 와 kafka server를 구동하고 quickstart-event라는 topic을 생성해줍니다.
# zookeeper 실행
$ bin/zookeeper-server-start.sh config/zookeeper.properties
# kafka 실행
$ bin/kafka-server-start.sh config/server.properties
# Topic 생성
$ bin/kafka-topics.sh --create --partitions 1 --replication-factor 1 --topic quickstart-events --bootstrap-server localhost:9092
4. producer 만들기
위에서 만들었던 index.js를 조금 개조하여 producer.js를 만듭니다.
const express = require('express')
const app = express()
const port = 3000
const { Kafka } = require('kafkajs')
const kafka = new Kafka({
clientId: 'my-app',
brokers: ['localhost:9092']
})
const producer = kafka.producer()
const initKafka = async () => {
await producer.connect()
}
app.post('/events/:event', async (req, res) => {
await producer.send({
topic: 'quickstart-events',
messages: [
{ value: req.params.event },
],
})
res.send('successfully stored event : '+ req.params.event + '\n')
})
app.listen(port, async () => {
console.log(`kafka app listening on port ${port}`)
})
initKafka();
그리고 producer를 실행해줍니다.
$ node producer.js
5. consumer 만들기
kafka 의 event를 수신할 consumer.js를 생성 합니다.
const { Kafka } = require('kafkajs')
const kafka = new Kafka({
clientId: 'my-app',
brokers: ['localhost:9092']
})
const consumer = kafka.consumer({ groupId: 'test-group' })
const initKafka = async () => {
console.log('start subscribe')
await consumer.connect()
await consumer.subscribe({ topic: 'quickstart-events', fromBeginning: true })
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
console.log({
value: message.value.toString(),
})
},
})
}
initKafka()
새로운 터미널에서 consumer.js를 실행 해줍니다.
$ node consumer.js
6. API를 호출하여 event 전송
curl을 이용하여 간단히 API를 호출 한 뒤 producer가 생성한 event가 consumer에서 제대로 수신하는지 확인 해봅시다.
curl을 이용해 API를 호출 합니다.
$ curl -X POST localhost:3000/events/hello
successfully stored event : hello
$ curl -X POST localhost:3000/events/world
successfully stored event : world
consumer에서 로그를 확인하여 정상적으로 event를 subscribe했는지 확인해봅니다.
$ node consumer.js
start subscribe
{"level":"INFO","timestamp":"2022-01-25T08:46:24.685Z","logger":"kafkajs","message":"[Consumer] Starting","groupId":"test-group"}
{"level":"INFO","timestamp":"2022-01-25T08:46:24.706Z","logger":"kafkajs","message":"[ConsumerGroup] Consumer has joined the group","groupId":"test-group","memberId":"my-app-3e27af44-7dc0-4666-bdee-2a743110f384","leaderId":"my-app-3e27af44-7dc0-4666-bdee-2a743110f384","isLeader":true,"memberAssignment":{"quickstart-events":[0]},"groupProtocol":"RoundRobinAssigner","duration":21}
{ value: 'hello' }
{ value: 'world' }
hello와 world 두가지 event가 정상적으로 수신되는것을 확인했습니다.
728x90
반응형
'기타' 카테고리의 다른 글
[Raspberry Pi3] 라즈베리 파이3 모니터(HDMI) 화면 회전하기 (0) | 2022.02.28 |
---|---|
[kafka] kafkajs 사용하여 producer / consumer 서버 만들기 (kafkajs) (0) | 2022.01.25 |
[kfaka] kafka 시작해보기 (0) | 2022.01.24 |
[Elasticsearch] Elasticsearch 7.16.1 (Kubernetes readiness failed ) (0) | 2021.12.13 |
[전세대출] 카카오뱅크 전세 대출 후기 (0) | 2021.11.24 |
댓글