Client/net/UnicastConnection.py

112 lines
3.8 KiB
Python
Raw Permalink Normal View History

import socket
import ssl
import time
class UnicastConnection:
STRICT_SSL_VALIDATION = True
BUFFER_SIZE=8000
def __init__(self, sock=None):
if sock is None:
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
else:
self.sock = sock
self.is_connected = False
self.is_tls = False
self.ssock = None
self.ca_path = None
self.ssl_info = None
def set_ca_path(self, ca_path):
self.ca_path = ca_path
def connect(self, host, port, tls):
print("[UnicastConnection::connect]")
self.is_tls = tls
if self.is_tls:
print("[UnicastConnection::connect] Attempting SSL/TLS connection")
context = ssl.create_default_context()
if UnicastConnection.STRICT_SSL_VALIDATION:
if self.ca_path:
print("[UnicastConnection::connect] loading CA.CRT")
context.load_verify_locations(self.ca_path)
else:
context.check_hostname = False
context.verify_mode = ssl.CERT_NONE
self.ssock = context.wrap_socket(self.sock, server_hostname=host)
self.ssock.connect((host,port))
certinfo = self.ssock.getpeercert()
if certinfo:
cert_subject = dict(x[0] for x in certinfo['subject'])
cert_issuer = dict(x[0] for x in certinfo['issuer'])
print(cert_subject['commonName'])
print(cert_issuer['commonName'])
self.ssl_info = f"Certificate Informatinon:\nCN={cert_subject['commonName']}\nIssuer={cert_issuer['commonName']}\nTLS version: {self.ssock.version()}"
self.is_connected = True
else:
print("[UnicastConnection::connect] Attemping unencypted connection")
self.sock.connect((host, port))
self.is_connected = True
def read_line(self):
print("[UnicastConnection::read_line] begin")
if self.is_tls:
line = self.ssock.recv(1000)
else:
line = self.sock.recv(1000)
message = line.decode('utf-8')
print(f"[UnicastConnection::read_line] Received: '{message}'")
return message
def send_message(self, message):
print(f"[UnicastConnection::send_message] sending message '{message}'")
if self.is_tls:
self.ssock.sendall(message.encode('utf-8'))
else:
self.sock.sendall(message.encode('utf-8'))
def send_file(self, filepath):
time.sleep(1)
self.nextrun = True
f = open(filepath, "rb")
while self.nextrun:
l = f.read(self.BUFFER_SIZE)
while (l):
if self.is_tls:
self.ssock.send(l)
else:
self.sock.send(l)
l = f.read(self.BUFFER_SIZE)
if not l:
f.close()
self.nextrun = True
break
def receive_file(self, filename, savepath, filesize):
self.nextrun = True
remaining = filesize
with open(f"{savepath}/{filename}", 'wb') as f:
while remaining > 0 and self.nextrun:
if self.is_tls:
data = self.ssock.recv(self.BUFFER_SIZE)
else:
data = self.sock.recv(self.BUFFER_SIZE)
if not data:
f.close()
self.nextrun = False
break
f.write(data)
remaining = remaining - len(data)
print(f"Received = {len(data)} - Remaining = {remaining} / {filesize}")
time.sleep(1)
def get_certificate_info(self):
return self.ssl_info
def close(self):
self.sock.close()