Kafka Consumer to insert mongoDB in python
글 작성자: 만렙개발자
카프카 컨수머를 파이썬으로 코딩하고,
받은 메세지의 레코드를 mongoDB로 pymongo를 통해 insert하는 예제 코드
from time import sleep import ujson from kafka import KafkaConsumer from pymongo import MongoClient pyclient = MongoClient("mongodb://localhost:3004/") quantum_db = pyclient["quantum"] information_db = quantum_db["information"] consumer = KafkaConsumer( 'QUANTUM_TEST', bootstrap_servers='quantum01.rnd:9092, quantum02.rnd:9092, quantum03.rnd:9092', auto_offset_reset='earliest', enable_auto_commit=True, group_id='analysis_1', # value_deserializer=lambda x: ujson.loads(x.decode('utf-8')) ) # 1) for loop 방식 # for message in consumer: # information_db.insert_one(message) # 2) while 방식 + poll & commit while True: message = consumer.poll() if len(message) == 0: sleep(1) for topic_partition, records in message.items(): for record in records: parsed_record = ujson.loads(record.value) information_db.insert_one(parsed_record) # consumer.commit() consumer.close()
컨수머는 다음 메세지가 올 때까지 대기하고 있음!
따라서 반복문의 형태로 간단히 만들면 됨
commit을 해야 이전에 어디까지 받았는지 알 수 있고,
이를 이용해 그 다음부터만 받을 수 있음.
enable_auto_commit은 default로 True.
만약 commit을 수동으로 하고 싶으면 False로!
value_deserializer를 입력하면, consumer.poll() 할 때 raw message가 아니라 deserialized되어서 옴.
ref: https://kafka-python.readthedocs.io/en/master/apidoc/KafkaConsumer.html
'✏️ 수동로깅 > dev_log' 카테고리의 다른 글
댓글
이 글 공유하기
다른 글
-
AttributeError: 'KNeighborsClassifier' object has no attribute 'n_samples_fit_'
AttributeError: 'KNeighborsClassifier' object has no attribute 'n_samples_fit_'
2020.07.07 -
파이썬으로 systemd 서비스(데몬)를 작성하기위한 튜토리얼
파이썬으로 systemd 서비스(데몬)를 작성하기위한 튜토리얼
2020.06.16 -
[opencv] ImportError: libSM.so.6: cannot open shared object file: No such file or directory
[opencv] ImportError: libSM.so.6: cannot open shared object file: No such file or directory
2020.03.15 -
[Google Colab] OSError: [Errno 107] Transport endpoint is not connected
[Google Colab] OSError: [Errno 107] Transport endpoint is not connected
2020.02.27
댓글을 사용할 수 없습니다.