글 작성자: 만렙개발자

카프카 컨수머를 파이썬으로 코딩하고,

받은 메세지의 레코드를 mongoDB로 pymongo를 통해 insert하는 예제 코드

from time import sleep
import ujson
from kafka import KafkaConsumer
from pymongo import MongoClient
pyclient = MongoClient("mongodb://localhost:3004/")
quantum_db = pyclient["quantum"]
information_db = quantum_db["information"]
consumer = KafkaConsumer(
'QUANTUM_TEST',
bootstrap_servers='quantum01.rnd:9092, quantum02.rnd:9092, quantum03.rnd:9092',
auto_offset_reset='earliest',
enable_auto_commit=True,
group_id='analysis_1',
# value_deserializer=lambda x: ujson.loads(x.decode('utf-8'))
)
# 1) for loop 방식
# for message in consumer:
# information_db.insert_one(message)
# 2) while 방식 + poll & commit
while True:
message = consumer.poll()
if len(message) == 0: sleep(1)
for topic_partition, records in message.items():
for record in records:
parsed_record = ujson.loads(record.value)
information_db.insert_one(parsed_record)
# consumer.commit()
consumer.close()

컨수머는 다음 메세지가 올 때까지 대기하고 있음!

따라서 반복문의 형태로 간단히 만들면 됨


commit을 해야 이전에 어디까지 받았는지 알 수 있고,
이를 이용해 그 다음부터만 받을 수 있음.

enable_auto_commit은 default로 True.

만약 commit을 수동으로 하고 싶으면 False로!

 

value_deserializer를 입력하면, consumer.poll() 할 때 raw message가 아니라 deserialized되어서 옴.

 

 

 

ref: https://kafka-python.readthedocs.io/en/master/apidoc/KafkaConsumer.html