291 lines
8.7 KiB
Python
291 lines
8.7 KiB
Python
#
|
|
# CIDR AGGREGATOR (IPv4 + IPv6)
|
|
#
|
|
|
|
def mask_to_int(mask_size, total_bits):
|
|
return ((1 << total_bits) - 1) ^ ((1 << (total_bits - mask_size)) - 1)
|
|
|
|
|
|
def ip_volume(mask_size, total_bits):
|
|
return 1 << (total_bits - mask_size)
|
|
|
|
|
|
class Net:
|
|
__slots__ = ["version","bits","mask_size","net","mask","volume"]
|
|
|
|
def __init__(self, net: int, mask_size: int, version: int=4):
|
|
self.version, self.bits = (4, 32) if version==4 else (6, 128)
|
|
self.mask_size = mask_size
|
|
self.mask = mask_to_int(mask_size, self.bits)
|
|
self.net = net & self.mask
|
|
self.volume = ip_volume(mask_size, self.bits)
|
|
|
|
#
|
|
# --- IP CONVERSION ---
|
|
#
|
|
def __int_to_ipv4(self, n):
|
|
return ".".join(str((n >> (24 - 8*i)) & 0xFF) for i in range(4))
|
|
|
|
def __int_to_ipv6(self, n):
|
|
blocks = [(n >> (112 - 16*i)) & 0xFFFF for i in range(8)]
|
|
|
|
best_start = -1
|
|
best_len = 0
|
|
cur_start = -1
|
|
cur_len = 0
|
|
|
|
for i in range(8):
|
|
if blocks[i] == 0:
|
|
if cur_start < 0:
|
|
cur_start = i
|
|
cur_len = 1
|
|
else:
|
|
cur_len += 1
|
|
else:
|
|
if cur_len > best_len:
|
|
best_len = cur_len
|
|
best_start = cur_start
|
|
cur_start = -1
|
|
cur_len = 0
|
|
|
|
if cur_len > best_len:
|
|
best_len = cur_len
|
|
best_start = cur_start
|
|
|
|
if best_len > 1:
|
|
new = []
|
|
i = 0
|
|
while i < 8:
|
|
if i == best_start:
|
|
new.append('')
|
|
i += best_len
|
|
else:
|
|
new.append(format(blocks[i], 'x'))
|
|
i += 1
|
|
res = ":".join(new)
|
|
while ":::" in res:
|
|
res = res.replace(":::", "::")
|
|
return res
|
|
|
|
return ":".join(format(b, 'x') for b in blocks)
|
|
|
|
def __int_to_ip(self, n):
|
|
return self.__int_to_ipv4(n) if self.version == 4 else self.__int_to_ipv6(n)
|
|
|
|
#
|
|
# --- PUBLIC API ---
|
|
#
|
|
def getAsString(self, fmt='{addr}/{masklen}'):
|
|
return fmt.format(
|
|
addr=self.__int_to_ip(self.net),
|
|
masklen=self.mask_size
|
|
)
|
|
|
|
def is_adjacent(self, other):
|
|
if self.version != other.version: return False
|
|
if self.mask_size != other.mask_size: return False
|
|
step = 1 << (self.bits - self.mask_size)
|
|
return self.net + step == other.net or other.net + step == self.net
|
|
|
|
def supernet(self):
|
|
if self.mask_size == 0:
|
|
return self
|
|
new_mask = self.mask_size - 1
|
|
new_mask_int = mask_to_int(new_mask, self.bits)
|
|
new_net = self.net & new_mask_int
|
|
return Net(new_net, new_mask, self.version)
|
|
|
|
|
|
class Node:
|
|
__slots__ = [
|
|
"net", "child0", "child1",
|
|
"is_real",
|
|
"real_volume", "real_count",
|
|
"fake_volume", "weight", "max_child_weight"
|
|
]
|
|
|
|
def __init__(self, net: Net):
|
|
self.net = net
|
|
self.child0 = None
|
|
self.child1 = None
|
|
self.is_real = False
|
|
|
|
self.real_volume = 0
|
|
self.real_count = 0
|
|
self.fake_volume = 0
|
|
self.weight = 0
|
|
self.max_child_weight = 0
|
|
|
|
#
|
|
# INSERT NETWORK INTO TRIE
|
|
#
|
|
def insert(self, new_net: Net):
|
|
return self.__insert(new_net, level=0)
|
|
|
|
def __insert(self, new_net: Net, level):
|
|
# если дошли до маски сети — это лист
|
|
if level == new_net.mask_size:
|
|
if not self.is_real:
|
|
self.is_real = True
|
|
self.child0 = None
|
|
self.child1 = None
|
|
return
|
|
|
|
# разбираем бит адреса
|
|
bit_pos = self.net.bits - 1 - level
|
|
direction = (new_net.net >> bit_pos) & 1
|
|
|
|
if direction == 0:
|
|
if not self.child0:
|
|
child_net = Net(new_net.net & mask_to_int(level+1, self.net.bits), level+1, self.net.version)
|
|
self.child0 = Node(child_net)
|
|
self.child0.__insert(new_net, level+1)
|
|
|
|
else:
|
|
if not self.child1:
|
|
child_net = Net(new_net.net & mask_to_int(level+1, self.net.bits), level+1, self.net.version)
|
|
self.child1 = Node(child_net)
|
|
self.child1.__insert(new_net, level+1)
|
|
|
|
#
|
|
# CALCULATE WEIGHTS
|
|
#
|
|
def finalize(self):
|
|
if self.is_real:
|
|
self.real_volume = self.net.volume
|
|
self.real_count = 1
|
|
self.fake_volume = 0
|
|
self.weight = 0
|
|
self.max_child_weight = 0
|
|
return
|
|
|
|
self.real_volume = 0
|
|
self.real_count = 0
|
|
self.fake_volume = 0
|
|
self.max_child_weight = 0
|
|
|
|
for ch in (self.child0, self.child1):
|
|
if ch:
|
|
ch.finalize()
|
|
self.real_volume += ch.real_volume
|
|
self.real_count += ch.real_count
|
|
self.fake_volume += ch.fake_volume
|
|
self.max_child_weight = max(self.max_child_weight, ch.weight, ch.max_child_weight)
|
|
|
|
self.__recalc()
|
|
|
|
def __recalc(self):
|
|
missing = self.net.volume - self.real_volume - self.fake_volume
|
|
if missing > 0:
|
|
self.weight = (self.real_count - 1) / (missing ** 0.5)
|
|
else:
|
|
self.weight = float('inf')
|
|
|
|
#
|
|
# COLLAPSE / AGGREGATE
|
|
#
|
|
def collapse(self, min_weight=0, max_delta=float('inf')):
|
|
if self.is_real:
|
|
return 0,0
|
|
|
|
delta = 0
|
|
fake = 0
|
|
|
|
# сворачиваем детей
|
|
for ch in (self.child0, self.child1):
|
|
if ch:
|
|
d, f = ch.collapse(min_weight, max_delta - delta)
|
|
delta += d
|
|
fake += f
|
|
|
|
# попытаемся объединить
|
|
if self.child0 and self.child1:
|
|
c0 = self.child0
|
|
c1 = self.child1
|
|
|
|
if (c0.is_real and c1.is_real and
|
|
c0.net.is_adjacent(c1.net)):
|
|
|
|
super_net = c0.net.supernet()
|
|
|
|
# превращаем текущий узел в супернет
|
|
self.net = super_net
|
|
self.is_real = True
|
|
self.child0 = None
|
|
self.child1 = None
|
|
|
|
self.real_volume = c0.real_volume + c1.real_volume
|
|
self.fake_volume = super_net.volume - self.real_volume
|
|
self.real_count = 1
|
|
self.weight = 0
|
|
self.max_child_weight = 0
|
|
|
|
return delta + 2, fake + self.fake_volume
|
|
|
|
# пересчитываем статистику
|
|
if not self.is_real:
|
|
self.real_volume = 0
|
|
self.real_count = 0
|
|
self.fake_volume = 0
|
|
self.max_child_weight = 0
|
|
|
|
for ch in (self.child0, self.child1):
|
|
if ch:
|
|
self.real_volume += ch.real_volume
|
|
self.real_count += ch.real_count
|
|
self.fake_volume += ch.fake_volume
|
|
self.max_child_weight = max(self.max_child_weight, ch.weight, ch.max_child_weight)
|
|
self.__recalc()
|
|
return delta, fake
|
|
|
|
def export_compress(self, fmt='{addr}/{masklen}'):
|
|
# считаем статистику
|
|
self.finalize()
|
|
# сжимаем
|
|
self.collapse()
|
|
result = []
|
|
|
|
def walk(node):
|
|
if node is None:
|
|
return
|
|
# если суперсеть реальная –> дети не нужны
|
|
if node.is_real:
|
|
result.append(node.net.getAsString(fmt))
|
|
return
|
|
walk(node.child0)
|
|
walk(node.child1)
|
|
|
|
walk(self)
|
|
return "\n".join(result)
|
|
|
|
|
|
def export(self, fmt='{addr}/{masklen}'):
|
|
# считаем статистику
|
|
self.finalize()
|
|
result = []
|
|
|
|
def walk(node):
|
|
if node is None:
|
|
return
|
|
|
|
if node.is_real:
|
|
# дети полностью покрывают диапазон родителя?
|
|
child_real_vol = 0
|
|
for ch in (node.child0, node.child1):
|
|
if ch:
|
|
child_real_vol += ch.real_volume
|
|
|
|
# если дети полностью покрывают родителя -> родители не нужны
|
|
if child_real_vol >= node.net.volume:
|
|
walk(node.child0)
|
|
walk(node.child1)
|
|
return
|
|
|
|
# иначе выводим родителя и детей
|
|
result.append(node.net.getAsString(fmt))
|
|
|
|
walk(node.child0)
|
|
walk(node.child1)
|
|
|
|
walk(self)
|
|
return "\n".join(result) |