Kafka Consumer to insert mongoDB in python
글 작성자: 만렙개발자
카프카 컨수머를 파이썬으로 코딩하고,
받은 메세지의 레코드를 mongoDB로 pymongo를 통해 insert하는 예제 코드
from time import sleep
import ujson
from kafka import KafkaConsumer
from pymongo import MongoClient
pyclient = MongoClient("mongodb://localhost:3004/")
quantum_db = pyclient["quantum"]
information_db = quantum_db["information"]
consumer = KafkaConsumer(
'QUANTUM_TEST',
bootstrap_servers='quantum01.rnd:9092, quantum02.rnd:9092, quantum03.rnd:9092',
auto_offset_reset='earliest',
enable_auto_commit=True,
group_id='analysis_1',
# value_deserializer=lambda x: ujson.loads(x.decode('utf-8'))
)
# 1) for loop 방식
# for message in consumer:
# information_db.insert_one(message)
# 2) while 방식 + poll & commit
while True:
message = consumer.poll()
if len(message) == 0: sleep(1)
for topic_partition, records in message.items():
for record in records:
parsed_record = ujson.loads(record.value)
information_db.insert_one(parsed_record)
# consumer.commit()
consumer.close()
컨수머는 다음 메세지가 올 때까지 대기하고 있음!
따라서 반복문의 형태로 간단히 만들면 됨
commit을 해야 이전에 어디까지 받았는지 알 수 있고,
이를 이용해 그 다음부터만 받을 수 있음.
enable_auto_commit은 default로 True.
만약 commit을 수동으로 하고 싶으면 False로!
value_deserializer를 입력하면, consumer.poll() 할 때 raw message가 아니라 deserialized되어서 옴.
ref: https://kafka-python.readthedocs.io/en/master/apidoc/KafkaConsumer.html
'✏️ 수동로깅 > dev_log' 카테고리의 다른 글
댓글
이 글 공유하기
다른 글
-
AttributeError: 'KNeighborsClassifier' object has no attribute 'n_samples_fit_'
AttributeError: 'KNeighborsClassifier' object has no attribute 'n_samples_fit_'
2020.07.07 -
파이썬으로 systemd 서비스(데몬)를 작성하기위한 튜토리얼
파이썬으로 systemd 서비스(데몬)를 작성하기위한 튜토리얼
2020.06.16 -
[opencv] ImportError: libSM.so.6: cannot open shared object file: No such file or directory
[opencv] ImportError: libSM.so.6: cannot open shared object file: No such file or directory
2020.03.15 -
[Google Colab] OSError: [Errno 107] Transport endpoint is not connected
[Google Colab] OSError: [Errno 107] Transport endpoint is not connected
2020.02.27