forked from uncloud/uncloud
84 lines
2 KiB
Python
84 lines
2 KiB
Python
import subprocess as sp
|
|
import random
|
|
import logging
|
|
import socket
|
|
from contextlib import closing
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def random_bytes(num=6):
|
|
return [random.randrange(256) for _ in range(num)]
|
|
|
|
|
|
def generate_mac(
|
|
uaa=False, multicast=False, oui=None, separator=":", byte_fmt="%02x"
|
|
):
|
|
mac = random_bytes()
|
|
if oui:
|
|
if type(oui) == str:
|
|
oui = [int(chunk) for chunk in oui.split(separator)]
|
|
mac = oui + random_bytes(num=6 - len(oui))
|
|
else:
|
|
if multicast:
|
|
mac[0] |= 1 # set bit 0
|
|
else:
|
|
mac[0] &= ~1 # clear bit 0
|
|
if uaa:
|
|
mac[0] &= ~(1 << 1) # clear bit 1
|
|
else:
|
|
mac[0] |= 1 << 1 # set bit 1
|
|
return separator.join(byte_fmt % b for b in mac)
|
|
|
|
|
|
def create_dev(script, _id, dev, ip=None):
|
|
command = [
|
|
"sudo",
|
|
"-p",
|
|
"Enter password to create network devices for vm: ",
|
|
script,
|
|
str(_id),
|
|
dev,
|
|
]
|
|
if ip:
|
|
command.append(ip)
|
|
try:
|
|
output = sp.check_output(command, stderr=sp.PIPE)
|
|
except Exception:
|
|
logger.exception("Creation of interface %s failed.", dev)
|
|
return None
|
|
else:
|
|
return output.decode("utf-8").strip()
|
|
|
|
|
|
def delete_network_interface(iface):
|
|
try:
|
|
sp.check_output(
|
|
[
|
|
"sudo",
|
|
"-p",
|
|
"Enter password to remove {} network device: ".format(
|
|
iface
|
|
),
|
|
"ip",
|
|
"link",
|
|
"del",
|
|
iface,
|
|
],
|
|
stderr=sp.PIPE,
|
|
)
|
|
except Exception:
|
|
logger.exception("Interface %s Deletion failed", iface)
|
|
|
|
|
|
def find_free_port():
|
|
with closing(
|
|
socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
) as s:
|
|
try:
|
|
s.bind(("", 0))
|
|
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
|
except Exception:
|
|
return None
|
|
else:
|
|
return s.getsockname()[1]
|