# # CIDR AGGREGATOR (IPv4 + IPv6) # def mask_to_int(mask_size, total_bits): return ((1 << total_bits) - 1) ^ ((1 << (total_bits - mask_size)) - 1) def ip_volume(mask_size, total_bits): return 1 << (total_bits - mask_size) class Net: __slots__ = ["version","bits","mask_size","net","mask","volume"] def __init__(self, net: int, mask_size: int, version: int=4): self.version, self.bits = (4, 32) if version==4 else (6, 128) self.mask_size = mask_size self.mask = mask_to_int(mask_size, self.bits) self.net = net & self.mask self.volume = ip_volume(mask_size, self.bits) # # --- IP CONVERSION --- # def __int_to_ipv4(self, n): return ".".join(str((n >> (24 - 8*i)) & 0xFF) for i in range(4)) def __int_to_ipv6(self, n): blocks = [(n >> (112 - 16*i)) & 0xFFFF for i in range(8)] best_start = -1 best_len = 0 cur_start = -1 cur_len = 0 for i in range(8): if blocks[i] == 0: if cur_start < 0: cur_start = i cur_len = 1 else: cur_len += 1 else: if cur_len > best_len: best_len = cur_len best_start = cur_start cur_start = -1 cur_len = 0 if cur_len > best_len: best_len = cur_len best_start = cur_start if best_len > 1: new = [] i = 0 while i < 8: if i == best_start: new.append('') i += best_len else: new.append(format(blocks[i], 'x')) i += 1 res = ":".join(new) while ":::" in res: res = res.replace(":::", "::") return res return ":".join(format(b, 'x') for b in blocks) def __int_to_ip(self, n): return self.__int_to_ipv4(n) if self.version == 4 else self.__int_to_ipv6(n) # # --- PUBLIC API --- # def getAsString(self, fmt='{addr}/{masklen}'): return fmt.format( addr=self.__int_to_ip(self.net), masklen=self.mask_size ) def is_adjacent(self, other): if self.version != other.version: return False if self.mask_size != other.mask_size: return False step = 1 << (self.bits - self.mask_size) return self.net + step == other.net or other.net + step == self.net def supernet(self): if self.mask_size == 0: return self new_mask = self.mask_size - 1 new_mask_int = mask_to_int(new_mask, self.bits) new_net = self.net & new_mask_int return Net(new_net, new_mask, self.version) class Node: __slots__ = [ "net", "child0", "child1", "is_real", "real_volume", "real_count", "fake_volume", "weight", "max_child_weight" ] def __init__(self, net: Net): self.net = net self.child0 = None self.child1 = None self.is_real = False self.real_volume = 0 self.real_count = 0 self.fake_volume = 0 self.weight = 0 self.max_child_weight = 0 # # INSERT NETWORK INTO TRIE # def insert(self, new_net: Net): return self.__insert(new_net, level=0) def __insert(self, new_net: Net, level): # если дошли до маски сети — это лист if level == new_net.mask_size: if not self.is_real: self.is_real = True self.child0 = None self.child1 = None return # разбираем бит адреса bit_pos = self.net.bits - 1 - level direction = (new_net.net >> bit_pos) & 1 if direction == 0: if not self.child0: child_net = Net(new_net.net & mask_to_int(level+1, self.net.bits), level+1, self.net.version) self.child0 = Node(child_net) self.child0.__insert(new_net, level+1) else: if not self.child1: child_net = Net(new_net.net & mask_to_int(level+1, self.net.bits), level+1, self.net.version) self.child1 = Node(child_net) self.child1.__insert(new_net, level+1) # # CALCULATE WEIGHTS # def finalize(self): if self.is_real: self.real_volume = self.net.volume self.real_count = 1 self.fake_volume = 0 self.weight = 0 self.max_child_weight = 0 return self.real_volume = 0 self.real_count = 0 self.fake_volume = 0 self.max_child_weight = 0 for ch in (self.child0, self.child1): if ch: ch.finalize() self.real_volume += ch.real_volume self.real_count += ch.real_count self.fake_volume += ch.fake_volume self.max_child_weight = max(self.max_child_weight, ch.weight, ch.max_child_weight) self.__recalc() def __recalc(self): missing = self.net.volume - self.real_volume - self.fake_volume if missing > 0: self.weight = (self.real_count - 1) / (missing ** 0.5) else: self.weight = float('inf') # # COLLAPSE / AGGREGATE # def collapse(self, min_weight=0, max_delta=float('inf')): if self.is_real: return 0,0 delta = 0 fake = 0 # сворачиваем детей for ch in (self.child0, self.child1): if ch: d, f = ch.collapse(min_weight, max_delta - delta) delta += d fake += f # попытаемся объединить if self.child0 and self.child1: c0 = self.child0 c1 = self.child1 if (c0.is_real and c1.is_real and c0.net.is_adjacent(c1.net)): super_net = c0.net.supernet() # превращаем текущий узел в супернет self.net = super_net self.is_real = True self.child0 = None self.child1 = None self.real_volume = c0.real_volume + c1.real_volume self.fake_volume = super_net.volume - self.real_volume self.real_count = 1 self.weight = 0 self.max_child_weight = 0 return delta + 2, fake + self.fake_volume # пересчитываем статистику if not self.is_real: self.real_volume = 0 self.real_count = 0 self.fake_volume = 0 self.max_child_weight = 0 for ch in (self.child0, self.child1): if ch: self.real_volume += ch.real_volume self.real_count += ch.real_count self.fake_volume += ch.fake_volume self.max_child_weight = max(self.max_child_weight, ch.weight, ch.max_child_weight) self.__recalc() return delta, fake def export_compress(self, fmt='{addr}/{masklen}'): # считаем статистику self.finalize() # сжимаем self.collapse() result = [] def walk(node): if node is None: return # если суперсеть реальная –> дети не нужны if node.is_real: result.append(node.net.getAsString(fmt)) return walk(node.child0) walk(node.child1) walk(self) return "\n".join(result) def export(self, fmt='{addr}/{masklen}'): # считаем статистику self.finalize() result = [] def walk(node): if node is None: return if node.is_real: # дети полностью покрывают диапазон родителя? child_real_vol = 0 for ch in (node.child0, node.child1): if ch: child_real_vol += ch.real_volume # если дети полностью покрывают родителя -> родители не нужны if child_real_vol >= node.net.volume: walk(node.child0) walk(node.child1) return # иначе выводим родителя и детей result.append(node.net.getAsString(fmt)) walk(node.child0) walk(node.child1) walk(self) return "\n".join(result)