Producer - Consumer with mySQL


Tech Stack in this Tutorial: Kafka, mysql


Let's Begin........

Step 1: Install Kafka

Step 2: Sample ecommerce_data.csv file content. Original File

date,product_id,city_id,orders

2019-12-16,1897,26,2

2019-12-16,4850,26,4

2019-12-16,2466,26,1

2019-12-16,637,26,1

2019-12-16,3497,26,184

Step 3: Sample Producer code

import time

from kafka import KafkaProducer

import os

import json


bootstrap_servers = ['localhost:9092']

topicName = 'producer-consumer-demo'

producer = KafkaProducer(bootstrap_servers = bootstrap_servers, retries = 5,value_serializer=lambda m: json.dumps(m).encode('ascii'))


with open('ecommerce_data.csv') as f:

head = [val.strip('\n') for val in next(f).split(',')]

for line in f:

content = dict(zip(head, [val.strip('\n') for val in line.split(',')]))

print(content)

ack = producer.send(topicName, content)

metadata = ack.get()

#time.sleep(2)

Step 4: Install mysql

Step 5: Create DB, and required Table

CREATE DATABASE Kafka;

use Kafka;

CREATE TABLE orders(date VARCHAR(20), product_id INTEGER(20), city_id INTEGER(20), orders INTEGER(20));

Step 6: Sample Consumer Code

from kafka import KafkaConsumer

import sys

import json

import mysql.connector


mydb = mysql.connector.connect(

host="localhost",

user="root",

password="XXXXXXXX",

database="Kafka"

)


mycursor = mydb.cursor()


bootstrap_servers = ['localhost:9092']

topicName = 'producer-consumer-demo'

consumer = KafkaConsumer(topicName,bootstrap_servers = bootstrap_servers, auto_offset_reset = 'latest')


try:

for message in consumer:

msg = json.loads(message.value)

sql = "INSERT INTO orders(date, product_id, city_id, orders) VALUES (%s, %s, %s, %s)"

val = (msg['date'], int(msg['product_id']), int(msg['city_id']), int(msg['orders']))

print(val)

mycursor.execute(sql, val)

mydb.commit()

# disconnecting from server

mydb.close()

except KeyboardInterrupt:

# disconnecting from server

mydb.close()

sys.exit()

Step 7: Data inserted into Table after producer and consumer execution