글 작성자: 만렙개발자

카프카 컨수머를 파이썬으로 코딩하고,

받은 메세지의 레코드를 mongoDB로 pymongo를 통해 insert하는 예제 코드

from time import sleep
import ujson
from kafka import KafkaConsumer
from pymongo import MongoClient

pyclient = MongoClient("mongodb://localhost:3004/")
quantum_db = pyclient["quantum"]
information_db = quantum_db["information"]

consumer = KafkaConsumer(
    'QUANTUM_TEST',
     bootstrap_servers='quantum01.rnd:9092, quantum02.rnd:9092, quantum03.rnd:9092',
     auto_offset_reset='earliest',
     enable_auto_commit=True,
     group_id='analysis_1',
     # value_deserializer=lambda x: ujson.loads(x.decode('utf-8'))
)

# 1) for loop 방식
# for message in consumer:
#     information_db.insert_one(message)

# 2) while 방식 + poll & commit
while True:
    message = consumer.poll()
    if len(message) == 0: sleep(1)
    for topic_partition, records in message.items():
      for record in records:
        parsed_record = ujson.loads(record.value)
        information_db.insert_one(parsed_record)
    # consumer.commit()
consumer.close()

컨수머는 다음 메세지가 올 때까지 대기하고 있음!

따라서 반복문의 형태로 간단히 만들면 됨


commit을 해야 이전에 어디까지 받았는지 알 수 있고,
이를 이용해 그 다음부터만 받을 수 있음.

enable_auto_commit은 default로 True.

만약 commit을 수동으로 하고 싶으면 False로!

 

value_deserializer를 입력하면, consumer.poll() 할 때 raw message가 아니라 deserialized되어서 옴.

 

 

 

ref: https://kafka-python.readthedocs.io/en/master/apidoc/KafkaConsumer.html