項目環(huán)境搭建:{vmware(虛擬機網(wǎng)絡配置),CentOS,MySql,Docker,kafka}

環(huán)境:CentOS 7.6 64bit,裝在了VM虛擬機中。

00、配置網(wǎng)絡環(huán)境(因為使用了vm虛擬機。非VM環(huán)境就無需)

#第一步:修改網(wǎng)絡配置(使用NAT模式),打開以下網(wǎng)絡配置文件,路徑和修改內容如下:
vi /etc/sysconfig/network-scripts/ifcfg-ens33
--修改
ONBOOT=yes 
BOOTPROTO=static //靜態(tài)網(wǎng)絡IP   dhcp 動態(tài)獲取網(wǎng)絡IP
--添加
IPADDR=192.168.58.xxx
NETMASK=255.255.255.0
GATEWAY=192.168.58.xxx
DNS1=114.114.114.114
--刪除
UUID
--進行網(wǎng)絡測試
ip addr
systemctl restart network.service
ping www.baidu.com

#第二步:關閉網(wǎng)絡防火墻:
systemctl stop firewalld (本次服務內關閉防火墻)
systemctl disable firewalld(禁用防火墻服務,服務器重啟后防火墻禁用)

#第三步:關閉軟件安裝限制:
vi /etc/selinux/config
--修改配置
SELINUX=disabled

01、MySQL安裝

、下載并安裝mysql:

wget http://dev.mysql.com/get/mysql57-community-release-el7-10.noarch.rpm
yum -y install mysql57-community-release-el7-10.noarch.rpm
yum -y install mysql-community-server
#遇到錯誤:
失敗的軟件包是:mysql-community-client-5.7.41-1.el7.x86_64
GPG  密鑰配置為:file:///etc/pki/rpm-gpg/RPM-GPG-KEY-mysql
#依次執(zhí)行以下命令
rpm --import https://repo.mysql.com/RPM-GPG-KEY-mysql-2022
yum install -y mysql-community-server

、啟動并查看狀態(tài)MySQL:

systemctl start  mysqld.service
systemctl status mysqld.service

、查看MySQL的默認密碼:

grep "password" /var/log/mysqld.log
#如下結果,最后類似的隨機字符串“3Xn1mKp.TnQ(”就是默認密碼
2023-03-16T00:29:16.010934Z 1 [Note] A temporary password is generated for root@localhost: 3Xn1mKp.TnQ(

、登錄進MySQL

mysql -uroot -p

、修改默認密碼(設置密碼需要有大小寫符號組合---安全性),把下面的my passrod替換成自己的密碼

ALTER USER 'root'@'localhost' IDENTIFIED BY 'my password';

#如下設置會報錯,因為密碼太簡單;這個與驗證密碼策略validate_password_policy的值有關。默認是1,
#所以剛開始設置的密碼必須符合長度,且必須含有數(shù)字,小寫或大寫字母,特殊字符。如果不想設置8位,或者想設置簡單點,可以選擇Policy0.
ALTER USER 'root'@'localhost' IDENTIFIED BY '123456';
Your password does not satisfy the current policy requirements

#修改validate_password_policy參數(shù)的值(等級為0)
mysql> set global validate_password_policy=0;
ALTER USER 'root'@'localhost' IDENTIFIED BY 'Aa123456';

、開啟遠程訪問 (把下面的my passrod替換成自己的密碼)

grant all privileges on *.* to 'root'@'%' identified by 'my password' with grant option;
flush privileges;
exit

、在云服務上增加MySQL的端口:

02、Docker環(huán)境

首先我們需要安裝GCC相關的環(huán)境:

yum -y install gcc

yum -y install gcc-c++

安裝Docker需要的依賴軟件包:

yum install -y yum-utils device-mapper-persistent-data lvm2

設置國內的鏡像(提高速度)

yum-config-manager --add-repo http://mirrors.aliyun.com/docker-ce/linux/centos/docker-ce.repo

更新yum軟件包索引:

yum makecache fast

安裝DOCKER CE(注意:Docker分為CE版和EE版,一般我們用CE版就夠用了)

yum -y install docker-ce

啟動Docker:

systemctl start docker

下載回來的Docker版本::

docker version

來一發(fā)HelloWorld:

docker run hello-world

03、Docker compose環(huán)境

Compose 是用于定義和運行多容器 Docker 應用程序的工具。通過 Compose,您可以使用 YML 文件來配置應用程序需要的所有服務。然后,使用一個命令,就可以從 YML 文件配置中創(chuàng)建并啟動所有服務.
運行以下命令以下載 Docker Compose 的當前穩(wěn)定版本:

sudo curl -L "https://github.com/docker/compose/releases/download/1.24.1/docker-compose-$(uname -s)-$(uname -m)" -o /usr/local/bin/docker-compose

將可執(zhí)行權限應用于二進制文件:

sudo chmod +x /usr/local/bin/docker-compose

創(chuàng)建軟鏈:

sudo ln -s /usr/local/bin/docker-compose /usr/bin/docker-compose

測試是否安裝成功:

docker-compose --version

04、kafka

kafka >>> compose 文件

新建搭建kafka環(huán)境的docker-compose.yml文件,內容如下:
文件內TODO 中的ip需要改成自己的,并且如果你用的是云服務器,那需要把端口給打開。

version: '3'
services:
  zookepper:
    image: wurstmeister/zookeeper                    # 原鏡像`wurstmeister/zookeeper`
    container_name: zookeeper                        # 容器名為'zookeeper'
    volumes:                                         # 數(shù)據(jù)卷掛載路徑設置,將本機目錄映射到容器目錄
      - "/etc/localtime:/etc/localtime"
    ports:                                           # 映射端口
      - "2181:2181"

  kafka:
    image: wurstmeister/kafka                                # 原鏡像`wurstmeister/kafka`
    container_name: kafka                                    # 容器名為'kafka'
    volumes:                                                 # 數(shù)據(jù)卷掛載路徑設置,將本機目錄映射到容器目錄
      - "/etc/localtime:/etc/localtime"
    environment:                                                       # 設置環(huán)境變量,相當于docker run命令中的-e
      KAFKA_BROKER_ID: 0                                               # 在kafka集群中,每個kafka都有一個BROKER_ID來區(qū)分自己
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://ip:9092 # TODO 將kafka的地址端口注冊給zookeeper
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092                        # 配置kafka的監(jiān)聽端口
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181                # zookeeper地址
      KAFKA_CREATE_TOPICS: "hello_world"
    ports:                              # 映射端口
      - "9092:9092"
    depends_on:                         # 解決容器依賴啟動先后問題
      - zookepper

  kafka-manager:
    image: sheepkiller/kafka-manager                         # 原鏡像`sheepkiller/kafka-manager`
    container_name: kafka-manager                            # 容器名為'kafka-manager'
    environment:                        # 設置環(huán)境變量,相當于docker run命令中的-e
      ZK_HOSTS: zookeeper:2181  #  zookeeper地址
      APPLICATION_SECRET: xxxxx
      KAFKA_MANAGER_AUTH_ENABLED: "true"  # 開啟kafka-manager權限校驗
      KAFKA_MANAGER_USERNAME: admin       # 登陸賬戶
      KAFKA_MANAGER_PASSWORD: 123456      # 登陸密碼
    ports:                              # 映射端口
      - "9000:9000"
    depends_on:                         # 解決容器依賴啟動先后問題
      - kafka

kafka >>> 啟動kafka

在存放docker-compose.yml的目錄下執(zhí)行啟動命令:

#格式為docker-compose up [options] [SERVICE...],
#該命令可以自動完成包括構建鏡像,(重新)創(chuàng)建服務,啟動服務,并關聯(lián)服務相關容器的一系列操作。
#-d表示后臺執(zhí)行
docker-compose up -d

可以查看下docker鏡像運行的情況:

docker ps 

進入kafka 的容器:

docker exec -it kafka sh

創(chuàng)建一個topic(這里我的topicName就叫austin,你們可以改成自己的)

$KAFKA_HOME/bin/kafka-topics.sh --create --topic austin --partitions 4 --zookeeper zookeeper:2181 --replication-factor 1 

查看剛創(chuàng)建的topic信息:

$KAFKA_HOME/bin/kafka-topics.sh --zookeeper zookeeper:2181 --describe --topic austin

啟動一個消費者:

$KAFKA_HOME/bin/kafka-console-consumer.sh --bootstrap-server kafka:9092 --from-beginning --topic austin

新增一個窗口,啟動一個生產者:

docker exec -it kafka sh
$KAFKA_HOME/bin/kafka-console-producer.sh --topic=austin --broker-list kafka:9092

kafka >>> Java程序驗證

引入Kafka依賴(SpringBoot有默認的版本,不需要寫version)

<dependency>
  <groupId>org.springframework.kafka</groupId>
  <artifactId>spring-kafka</artifactId>
</dependency>

yml配置文件

# 以逗號分隔的主機:端口對列表,用于建立與Kafka群集的初始連接
spring.kafka.bootstrap-servers=xx.xx.xx.xx:9092
#如果該值大于零時,表示啟用重試失敗的發(fā)送次數(shù)
spring.kafka.producer.retries=3
#每當多個記錄被發(fā)送到同一分區(qū)時,生產者將嘗試將記錄一起批量處理為更少的請求,
#這有助于提升客戶端和服務器上的性能,此配置控制默認批量大?。ㄒ宰止?jié)為單位),默認值為16384
spring.kafka.producer.batch-size=16384
#生產者可用于緩沖等待發(fā)送到服務器的記錄的內存總字節(jié)數(shù),默認值為33554432
spring.kafka.producer.buffer-memory=33554432
#procedure要求leader在考慮完成請求之前收到的確認數(shù),用于控制發(fā)送記錄在服務端的持久化,其值可以為如下:
#acks = 0 如果設置為零,則生產者將不會等待來自服務器的任何確認,該記錄將立即添加到套接字緩沖區(qū)并視為已發(fā)送。在這種情況下,無法保證服務器已收到記錄,并且重試配置將不會生效(因為客戶端通常不會知道任何故障),為每條記錄返回的偏移量始終設置為-1。
#acks = 1 這意味著leader會將記錄寫入其本地日志,但無需等待所有副本服務器的完全確認即可做出回應,在這種情況下,如果leader在確認記錄后立即失敗,但在將數(shù)據(jù)復制到所有的副本服務器之前,則記錄將會丟失。
#acks = all 這意味著leader將等待完整的同步副本集以確認記錄,這保證了只要至少一個同步副本服務器仍然存活,記錄就不會丟失,這是最強有力的保證,這相當于acks = -1的設置。
#可以設置的值為:all, -1, 0, 1
spring.kafka.producer.acks=1
#key的Serializer類,實現(xiàn)類實現(xiàn)了接口org.apache.kafka.common.serialization.Serializer
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
#用于標識此使用者所屬的使用者組的唯一字符串。
spring.kafka.consumer.group-id=default‐group
#如果為true,則消費者的偏移量將在后臺定期提交,默認值為true
spring.kafka.consumer.enable-auto-commit=false
#當Kafka中沒有初始偏移量或者服務器上不再存在當前偏移量時該怎么辦,默認值為latest,表示自動將偏移重置為最新的偏移量
#可選的值為latest, earliest, none
spring.kafka.consumer.auto-offset-reset=earliest
#值的反序列化器類,實現(xiàn)類實現(xiàn)了接口org.apache.kafka.common.serialization.Deserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
#密鑰的反序列化器類,實現(xiàn)類實現(xiàn)了接口org.apache.kafka.common.serialization.Deserializer
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
#偵聽器的AckMode,參見https://docs.spring.io/spring-kafka/reference/htmlsingle/#committing-offsets
#當enable.auto.commit的值設置為false時,該值會生效;為true時不會生效
spring.kafka.listener.ack-mode=manual_immediate

定義一個實體類:

@Data
@Accessors(chain = true)
public class UserLog {
    private String username;
    private String userid;
    private String state;
}

定義生產者(austin是topicName):

@Component
public class UserLogProducer {
    @Autowired
    private KafkaTemplate kafkaTemplate;

    /**
     * 發(fā)送數(shù)據(jù)   
     * @param userid
     */
    public void sendLog(String userid){
        UserLog userLog = new UserLog();
        userLog.setUsername("jhp").setUserid(userid).setState("0");
        System.err.println("發(fā)送用戶日志數(shù)據(jù):"+userLog);
        kafkaTemplate.send("austin", JSON.toJSONString(userLog));
    }
}

定義消費者:

@Component
@Slf4j
public class UserLogConsumer {
    @KafkaListener(topics = {"austin"},groupId = "austinGroup2")
    public void consumer(ConsumerRecord<?,?> consumerRecord){
        //判斷是否為null
        Optional<?> kafkaMessage = Optional.ofNullable(consumerRecord.value());
        log.error(">>>austinGroup2>>>>>>> record =" + kafkaMessage);
        if(kafkaMessage.isPresent()){
            //得到Optional實例中的值
            Object message = kafkaMessage.get();
            System.err.println("消費消息:"+message);
        }

    }

    /**
     * 不指定消費組消費
     *
     * @param record
     * @param ack
     */
    @KafkaListener(topics = "austin",groupId = "austinGroup1")
    public void listenerOne(ConsumerRecord<?,?> record, Acknowledgment ack) {
        //判斷是否為null
        Optional<?> kafkaMessage = Optional.ofNullable(record.value());
        log.error(">>>austinGroup1>>>>>>> record =" + kafkaMessage);
        if(kafkaMessage.isPresent()){
            //得到Optional實例中的值
            Object message = kafkaMessage.get();
            System.err.println("消費消息:"+message);
        }
        //手動提交offset
        ack.acknowledge();
    }
}

定義接口:

@RestController
public class KafkaTestController {

    @Autowired
    private UserLogProducer userLogProducer;

    /**
     * test insert
     */
    @GetMapping("/kafka/insert")
    public String insert(String userId) {
        userLogProducer.sendLog(userId);
        return null;
    }

}

測試:http://localhost:8080/kafka/insert?userId=33

發(fā)送用戶日志數(shù)據(jù):UserLog(username=jhp, userid=33, state=0)
消費消息:{"state":"0","userid":"33","username":"jhp"}
消費消息:{"state":"0","userid":"33","username":"jhp"}
2023-03-20 02:00:59.269 ERROR 56940 --- [ntainer#0-0-C-1] c.e.kafkademo.config.UserLogConsumer     : >>>austinGroup2>>>>>>> record =Optional[{"state":"0","userid":"33","username":"jhp"}]
2023-03-20 02:00:59.270 ERROR 56940 --- [ntainer#1-0-C-1] c.e.kafkademo.config.UserLogConsumer     : >>>austinGroup1>>>>>>> record =Optional[{"state":"0","userid":"33","username":"jhp"}]

05、Redis

Redis >>> compose 文件

安裝Redis的環(huán)境跟上次Kafka是一樣的,為了方便我就繼續(xù)用docker-compose的方式來進行
首先,我們新建一個文件夾redis,然后在該目錄下創(chuàng)建出data文件夾、redis.conf文件和docker-compose.yaml文件

redis.conf文件的內容如下(后面的配置可在這更改,比如requirepass 我指定的密碼為123456)

protected-mode no
port 6379
timeout 0
save 900 1 
save 300 10
save 60 10000
rdbcompression yes
dbfilename dump.rdb
dir /data
appendonly yes
appendfsync everysec
requirepass 123456

docker-compose.yaml的文件內容如下:

version: '3'
services:
  redis:
    image: redis:latest
    container_name: redis
    restart: always
    ports:
      - 6379:6379
    volumes:
      - ./redis.conf:/usr/local/etc/redis/redis.conf:rw
      - ./data:/data:rw
    command:
      /bin/bash -c "redis-server /usr/local/etc/redis/redis.conf "

配置的工作就完了,如果是云服務器,記得開redis端口6379

Redis >>> 啟動Redis

啟動Redis跟之前安裝Kafka的時候就差不多

# docker-compose up解決錯誤ERROR: Couldn't connect to Docker daemon at http+docker://localhost - is it running?  
# If it's at a non-standard location, specify the URL with the DOCKER_HOST environment variable.
#如果發(fā)生以上錯誤提示,可能是:docker服務沒啟動,那就啟動(命令如下:sudo systemctl start docker)

docker-compose up -d

docker ps

docker exec -it redis redis-cli

進入redis客戶端了之后,我們想看驗證下是否正常。(在正式輸入命令之前,我們需要通過密碼校驗,在配置文件下配置的密碼是123456)

auth 123456

然后隨意看看命令是不是正常

set 1 1
get 1
keys *

Redis >>> Java程序驗證

在SpringBoot環(huán)境下,使用Redis就非常簡單了(再次體現(xiàn)出使用SpringBoot的好處)。我們只需要在pom文件下引入對應的依賴,并且在配置文件下配置host/port和password就搞掂了。

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>
最后編輯于
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內容

友情鏈接更多精彩內容