代碼整體分為兩個部分:造數(shù)據(jù)和發(fā)送數(shù)據(jù)。
在造數(shù)據(jù)部分,我們需要注意到其中使用了shot類型的數(shù)據(jù)。由于Python中沒有直接的shot類型,我們可以通過限制int類型并將其轉(zhuǎn)換為相應(yīng)的十六進制數(shù)值來替代。此外,在造數(shù)據(jù)函數(shù)中還包含了取反轉(zhuǎn)hex、有無符號和浮點型轉(zhuǎn)hex的功能。
在發(fā)送數(shù)據(jù)部分,我們建立連接并設(shè)置發(fā)送間隔,然后通過TCP或UDP發(fā)送數(shù)據(jù)。在TCP通信中,為了正確關(guān)閉連接,我們不僅需要調(diào)用close方法,還需要調(diào)用shutdown方法以發(fā)送揮手包。而在UDP通信中,發(fā)送數(shù)據(jù)相對簡單,不需要過多考慮異常情況。對于串口通信,我們同樣可以選擇合適的串口,并根據(jù)自定義協(xié)議將造好的數(shù)據(jù)按照一定的分包方式發(fā)送出去。
優(yōu)化的思路如下:
結(jié)合造數(shù)據(jù)和發(fā)送數(shù)據(jù)的部分,因為它們使用的數(shù)據(jù)是相同的。這樣可以減少重復代碼,提高代碼的可讀性和可維護性。
使用Python的struct模塊來編碼和解碼二進制數(shù)據(jù)。struct模塊能夠更高效地實現(xiàn)數(shù)據(jù)的打包和解包操作,使代碼更加簡潔和可靠。
在發(fā)送數(shù)據(jù)時,使用socket模塊的sendall()函數(shù)發(fā)送完整的消息,并通過頭部信息指示消息的長度,確保接收方能夠正確接收到完整的消息。
在處理串口通信時,使用PySerial庫來簡化串口操作。通過PySerial,可以方便地進行串口初始化、讀寫串口、設(shè)置波特率等操作,提高開發(fā)效率。
在數(shù)據(jù)解析過程中,可以考慮使用Pandas庫來進行數(shù)據(jù)處理。Pandas提供了DataFrame等數(shù)據(jù)結(jié)構(gòu),可以方便地對數(shù)據(jù)進行過濾、排序、分組、統(tǒng)計等操作,減少手動處理的工作量。
import time
from socket import *
import struct
import math
import datetime
import threading
def type_hex(value_, type_, Positive_and_Negative=0) -> str:
"""
輸入值和數(shù)據(jù)類型 默認為正數(shù) 并根據(jù)對應(yīng)的解析方式返回16進制的值
:param value_: 值
:param type_: 數(shù)據(jù)類型
:param Positive_and_Negative: 正負項
:return: 16進制的值
"""
if Positive_and_Negative == 1:
if type_ == 'f':
data10 = float(value_)
data10hex = float_to_hex(data10)[2:]
data10hex = bin(int(data10hex, 16))[2:]
data10hex = hex(int('1' + data10hex.rjust(31, '0'), 2))[2:]
return data10hex.rjust(8, '0')
elif type_ == 'i':
data10 = int(value_)
return the_not(data10, 4)
elif type_ == 's':
data10 = int(value_)
return the_not(data10, 2)
elif type_ == 'c':
data10 = hex(int(value_))[2:]
return data10.rjust(2, '0')
elif Positive_and_Negative == 0:
if type_ == 'f':
data10 = float(value_)
data10hex = float_to_hex(data10)[2:]
return data10hex.rjust(8, '0')
elif type_ == 'i':
data8hex = hex(int(value_))[2:]
return data8hex.rjust(8, '0')
elif type_ == 's':
data8hex = hex(int(value_))[2:]
return data8hex.rjust(4, '0')
elif type_ == 'c':
data10 = hex(int(value_))[2:]
return data10.rjust(2, '0')
def the_not(value, pos):
"""
指定位數(shù)取反
:param value:數(shù)據(jù)
:param pos: 位數(shù)
leng 傳遞二進制長度 - 1
將 獲取到的數(shù)據(jù)轉(zhuǎn)換為2進制后,將數(shù)據(jù)的最高位去除,
如數(shù)據(jù)長度未滿足pos長度將會在高位補零,并將其余所
有位數(shù)0和1置換在從最高位補1并返回
:return:
"""
if value == 0:
value = 1
value = value - 1
value = "{0:b}".format(value)
a = ((pos * 8) - 1)
value = value.rjust(a, '0')
value = value.replace('1', '2')
value = value.replace('0', '1')
value = value.replace('2', '0')
value = '1' + value
value = hex((int(value, 2)))[2:]
value = value.rjust((pos * 2), '0')
return value
# -> object 描述返回值數(shù)據(jù)類型
def Override_override(ov: object, value: object, leng: object) -> object:
"""
更改字符串指定位置的字符
:param ov: 被更改的老字符串
:param value: 更改的值
:param leng: 起始位置
:return: 更改后的字符串
"""
len_ = int(len(value))
# print(f"ov值為:{ov},新的值為:{ov[:leng * 2] + value + ov[leng + (leng * 2):]}")
return ov[:leng * 2] + value + ov[len_ + (leng * 2):]
def float_to_hex(f):
return hex(struct.unpack('<I', struct.pack('<f', f))[0])
def print_progress(num, cont):
# 進度條
total_length = 30
progress = int(cont / num * total_length)
bar = '['+'■' * progress + ' ' * (total_length - progress) + ']'
print(f'Progress:{bar}{cont}/{num}', end='\r')
def type_hex_new(value_, type_, Positive_and_Negative=0) -> str:
"""
輸入值和數(shù)據(jù)類型 默認為正數(shù) 并根據(jù)對應(yīng)的解析方式返回 置返的16進制的值
:param value_:
:param type_:
:param Positive_and_Negative:
:return:
"""
hex_ = type_hex(value_, type_, Positive_and_Negative=Positive_and_Negative)
if type_ == 'f':
a = int(hex_[0:2], 16)
b = int(hex_[2:4], 16) * 256
c = int(hex_[4:6], 16) * (256 * 256)
d = int(hex_[6:8], 16) * (256 * 256 * 256)
max_ = hex(int(a + b + c + d))[2:]
max_ = max_.rjust(8, '0')
return max_
elif type_ == 'i':
a = int(hex_[0:2], 16)
b = int(hex_[2:4], 16) * 256
c = int(hex_[4:6], 16) * (256 * 256)
d = int(hex_[6:8], 16) * (256 * 256 * 256)
max_ = hex(int(a + b + c + d))[2:]
max_ = max_.rjust(8, '0')
return max_
# = '0x' + hex_[0:2]
elif type_ == 's':
a = int(hex_[0:2], 16)
b = int(hex_[2:4], 16) * 256
max_ = hex(int(a + b))[2:]
max_ = max_.rjust(4, '0')
return max_
elif type_ == 'c':
return hex_
def doConnect(host, port):
# TCP
tcp_soket = socket(AF_INET, SOCK_STREAM)
# tcp_soket = socket(AF_INET, SOCK_DGRAM)
try:
tcp_soket.connect((host, port))
except:
...
return tcp_soket
def Generate_data(message_len: int, type_='00', models: str = '01') -> object:
"""
輸入 數(shù)據(jù)長度 和數(shù)據(jù)類型,以及分機號,返回一個長度400的數(shù)組
data_type:更改為0 返回的數(shù)據(jù)為1024隨機數(shù)據(jù)類型,1為自定義數(shù)據(jù)
:type type_: object
:param message_len: 報文長度 1 - 1024 int類型
:param type_: 報文類型 01 00 str
:param models: 分機號 為 01 - 04 str
:return:返回制造的數(shù)據(jù)數(shù)組
"""
message_list = []
data_num = 0
for z in range(-200, 201):
z = int(math.fabs(z))
a = int((z * z) / (2 * 100))
data_type = 1
if data_type == 1:
# char
Data1 = type_hex(str(z % 2), 'c')
# shoar
data2 = type_hex_new(str(31267 + a), 's')
# float
data3 = type_hex_new(str((z * z) / (2 * 100)), 'f')
# int
data4 = type_hex_new(str(100000 + (z * 100000)), 'i')
# char
data5 = type_hex(str(((z + 1) % 2) * 5), 'c')
# float
data6 = type_hex_new(str(0.99 + (z * 10)), 'f')
# shoar 負數(shù) 遞增
data7 = type_hex_new(str(31267 - z), 's', Positive_and_Negative=1)
# int1
data8 = type_hex_new(str(800000 + (a * 50000)), 'i')
# int 負數(shù)
data9 = type_hex_new(str(100000 + (z * 100000)), 'i', Positive_and_Negative=1)
# float 負數(shù)
data10 = type_hex_new(str((z * z) / (2 * 100)), 'f', Positive_and_Negative=1)
# char 55
data11 = type_hex_new(str(z % 2 * 85), 'c')
# shoar 負數(shù)遞減
data12 = type_hex_new(str(31267 + z), 's', Positive_and_Negative=1)
# float 改
data13 = type_hex_new(str(0.4 + (z * 2)), 'f')
# int 改
data14 = type_hex_new(str(100000 + (z * 83000)), 'i')
# float 改
data15 = type_hex_new(str(0.3 + (z * 0.1)), 'f')
# shoar
data16 = type_hex_new(str(150 * z), 's')
# char
data17 = type_hex_new(str(z % 2 * 165), 'c')
# shoar 負數(shù)
data18 = type_hex_new(str((30 * a)), 's')
# int
data19 = type_hex_new(str((a * 20000)), 'i')
# char
data20 = type_hex(str((z % 2) * 90), 'c')
Data_Sum = '0'
#設(shè)置長度
Data_Sum = Data_Sum.ljust((2 * message_len), "0")
Data_Sum = Override_override(Data_Sum, Data1, 0)
Data_Sum = Override_override(Data_Sum, data2, 1)
Data_Sum = Override_override(Data_Sum, data3, 3)
Data_Sum = Override_override(Data_Sum, data4, 7)
Data_Sum = Override_override(Data_Sum, data5, 11)
Data_Sum = Override_override(Data_Sum, data6, 12)
Data_Sum = Override_override(Data_Sum, data7, 16)
Data_Sum = Override_override(Data_Sum, data8, 18)
Data_Sum = Override_override(Data_Sum, data9, 22)
Data_Sum = Override_override(Data_Sum, data10, 26)
Data_Sum = Override_override(Data_Sum, data11, 30)
Data_Sum = Override_override(Data_Sum, data12, 31)
Data_Sum = Override_override(Data_Sum, data13, 33)
Data_Sum = Override_override(Data_Sum, data14, 37)
Data_Sum = Override_override(Data_Sum, data15, 41)
Data_Sum = Override_override(Data_Sum, data16, 45)
Data_Sum = Override_override(Data_Sum, data17, 47)
Data_Sum = Override_override(Data_Sum, data18, 48)
Data_Sum = Override_override(Data_Sum, data19, 50)
Data_Sum = Override_override(Data_Sum, data20, 54)
for value_w in range(1, 18):
Data_Sum = Override_override(Data_Sum, Data_Sum[0:55 * 2], 55 * value_w)
start = 990
Data_Sum = Override_override(Data_Sum, Data1, start + 0)
Data_Sum = Override_override(Data_Sum, data2, start + 1)
Data_Sum = Override_override(Data_Sum, data3, start + 3)
Data_Sum = Override_override(Data_Sum, data4, start + 7)
Data_Sum = Override_override(Data_Sum, data5, start + 11)
Data_Sum = Override_override(Data_Sum, data6, start + 12)
Data_Sum = Override_override(Data_Sum, data7, start + 16)
Data_Sum = Override_override(Data_Sum, data8, start + 18)
Data_Sum = Override_override(Data_Sum, data9, start + 22)
Data_Sum = Override_override(Data_Sum, data10, start + 26)
Data_Sum = Override_override(Data_Sum, data11, start + 30)
Data_Sum = Override_override(Data_Sum, data12, start + 31)
Data_Sum = Override_override(Data_Sum, data20, start + 33)
Data_Sum = Override_override(Data_Sum, Data_Sum, 1024)
Data_Sum = Override_override(Data_Sum, Data_Sum, 1024 * 2)
Data_Sum = Override_override(Data_Sum, Data_Sum, 1024 * 3)
Data_Sum = Override_override(Data_Sum, Data_Sum, 1024 * 4)
Data_Sum = Override_override(Data_Sum, Data_Sum, 1024 * 5)
Data_Sum = Override_override(Data_Sum, Data_Sum, 1024 * 6)
Data_Sum = Override_override(Data_Sum, Data_Sum, 1024 * 7)
Data_Sum = Override_override(Data_Sum, Data_Sum, 1024 * 8)
Data_Sum = Override_override(Data_Sum, Data_Sum, 1024 * 9)
elif data_type == 0:
data_num = data_num + 1
if data_num > 255:
break
# int 正
# data_1 = type_hex_new(str(100000 + (z * 100000)), 'i')
# # flot 負數(shù)
# data_1 = type_hex_new(str((z * z) / (2 * 100)), 'f', Positive_and_Negative=1)
# # char
# data_1 = type_hex(str(z % 2), 'c')
# data_1 = type_hex_new(str(31267 - z), 's', Positive_and_Negative=1)
# 報文長度
Data_Sum = str()
data_num1 = hex(data_num)[2:].rjust(2, '0')
# print(data_num)
for i in range(message_len + 1):
Data_Sum = Data_Sum + data_num1
# Data_Sum = Data_Sum.rjust(2,message_len, data_num)
# 填滿1024
# for value_w in range(0, message_len, 2):
#
# Data_Sum = Override_override(Data_Sum, hex(value_w), 2)
# 設(shè)置報文類型
Message_type = type_
#
# if type_ == '00' and '01':
# Message_type = type_
# else:
# if z % 2 == 0:
# Message_type = '00'
# else:
# Message_type = '01'
# 設(shè)置長度
byte_len = int(message_len) * 2
Data_Sum = Data_Sum[0:byte_len]
Message_len = hex(int(byte_len / 2))[2:].rjust(4, '0')
# test = (z, '=======', '1', Data_Sum)
#
# with open('test.txt', 'a') as f:
# f.write(str(test) + '\n')
data_sum = ()
message = '01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 %s %s %s ' \
'00 00 00 00 00 00 %s 30 30' % (models, Message_type, Message_len, Data_Sum)
message_list.append(Data_Sum)
return message_list
class Tup0001:
def __init__(self, serve_ip, serve_port):
self.serve_ip = serve_ip
self.serve_port = serve_port
def send_(self, send_gap=0.05, num_=1000000000000000000000000000, time_=1000000000000000000000000000,
data=None):
"""
不設(shè)置默認為 100000000000000000000000000000000
:param time_: 發(fā)送時間
:param num_: 發(fā)送個數(shù)
:param send_gap: 發(fā)送間隔浮點數(shù)
:param data: 導入數(shù)組data
:return:
"""
# 建立連接
datalen = len(data[0].replace(' ', ''))/2
tcp_soket = doConnect(self.serve_ip, self.serve_port)
send_port = tcp_soket.getsockname()
# 記錄開始時間 data易于查看 time用于計算
now1_time = datetime.datetime.now()
Labial_time = time.time()
time_start = Labial_time
# 設(shè)置發(fā)送時間
Send_time = Labial_time + time_ + 1
# 設(shè)置發(fā)送個數(shù)
Send_num = num_
# 12小時發(fā)送時間 === 43200
print("------------------------------------------------------------------")
# 打印當前時間
print(now1_time)
# 計數(shù)器
counter = 0
while Labial_time <= Send_time - 1 and counter <= Send_num - 1:
# 每400包打印一次信息
# print(Labial_time, '發(fā)送個數(shù):', counter,'開始時間 =====', now1_time)
l = len(data)
for z in range(l):
Labial_time = time.time()
if Labial_time >= Send_time - 1 or counter > Send_num - 1:
break
try:
tcp_soket.send(bytes.fromhex(data[z]))
counter = counter + 1
except ConnectionAbortedError:
counter = counter - 1
now_time = datetime.datetime.now()
print('\r\nsocket error,do reconnect ', 'Aborted')
print(f"分機號{data[0].replace(' ', '')[60:62]}")
print(f"斷開端口號為:{send_port}")
print('報文個數(shù)=====', counter)
print('開始時間 =====', now1_time)
print('出錯時間 =====', now_time)
print(f"上位機應(yīng)接收:{(datalen - 20) * counter}B")
time.sleep(10)
tcp_soket = doConnect(self.serve_ip, self.serve_port)
send_port = tcp_soket.getsockname()
tcp_soket.send(bytes.fromhex(data[z]))
counter = counter + 1
except ConnectionResetError:
counter = counter - 1
now_time = datetime.datetime.now()
print('\r\nsocket error,do reconnect ', 'Reseterror')
print(f"分機號{data[0].replace(' ', '')[60:62]}")
print(f"斷開端口號為:{send_port}")
print('報文個數(shù)=====', counter)
print('開始時間 =====', now1_time)
print('出錯時間 =====', now_time)
print(f"上位機應(yīng)接收:{(datalen - 20) * counter}B")
time.sleep(10)
# timeee = timeee - 3
tcp_soket = doConnect(self.serve_ip, self.serve_port)
send_port = tcp_soket.getsockname()
tcp_soket.send(bytes.fromhex(data[z]))
counter = counter + 1
# 發(fā)送間隔
time.sleep(send_gap)
print_progress(num_, counter)
now_time = datetime.datetime.now()
# print('time :', now_time)
tcp_soket.shutdown(1)
time.sleep(0.1)
tcp_soket.close()
now_time = datetime.datetime.now()
send_time = time.time() - time_start
print('報文個數(shù)=====', counter)
print(f"上位機發(fā)送數(shù)量:{(datalen) * counter}B")
print(f"分機號{data[0].replace(' ', '')[60:62]}下位機應(yīng)接收:{(datalen - 20) * counter}B")
print(f"發(fā)送端口號為:{send_port}")
print(f'傳輸速率為:{(datalen) * counter / send_time / 1024 / 1024} MB/S')
print(f'發(fā)送頻率為{counter / send_time}條/秒')
print('開始時間 =====', now1_time)
print('結(jié)束時間 =====', now_time)
print("------------------------------------------------------------------")
if __name__ == '__main__':
ip_port = [('192.188.1.200', 52245), ('192.188.1.223', 10241), ('192.168.2.175', 502), ("192.168.2.253", 10268)]
w = 1
prject = Tup0001(ip_port[w][0], ip_port[w][1])
data1 = Generate_data(1024, type_='01', models='01')
data2 = Generate_data(1, type_='01', models='02')
# data3 = Generate_data(1024*10, type_='01', models='03')
# data4 = Generate_data(1024*10, type_='01', models='04')
# data5 = Generate_data(1024*8, type_='00', models='05')
# data6 = Generate_data(1024 * 8, type_='00', models='06')
# data7 = Generate_data(1024 * 8, type_='00', models='07')
# data8 = Generate_data(1024 * 8, type_='00', models='08')
# data9 = Generate_data(1024 * 8, type_='01', models='09')
thread1 = threading.Thread(target=prject.send_,
kwargs={"num_": 3000, "send_gap": 0.05, "data": data1})
thread2 = threading.Thread(target=prject.send_,
kwargs={"num_": 3000, "send_gap": 0.2, "data": data2})
thread3 = threading.Thread(target=prject.send_,
kwargs={"num_": 3000, "send_gap": 0.2, "data": data2})
thread4 = threading.Thread(target=prject.send_,
kwargs={"num_": 3000, "send_gap": 0.2, "data": data2})
# thread5 = threading.Thread(target=prject.send_,
# kwargs={"num_": 3000, "send_gap": 0.001, "data": data5})
# thread6 = threading.Thread(target=prject.send_,
# kwargs={"num_": 3000, "send_gap": 0.001, "data": data6})
# thread7 = threading.Thread(target=prject.send_,
# kwargs={"num_": 3000, "send_gap": 0.001, "data": data7})
# thread8 = threading.Thread(target=prject.send_,
# kwargs={"num_": 3000, "send_gap": 0.001, "data": data8})
# thread9 = threading.Thread(target=prject.send_,
# kwargs={"num_": 3000, "send_gap": 0.001, "data": data9})
# Plist = [thread1, thread2, thread3, thread4, thread5, thread6, thread7, thread8]
# Plist = [thread1, thread2]
Plist = [thread1]
# # #,thread2,thread3,thread4
# # # print(data1[0])
# # # print(data3[0])
for i in Plist:
i.start()
# i.join()
#
for j in Plist:
j.join()
UDP根據(jù)指定協(xié)議發(fā)包
import time
from socket import *
import struct
import math
import datetime
import threading
def the_not(value, pos):
"""
指定位數(shù)取反
:param value:數(shù)據(jù)
:param pos: 位數(shù)
leng 傳遞二進制長度 - 1
將 獲取到的數(shù)據(jù)轉(zhuǎn)換為2進制后,將數(shù)據(jù)的最高位去除,
如數(shù)據(jù)長度未滿足pos長度將會在高位補零,并將其余所
有位數(shù)0和1置換在從最高位補1并返回
:return:
"""
if value == 0:
value = 1
value = value - 1
value = "{0:b}".format(value)
a = ((pos * 8) - 1)
value = value.rjust(a, '0')
value = value.replace('1', '2')
value = value.replace('0', '1')
value = value.replace('2', '0')
value = '1' + value
value = hex((int(value, 2)))[2:]
value = value.rjust((pos * 2), '0')
return value
# -> object 描述返回值數(shù)據(jù)類型
def Override_override(ov: object, value: object, leng: object) -> object:
"""
更改字符串指定位置的字符
:param ov: 被更改的老字符串
:param value: 更改的值
:param leng: 起始位置
:return: 更改后的字符串
"""
len_ = int(len(value))
# print(f"ov值為:{ov},新的值為:{ov[:leng * 2] + value + ov[leng + (leng * 2):]}")
return ov[:leng * 2] + value + ov[len_ + (leng * 2):]
def float_to_hex(f):
return hex(struct.unpack('<I', struct.pack('<f', f))[0])
def type_hex(value_, type_, Positive_and_Negative=0) -> str:
"""
輸入值和數(shù)據(jù)類型 默認為正數(shù) 并根據(jù)對應(yīng)的解析方式返回16進制的值
:param value_: 值
:param type_: 數(shù)據(jù)類型
:param Positive_and_Negative: 正負項
:return: 16進制的值
"""
if Positive_and_Negative == 1:
if type_ == 'f':
data10 = float(value_)
data10hex = float_to_hex(data10)[2:]
data10hex = bin(int(data10hex, 16))[2:]
data10hex = hex(int('1' + data10hex.rjust(31, '0'), 2))[2:]
return data10hex.rjust(8, '0')
elif type_ == 'i':
data10 = int(value_)
return the_not(data10, 4)
elif type_ == 's':
data10 = int(value_)
return the_not(data10, 2)
elif type_ == 'c':
data10 = hex(int(value_))[2:]
return data10.rjust(2, '0')
elif Positive_and_Negative == 0:
if type_ == 'f':
data10 = float(value_)
data10hex = float_to_hex(data10)[2:]
return data10hex.rjust(8, '0')
elif type_ == 'i':
data8hex = hex(int(value_))[2:]
return data8hex.rjust(8, '0')
elif type_ == 's':
data8hex = hex(int(value_))[2:]
return data8hex.rjust(4, '0')
elif type_ == 'c':
data10 = hex(int(value_))[2:]
return data10.rjust(2, '0')
def type_hex_new(value_, type_, Positive_and_Negative=0) -> str:
"""
輸入值和數(shù)據(jù)類型 默認為正數(shù) 并根據(jù)對應(yīng)的解析方式返回 置返的16進制的值
:param value_:
:param type_:
:param Positive_and_Negative:
:return:
"""
hex_ = type_hex(value_, type_, Positive_and_Negative=Positive_and_Negative)
if type_ == 'f':
a = int(hex_[0:2], 16)
b = int(hex_[2:4], 16) * 256
c = int(hex_[4:6], 16) * (256 * 256)
d = int(hex_[6:8], 16) * (256 * 256 * 256)
max_ = hex(int(a + b + c + d))[2:]
max_ = max_.rjust(8, '0')
return max_
elif type_ == 'i':
a = int(hex_[0:2], 16)
b = int(hex_[2:4], 16) * 256
c = int(hex_[4:6], 16) * (256 * 256)
d = int(hex_[6:8], 16) * (256 * 256 * 256)
max_ = hex(int(a + b + c + d))[2:]
max_ = max_.rjust(8, '0')
return max_
# = '0x' + hex_[0:2]
elif type_ == 's':
a = int(hex_[0:2], 16)
b = int(hex_[2:4], 16) * 256
max_ = hex(int(a + b))[2:]
max_ = max_.rjust(4, '0')
return max_
elif type_ == 'c':
return hex_
def doConnect(host, port):
# tcp_soket = socket(AF_INET, SOCK_STREAM)
tcp_soket = socket(AF_INET, SOCK_DGRAM)
try:
tcp_soket.connect((host, port))
except:
...
return tcp_soket
def Generate_data(message_len: int, type_='00', models: str = '01') -> object:
"""
輸入 數(shù)據(jù)長度 和數(shù)據(jù)類型,以及分機號,返回一個長度400的數(shù)組
data_type:更改為0 返回的數(shù)據(jù)為1024隨機數(shù)據(jù)類型,1為自定義數(shù)據(jù)
:type type_: object
:param message_len: 報文長度 1 - 1024 int類型
:param type_: 報文類型 01 00 str
:param models: 分機號 為 01 - 04 str
:return:返回制造的數(shù)據(jù)數(shù)組
"""
message_list = []
data_num = 0
for z in range(-200, 201):
z = int(math.fabs(z))
a = int((z * z) / (2 * 100))
data_type = 1
if data_type == 1:
# char
Data1 = type_hex(str(z % 2), 'c')
# shoar
data2 = type_hex_new(str(31267 + a), 's')
# float
data3 = type_hex_new(str((z * z) / (2 * 100)), 'f')
# int
data4 = type_hex_new(str(100000 + (z * 100000)), 'i')
# char
data5 = type_hex(str(((z + 1) % 2) * 5), 'c')
# float
data6 = type_hex_new(str(0.99 + (z * 10)), 'f')
# shoar 負數(shù) 遞增
data7 = type_hex_new(str(31267 - z), 's', Positive_and_Negative=1)
# int1
data8 = type_hex_new(str(800000 + (a * 50000)), 'i')
# int 負數(shù)
data9 = type_hex_new(str(100000 + (z * 100000)), 'i', Positive_and_Negative=1)
# float 負數(shù)
data10 = type_hex_new(str((z * z) / (2 * 100)), 'f', Positive_and_Negative=1)
# char 55
data11 = type_hex_new(str(z % 2 * 85), 'c')
# shoar 負數(shù)遞減
data12 = type_hex_new(str(31267 + z), 's', Positive_and_Negative=1)
# float 改
data13 = type_hex_new(str(0.4 + (z * 2)), 'f')
# int 改
data14 = type_hex_new(str(100000 + (z * 83000)), 'i')
# float 改
data15 = type_hex_new(str(0.3 + (z * 0.1)), 'f')
# shoar
data16 = type_hex_new(str(150 * z), 's')
# char
data17 = type_hex_new(str(z % 2 * 165), 'c')
# shoar 負數(shù)
data18 = type_hex_new(str((30 * a)), 's')
# int
data19 = type_hex_new(str((a * 20000)), 'i')
# char
data20 = type_hex(str((z % 2) * 90), 'c')
Data_Sum = '0'
#設(shè)置長度
Data_Sum = Data_Sum.ljust((2 * message_len), "0")
Data_Sum = Override_override(Data_Sum, Data1, 0)
Data_Sum = Override_override(Data_Sum, data2, 1)
Data_Sum = Override_override(Data_Sum, data3, 3)
Data_Sum = Override_override(Data_Sum, data4, 7)
Data_Sum = Override_override(Data_Sum, data5, 11)
Data_Sum = Override_override(Data_Sum, data6, 12)
Data_Sum = Override_override(Data_Sum, data7, 16)
Data_Sum = Override_override(Data_Sum, data8, 18)
Data_Sum = Override_override(Data_Sum, data9, 22)
Data_Sum = Override_override(Data_Sum, data10, 26)
Data_Sum = Override_override(Data_Sum, data11, 30)
Data_Sum = Override_override(Data_Sum, data12, 31)
Data_Sum = Override_override(Data_Sum, data13, 33)
Data_Sum = Override_override(Data_Sum, data14, 37)
Data_Sum = Override_override(Data_Sum, data15, 41)
Data_Sum = Override_override(Data_Sum, data16, 45)
Data_Sum = Override_override(Data_Sum, data17, 47)
Data_Sum = Override_override(Data_Sum, data18, 48)
Data_Sum = Override_override(Data_Sum, data19, 50)
Data_Sum = Override_override(Data_Sum, data20, 54)
for value_w in range(1, 18):
Data_Sum = Override_override(Data_Sum, Data_Sum[0:55 * 2], 55 * value_w)
start = 990
Data_Sum = Override_override(Data_Sum, Data1, start + 0)
Data_Sum = Override_override(Data_Sum, data2, start + 1)
Data_Sum = Override_override(Data_Sum, data3, start + 3)
Data_Sum = Override_override(Data_Sum, data4, start + 7)
Data_Sum = Override_override(Data_Sum, data5, start + 11)
Data_Sum = Override_override(Data_Sum, data6, start + 12)
Data_Sum = Override_override(Data_Sum, data7, start + 16)
Data_Sum = Override_override(Data_Sum, data8, start + 18)
Data_Sum = Override_override(Data_Sum, data9, start + 22)
Data_Sum = Override_override(Data_Sum, data10, start + 26)
Data_Sum = Override_override(Data_Sum, data11, start + 30)
Data_Sum = Override_override(Data_Sum, data12, start + 31)
Data_Sum = Override_override(Data_Sum, data20, start + 33)
Data_Sum = Override_override(Data_Sum, Data_Sum, 1024)
Data_Sum = Override_override(Data_Sum, Data_Sum, 1024*2)
Data_Sum = Override_override(Data_Sum, Data_Sum, 1024*3)
Data_Sum = Override_override(Data_Sum, Data_Sum, 1024*4)
Data_Sum = Override_override(Data_Sum, Data_Sum, 1024*5)
Data_Sum = Override_override(Data_Sum, Data_Sum, 1024*6)
Data_Sum = Override_override(Data_Sum, Data_Sum, 1024*7)
Data_Sum = Override_override(Data_Sum, Data_Sum, 1024 * 8)
Data_Sum = Override_override(Data_Sum, Data_Sum, 1024 * 9)
elif data_type == 0:
data_num = data_num + 1
if data_num > 255:
break
# int 正
# data_1 = type_hex_new(str(100000 + (z * 100000)), 'i')
# # flot 負數(shù)
# data_1 = type_hex_new(str((z * z) / (2 * 100)), 'f', Positive_and_Negative=1)
# # char
# data_1 = type_hex(str(z % 2), 'c')
# data_1 = type_hex_new(str(31267 - z), 's', Positive_and_Negative=1)
# 報文長度
Data_Sum = str()
data_num1 = hex(data_num)[2:].rjust(2, '0')
# print(data_num)
for i in range(message_len + 1):
Data_Sum = Data_Sum + data_num1
# Data_Sum = Data_Sum.rjust(2,message_len, data_num)
# 填滿1024
# for value_w in range(0, message_len, 2):
#
# Data_Sum = Override_override(Data_Sum, hex(value_w), 2)
# 設(shè)置報文類型
Message_type = type_
#
# if type_ == '00' and '01':
# Message_type = type_
# else:
# if z % 2 == 0:
# Message_type = '00'
# else:
# Message_type = '01'
# 設(shè)置長度
byte_len = int(message_len) * 2
Data_Sum = Data_Sum[0:byte_len]
Message_len = hex(int(byte_len / 2))[2:].rjust(4, '0')
# test = (z, '=======', '1', Data_Sum)
#
# with open('test.txt', 'a') as f:
# f.write(str(test) + '\n')
data_sum = ()
message = '01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 %s %s %s ' \
'00 00 00 00 00 00 %s 30 30' % (models, Message_type, Message_len, Data_Sum)
message_list.append(Data_Sum)
return message_list
class Tup0001:
def __init__(self, serve_ip, serve_port):
self.serve_ip = serve_ip
self.serve_port = serve_port
def send_(self, send_gap=0.05, num_=1000000000000000000000000000, time_=1000000000000000000000000000,
data=None):
"""
不設(shè)置默認為 100000000000000000000000000000000
:param time_: 發(fā)送時間
:param num_: 發(fā)送個數(shù)
:param send_gap: 發(fā)送間隔浮點數(shù)
:param data: 導入數(shù)組data
:return:
"""
# 建立連接
datalen = len(data[0].replace(' ', ''))/2
tcp_soket = doConnect(self.serve_ip, self.serve_port)
send_port = tcp_soket.getsockname()
# 記錄開始時間 data易于查看 time用于計算
now1_time = datetime.datetime.now()
Labial_time = time.time()
time_start = Labial_time
# 設(shè)置發(fā)送時間
Send_time = Labial_time + time_ + 1
# 設(shè)置發(fā)送個數(shù)
Send_num = num_
# 12小時發(fā)送時間 === 43200
# 打印當前時間
print(now1_time)
# 計數(shù)器
counter = 0
while Labial_time <= Send_time - 1 and counter <= Send_num - 1:
# 每400包打印一次信息
# print(Labial_time, '發(fā)送個數(shù):', counter,'開始時間 =====', now1_time)
l = len(data)
for z in range(l):
Labial_time = time.time()
if Labial_time >= Send_time - 1 or counter > Send_num - 1:
break
try:
tcp_soket.send(bytes.fromhex(data[z]))
counter = counter + 1
except ConnectionAbortedError:
counter = counter - 1
now_time = datetime.datetime.now()
print('\r\nsocket error,do reconnect ', 'Aborted')
print(f"分機號{data[0].replace(' ', '')[60:62]}")
print('報文個數(shù)=====', counter)
print('開始時間 =====', now1_time)
print('出錯時間 =====', now_time)
print(f"上位機應(yīng)接收:{(datalen - 20) * counter}B")
time.sleep(10)
tcp_soket = doConnect(self.serve_ip, self.serve_port)
tcp_soket.send(bytes.fromhex(data[z]))
counter = counter + 1
except ConnectionResetError:
counter = counter - 1
now_time = datetime.datetime.now()
print('\r\nsocket error,do reconnect ', 'Reseterror')
print(f"分機號{data[0].replace(' ', '')[60:62]}")
print('報文個數(shù)=====', counter)
print('開始時間 =====', now1_time)
print('出錯時間 =====', now_time)
print(f"上位機應(yīng)接收:{(datalen - 20) * counter}B")
time.sleep(10)
# timeee = timeee - 3
tcp_soket = doConnect(self.serve_ip, self.serve_port)
tcp_soket.send(bytes.fromhex(data[z]))
counter = counter + 1
# 發(fā)送間隔
time.sleep(send_gap)
now_time = datetime.datetime.now()
# print('time :', now_time)
tcp_soket.shutdown(1)
time.sleep(0.1)
tcp_soket.close()
now_time = datetime.datetime.now()
send_time = time.time() - time_start
print('報文個數(shù)=====', counter)
print(f"上位機發(fā)送數(shù)量:{(datalen) * counter}B")
print(f"分機號{data[0].replace(' ', '')[60:62]}下位機應(yīng)接收:{(datalen - 20) * counter}B")
print(f"端口號為:{send_port}")
print(f'傳輸速率為:{(datalen) * counter / send_time / 1024 / 1024} MB/S')
print(f'發(fā)送頻率為{counter/send_time}條/秒')
print('開始時間 =====', now1_time)
print('結(jié)束時間 =====', now_time)
print("------------------------------------------------------------")
if __name__ == '__main__':
ip_port = [('192.188.1.200', 52245), ('192.188.1.223', 10242), ('10.4.0.9', 8080), ("192.168.2.253", 10268)]
w = 1
prject = Tup0001(ip_port[w][0], ip_port[w][1])
data1 = Generate_data(1, type_='01', models='01')
data2 = Generate_data(1024 * 10, type_='01', models='02')
data3 = Generate_data(1024 * 10, type_='01', models='03')
data4 = Generate_data(1024 * 10, type_='01', models='04')
data5 = Generate_data(1024 * 10, type_='00', models='05')
data6 = Generate_data(1024 * 10, type_='00', models='06')
data7 = Generate_data(1024 * 10, type_='00', models='07')
data8 = Generate_data(1024 * 10, type_='00', models='08')
data9 = Generate_data(1024 * 10, type_='01', models='09')
thread1 = threading.Thread(target=prject.send_,
kwargs={"num_": 3000, "send_gap": 0.05, "data": data1})
thread2 = threading.Thread(target=prject.send_,
kwargs={"num_": 1000, "send_gap": 0.2, "data": data2})
thread3 = threading.Thread(target=prject.send_,
kwargs={"num_": 1000, "send_gap": 0.2, "data": data3})
thread4 = threading.Thread(target=prject.send_,
kwargs={"num_": 1000, "send_gap": 0.2, "data": data4})
thread5 = threading.Thread(target=prject.send_,
kwargs={"num_": 1000, "send_gap": 0.2, "data": data5})
thread6 = threading.Thread(target=prject.send_,
kwargs={"num_": 1000, "send_gap": 0.2, "data": data6})
thread7 = threading.Thread(target=prject.send_,
kwargs={"num_": 1000, "send_gap": 0.2, "data": data7})
thread8 = threading.Thread(target=prject.send_,
kwargs={"num_": 1000, "send_gap": 0.2, "data": data8})
thread9 = threading.Thread(target=prject.send_,
kwargs={"num_": 1000, "send_gap": 0.2, "data": data9})
# Plist = [thread1, thread2, thread3, thread4, thread5, thread6, thread7, thread8]
# # #,thread2,thread3,thread4
# Plist = [thread1, thread2, thread3, thread4]
Plist = [thread1]
# # # print(data1[0])
# # # print(data3[0])
for i in Plist:
i.start()
# i.join()
#
for j in Plist:
j.join()
import serial
import serial.tools.list_ports
import threading
import time
import struct
import math
import datetime
import threading
def the_not(value, pos):
"""
指定位數(shù)取反
:param value:數(shù)據(jù)
:param pos: 位數(shù)
leng 傳遞二進制長度 - 1
將 獲取到的數(shù)據(jù)轉(zhuǎn)換為2進制后,將數(shù)據(jù)的最高位去除,
如數(shù)據(jù)長度未滿足pos長度將會在高位補零,并將其余所
有位數(shù)0和1置換在從最高位補1并返回
:return:
"""
if value == 0:
value = 1
value = value - 1
value = "{0:b}".format(value)
a = ((pos * 8) - 1)
value = value.rjust(a, '0')
value = value.replace('1', '2')
value = value.replace('0', '1')
value = value.replace('2', '0')
value = '1' + value
value = hex((int(value, 2)))[2:]
value = value.rjust((pos * 2), '0')
return value
# -> object 描述返回值數(shù)據(jù)類型
def Override_override(ov: object, value: object, leng: object) -> object:
"""
更改字符串指定位置的字符
:param ov: 被更改的老字符串
:param value: 更改的值
:param leng: 起始位置
:return: 更改后的字符串
"""
len_ = int(len(value))
# print(f"ov值為:{ov},新的值為:{ov[:leng * 2] + value + ov[leng + (leng * 2):]}")
return ov[:leng * 2] + value + ov[len_ + (leng * 2):]
def float_to_hex(f):
return hex(struct.unpack('<I', struct.pack('<f', f))[0])
def type_hex(value_, type_, Positive_and_Negative=0) -> str:
"""
輸入值和數(shù)據(jù)類型 默認為正數(shù) 并根據(jù)對應(yīng)的解析方式返回16進制的值
:param value_: 值
:param type_: 數(shù)據(jù)類型
:param Positive_and_Negative: 正負項
:return: 16進制的值
"""
if Positive_and_Negative == 1:
if type_ == 'f':
data10 = float(value_)
data10hex = float_to_hex(data10)[2:]
data10hex = bin(int(data10hex, 16))[2:]
data10hex = hex(int('1' + data10hex.rjust(31, '0'), 2))[2:]
return data10hex.rjust(8, '0')
elif type_ == 'i':
data10 = int(value_)
return the_not(data10, 4)
elif type_ == 's':
data10 = int(value_)
return the_not(data10, 2)
elif type_ == 'c':
data10 = hex(int(value_))[2:]
return data10.rjust(2, '0')
elif Positive_and_Negative == 0:
if type_ == 'f':
data10 = float(value_)
data10hex = float_to_hex(data10)[2:]
return data10hex.rjust(8, '0')
elif type_ == 'i':
data8hex = hex(int(value_))[2:]
return data8hex.rjust(8, '0')
elif type_ == 's':
data8hex = hex(int(value_))[2:]
return data8hex.rjust(4, '0')
elif type_ == 'c':
data10 = hex(int(value_))[2:]
return data10.rjust(2, '0')
def type_hex_new(value_, type_, Positive_and_Negative=0) -> str:
"""
輸入值和數(shù)據(jù)類型 默認為正數(shù) 并根據(jù)對應(yīng)的解析方式返回 置返的16進制的值
:param value_:
:param type_:
:param Positive_and_Negative:
:return:
"""
hex_ = type_hex(value_, type_, Positive_and_Negative=Positive_and_Negative)
if type_ == 'f':
a = int(hex_[0:2], 16)
b = int(hex_[2:4], 16) * 256
c = int(hex_[4:6], 16) * (256 * 256)
d = int(hex_[6:8], 16) * (256 * 256 * 256)
max_ = hex(int(a + b + c + d))[2:]
max_ = max_.rjust(8, '0')
return max_
elif type_ == 'i':
a = int(hex_[0:2], 16)
b = int(hex_[2:4], 16) * 256
c = int(hex_[4:6], 16) * (256 * 256)
d = int(hex_[6:8], 16) * (256 * 256 * 256)
max_ = hex(int(a + b + c + d))[2:]
max_ = max_.rjust(8, '0')
return max_
# = '0x' + hex_[0:2]
elif type_ == 's':
a = int(hex_[0:2], 16)
b = int(hex_[2:4], 16) * 256
max_ = hex(int(a + b))[2:]
max_ = max_.rjust(4, '0')
return max_
elif type_ == 'c':
return hex_
def Generate_data(message_len: int, type_='00', models: str = '01') -> object:
"""
輸入 數(shù)據(jù)長度 和數(shù)據(jù)類型,以及分機號,返回一個長度400的數(shù)組
data_type:更改為0 返回的數(shù)據(jù)為1024隨機數(shù)據(jù)類型,1為自定義數(shù)據(jù)
:type type_: object
:param message_len: 報文長度 1 - 1024 int類型
:param type_: 報文類型 01 00 str
:param models: 分機號 為 01 - 04 str
:return:返回制造的數(shù)據(jù)數(shù)組
"""
message_list = []
data_num = 0
for z in range(-200, 201):
z = int(math.fabs(z))
a = int((z * z) / (2 * 100))
data_type = 1
if data_type == 1:
# char
Data1 = type_hex(str(z % 2), 'c')
# shoar
data2 = type_hex_new(str(31267 + a), 's')
# float
data3 = type_hex_new(str((z * z) / (2 * 100)), 'f')
# int
data4 = type_hex_new(str(100000 + (z * 100000)), 'i')
# char
data5 = type_hex(str(((z + 1) % 2) * 5), 'c')
# float
data6 = type_hex_new(str(0.99 + (z * 10)), 'f')
# shoar 負數(shù) 遞增
data7 = type_hex_new(str(31267 - z), 's', Positive_and_Negative=1)
# int1
data8 = type_hex_new(str(800000 + (a * 50000)), 'i')
# int 負數(shù)
data9 = type_hex_new(str(100000 + (z * 100000)), 'i', Positive_and_Negative=1)
# float 負數(shù)
data10 = type_hex_new(str((z * z) / (2 * 100)), 'f', Positive_and_Negative=1)
# char 55
data11 = type_hex_new(str(z % 2 * 85), 'c')
# shoar 負數(shù)遞減
data12 = type_hex_new(str(31267 + z), 's', Positive_and_Negative=1)
# float 改
data13 = type_hex_new(str(0.4 + (z * 2)), 'f')
# int 改
data14 = type_hex_new(str(100000 + (z * 83000)), 'i')
# float 改
data15 = type_hex_new(str(0.3 + (z * 0.1)), 'f')
# shoar
data16 = type_hex_new(str(150 * z), 's')
# char
data17 = type_hex_new(str(z % 2 * 165), 'c')
# shoar 負數(shù)
data18 = type_hex_new(str((30 * a)), 's')
# int
data19 = type_hex_new(str((a * 20000)), 'i')
# char
data20 = type_hex(str((z % 2) * 90), 'c')
Data_Sum = '0'
#設(shè)置長度
Data_Sum = Data_Sum.ljust((2 * message_len), "0")
Data_Sum = Override_override(Data_Sum, Data1, 0)
Data_Sum = Override_override(Data_Sum, data2, 1)
Data_Sum = Override_override(Data_Sum, data3, 3)
Data_Sum = Override_override(Data_Sum, data4, 7)
Data_Sum = Override_override(Data_Sum, data5, 11)
Data_Sum = Override_override(Data_Sum, data6, 12)
Data_Sum = Override_override(Data_Sum, data7, 16)
Data_Sum = Override_override(Data_Sum, data8, 18)
Data_Sum = Override_override(Data_Sum, data9, 22)
Data_Sum = Override_override(Data_Sum, data10, 26)
Data_Sum = Override_override(Data_Sum, data11, 30)
Data_Sum = Override_override(Data_Sum, data12, 31)
Data_Sum = Override_override(Data_Sum, data13, 33)
Data_Sum = Override_override(Data_Sum, data14, 37)
Data_Sum = Override_override(Data_Sum, data15, 41)
Data_Sum = Override_override(Data_Sum, data16, 45)
Data_Sum = Override_override(Data_Sum, data17, 47)
Data_Sum = Override_override(Data_Sum, data18, 48)
Data_Sum = Override_override(Data_Sum, data19, 50)
Data_Sum = Override_override(Data_Sum, data20, 54)
for value_w in range(1, 18):
Data_Sum = Override_override(Data_Sum, Data_Sum[0:55 * 2], 55 * value_w)
start = 990
Data_Sum = Override_override(Data_Sum, Data1, start + 0)
Data_Sum = Override_override(Data_Sum, data2, start + 1)
Data_Sum = Override_override(Data_Sum, data3, start + 3)
Data_Sum = Override_override(Data_Sum, data4, start + 7)
Data_Sum = Override_override(Data_Sum, data5, start + 11)
Data_Sum = Override_override(Data_Sum, data6, start + 12)
Data_Sum = Override_override(Data_Sum, data7, start + 16)
Data_Sum = Override_override(Data_Sum, data8, start + 18)
Data_Sum = Override_override(Data_Sum, data9, start + 22)
Data_Sum = Override_override(Data_Sum, data10, start + 26)
Data_Sum = Override_override(Data_Sum, data11, start + 30)
Data_Sum = Override_override(Data_Sum, data12, start + 31)
Data_Sum = Override_override(Data_Sum, data20, start + 33)
Data_Sum = Override_override(Data_Sum, Data_Sum, 1024)
Data_Sum = Override_override(Data_Sum, Data_Sum, 1024*2)
Data_Sum = Override_override(Data_Sum, Data_Sum, 1024*3)
Data_Sum = Override_override(Data_Sum, Data_Sum, 1024*4)
Data_Sum = Override_override(Data_Sum, Data_Sum, 1024*5)
Data_Sum = Override_override(Data_Sum, Data_Sum, 1024*6)
Data_Sum = Override_override(Data_Sum, Data_Sum, 1024*7)
elif data_type == 0:
data_num = data_num + 1
if data_num > 255:
break
# int 正
# data_1 = type_hex_new(str(100000 + (z * 100000)), 'i')
# # flot 負數(shù)
# data_1 = type_hex_new(str((z * z) / (2 * 100)), 'f', Positive_and_Negative=1)
# # char
# data_1 = type_hex(str(z % 2), 'c')
# data_1 = type_hex_new(str(31267 - z), 's', Positive_and_Negative=1)
# 報文長度
Data_Sum = str()
data_num1 = hex(data_num)[2:].rjust(2, '0')
# print(data_num)
for i in range(message_len + 1):
Data_Sum = Data_Sum + data_num1
# Data_Sum = Data_Sum.rjust(2,message_len, data_num)
# 填滿1024
# for value_w in range(0, message_len, 2):
#
# Data_Sum = Override_override(Data_Sum, hex(value_w), 2)
# 設(shè)置報文類型
Message_type = type_
#
# if type_ == '00' and '01':
# Message_type = type_
# else:
# if z % 2 == 0:
# Message_type = '00'
# else:
# Message_type = '01'
# 設(shè)置長度
byte_len = int(message_len) * 2
Data_Sum = Data_Sum[0:byte_len]
Message_len = hex(int(byte_len / 2))[2:].rjust(4, '0')
# test = (z, '=======', '1', Data_Sum)
#
# with open('test.txt', 'a') as f:
# f.write(str(test) + '\n')
data_sum = ()
message = '01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 %s %s %s ' \
'00 00 00 00 00 00 %s 30 30' % (models, Message_type, Message_len, Data_Sum)
message_list.append(Data_Sum)
return message_list
class RS485:
def __init__(self, baudrate, pods):
self.port = pods
self.baudrate = baudrate
self.ser = serial.Serial(self.port, self.baudrate)
def slice_data(slef,data):
data = data.replace(' ', '')
def split_string(data):
sub = 20
length = len(data)
avg_length = length // sub # 每個子字符串的平均長度
remainder = length % sub # 余數(shù),用于判斷最后一個子字符串的長度
result = []
start = 0
end = avg_length
for i in range(sub):
# 判斷是否為最后一個子字符串,并調(diào)整結(jié)束索引
if i == 7:
end += remainder
substring = data[start:end]
result.append(substring)
start = end
end += avg_length
return result
def generate_data_packets(datalist):
for index,value in enumerate(datalist):
datalist[index] = '5a5a' + hex(int(index + 1))[2:].rjust(2, '0')+ hex(int(len(value)/2 + 5))[2:].rjust(2, '0') + value + 'cf'
return datalist
return generate_data_packets(split_string(data))
def send_com(self, send_gap=0.5, num_=1000000000000000000000000000, time_=1000000000000000000000000000,
data=None):
# 獲取data長度
datalen = len(data[0].replace(' ', ''))/2
# 讀取當前時間
now1_time = datetime.datetime.now()
Labial_time = time.time()
time_start = Labial_time
Send_time = Labial_time + time_ + 1
# 設(shè)置發(fā)送個數(shù)
Send_num = num_
# 12小時發(fā)送時間 === 43200
# 打印當前時間
print(Labial_time)
# 計數(shù)器
counter = 0
while Labial_time <= Send_time - 1 and counter <= Send_num - 1:
for z in range(len(data)):
#更新時間
Labial_time = time.time()
if Labial_time >= Send_time - 1 or counter > Send_num - 1:
break
for j in self.slice_data(data[z]):
self.ser.write(bytes.fromhex(j))
print(j)
time.sleep(send_gap)
counter = counter + 1
now_time = datetime.datetime.now()
print('報文個數(shù)=====', counter)
print(f"上位機發(fā)送數(shù)量:{(datalen) * counter}B")
print(f"分機號{data[0].replace(' ', '')[60:62]}下位機應(yīng)接收:{(datalen - 20) * counter}B")
print(f'傳輸速率為:{(datalen) * counter / Send_time / 1024 / 1024} MB/S')
print('開始時間 =====', now1_time)
print('結(jié)束時間 =====', now_time)
if __name__ == '__main__':
data1 = Generate_data(400, type_='01', models='03')
data2 = Generate_data(1024*8, type_='01', models='02')
data3 = Generate_data(1024*8, type_='01', models='03')
data4 = Generate_data(1024*8, type_='01', models='04')
def print_available_ports():
ports = serial.tools.list_ports.comports()
port_list = []
for port in ports:
port_list.append(port.device)
return port_list
pods = print_available_ports()
for i in range(len(pods)):
print(f"串口:{i}\t{pods[i]}")
num = int(input("輸入對應(yīng)編號選擇響應(yīng)的串口"))
# num = 1
# 串口配置
port = pods[num]
baudrate = 115200
prject = RS485(baudrate,pods[num])
# 創(chuàng)建串口對象
print(len(data1))
prject.send_com(num_=1000, data=data1)