Skip to content
Snippets Groups Projects
Commit 989565bb authored by Florian Gransee's avatar Florian Gransee Committed by Luca Johannes Nendel
Browse files

Resolve "replace Kafka with MQTT"

parent c33a3555
No related branches found
No related tags found
1 merge request!9Resolve "replace Kafka with MQTT"
......@@ -22,7 +22,7 @@ that repo:
cp .env.example .env
```
The settings from the example are ok for local testing and development.
Postgres, Minio and Kafka services are exposed on localhost, so you can
Postgres, Minio and MQTT services are exposed on localhost, so you can
access them with clients from your machine.
When using this in (semi-) production (i.g. on a server) some settings,
......@@ -38,8 +38,7 @@ issued by
docker-compose up -d
```
It will take some seconds until everything is up. Especially the kafka
service is very costly and will engage your CPU and CPU fan.
It will take some seconds until everything is up.
## 3. Create a thing
......@@ -48,17 +47,14 @@ series data in one or more data streams. In ZID/TSM we follow the
approach, that an end user is able to create a new *thing* and all its
settings for its infrastructure like database credentials or parser
properties. When somebody enters or changes settings of a *thing* these
changes are populated to *action services* by kafka events.
changes are populated to *action services* by MQTT events.
As long as ZID/TSM doesn't have a graphical end user frontend we have to
produce events by ourselves. We directly use the kafka container for
produce events by ourselves. We directly use the MQTT container for
that:
```bash
cat thing-event-msg.json | tr -d '\n' | docker-compose exec -T kafka kafka-console-producer.sh --broker-list kafka:9092 --topic thing_created
# Be aware of the `tr` step - `kafka-console-producer` is processing all
# input line by line and will break multiline (JSON) strings otherwise.
cat thing-event-msg.json | docker-compose exec -T mqtt-broker sh -c "mosquitto_pub -t thing_created -u \$MQTT_USER -P \$MQTT_PASSWORD -s"
```
The dispatcher action services will create
......@@ -77,7 +73,7 @@ Now you can go to the fresh new bucket in the
and upload a `csv` file.
The dispatcher action service called *run-process-new-file-service* gets
notified by a kafka event produced by minio and will forward the file
notified by a MQTT event produced by minio and will forward the file
resource and the necessary settings to the scheduler. The scheduler
starts the extractor wo will parse the data and write it to the things
database.
......@@ -95,14 +91,6 @@ docker-compose down --timeout 0 -v --remove-orphans && ./remove-all-data.sh
All data is lost with this. Be careful!
# Doing it with Mosquitto MQTT instead of Apache Kafka
**Step 3 is:**
```bash
cat thing-event-msg.json | docker-compose exec -T mqtt-broker sh -c "mosquitto_pub -t thing_created -u \$MQTT_USER -P \$MQTT_PASSWORD -s"
```
# Further thoughts and hints
## Configuring and operating Mosquitto MQTT broker
......@@ -162,14 +150,6 @@ For dynamic acls from database: https://gist.github.com/TheAshwanik/7ed2a3032ca1
mc admin info myminio/ --json | jq .info.sqsARN
```
## Kafka
Debugging Kafka events:
```bash
docker-compose logs --follow kafkacat
```
## Naming conventions
Human readable ID for projects and things: Use UUID as suffix and
......
......@@ -6,9 +6,9 @@ services:
ports:
- "${POSTGRES_PORT:-127.0.0.1:5432}:5432"
environment:
- POSTGRES_USER=${POSTGRES_USER:?Please define an postgres user!}
- POSTGRES_PASSWORD=${POSTGRES_PASSWORD:?Please define an postgres password!}
- PGDATA=/var/lib/postgresql/data/pgdata
POSTGRES_USER: ${POSTGRES_USER:?Please define an postgres user!}
POSTGRES_PASSWORD: ${POSTGRES_PASSWORD:?Please define an postgres password!}
PGDATA: /var/lib/postgresql/data/pgdata
volumes:
- ./data/postgres/data:/var/lib/postgresql/data
- ./data/postgres/postgres-ddl.sql:/docker-entrypoint-initdb.d/postgres-ddl.sql
......@@ -23,30 +23,24 @@ services:
image: minio/minio
restart: ${RESTART:-on-failure}
depends_on:
kafka:
condition: service_healthy
mqtt-broker:
condition: service_healthy
- mqtt-broker
ports:
- "${MINIO_API_PORT:-127.0.0.1:9000}:9000"
- "${MINIO_CONSOLE_PORT:-127.0.0.1:9001}:9001"
environment:
MINIO_ROOT_USER: ${MINIO_ROOT_USER:?Please define an minio root user!}
MINIO_ROOT_PASSWORD: ${MINIO_ROOT_PASSWORD:?Please define an minio root user password!}
MINIO_NOTIFY_KAFKA_ENABLE: "on"
MINIO_NOTIFY_KAFKA_BROKERS: "kafka:9092"
MINIO_NOTIFY_KAFKA_TOPIC: "minio-bucket-notifications"
MINIO_NOTIFY_MQTT_ENABLE_LOCAL_BROKER: "on"
MINIO_NOTIFY_MQTT_BROKER_LOCAL_BROKER: "tcp://mqtt-broker:1883"
MINIO_NOTIFY_MQTT_TOPIC_LOCAL_BROKER: "minio-bucket-notifications"
MINIO_NOTIFY_MQTT_USERNAME_LOCAL_BROKER: "${MQTT_USER:?Please define a mqtt user!}"
MINIO_NOTIFY_MQTT_PASSWORD_LOCAL_BROKER: "${MQTT_PASSWORD:?Please define a mqtt password!}"
MINIO_NOTIFY_MQTT_KEEP_ALIVE_INTERVAL_LOCAL_BROKER: "60s"
# MINIO_NOTIFY_MQTT_QOS_LOCAL_BROKER: "<string>"
# MINIO_NOTIFY_MQTT_QOS_LOCAL_BROKER: "<string>"
MINIO_NOTIFY_MQTT_RECONNECT_INTERVAL_LOCAL_BROKER: "60s"
# MINIO_NOTIFY_MQTT_QUEUE_DIR_LOCAL_BROKER: "<string>"
# MINIO_NOTIFY_MQTT_QUEUE_LIMIT_LOCAL_BROKER: "<string>"
# MINIO_NOTIFY_MQTT_COMMENT_LOCAL_BROKER: "<string>"
# MINIO_NOTIFY_MQTT_QUEUE_DIR_LOCAL_BROKER: "<string>"
# MINIO_NOTIFY_MQTT_QUEUE_LIMIT_LOCAL_BROKER: "<string>"
# MINIO_NOTIFY_MQTT_COMMENT_LOCAL_BROKER: "<string>"
MINIO_SERVER_URL: "${MINIO_SERVER_URL:-}"
volumes:
- ./data/minio/vol0:/vol0
......@@ -67,61 +61,23 @@ services:
timeout: 5s
retries: 15
zookeeper:
image: docker.io/bitnami/zookeeper:3.7
restart: ${RESTART:-on-failure}
volumes:
- ./data/zookeeper:/bitnami/zookeeper
user: "${UID}:0"
environment:
- ALLOW_ANONYMOUS_LOGIN=yes
healthcheck:
test: [ "CMD", "zkServer.sh", "status" ]
interval: 2s
timeout: 10s
retries: 15
kafka:
image: docker.io/bitnami/kafka:3
hostname: kafka
restart: on-failure
ports:
- "${KAFKA_PORT:-127.0.0.1:9092}:9092"
volumes:
- ./data/kafka:/bitnami/kafka
user: "${UID}:0"
environment:
- KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
- ALLOW_PLAINTEXT_LISTENER=yes
- KAFKA_ADVERTISED_HOST_NAME=kafka
- KAFKA_ADVERTISED_PORT=9092
depends_on:
zookeeper:
condition: service_healthy
healthcheck:
test: ["CMD", "kafka-topics.sh", "--bootstrap-server", "127.0.0.1:9092", "--list"]
interval: 2s
timeout: 10s
retries: 15
dispatcher-producer:
image: git.ufz.de:4567/rdm-software/timeseries-management/tsm-dispatcher/dispatcher:latest
command:
- --version
dispatcher-minio-new-thing-runner:
image: git.ufz.de:4567/rdm-software/timeseries-management/tsm-dispatcher/dispatcher:latest
restart: ${RESTART:-on-failure}
depends_on:
kafka:
condition: service_healthy
mqtt-broker:
condition: service_started
minio:
condition: service_healthy
command:
- --topic
- thing_created
- --kafka-server
- kafka:9092
- --mqtt-broker
- mqtt-broker:1883
- --mqtt-user
- ${MQTT_USER:?Please define a mqtt user!}
- --mqtt-password
- ${MQTT_PASSWORD:?Please define a mqtt password!}
- run-create-thing-on-minio-action-service
- --minio_secure
- "${MINIO_SECURE:-False}"
......@@ -133,15 +89,17 @@ services:
image: git.ufz.de:4567/rdm-software/timeseries-management/tsm-dispatcher/dispatcher:latest
restart: ${RESTART:-on-failure}
depends_on:
kafka:
condition: service_healthy
minio:
condition: service_healthy
- mqtt-broker
- postgres
command:
- --topic
- thing_created
- --kafka-server
- kafka:9092
- --mqtt-broker
- mqtt-broker:1883
- --mqtt-user
- ${MQTT_USER:?Please define a mqtt user!}
- --mqtt-password
- ${MQTT_PASSWORD:?Please define a mqtt password!}
- run-create-database-schema-action-service
- postgresql://${CREATEDB_POSTGRES_USER:?Please define an postgres user!}:${CREATEDB_POSTGRES_PASSWORD:?Please define an postgres password!}@${CREATEDB_POSTGRES_HOST:-postgres}/${CREATEDB_POSTGRES_DATABASE:-postgres}
......@@ -149,15 +107,19 @@ services:
image: git.ufz.de:4567/rdm-software/timeseries-management/tsm-dispatcher/dispatcher:latest
restart: on-failure
depends_on:
kafka:
condition: service_healthy
mqtt-broker:
condition: service_started
minio:
condition: service_healthy
command:
- --topic
- minio-bucket-notifications
- --kafka-server
- kafka:9092
- --mqtt-broker
- mqtt-broker:1883
- --mqtt-user
- ${MQTT_USER:?Please define a mqtt user!}
- --mqtt-password
- ${MQTT_PASSWORD:?Please define a mqtt password!}
- run-process-new-file-service
- --minio_secure
- "${MINIO_SECURE:-False}"
......@@ -166,26 +128,6 @@ services:
- ${MINIO_ROOT_PASSWORD:?Please define an minio root user password!}
- http://extractor:5000/extractor/run
# for debugging kafka events
kafkacat:
restart: ${RESTART:-on-failure}
image: confluentinc/cp-kafkacat
command:
- kafkacat
- -b
- kafka:9092
- -C
- -u
- -G
- 5867ab54
- thing_created
- minio-bucket-notifications
depends_on:
kafka:
condition: service_healthy
minio:
condition: service_healthy
extractor:
image: git.ufz.de:4567/rdm-software/timeseries-management/tsm-basic-demo-scheduler/basic_demo_scheduler:latest
restart: ${RESTART:-on-failure}
......@@ -232,6 +174,7 @@ services:
- -t
- thedoors-057d8bba-40b3-11ec-a337-125e5a40a849/#
grafana:
restart: ${RESTART:-on-failure}
image: grafana/grafana:latest
......@@ -254,4 +197,3 @@ services:
- ${GRAFANA_TLS_KEY_PATH:-/tmp/c8cf2d92-73cd-11ec-b035-54e1ad7c5c19}:/etc/ssl/private.key:ro
ports:
- "${GRAFANA_PORT:-127.0.0.1:3000}:3000"
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment