服務(wù)端:
-
ss = socket()創(chuàng)建一個(gè)服務(wù)器套接字 -
ss.bind()綁定服務(wù)器套接字
inf_loop: # 服務(wù)器無限循環(huán)
cs = ss.recvfrom() 或者ss.sendto() # 與客戶端對(duì)話
ss.close() # 關(guān)閉服務(wù)器套接字
客戶端:
-
cs = socket()創(chuàng)建一個(gè)客戶端套接字
comm_loop: # 客戶端通訊循環(huán)
cs.recvfrom() 或者cs.sendto() # 與客戶端對(duì)話
cs.close() # 關(guān)閉服務(wù)器套接字
服務(wù)端代碼實(shí)現(xiàn):
from socket import *
ip_port = ('127.0.0.1', 8080)
buffer_size = 1024
udp_server = socket(AF_INET, SOCK_DGRAM) # 數(shù)據(jù)報(bào)
udp_server.bind(ip_port)
while True:
data = udp_server.recvfrom(buffer_size)
print(data)
客戶端代碼實(shí)現(xiàn):
from socket import *
ip_port = ('127.0.0.1', 8080)
udp_client = socket(AF_INET, SOCK_DGRAM) # 數(shù)據(jù)報(bào)
while True:
udp_client.sendto('hello udp'.encode('utf-8'), ip_port)
基于UDP實(shí)現(xiàn)NTP服務(wù)
服務(wù)端:
from socket import *
import time
ip_port=('127.0.0.1',8080)
buffer_size=1024
udp_server=socket(AF_INET,SOCK_DGRAM) #數(shù)據(jù)報(bào)
udp_server.bind(ip_port)
while True:
data,addr=udp_server.recvfrom(buffer_size)
print(data)
if not data:
fmt='%Y-%m-%d %X'
else:
fmt=data.decode('utf-8')
back_time=time.strftime(fmt)
udp_server.sendto(back_time.encode('utf-8'),addr)
客戶端:
from socket import *
ip_port = ('127.0.0.1', 8080)
buf_size=1024
udp_client = socket(AF_INET, SOCK_DGRAM) # 數(shù)據(jù)報(bào)
while True:
msg = input("請(qǐng)輸入發(fā)送內(nèi)容")
udp_client.sendto(msg.encode('utf-8'), ip_port)
data, addr = udp_client.recvfrom(buf_size)
print("ntp服務(wù)器的標(biāo)準(zhǔn)時(shí)間是", data.decode("utf-8"))