Producer - Consumer with mySQL
Tech Stack in this Tutorial: Kafka, mysql
Let's Begin........
Step 1: Install Kafka
Step 2: Sample ecommerce_data.csv file content. Original File
date,product_id,city_id,orders
2019-12-16,1897,26,2
2019-12-16,4850,26,4
2019-12-16,2466,26,1
2019-12-16,637,26,1
2019-12-16,3497,26,184
Step 3: Sample Producer code
import time
from kafka import KafkaProducer
import os
import json
bootstrap_servers = ['localhost:9092']
topicName = 'producer-consumer-demo'
producer = KafkaProducer(bootstrap_servers = bootstrap_servers, retries = 5,value_serializer=lambda m: json.dumps(m).encode('ascii'))
with open('ecommerce_data.csv') as f:
head = [val.strip('\n') for val in next(f).split(',')]
for line in f:
content = dict(zip(head, [val.strip('\n') for val in line.split(',')]))
print(content)
ack = producer.send(topicName, content)
metadata = ack.get()
#time.sleep(2)
Step 4: Install mysql
Step 5: Create DB, and required Table
CREATE DATABASE Kafka;
use Kafka;
CREATE TABLE orders(date VARCHAR(20), product_id INTEGER(20), city_id INTEGER(20), orders INTEGER(20));
Step 6: Sample Consumer Code
from kafka import KafkaConsumer
import sys
import json
import mysql.connector
mydb = mysql.connector.connect(
host="localhost",
user="root",
password="XXXXXXXX",
database="Kafka"
)
mycursor = mydb.cursor()
bootstrap_servers = ['localhost:9092']
topicName = 'producer-consumer-demo'
consumer = KafkaConsumer(topicName,bootstrap_servers = bootstrap_servers, auto_offset_reset = 'latest')
try:
for message in consumer:
msg = json.loads(message.value)
sql = "INSERT INTO orders(date, product_id, city_id, orders) VALUES (%s, %s, %s, %s)"
val = (msg['date'], int(msg['product_id']), int(msg['city_id']), int(msg['orders']))
print(val)
mycursor.execute(sql, val)
mydb.commit()
# disconnecting from server
mydb.close()
except KeyboardInterrupt:
# disconnecting from server
mydb.close()
sys.exit()
Step 7: Data inserted into Table after producer and consumer execution