Detalles del proyecto

  • Inicio
  • / Detalles del proyecto

image

01 Oct 2025

Real-Time MariaDB - Elasticsearch Replication Pipeline

Repositorio en Github

Diseñé e implementé un sistema de replicación de datos en tiempo real capaz de manejar millones de inserciones de filas por día. El sistema utiliza MariaDB como fuente de datos, Debezium para la captura de cambios, Kafka y Kafka Connect para el streaming y el almacenamiento en búfer, y Elasticsearch como destino para búsquedas y análisis de alta velocidad. Esta arquitectura garantiza una sincronización casi en tiempo real entre las bases de datos transaccionales y analíticas.

Inicio

Los siguientes pasos permiten configurar una maquina virtual Linux Ubuntu 20.02 Server para replicar la tabla historico DATO_GPS en un entorno local, y utilizar un script para leer los archivos binarios de mariadb una vez configurados.

Verificar Binary logs Maria DB

Actualizar, instalar y configurar los siguientes paquetes.

sudo apt update
sudo apt upgrade -y
sudo apt install mariadb-server mariadb-client -y
sudo apt install vim ssh jq gdu lsof -y
sudo systemctl status mariadb
sudo systemctl enable --now mariadb
sudo mariadb-secure-installation
sudo mysql -u root -p

Maria DB local server settings

Configurar el contenedor local de Maria DB, a excepcion de dos configuraciones, son las mismas que se tienen configuradas en el servidor de recorridos Ondash Maria DB.

sudo cp /etc/mysql/mariadb.conf.d/50-server.cnf /etc/mysql/mariadb.conf.d/50-server.cnf.bk
sudo truncate -s 0 /etc/mysql/mariadb.conf.d/50-server.cnf
sudo vim /etc/mysql/mariadb.conf.d/50-server.cnf
[server]
[mysqld]
pid-file                = /run/mysqld/mysqld.pid
basedir                 = /usr

[mysqld]
user		= mysql
bind-address		= 0.0.0.0
key_buffer_size		= 16M
thread_cache_size       = 32

max_connections        = 300

log_error = /var/log/mysql/error.log
slow_query_log		= 1
slow_query_log_file	= /var/log/mysql/mysql-slow.log
long_query_time = 1 

innodb_buffer_pool_size = 512M # Para entorno local, de otra forma el uso de RAM se eleva
innodb_buffer_pool_instances = 16
innodb_flush_log_at_trx_commit = 2
innodb_flush_method = O_DIRECT
innodb_log_buffer_size = 128M # Para entorno local, de otra forma el uso de RAM se eleva
innodb_log_file_size = 1G
innodb_log_files_in_group = 2
innodb_autoinc_lock_mode = 2
innodb_read_io_threads = 16
innodb_write_io_threads = 16
innodb_io_capacity = 200
innodb_io_capacity_max = 8000
innodb_purge_threads = 4
innodb_thread_concurrency = 0
innodb_file_per_table=1
local-infile=1
event_scheduler=ON
performance_schema = ON
innodb_file_format=Barracuda
innodb_large_prefix=1
innodb_compression_level=6
tmp_table_size = 256M
max_heap_table_size = 256M
query_cache_type = 0
query_cache_size = 0
max_allowed_packet     = 1G
expire_logs_days        = 10
character-set-server  = utf8mb4
collation-server      = utf8mb4_general_ci

server_id = 1                  # mandatory
log_bin = /var/lib/mysql/mariadb-bin  # path + base name for binlogs
binlog_format = ROW             # safest for replication & Debezium
expire_logs_days = 3            # keep 3 days of binary logs
max_binlog_size = 100G          # optional, rotates after 100 GB
sync_binlog=1
innodb_flush_log_at_trx_commit=1
binlog_row_image = FULL
binlog_row_metadata = FULL

[embedded]
[mariadb]
[mariadb-10.6]

La configuracion:

server_id = 1                  # mandatory
log_bin = /var/lib/mysql/mariadb-bin  # path + base name for binlogs
binlog_format = ROW             # safest for replication & Debezium
expire_logs_days = 3            # keep 3 days of binary logs
max_binlog_size = 100G          # optional, rotates after 100 GB
sync_binlog=1
innodb_flush_log_at_trx_commit=1
binlog_row_image = FULL
binlog_row_metadata = FULL

Es necesaria para el mejor funcionamiento del stack Kafka+Debezium+Elasticsearch.

Configurar usuario para lectura de los archivos binarios tipo log

El usuario python-user y replicator son solo para pruebas. A la hora de configurar el stack principal no son necesarios.

sudo mysql -u root -p
create database historial_dato_gps;
CREATE USER 'python_user'@'127.0.0.1' IDENTIFIED BY 'MySecret123';
GRANT ALL PRIVILEGES ON historial_dato_gps.* TO 'python_user'@'127.0.0.1';
FLUSH PRIVILEGES;


CREATE USER 'replicator'@'%' IDENTIFIED BY 'replica_pass'; GRANT REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'replicator'@'%'; FLUSH PRIVILEGES;

-- 1. Allow replicator to read table metadata
GRANT SELECT ON historial_dato_gps.* TO 'replicator'@'%';

-- 2. Keep replication privileges
GRANT REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'replicator'@'%';

FLUSH PRIVILEGES;

Mariadb database table schema

Crear la tabla DATO_GPS para las pruebas

use historial_dato_gps;
CREATE TABLE `DATO_GPS_FULL` (
  `Id` bigint(20) unsigned NOT NULL AUTO_INCREMENT,
  `Nombre` varchar(250) NOT NULL,
  `Latitud` varchar(100) NOT NULL,
  `Longitud` varchar(100) NOT NULL,
  `Fecha` date DEFAULT NULL,
  `Hora` varchar(10) NOT NULL,
  `Gmt` varchar(5) NOT NULL DEFAULT '0',
  `TimeStamp` timestamp NOT NULL DEFAULT current_timestamp() ON UPDATE current_timestamp(),
  `IdTipo_dato_gps` int(11) NOT NULL DEFAULT 1,
  `Velocidad` int(11) DEFAULT 0,
  `Direccion` int(11) DEFAULT -1,
  `Senal` int(11) DEFAULT -1,
  `Satelites` int(11) NOT NULL DEFAULT -1,
  `Inputs` varchar(250) NOT NULL DEFAULT '',
  `Odometro` double NOT NULL DEFAULT -1,
  `Bateria_gps` double NOT NULL DEFAULT -1,
  `Bateria_veh` double NOT NULL DEFAULT -1,
  `Ignicion` tinyint(1) NOT NULL DEFAULT 2,
  `IdConductor` int(11) unsigned NOT NULL DEFAULT 0,
  `Buffered` tinyint(1) NOT NULL DEFAULT 0,
  `Contador_pasajeros` int(11) NOT NULL DEFAULT -1,
  `Datos_extras` text DEFAULT NULL,
  `Estado` tinyint(1) NOT NULL DEFAULT 1,
  PRIMARY KEY (`Id`,`TimeStamp`),
  KEY `index_nombre_timestamp` (`Nombre`,`TimeStamp`)
) ENGINE=InnoDB AUTO_INCREMENT=153365573382 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci ROW_FORMAT=COMPRESSED KEY_BLOCK_SIZE=8;

Required ssh config file inside the container main user

  • Para poder obtener los datos de recorridos ondash, se recomienda configurar el archivo /home/$USER/.ssh/config para facilitar el uso del comando que se va a encargar de abrir el tunnel SSH a el puerto 3306 de Maria DB.
mkdir -p ~/.ssh
touch ~/.ssh/redgps.pem
touch ~/.ssh/config
chmod 700 ~/.ssh
chmod 600 ~/.ssh/redgps.pem # Agregar llave SSH personal ondash recorridos process
chmod 600 ~/.ssh/config
# config file
# Recorridos ondash
Host redgps-ondash-recorridos-process
  HostName 144.126.216.142
  User spalestina # usuario de la llave ssh
  Port 6611
  IdentityFile ~/.ssh/redgps.pem
  HostKeyAlgorithms +ssh-rsa
  PubkeyAcceptedAlgorithms +ssh-rsa
  ServerAliveInterval 120 # mantiene la conexion abierta, previene cierres
  • Realizar conexion SSH, se abre un tunnel a el puerto 3306 en el servidor (Maria DB recorridos Ondash) y el puerto 3310 en la maquina virtual. Gracias a este tunnel, el Script que se va a encargar de conectarse a recorridos ondash puede hacerlo sin necesidad de configurar el firewall en DigitalOcean.
ssh -f -N -L 3310:localhost:3306 spalestina@redgps-ondash-recorridos-process
  • Una vez abierto el tunnel en el background, puedes hacer uso de mysql-client (instalado anteriormente) para poder conectarse a Maria DB de recorridos ondash.
mysql -u merlaboro_gps -p --host=127.0.0.1 -P 3310 --password=GXO4hT34xCv90xY
  • Validar que a nivel local el usuario python_user tiene acceso.
mysql -u python_user -p --host=127.0.0.1 -P 3306 --password=MySecret123

Python setup and dependencies

  • Instalar dependencias a nivel apt
sudo apt install python3 python3-virtualenv python3-pip -y
  • Crear entorno virtual en python para poder tener un entorno de desarrollo, necesario para los scripts de pruebas y de replicacion.
mkdir -p ~/dev/python-replica-ondash
cd ~/dev/python-replica-ondash
virtualenv venv -p python3
source venv/bin/activate
pip install mysql-connector-python

Script de replicacion con Python

  • El script se encarga de obtener el last id mas reciente en recorridos ondash para realizar consultas e insertar los reportes en bulk en tu base de datos Mariadb local.

replicar_dato_gps.py

import mysql.connector
import json
import os
from datetime import datetime

# Configuration
REMOTE_DB = {
    'host': '127.0.0.1',
    'port': 3310,
    'user': 'merlaboro_gps',
    'password': 'GXO4hT34xCv90xY',
    'database': 'historial_dato_gps'
}

LOCAL_DB = {
    'host': '127.0.0.1',
    'port': 3306,
    'user': 'python_user',
    'password': 'MySecret123',
    'database': 'historial_dato_gps'
}

JSON_FILE = 'last_id.json'
BATCH_SIZE = 1000

def read_last_id():
    if os.path.exists(JSON_FILE):
        with open(JSON_FILE, 'r') as f:
            data = json.load(f)
            return data.get('last_id')
    return None

def save_last_id(last_id):
    with open(JSON_FILE, 'w') as f:
        json.dump({'last_id': last_id, 'updated_at': str(datetime.now())}, f, indent=2)

remote_conn = mysql.connector.connect(**REMOTE_DB)
remote_cursor = remote_conn.cursor(dictionary=True)

local_conn = mysql.connector.connect(**LOCAL_DB)
local_cursor = local_conn.cursor()

last_id = read_last_id()
if last_id is None:
    remote_cursor.execute(f"SELECT Id FROM DATO_GPS_FULL ORDER BY Id ASC LIMIT {BATCH_SIZE}")
    first_rows = remote_cursor.fetchall()
    if not first_rows:
        print("Remote table is empty.")
        remote_cursor.close()
        remote_conn.close()
        local_cursor.close()
        local_conn.close()
        exit()
    last_id = first_rows[0]['Id'] - 1

print(f"Starting replication from Id > {last_id}")

while True:
    # Fetch batch from remote
    remote_cursor.execute("""
        SELECT *
        FROM DATO_GPS_FULL
        WHERE Id > %s
        ORDER BY Id ASC
        LIMIT %s
    """, (last_id, BATCH_SIZE))

    rows = remote_cursor.fetchall()
    if not rows:
        print("No more rows to replicate.")
        break

    for row in rows:
        columns = ", ".join(row.keys())
        placeholders = ", ".join(["%s"] * len(row))
        values = list(row.values())

        insert_query = f"""
            INSERT INTO DATO_GPS_FULL ({columns})
            VALUES ({placeholders})
            ON DUPLICATE KEY UPDATE
        """ + ", ".join([f"{col}=VALUES({col})" for col in row.keys() if col != 'Id'])

        local_cursor.execute(insert_query, values)
        last_id = row['Id']

    local_conn.commit()
    save_last_id(last_id)
    print(f"Processed batch up to Id {last_id}")

remote_cursor.close()
remote_conn.close()
local_cursor.close()
local_conn.close()
print("Replication finished.")

Python Binary manual replication script

El siguiente script solo funciona para demostrar que el servidor Maria DB tiene bien configurados los binary logs necesarios para el stack Kafka+Debezium+Elasticsearch.

read_binary_test.py

# Paquete necesario
pip install mysql-replication
from pymysqlreplication import BinLogStreamReader
from pymysqlreplication.row_event import WriteRowsEvent, UpdateRowsEvent, DeleteRowsEvent

stream = BinLogStreamReader(
    connection_settings={
        "host": "127.0.0.1",
        "port": 3306,
        "user": "replicator",
        "passwd": "replica_pass"
    },
    server_id=101,
    only_events=[WriteRowsEvent, UpdateRowsEvent, DeleteRowsEvent],
    blocking=True,
    only_schemas=["historial_dato_gps"],
    only_tables=["DATO_GPS_FULL"]
)

for binlogevent in stream:
    for row in binlogevent.rows:
        print(row)

stream.close()

Todos los pasos anteriormente proporcionados pueden funcionar en un docker estilo:

docker run -itd --name mi_contenedor_ubuntu ubuntu:latest

Pero se recomienda utilizar Oracle Virtualbox, o QEMU KVM para crear la maquina virtual. De igual forma es posible realizarlo en un Desktop o Servidor Linux con Ubuntu o Debian.

El Stack

Configuración de MariaDB con registros binarios (Binary Logs)

  1. MariaDB con Binary Logs habilitados
  • binlog_format = ROW Activa el registro de cambios a nivel de fila. Esto significa que cada vez que una fila se inserta, actualiza o elimina, se registra su valor exacto antes y después del cambio. Este formato es obligatorio para sistemas de Change Data Capture (CDC) como Debezium, porque permite reconstruir los eventos con precisión.

  • Usuario con privilegio REPLICATION SLAVE Este permiso permite a Debezium (u otro consumidor) conectarse al servidor MariaDB y “leer” los binlogs, como si fuera un servidor de réplica. Así, Debezium puede extraer en tiempo real todos los cambios que ocurren en la base de datos.

🔹 Objetivo: MariaDB actúa como la fuente de datos. Su tarea es registrar todos los cambios que ocurren en las tablas (inserciones, actualizaciones, eliminaciones) para que otros sistemas puedan capturarlos y procesarlos en tiempo real.

Kafka + Zookeeper (o KRaft)

  1. Kafka y Zookeeper (o KRaft si usas Kafka reciente)
  • Apache Kafka es un sistema de mensajería distribuido que permite enviar, almacenar y procesar flujos de datos en tiempo real.

  • Zookeeper (usado en versiones más antiguas de Kafka) coordina los nodos del clúster Kafka. En versiones nuevas (Kafka 3.x+), KRaft reemplaza a Zookeeper, integrando la coordinación directamente dentro de Kafka.

🔹 Objetivo: Kafka es el canal intermedio: recibe los eventos de cambios de Debezium y los distribuye a otros sistemas interesados (por ejemplo, Elasticsearch, sistemas de análisis o microservicios).

Kafka Connect + Debezium MySQL/MariaDB Connector

  1. Kafka Connect con el conector Debezium para MySQL/MariaDB
  • Kafka Connect es una herramienta para integrar sistemas externos con Kafka de forma modular, usando conectores.

  • Debezium es un conector especializado que lee los binlogs de MariaDB/MySQL y publica los eventos de cambio en Kafka.

  • Cada cambio (INSERT, UPDATE, DELETE) se convierte en un mensaje estructurado (JSON o Avro) y se envía a un topic en Kafka.

🔹 Objetivo: Debezium convierte los cambios de la base de datos en eventos de datos en tiempo real, permitiendo que cualquier otro sistema pueda reaccionar a ellos inmediatamente.

Kafka Connect Elasticsearch Sink Connector

  1. Conector Kafka → Elasticsearch
  • Este conector lee los mensajes (eventos de cambio) desde los topics de Kafka.

  • Luego escribe esos eventos en un índice de Elasticsearch, transformando los datos según sea necesario.

  • Se puede usar el conector oficial de Confluent o el plugin de Elasticsearch para Kafka Connect.

🔹 Objetivo: Sirve como puente de salida para enviar los datos procesados por Kafka hacia Elasticsearch, manteniendo los índices actualizados en tiempo real.

Elasticsearch

  1. Elasticsearch
  • Es un motor de búsqueda y análisis de datos altamente eficiente y escalable.

  • Al recibir los datos desde Kafka, los indexa para permitir búsquedas rápidas, agregaciones y visualizaciones.

  • Puede integrarse fácilmente con herramientas como Kibana para monitorear y explorar los datos.

🔹 Objetivo: Elasticsearch actúa como el destino final del flujo de datos, almacenando la información de MariaDB de forma optimizada para búsquedas y análisis.

| Componente | Rol | Descripción resumida | | ---------------------------- | --------------------- | ------------------------------------------------------------------ | | MariaDB | Fuente | Contiene los datos originales y genera binlogs con los cambios. | | Debezium (Kafka Connect) | Capturador de cambios | Lee los binlogs y convierte los cambios en eventos estructurados. | | Kafka | Intermediario | Transporta los eventos de datos en tiempo real a los consumidores. | | Elasticsearch Connector | Sink (destino) | Toma los eventos de Kafka y los envía a Elasticsearch. | | Elasticsearch | Destino y analizador | Indexa y permite consultar los datos con baja latencia. |

TLDR

Este stack implementa un sistema de replicación y análisis en tiempo real entre MariaDB y Elasticsearch:

Cada vez que algo cambia en MariaDB → Debezium lo detecta al instante → lo envía a Kafka → Kafka lo entrega a Elasticsearch → Elasticsearch refleja el cambio casi en tiempo real.

Esto es ideal para:

  • Dashboards en vivo,

  • Monitoreo de sistemas,

  • Sincronización entre bases de datos,

  • Auditorías de datos,

  • Procesos de event-driven architecture (arquitectura basada en eventos).

  • Totalizadores

Instalar y configurar El Stack

Paso 1: Instalar y configurar MariaDB con logs binarios

1.1 Instalar Maria DB

sudo apt update
sudo apt upgrade
sudo apt install mariadb-server -y
sudo apt install zip unzip -y
sudo systemctl enable mariadb
sudo systemctl start mariadb
sudo mysql_secure_installation

1.2 Habilitar Logs Binarios con formato ROW

sudo vim /etc/mysql/mariadb.conf.d/50-server.cnf
[server]
[mysqld]
pid-file                = /run/mysqld/mysqld.pid
basedir                 = /usr

[mysqld]
user		= mysql
bind-address		= 0.0.0.0
key_buffer_size		= 16M
thread_cache_size       = 32

max_connections        = 300

log_error = /var/log/mysql/error.log
slow_query_log		= 1
slow_query_log_file	= /var/log/mysql/mysql-slow.log
long_query_time = 1 

innodb_buffer_pool_size = 512M # Para entorno local, de otra forma el uso de RAM se eleva
innodb_buffer_pool_instances = 16
innodb_flush_log_at_trx_commit = 2
innodb_flush_method = O_DIRECT
innodb_log_buffer_size = 128M # Para entorno local, de otra forma el uso de RAM se eleva
innodb_log_file_size = 1G
innodb_log_files_in_group = 2
innodb_autoinc_lock_mode = 2
innodb_read_io_threads = 16
innodb_write_io_threads = 16
innodb_io_capacity = 200
innodb_io_capacity_max = 8000
innodb_purge_threads = 4
innodb_thread_concurrency = 0
innodb_file_per_table=1
local-infile=1
event_scheduler=ON
performance_schema = ON
innodb_file_format=Barracuda
innodb_large_prefix=1
innodb_compression_level=6
tmp_table_size = 256M
max_heap_table_size = 256M
query_cache_type = 0
query_cache_size = 0
max_allowed_packet     = 1G
expire_logs_days        = 10
character-set-server  = utf8mb4
collation-server      = utf8mb4_general_ci

server_id = 1                  # mandatory
log_bin = /var/lib/mysql/mariadb-bin  # path + base name for binlogs
binlog_format = ROW             # safest for replication & Debezium
expire_logs_days = 3            # keep 3 days of binary logs
max_binlog_size = 100G          # optional, rotates after 100 GB
sync_binlog=1
innodb_flush_log_at_trx_commit=1
binlog_row_image = FULL
binlog_row_metadata = FULL

[embedded]
[mariadb]
[mariadb-10.6]

Crear el directorio si no existe, y verificar permisos:

sudo mkdir -p /var/log/mysql
sudo chown mysql:mysql /var/log/mysql

Reiniciar MariaDB:

sudo systemctl restart mariadb
sudo systemctl status mariadb

1.3 Verificar que los logs binarios esten activos

Acceder:

sudo mysql -u root -p

Validar:

SHOW VARIABLES LIKE 'log_bin';
SHOW VARIABLES LIKE 'binlog_format';

Valores esperados ON y ROW

1.4 Crear usuario debezium

CREATE USER 'debezium'@'%' IDENTIFIED BY 'dbz-pass';
GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'debezium'@'%';
FLUSH PRIVILEGES;

A estas alturas, Maria DB esta listo para funcionar con logs binarios

Parte 2: Instalar y configurar Kafka (modo KRaft)

Kafka depende de Java 17+, instalar:

2.1 Instalar Open JDK Java 17

sudo apt install openjdk-17-jdk -y
java -version
javac -version

# alternativa: sudo apt install default-jdk
# java --version
# javac --version

2.2 Descargar Kafka

Ir a sitio web Apache Kafka -> Sitio Web Seleccionar el mas reciente (4.1.0 a fecha de escribir este archivo). Ejemplo:

mkdir ~/kafka
cd ~/kafka
wget https://dlcdn.apache.org/kafka/4.1.0/kafka_2.13-4.1.0.tgz
tar -xvzf kafka_2.13-4.1.0.tgz
sudo mv kafka_2.13-4.1.0 /opt/kafka

2.3 Configurar Kafka para el modo KRaft

Crear el identificador UUID Unico del sistema (este id debe guardarse y usarse siempre).

/opt/kafka/bin/kafka-storage.sh random-uuid

# output sample: BuNJMMTISGGD7vClL5fCyQ

Copiar dicho UUID, despues es necesario realizar el formato del almacenamiento (remplazar <UUID>):

sudo /opt/kafka/bin/kafka-storage.sh format -t <UUID> -c /opt/kafka/config/controller.properties

Respaldar archivo de configuracion por defecto del servidor Kafka

sudo cp /opt/kafka/config/controller.properties /opt/kafka/config/controller.properties.bk

Editar archivo de configuracion para el servidor Kafka:

sudo vim /opt/kafka/config/controller.properties

Configuracion minima (debe ser tal cual se muestra, buscar y remplazar o agregar):

process.roles=broker,controller
node.id=1
controller.quorum.voters=1@localhost:9093

listeners=PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093
advertised.listeners=PLAINTEXT://127.0.0.1:9092
controller.listener.names=CONTROLLER
listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
inter.broker.listener.name=PLAINTEXT

log.dirs=/tmp/kraft-combined-logs
num.network.threads=3
num.io.threads=8

Una vez editado el archivo con la configuracion minima, reiniciar el directorio de los metadatos de KRaft

sudo /opt/kafka/bin/kafka-storage.sh format -t <UUID> -c /opt/kafka/config/controller.properties

2.4 Iniciar servidor Kafka

# daemon background
sudo /opt/kafka/bin/kafka-server-start.sh -daemon /opt/kafka/config/controller.properties
# persistent mode, can see logging
sudo /opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/controller.properties

Revisar si se ejecuto el modo background:

ps aux | grep kafka

2.5 Probar funcionamiento de Kafka

  • Crear un topic:
sudo /opt/kafka/bin/kafka-topics.sh --create --topic test-topic --bootstrap-server localhost:9092
  • Lista de topics:
sudo /opt/kafka/bin/kafka-topics.sh --list --bootstrap-server localhost:9092
  • Producir mensajes:
sudo /opt/kafka/bin/kafka-console-producer.sh --topic test-topic --bootstrap-server localhost:9092

(escribir algunos mensajes en el prompt, cerrar con CTRL+C)

Consumir mensajes:

sudo /opt/kafka/bin/kafka-console-consumer.sh --topic test-topic --from-beginning --bootstrap-server localhost:9092

Si ves mensajes , Kafka se encuentra funcionando de la manera correcta.

Parte 3: Instalar Kafka Connect

Kafka Connect esta incluido por defecto en la instalacion de Kafka previamente realizada. Solo es neceario configurar e instalar los plugins requeridos.

3.1 Crear directorio para los plugins de Kafka Connect

sudo mkdir -p /opt/kafka/plugins
sudo chown -R $USER:$USER /opt/kafka/plugins

3.2 Instalar Kafka Connect Plugin Debezium MySQL Connector (compatible con Maria DB)

Descargar la version mas reciente de Debezium MySQL Connector Link

wget https://repo1.maven.org/maven2/io/debezium/debezium-connector-mariadb/3.2.2.Final/debezium-connector-mariadb-3.2.2.Final-plugin.tar.gz
tar -xvzf debezium-connector-mariadb-3.2.2.Final-plugin.tar.gz -C /opt/kafka/plugins

3.3 Instalar Kafka Connect Plugin Elasticsearch Connector

ElasticSearch Sink Connector: Source

wget https://hub-downloads.confluent.io/api/plugins/confluentinc/kafka-connect-elasticsearch/versions/15.0.1/confluentinc-kafka-connect-elasticsearch-15.0.1.zip
unzip confluentinc-kafka-connect-elasticsearch-15.0.1.zip -d /opt/kafka/plugins

Ahora /opt/kafka/plugins contiene los plugins Debezium MySQL y Elasticsearch

3.4 Configurar Plugin Kafka Connect Worker

Copiar el archivo de configuracion por defecto en la distribucion:

cp /opt/kafka/config/connect-distributed.properties /opt/kafka/config/connect.properties

Editar /opt/kafka/config/connect.properties:

sudo vim /opt/kafka/config/connect.properties
bootstrap.servers=localhost:9092

group.id=connect-cluster
offset.storage.topic=connect-offsets
config.storage.topic=connect-configs
status.storage.topic=connect-status

key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false

plugin.path=/opt/kafka/plugins

3.5 Iniciar Kafka Connect

sudo /opt/kafka/bin/connect-distributed.sh -daemon /opt/kafka/config/connect.properties

Probar que se encuentra funcionando correctamente:

curl -s http://localhost:8083/connectors

Resultado esperado es: [] (arreglo vacio, todavia no hay conecciones).

Kafka Connect esta ejecutandose y esta listo para configurar conecciones.

Parte 4: Instalar Elasticsearch

Configurar un solo nodo (suficiente para un entorno de desarrollo y pruebas).

4.1 Instalar dependencias

sudo apt install apt-transport-https curl gnupg -y

4.2 Instalar Elasticsearch mediante llave GPG

sudo apt install openjdk-17-jdk -y
java -version
javac -version

sudo wget -qO - https://artifacts.elastic.co/GPG-KEY-elasticsearch | sudo tee /usr/share/keyrings/elasticsearch-keyring.asc
echo "deb [signed-by=/usr/share/keyrings/elasticsearch-keyring.asc] https://artifacts.elastic.co/packages/8.x/apt stable main" | sudo tee /etc/apt/sources.list.d/elastic-8.x.list

sudo apt update
sudo apt install elasticsearch -y

4.3 Configurar Elasticsearch

Respaldar configuracion por defecto:

sudo cp /etc/elasticsearch/elasticsearch.yml /etc/elasticsearch/elasticsearch.yml.bk

Editar configuracion:

sudo vim /etc/elasticsearch/elasticsearch.yml

Configuracion minima desarrollo:

cluster.name: es-cluster
node.name: es-node-1

path.data: /var/lib/elasticsearch
path.logs: /var/log/elasticsearch

network.host: 127.0.0.1
http.port: 9200

discovery.type: single-node

# Disable all security
xpack.security.enabled: false
xpack.security.http.ssl.enabled: false
xpack.security.transport.ssl.enabled: false

# Comment https

# Enable encryption for HTTP API client connections, such as Kibana, Logstash, and Agents
# xpack.security.http.ssl:
#   enabled: true
#   keystore.path: certs/http.p12

# Enable encryption and mutual authentication between cluster nodes
# xpack.security.transport.ssl:
#   enabled: true
#   verification_mode: certificate
#   keystore.path: certs/transport.p12
#   truststore.path: certs/transport.p12




Importante, en el archivo /etc/elasticsearch/elasticsearch.yml buscar una linea parecida a:

cluster.initial_master_nodes: ["spc-server-tests"]

Comentar la linea o el siguiente error va a mostrarse:

[2025-10-17T15:40:23,681][ERROR][o.e.b.Elasticsearch      ] [es-node-1] fatal exception while booting Elasticsearch
java.lang.IllegalArgumentException: setting [cluster.initial_master_nodes] is not allowed when [discovery.type] is set to [single-node]
	at org.elasticsearch.cluster.coordination.ClusterBootstrapService.<init>(ClusterBootstrapService.java:88) ~[elasticsearch-8.19.5.jar:?]
	at org.elasticsearch.cluster.coordination.Coordinator.<init>(Coordinator.java:299) ~[elasticsearch-8.19.5.jar:?]
	at org.elasticsearch.discovery.DiscoveryModule.<init>(DiscoveryModule.java:199) ~[elasticsearch-8.19.5.jar:?]
	at org.elasticsearch.node.NodeConstruction.createDiscoveryModule(NodeConstruction.java:1635) ~[elasticsearch-8.19.5.jar:?]
	at org.elasticsearch.node.NodeConstruction.construct(NodeConstruction.java:1100) ~[elasticsearch-8.19.5.jar:?]
	at org.elasticsearch.node.NodeConstruction.prepareConstruction(NodeConstruction.java:292) ~[elasticsearch-8.19.5.jar:?]
	at org.elasticsearch.node.Node.<init>(Node.java:201) ~[elasticsearch-8.19.5.jar:?]
	at org.elasticsearch.bootstrap.Elasticsearch$1.<init>(Elasticsearch.java:402) ~[elasticsearch-8.19.5.jar:?]
	at org.elasticsearch.bootstrap.Elasticsearch.initPhase3(Elasticsearch.java:402) ~[elasticsearch-8.19.5.jar:?]
	at org.elasticsearch.bootstrap.Elasticsearch.main(Elasticsearch.java:99) ~[elasticsearch-8.19.5.jar:?]

Validar logs

tail -n 200 -f /var/log/elasticsearch/es-cluster.log

Start & enable service:

sudo systemctl enable elasticsearch
sudo systemctl start elasticsearch

Check status:

curl -s http://localhost:9200

Elasticsearch should reply with cluster info.

4.4 Restringir uso de memoria RAM para el servicio elasticsearch

Respaldar el archivo jvm.options

sudo cp /etc/elasticsearch/jvm.options /etc/elasticsearch/jvm.options.bk

Establecer maximo uso de RAM a 2GB

sudo vim /etc/elasticsearch/jvm.options

Buscar seccion dentro del archivo jvm.options

## IMPORTANT: JVM heap size
-Xms2g
-Xms2g

Parte 5: Configurar Debezium Source Connector (MariaDB -> Kafka)

Recordatorio de terminar de escribir este blog. Lo tengo pero lo escribi todo en ingles primero, tengo que traducirlo...

Proyectos similares

image
04 Nov 2025

REST API con .NET para manejo de subscripciones con Stripe

A full-stack platform for service scheduling

image
01 Oct 2025

Real-Time MariaDB - Elasticsearch Replication Pipeline

Diseñé e implementé un sistema de replicación de datos en tiempo real capaz de manejar millones de inserciones de filas por día.

image
10 Apr 2024

Captura de pantalla en video con Python

En este proyecto desarrollé un grabador de pantalla en Python capaz de capturar en video la actividad del escritorio en Windows.