Kafka-Zookeeper在Rainbond部署使用以及应用制作分享

1. 简介

什么是Kafka?

Apache Kafka是一个分布式流媒体平台,用于构建实时数据管道和流媒体应用。它是水平可扩展的、故障导通的。

什么是Zookeeper?

https://zookeeper.apache.org

ZooKeeper是用于维护配置信息、命名、提供分布式同步和提供组服务的集中式服务。

本次文章中 Kafka Zookeeper 均是基于bitnami制作的镜像进行修改,并部署在Rainbond中。

2.快速安装

Kafka-Zookeeper已发布在开源应用商店中,可从开源应用商店一键部署。

3.如何伸缩Kafka

通过快速复制功能,复制出另一个Kafka组件,依赖于Zookeeper

4.如何伸缩Zookeeper

首先关闭Zookeeper组件,进入组件中 > 伸缩 > 实例数量,手动进行伸缩实例,修改完启动组件即可。

缩小实例也是如此,减少实例数量。

5. 如何使用

5.1 外部访问

  1. 进入 Zookeeper 组件中,打开Zookeeper 2181的对外端口(默认已打开,进入组件内查看端口即可)。

  2. 进入 每个 Kafka 组件中,修改环境变量,IP+PORT从组件的对外端口中获取。

    KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://xxx:xxx
    
  3. 打开 Kafka 客户端工具,连接 Zookeeper Kafka 的对外端口地址即可访问。

6.应用制作过程分享

基于 bitnami-docker- zookeeper bitnami-docker-kafka 进行的修改

6.1 Zookeeper的改动

Fork bitnami 的仓库进行修改,新增了rainbond-env.sh 文件,修改内容如下:

#!/bin/bash
set -o errexit
set -o nounset
set -o pipefail
# set -o xtrace # Uncomment this line for debugging purposes

## This script works with Rainbond

# define zookeeper id
export ZOO_SERVER_ID=`expr ${HOSTNAME#*-} + 1`

# define zookeeper server list

ZOO_SERVERS_LIST=()

for (( i = 0;i < $SERVICE_POD_NUM;i++)); do
    if [ $SERVICE_POD_NUM != 1 ]; then
        ZOO_SERVERS_LIST[$i]="$SERVICE_NAME-$i.$SERVICE_NAME:2888:3888"
    fi
done

if [ $SERVICE_POD_NUM != 1 ]; then
    echo "zookeeper cluster nodes are ${ZOO_SERVERS_LIST[@]}"
    export ZOO_SERVERS=$(echo ${ZOO_SERVERS_LIST[@]} | tr ' ' ',')
fi

# set default_java_mem_opts
case ${MEMORY_SIZE} in
    "micro")
       export ZOO_HEAP_SIZE=128
       echo "Optimizing java process for 128M Memory...." >&2
       ;;
    "small")
       export ZOO_HEAP_SIZE=256
       echo "Optimizing java process for 256M Memory...." >&2
       ;;
    "medium")
       export ZOO_HEAP_SIZE=512
       echo "Optimizing java process for 512M Memory...." >&2
       ;;
    "large")
       export ZOO_HEAP_SIZE=1024
       echo "Optimizing java process for 1G Memory...." >&2
       ;;
    "2xlarge")
       export ZOO_HEAP_SIZE=2048
       echo "Optimizing java process for 2G Memory...." >&2
       ;;
    "4xlarge")
       export ZOO_HEAP_SIZE=4096
       echo "Optimizing java process for 4G Memory...." >&2
       ;;
    "8xlarge")
       export ZOO_HEAP_SIZE=8192
       echo "Optimizing java process for 8G Memory...." >&2
       ;;
    16xlarge|32xlarge|64xlarge)
       export ZOO_HEAP_SIZE=16384
       echo "Optimizing java process for biger Memory...." >&2
       ;;
esac

以上 rainbond-env.sh 脚本文件包含了:

  1. 根据系统环境变量 HOSTNAME 自动获取 ZOO_SERVER_ID 的值,并且动态伸缩实例时自增。
  2. 根据系统环境变量 SERVICE_POD_NUM 自动获取当前有几个POD,通过 SERVICE_NAME 获取当前POD的访问地址,并写入到变量 ZOO_SERVERS 中,达到动态伸缩实例,自动组成集群。
  3. 根据 动态配置JVM内存 动态调整Zookeeper的JVM内存。

修改完以上内容,可在Rainbond平台上进行源码构建,并可达到动态伸缩实例的效果。

6.2 Kafka的改动

Fork bitnami 的仓库进行修改,新增了rainbond-env.sh 文件,修改内容如下:

#!/bin/bash
set -o errexit
set -o nounset
set -o pipefail
# set -o xtrace # Uncomment this line for debugging purposes

## This script works with Rainbond

# define kafka broker id
KAFKA_CFG_BROKER_ID=${HOSTNAME#*-}

# define zk server Addr
if [ -z $DEPEND_SERVICE ]; then
    ZooKeeperServer=$(nslookup ${DEPEND_SERVICE%:*} | grep Address | sed '1d' | awk '{print $2":2181"}')
    export KAFKA_CFG_ZOOKEEPER_CONNECT=$(echo $ZooKeeperServer | tr ' ' ',')
fi

# set default_java_mem_opts
case ${MEMORY_SIZE} in
    "micro")
       export KAFKA_HEAP_OPTS="-Xmx128m -Xms128m"
       echo "Optimizing java process for 128M Memory...." >&2
       ;;
    "small")
       export KAFKA_HEAP_OPTS="-Xmx256m -Xms256m"
       echo "Optimizing java process for 256M Memory...." >&2
       ;;
    "medium")
       export KAFKA_HEAP_OPTS="-Xmx512m -Xms512m"
       echo "Optimizing java process for 512M Memory...." >&2
       ;;
    "large")
       export KAFKA_HEAP_OPTS="-Xmx1024m -Xms1024m"
       echo "Optimizing java process for 1G Memory...." >&2
       ;;
    "2xlarge")
       export KAFKA_HEAP_OPTS="-Xmx2048m -Xms2048m"
       echo "Optimizing java process for 2G Memory...." >&2
       ;;
    "4xlarge")
       export KAFKA_HEAP_OPTS="-Xmx4096m -Xms4096m"
       echo "Optimizing java process for 4G Memory...." >&2
       ;;
    "8xlarge")
       export KAFKA_HEAP_OPTS="-Xmx8192m -Xms8192m"
       echo "Optimizing java process for 8G Memory...." >&2
       ;;
    16xlarge|32xlarge|64xlarge)
       export KAFKA_HEAP_OPTS="-Xmx16384m -Xms16384m"
       echo "Optimizing java process for biger Memory...." >&2
       ;;
esac

以上 rainbond-env.sh 脚本文件包含了:

  1. 根据系统环境变量 HOSTNAME 自动获取 KAFKA_CFG_BROKER_ID 的值。
  2. 当依赖了Zookeeper组件后,会自动注入环境变量 DEPEND_SERVICE ,使用 环境变量 DEPEND_SERVICE 解析出Zookeeper的 IPIP 写入到 KAFKA_CFG_ZOOKEEPER_CONNECT 环境变量中。
  3. 根据 动态配置JVM内存 动态调整Kafka的JVM内存。

修改完以上内容,可在Rainbond平台上进行源码构建,并且需依赖Zookeeper组件。