項(xiàng)目概況
[????????????????]
點(diǎn)這里,查看所有項(xiàng)目
[????????????????]
數(shù)據(jù)類型
北京歷史天氣數(shù)據(jù)
開發(fā)環(huán)境
centos7
軟件版本
python3.8.18、hadoop3.2.0、spark3.1.2、mysql5.7.38、scala2.12.18、jdk8、kafka2.8.2
開發(fā)語言
python
開發(fā)流程
數(shù)據(jù)上傳(hdfs)->數(shù)據(jù)分析(spark)->數(shù)據(jù)寫kafka(python)->實(shí)時(shí)分析(spark)->數(shù)據(jù)存儲(chǔ)(mysql)->后端(flask)->前端(html+js+css)
可視化圖表

2025-06-14_190858.jpg

2025-06-14_190904.jpg
操作步驟
python安裝包
pip3 install kafka-python==2.0.2 -i https://pypi.tuna.tsinghua.edu.cn/simple
pip3 install pandas==2.0.3 -i https://pypi.tuna.tsinghua.edu.cn/simple
pip3 install flask==3.0.0 -i https://pypi.tuna.tsinghua.edu.cn/simple
pip3 install flask-cors==4.0.1 -i https://pypi.tuna.tsinghua.edu.cn/simple
pip3 install pyecharts==2.0.4 -i https://pypi.tuna.tsinghua.edu.cn/simple
pip3 install pymysql==1.1.0 -i https://pypi.tuna.tsinghua.edu.cn/simple
啟動(dòng)MySQL
# 查看mysql是否啟動(dòng) 啟動(dòng)命令: systemctl start mysqld.service
systemctl status mysqld.service
# 進(jìn)入mysql終端
# MySQL的用戶名:root 密碼:123456
# MySQL的用戶名:root 密碼:123456
# MySQL的用戶名:root 密碼:123456
mysql -uroot -p123456
啟動(dòng)Hadoop
# 離開安全模式: hdfs dfsadmin -safemode leave
# 啟動(dòng)hadoop
bash /export/software/hadoop-3.2.0/sbin/start-hadoop.sh
啟動(dòng)kafka
# 啟動(dòng)zookeeper
sh /export/software/kafka_2.12-2.8.2/bin/zookeeper-server-start.sh -daemon /export/software/kafka_2.12-2.8.2/config/zookeeper.properties
# 啟動(dòng)kafka
sh /export/software/kafka_2.12-2.8.2/bin/kafka-server-start.sh -daemon /export/software/kafka_2.12-2.8.2/config/server.properties
# 創(chuàng)建topic
/export/software/kafka_2.12-2.8.2/bin/kafka-topics.sh --create --topic weather --replication-factor 1 --partitions 1 --zookeeper master:2181
# 啟動(dòng)消費(fèi)者
/export/software/kafka_2.12-2.8.2/bin/kafka-console-consumer.sh --bootstrap-server master:9092 --topic weather
# 關(guān)閉kafka
# sh /export/software/kafka_2.12-2.8.2/bin/kafka-server-stop.sh
# 關(guān)閉zookeeper
# sh /export/software/kafka_2.12-2.8.2/bin/zookeeper-server-stop.sh
準(zhǔn)備目錄
mkdir -p /data/jobs/project/
cd /data/jobs/project/
# 上傳 "data" 目錄下的 "beijing_weather_data.csv" 文件
head -5 beijing_weather_data.csv
上傳文件到hdfs
cd /data/jobs/project/
hdfs dfs -mkdir -p /data/source/
hdfs dfs -rm -r /data/source/*
hdfs dfs -put -f beijing_weather_data.csv /data/source/
hdfs dfs -ls /data/source/
創(chuàng)建MySQL庫
CREATE DATABASE IF NOT EXISTS echarts CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;
spark數(shù)據(jù)分析
cd /data/jobs/project/
# 上傳 "pyspark" 目錄下的 "data_process.py" 文件
spark-submit \
--master local[*] \
--jars /export/software/spark-3.1.2-bin-hadoop3.2/jars/mysql-connector-j-8.0.33.jar \
--driver-class-path /export/software/spark-3.1.2-bin-hadoop3.2/jars/mysql-connector-j-8.0.33.jar \
/data/jobs/project/data_process.py /data/source/
# 可以進(jìn)入MySQL進(jìn)行校驗(yàn)
# select * from weather_info limit 10;
# select * from weather_year_h_temp limit 10;
# select * from weather_min_max_temp limit 10;
spark實(shí)時(shí)計(jì)算
cd /data/jobs/project/
# 上傳 "pyspark" 目錄下的 "data_process_realtime.py" 文件
# 上傳 "pyspark" 目錄下的 "spark-sql-kafka-0-10_2.12-3.1.2.jar" 文件
spark-submit \
--master local[*] \
--jars /data/jobs/project/spark-sql-kafka-0-10_2.12-3.1.2.jar,/export/software/spark-3.1.2-bin-hadoop3.2/jars/mysql-connector-j-8.0.33.jar \
--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2,com.mysql:mysql-connector-j:8.0.33 \
--driver-class-path /export/software/spark-3.1.2-bin-hadoop3.2/jars/mysql-connector-j-8.0.33.jar \
/data/jobs/project/data_process_realtime.py
kafka生產(chǎn)者_(dá)讀取csv寫kafka
cd /data/jobs/project/
# 上傳 "pyspark" 目錄下的 "csv_to_kafka.py" 文件
# 向kafka中發(fā)送數(shù)據(jù)
python3 csv_to_kafka.py
啟動(dòng)可視化
mkdir -p /data/jobs/project/myapp/
cd /data/jobs/project/myapp/
# 上傳 "可視化" 目錄下的 "所有" 文件/文件夾
# windows本地運(yùn)行: python app.py
python3 app.py pro