Requisito previo
Tienes que instalar la biblioteca de Python necesaria para leer datos de Kafka. Python3 se utiliza en este tutorial para escribir el script de consumidor y productor. Si el paquete pip no está instalado antes en su sistema operativo Linux, entonces debe instalar pip antes de instalar la biblioteca Kafka para python. python3-kafka se utiliza en este tutorial para leer datos de Kafka. Ejecute el siguiente comando para instalar la biblioteca.
$ pip instalar python3-kafkaLectura de datos de texto simple de Kafka
El productor puede enviar diferentes tipos de datos sobre un tema en particular que el consumidor puede leer. En esta parte de este tutorial se muestra cómo se pueden enviar y recibir datos de texto simple desde Kafka utilizando el productor y el consumidor.
Crea un archivo llamado productor1.py con el siguiente script de Python. Productor Kafka El módulo se importa de la biblioteca de Kafka. La lista de agentes debe definirse en el momento de la inicialización del objeto del productor para conectarse con el servidor de Kafka. El puerto predeterminado de Kafka es '9092'. El argumento bootstrap_servers se usa para definir el nombre de host con el puerto. 'First_Topic'se establece como un nombre de tema mediante el cual el productor enviará un mensaje de texto. A continuación, un simple mensaje de texto 'Hola de Kafka'se envía usando enviar() método de Productor Kafka al tema, 'First_Topic'.
productor1.py:
# Importar KafkaProducer de la biblioteca de Kafkade kafka import KafkaProducer
# Definir servidor con puerto
bootstrap_servers = ['localhost: 9092']
# Definir el nombre del tema donde se publicará el mensaje
topicName = 'First_Topic'
# Inicializar variable de productor
productor = KafkaProducer (bootstrap_servers = bootstrap_servers)
# Publicar texto en un tema definido
productor.enviar (topicName, b'Hola de kafka ... ')
# Imprimir mensaje
imprimir ("Mensaje enviado")
Crea un archivo llamado consumidor1.py con el siguiente script de Python. KafkaConsumidor El módulo se importa de la biblioteca de Kafka para leer datos de Kafka. sys El módulo se usa aquí para terminar el script. El mismo nombre de host y número de puerto del productor se utilizan en el script del consumidor para leer datos de Kafka. El nombre del tema del consumidor y del productor debe ser el mismo que es 'First_topic'. A continuación, el objeto consumidor se inicializa con los tres argumentos. Nombre del tema, identificación del grupo e información del servidor. por El bucle se usa aquí para leer el envío de texto del productor de Kafka.
consumidor1.py:
# Importar KafkaConsumer de la biblioteca de Kafkade kafka import KafkaConsumer
# Importar módulo sys
importar sys
# Definir servidor con puerto
bootstrap_servers = ['localhost: 9092']
# Definir el nombre del tema desde donde se recibirá el mensaje
topicName = 'First_Topic'
# Inicializar variable de consumidor
consumidor = KafkaConsumer (topicName, group_id = 'group1', bootstrap_servers =
bootstrap_servers)
# Leer e imprimir el mensaje del consumidor
para msg en consumidor:
print ("Nombre del tema =% s, Mensaje =% s"% (msg.tema, mensaje.valor))
# Terminar el guión
sys.Salida()
Producción:
Ejecute el siguiente comando desde una terminal para ejecutar el script de productor.
$ python3 productor1.pyLa siguiente salida aparecerá después de enviar el mensaje.
Ejecute el siguiente comando desde otro terminal para ejecutar el script del consumidor.
$ python3 consumidor1.pyLa salida muestra el nombre del tema y el mensaje de texto enviado por el productor.
Lectura de datos con formato JSON de Kafka
El productor de Kafka puede enviar datos formateados JSON y el consumidor de Kafka los puede leer utilizando el json módulo de python. En esta parte de este tutorial se muestra cómo se pueden serializar y deserializar los datos JSON antes de enviar y recibir los datos mediante el módulo python-kafka.
Crea una secuencia de comandos de Python llamada productor2.py con el siguiente script. Otro módulo llamado JSON se importa con Productor Kafka módulo aquí. value_serializer el argumento se usa con bootstrap_servers argumento aquí para inicializar el objeto del productor de Kafka. Este argumento indica que los datos JSON se codificarán usando 'utf-8'juego de caracteres en el momento del envío. A continuación, los datos con formato JSON se envían al tema denominado JSONtopic.
productor2.py:
# Importar KafkaProducer de la biblioteca de Kafkade kafka import KafkaProducer
# Importar módulo JSON para serializar datos
importar json
# Inicializar la variable de productor y establecer el parámetro para la codificación JSON
productor = KafkaProducer (bootstrap_servers =
['localhost: 9092'], value_serializer = lambda v: json.vertederos (v).codificar ('utf-8'))
# Enviar datos en formato JSON
productor.enviar ('JSONtopic', 'nombre': 'fahmida', 'correo electrónico': '[correo electrónico protegido]')
# Imprimir mensaje
print ("Mensaje enviado a JSONtopic")
Crea una secuencia de comandos de Python llamada consumidor2.py con el siguiente script. KafkaConsumidor, sys y los módulos JSON se importan en este script. KafkaConsumidor El módulo se utiliza para leer datos con formato JSON de Kafka. El módulo JSON se utiliza para decodificar los datos JSON codificados enviados desde el productor de Kafka. Sys El módulo se usa para terminar el script. value_deserializer el argumento se usa con bootstrap_servers para definir cómo se decodificarán los datos JSON. próximo, por El bucle se utiliza para imprimir todos los registros del consumidor y los datos JSON recuperados de Kafka.
consumidor2.py:
# Importar KafkaConsumer de la biblioteca de Kafkade kafka import KafkaConsumer
# Importar módulo sys
importar sys
# Importar módulo json para serializar datos
importar json
# Inicialice la variable del consumidor y establezca la propiedad para la decodificación JSON
consumidor = KafkaConsumer ('JSONtopic', bootstrap_servers = ['localhost: 9092'],
value_deserializer = lambda m: json.cargas (m.decodificar ('utf-8')))
# Leer datos de kafka
para el mensaje en el consumidor:
print ("Registros del consumidor: \ n")
imprimir (mensaje)
print ("\ nLeyendo datos JSON \ n")
print ("Nombre:", mensaje [6] ['nombre'])
imprimir ("Correo electrónico:", mensaje [6] ['correo electrónico'])
# Terminar el guión
sys.Salida()
Producción:
Ejecute el siguiente comando desde una terminal para ejecutar el script de productor.
$ python3 productor2.pyEl script imprimirá el siguiente mensaje después de enviar los datos JSON.
Ejecute el siguiente comando desde otro terminal para ejecutar el script del consumidor.
$ python3 consumidor2.pyLa siguiente salida aparecerá después de ejecutar el script.
Conclusión:
Los datos se pueden enviar y recibir en diferentes formatos desde Kafka usando python. Los datos también se pueden almacenar en la base de datos y recuperar de la base de datos usando Kafka y Python. Ya en casa, este tutorial ayudará al usuario de Python a comenzar a trabajar con Kafka.