Files
bird_list_ip/include/net_tree.py

291 lines
8.7 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#
# CIDR AGGREGATOR (IPv4 + IPv6)
#
def mask_to_int(mask_size, total_bits):
return ((1 << total_bits) - 1) ^ ((1 << (total_bits - mask_size)) - 1)
def ip_volume(mask_size, total_bits):
return 1 << (total_bits - mask_size)
class Net:
__slots__ = ["version","bits","mask_size","net","mask","volume"]
def __init__(self, net: int, mask_size: int, version: int=4):
self.version, self.bits = (4, 32) if version==4 else (6, 128)
self.mask_size = mask_size
self.mask = mask_to_int(mask_size, self.bits)
self.net = net & self.mask
self.volume = ip_volume(mask_size, self.bits)
#
# --- IP CONVERSION ---
#
def __int_to_ipv4(self, n):
return ".".join(str((n >> (24 - 8*i)) & 0xFF) for i in range(4))
def __int_to_ipv6(self, n):
blocks = [(n >> (112 - 16*i)) & 0xFFFF for i in range(8)]
best_start = -1
best_len = 0
cur_start = -1
cur_len = 0
for i in range(8):
if blocks[i] == 0:
if cur_start < 0:
cur_start = i
cur_len = 1
else:
cur_len += 1
else:
if cur_len > best_len:
best_len = cur_len
best_start = cur_start
cur_start = -1
cur_len = 0
if cur_len > best_len:
best_len = cur_len
best_start = cur_start
if best_len > 1:
new = []
i = 0
while i < 8:
if i == best_start:
new.append('')
i += best_len
else:
new.append(format(blocks[i], 'x'))
i += 1
res = ":".join(new)
while ":::" in res:
res = res.replace(":::", "::")
return res
return ":".join(format(b, 'x') for b in blocks)
def __int_to_ip(self, n):
return self.__int_to_ipv4(n) if self.version == 4 else self.__int_to_ipv6(n)
#
# --- PUBLIC API ---
#
def getAsString(self, fmt='{addr}/{masklen}'):
return fmt.format(
addr=self.__int_to_ip(self.net),
masklen=self.mask_size
)
def is_adjacent(self, other):
if self.version != other.version: return False
if self.mask_size != other.mask_size: return False
step = 1 << (self.bits - self.mask_size)
return self.net + step == other.net or other.net + step == self.net
def supernet(self):
if self.mask_size == 0:
return self
new_mask = self.mask_size - 1
new_mask_int = mask_to_int(new_mask, self.bits)
new_net = self.net & new_mask_int
return Net(new_net, new_mask, self.version)
class Node:
__slots__ = [
"net", "child0", "child1",
"is_real",
"real_volume", "real_count",
"fake_volume", "weight", "max_child_weight"
]
def __init__(self, net: Net):
self.net = net
self.child0 = None
self.child1 = None
self.is_real = False
self.real_volume = 0
self.real_count = 0
self.fake_volume = 0
self.weight = 0
self.max_child_weight = 0
#
# INSERT NETWORK INTO TRIE
#
def insert(self, new_net: Net):
return self.__insert(new_net, level=0)
def __insert(self, new_net: Net, level):
# если дошли до маски сети — это лист
if level == new_net.mask_size:
if not self.is_real:
self.is_real = True
self.child0 = None
self.child1 = None
return
# разбираем бит адреса
bit_pos = self.net.bits - 1 - level
direction = (new_net.net >> bit_pos) & 1
if direction == 0:
if not self.child0:
child_net = Net(new_net.net & mask_to_int(level+1, self.net.bits), level+1, self.net.version)
self.child0 = Node(child_net)
self.child0.__insert(new_net, level+1)
else:
if not self.child1:
child_net = Net(new_net.net & mask_to_int(level+1, self.net.bits), level+1, self.net.version)
self.child1 = Node(child_net)
self.child1.__insert(new_net, level+1)
#
# CALCULATE WEIGHTS
#
def finalize(self):
if self.is_real:
self.real_volume = self.net.volume
self.real_count = 1
self.fake_volume = 0
self.weight = 0
self.max_child_weight = 0
return
self.real_volume = 0
self.real_count = 0
self.fake_volume = 0
self.max_child_weight = 0
for ch in (self.child0, self.child1):
if ch:
ch.finalize()
self.real_volume += ch.real_volume
self.real_count += ch.real_count
self.fake_volume += ch.fake_volume
self.max_child_weight = max(self.max_child_weight, ch.weight, ch.max_child_weight)
self.__recalc()
def __recalc(self):
missing = self.net.volume - self.real_volume - self.fake_volume
if missing > 0:
self.weight = (self.real_count - 1) / (missing ** 0.5)
else:
self.weight = float('inf')
#
# COLLAPSE / AGGREGATE
#
def collapse(self, min_weight=0, max_delta=float('inf')):
if self.is_real:
return 0,0
delta = 0
fake = 0
# сворачиваем детей
for ch in (self.child0, self.child1):
if ch:
d, f = ch.collapse(min_weight, max_delta - delta)
delta += d
fake += f
# попытаемся объединить
if self.child0 and self.child1:
c0 = self.child0
c1 = self.child1
if (c0.is_real and c1.is_real and
c0.net.is_adjacent(c1.net)):
super_net = c0.net.supernet()
# превращаем текущий узел в супернет
self.net = super_net
self.is_real = True
self.child0 = None
self.child1 = None
self.real_volume = c0.real_volume + c1.real_volume
self.fake_volume = super_net.volume - self.real_volume
self.real_count = 1
self.weight = 0
self.max_child_weight = 0
return delta + 2, fake + self.fake_volume
# пересчитываем статистику
if not self.is_real:
self.real_volume = 0
self.real_count = 0
self.fake_volume = 0
self.max_child_weight = 0
for ch in (self.child0, self.child1):
if ch:
self.real_volume += ch.real_volume
self.real_count += ch.real_count
self.fake_volume += ch.fake_volume
self.max_child_weight = max(self.max_child_weight, ch.weight, ch.max_child_weight)
self.__recalc()
return delta, fake
def export_compress(self, fmt='{addr}/{masklen}'):
# считаем статистику
self.finalize()
# сжимаем
self.collapse()
result = []
def walk(node):
if node is None:
return
# если суперсеть реальная > дети не нужны
if node.is_real:
result.append(node.net.getAsString(fmt))
return
walk(node.child0)
walk(node.child1)
walk(self)
return "\n".join(result)
def export(self, fmt='{addr}/{masklen}'):
# считаем статистику
self.finalize()
result = []
def walk(node):
if node is None:
return
if node.is_real:
# дети полностью покрывают диапазон родителя?
child_real_vol = 0
for ch in (node.child0, node.child1):
if ch:
child_real_vol += ch.real_volume
# если дети полностью покрывают родителя -> родители не нужны
if child_real_vol >= node.net.volume:
walk(node.child0)
walk(node.child1)
return
# иначе выводим родителя и детей
result.append(node.net.getAsString(fmt))
walk(node.child0)
walk(node.child1)
walk(self)
return "\n".join(result)