Initial commit

This commit is contained in:
2025-11-17 20:39:26 +10:00
commit 0faee529ec
5 changed files with 412 additions and 0 deletions

2
.gitignore vendored Normal file
View File

@@ -0,0 +1,2 @@
__pycache__/
unloading/

9
LICENSE Normal file
View File

@@ -0,0 +1,9 @@
MIT License
Copyright (c) 2025 alan
Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

3
README.md Normal file
View File

@@ -0,0 +1,3 @@
# bird_list_ip
Это скрипт написанный на Python3 для выгрузки пулов ip адресов по номерам AS.

185
download.py Executable file
View File

@@ -0,0 +1,185 @@
#!/usr/bin/python3
import re
import os
import sys
import net_tree
import requests
# массив выгружаемых списков
ip_list = {
'RU': [
# Большая часть RU сегмента
{ 'url': ['https://stat.ripe.net/data/country-resource-list/data.json?resource=RU'], 'ipv4': True, 'ipv6': False },
{ 'url': ['https://ipv4.fetus.jp/ru.txt'], 'ipv4': True, 'ipv6': False },
{ 'url': ['https://github.com/ipverse/rir-ip/blob/master/country/ru/aggregated.json'], 'ipv4': True, 'ipv6': False },
# HLL LLC
{ 'url': ['https://bgp.he.net/AS51115#_prefixes', 'https://ipinfo.io/widget/demo/AS51115?dataset=asn', 'https://api.hackertarget.com/aslookup/?q=AS51115'], 'ipv4': True, 'ipv6': False },
# STATIC
{ 'static4': '188.130.255.0/24', 'ipv4': True, 'ipv6': False },
],
'CHINA': [
# Большая часть CH сегмента
{ 'url': ['https://stat.ripe.net/data/country-resource-list/data.json?resource=CN'], 'ipv4': True, 'ipv6': False },
{ 'url': ['https://ipv4.fetus.jp/cn.txt'], 'ipv4': True, 'ipv6': False },
{ 'url': ['https://github.com/ipverse/rir-ip/blob/master/country/cn/aggregated.json'], 'ipv4': True, 'ipv6': False },
# Hong Kong
{ 'url': ['https://stat.ripe.net/data/country-resource-list/data.json?resource=HK'], 'ipv4': True, 'ipv6': False },
{ 'url': ['https://ipv4.fetus.jp/hk.txt'], 'ipv4': True, 'ipv6': False },
{ 'url': ['https://github.com/ipverse/rir-ip/blob/master/country/hk/aggregated.json'], 'ipv4': True, 'ipv6': False },
# Alibaba (US) Technology Co., Ltd.
{ 'url': ['https://bgp.he.net/AS45102#_prefixes', 'https://ipinfo.io/widget/demo/AS45102?dataset=asn', 'https://api.hackertarget.com/aslookup/?q=AS45102'], 'ipv4': True, 'ipv6': False },
],
'JAPAN': [
# Большая часть KR сегмента
{ 'url': ['https://stat.ripe.net/data/country-resource-list/data.json?resource=JP'], 'ipv4': True, 'ipv6': False },
{ 'url': ['https://ipv4.fetus.jp/jp.txt'], 'ipv4': True, 'ipv6': False },
{ 'url': ['https://github.com/ipverse/rir-ip/blob/master/country/jp/aggregated.json'], 'ipv4': True, 'ipv6': False },
],
'KOREA': [
# Большая часть KR сегмента
{ 'url': ['https://stat.ripe.net/data/country-resource-list/data.json?resource=KR'], 'ipv4': True, 'ipv6': False },
{ 'url': ['https://ipv4.fetus.jp/kr.txt'], 'ipv4': True, 'ipv6': False },
{ 'url': ['https://github.com/ipverse/rir-ip/blob/master/country/kr/aggregated.json'], 'ipv4': True, 'ipv6': False },
# LG DACOM Corporation
{ 'url': ['https://bgp.he.net/AS3786#_prefixes', 'https://ipinfo.io/widget/demo/AS3786?dataset=asn', 'https://api.hackertarget.com/aslookup/?q=AS3786'], 'ipv4': True, 'ipv6': False },
],
}
# компилируем регулярку поиска ipv4 адреса
ipv4_find_str=re.compile(r"[^0-9.]?(25[0-5]|2[0-4][0-9]|1?[0-9][0-9]|[1-9])\.(25[0-5]|2[0-4][0-9]|1?[0-9][0-9]|[0-9])\.(25[0-5]|2[0-4][0-9]|1?[0-9][0-9]|[0-9])\.(25[0-5]|2[0-4][0-9]|1?[0-9][0-9]|[0-9])(/([0-9]{1}[0-9]*))?[^0-9.]?")
# метод сбора словаря ip адресов ipv4 из текста
def ipv4_find(strip:str, size:int):
"""
Метод сбора словаря ip адресов ipv4 из текста
возвращает словарь ip
где:
ключ - имя сети
значение - [адрес в int формате, размер сети]
"""
listip=dict()
for c in ipv4_find_str.finditer(strip):
ip:int=0
key:str=""
# 4 элемента кортежа, содержащие актеты адреса
for i in range(1, 5):
key+=str(c.group(i))+"."
ip = ip * 256 + int(c.group(i))
# элемент кортежа, содержащий размер сети
if c.group(6) and int(c.group(6))<=size:
listip[key[:-1]+"/"+str(c.group(6))]=[ip,int(c.group(6))]
# элемент кортежа, с размером сети, отсутствует
elif size==32:
listip[f"{key[:-1]}/32"]=[ip,32]
return listip
# метод сбора словаря ip адресов ipv4 из текста
def ipv6_find(strip:str, size:int):
"""
Метод сбора словаря ip адресов ipv4 из текста
возвращает словарь ip
где:
ключ - имя сети
значение - [адрес в int формате, размер сети]
"""
return dict()
# метод получения списка ip адресов
def list_ip(c_dict: dict = []):
"""
Метод получения списка ip адресов
возвращает кортеж из 2-х списков: ipv4 и ipv6
"""
try:
ipv4_list=dict()
ipv6_list=dict()
# пробегаем словарь выгрузки
for c_list in c_dict:
# какие типы обрабытываем, от какого размера
ipv4 = False if 'ipv4' not in list(c_list) or not c_list['ipv4'] else (c_list['ipv4'] if type(c_list['ipv4']) is int else 24)
ipv6 = False if 'ipv6' not in list(c_list) or not c_list['ipv6'] else (c_list['ipv6'] if type(c_list['ipv6']) is int else 32)
# если есть источник ссылка
if 'url' in list(c_list):
# бежим весь список ссылок пока не код 200
for c_url in c_list['url']:
if (result:=requests.get(c_url)) and result.status_code == 200 and result.text:
print(f"URL: {c_url}")
# пополняем словарь ipv4_list
if ipv4: ipv4_list.update(ipv4_find(result.text,ipv4))
# пополняем словарь ipv6_list
if ipv6: ipv6_list.update(ipv6_find(result.text,ipv6))
break
print("Ошибка соединения")
# если есть статичные записи ipv4
if ipv4 and 'static4' in list(c_list):
print(f"STATIC: IPv4")
# пополняем словарь ipv4_list
ipv4_list.update(ipv4_find(str(c_list['static4']),ipv4))
# если есть статичные записи ipv6
if ipv6 and 'static6' in list(c_list):
print(f"STATIC: IPv6")
# пополняем словарь ipv6_list
ipv6_list.update(ipv6_find(str(c_list['static6']),ipv6))
# сжимаем подсети ipv4
if ipv4_list:
# строим дерево
Root = net_tree.Node(net_tree.Net(0,0), 0)
# пробегаем в цикле
for c in ipv4_list.values():
# добавляем запись в дерево
Root.addSubnet(net_tree.Node(net_tree.Net(c[0], c[1]), 1))
Root.finishTreeFirst()
# жесткое сжатие в размер 30000 записей
#Root.collapseRoot(Root.real_ip_records_count - 30000)
# более мягкое сжатие
Root.collapse(1,Root.real_ip_records_count)
# возвращаем результат
ipv4_list:str=Root.returnCollapsedTree('route {addr}/{masklen} blackhole;')
else:
ipv4_list:bool=False
# сжимаем подсети ipv6
if ipv6_list:
None
else:
ipv6_list:bool=False
# возвращаем 2 списка маршрутов
return ipv4_list, ipv6_list
except Exception as e:
# исключение
print(f"Ошибка: {e}")
return False, False
# главная фукция
if __name__ == "__main__":
# создаем дерриктори. для сохранения
outdir=os.path.join(os.path.dirname(os.path.realpath(__file__)), 'unloading')
if not os.path.exists(outdir):
os.makedirs(outdir,exist_ok=True)
# обходим массив списков для выкрузки
for clist in ip_list:
# имена выходых файлов
ipv4_out_file=f"{outdir}/{clist.lower()}_v4.txt"
ipv6_out_file=f"{outdir}/{clist.lower()}_v6.txt"
# вычисляем кол-во записей прошлой выгрузки
ipv4_count_old = sum(1 for line in open(ipv4_out_file)) if os.path.isfile(ipv4_out_file) else 0
ipv6_count_old = sum(1 for line in open(ipv6_out_file)) if os.path.isfile(ipv6_out_file) else 0
# выполняем выгрузку
print(f"Выгружаю список IP: {clist}")
ipv4_list, ipv6_list=list_ip(ip_list[clist])
# сохраняем ipv4
if ipv4_list and len(ipv4_list.splitlines()) >= ipv4_count_old * 0.7:
# сохраняем в файл
with open(ipv4_out_file, "w") as file:
file.write(ipv4_list)
print(f"Файл {ipv4_out_file} сохранён")
# сохраняем ipv6
if ipv6_list and len(ipv6_list.splitlines()) >= ipv6_count_old * 0.7:
# сохраняем в файл
with open(ipv6_out_file, "w") as file:
file.write(ipv6_list)
print(f"Файл {ipv6_out_file} сохранён")
print("")

213
net_tree.py Normal file
View File

@@ -0,0 +1,213 @@
BIG_MASK = (1 << 32) - 1
def getMaskByMaskSize(mask_size):
return BIG_MASK ^ ((1 << (32 - mask_size)) - 1)
def getIpVolumeByMaskSize(mask_size):
return 1 << (32 - mask_size)
class Net:
__slots__ = ['mask_size', 'net', 'mask', 'ip_volume']
def __init__(self, net: int, mask_size: int):
self.mask_size = mask_size
self.net = net & getMaskByMaskSize(mask_size)
self.mask = getMaskByMaskSize(self.mask_size)
self.ip_volume = getIpVolumeByMaskSize(mask_size)
def hasSubnet(self, Net: 'Net'):
if Net.mask_size <= self.mask_size: return 0
return self.net == Net.net & self.mask
def isSameNet(self, Net: 'Net'):
return (Net.mask_size == self.mask_size) and (Net.net == self.net)
def getCommonNet(self, OtherNet: 'Net', min_mask_size: int):
if self.mask_size <= min_mask_size: return 0
if OtherNet.mask_size <= min_mask_size: return 0
for mask_size in range(min(self.mask_size, OtherNet.mask_size) - 1, min_mask_size - 1, -1):
mask = getMaskByMaskSize(mask_size)
if (self.net & mask) == (OtherNet.net & mask):
return Net(self.net, mask_size)
return 0
def getAsString(self, fmt='{addr}/{masklen}'):
net = self.net
mask = self.mask
addrbytes = []
maskbytes = []
for i in range(4):
addrbytes.append(str(net % 256))
maskbytes.append(str(mask % 256))
net = net >> 8
mask = mask >> 8
return fmt.format(addr='.'.join(reversed(addrbytes)), mask='.'.join(reversed(maskbytes)), masklen=self.mask_size)
class Node:
__slots__ = ['net', 'child1', 'child2', 'is_real_net', 'real_ip_volume', 'real_ip_records_count', 'weight', 'max_child_weight', 'added_fake_ip_volume']
def __init__(self, net: Net, is_real_net: int):
self.net = net
self.child1 = None
self.child2 = None
self.is_real_net = is_real_net
self.real_ip_volume = 0
self.real_ip_records_count = 0
self.weight = 0.0
self.max_child_weight = 0.0
self.added_fake_ip_volume = 0
def getNet(self):
return self.net
def addSubnet(self, NewNode: 'Node'):
if self.net.isSameNet(NewNode.net):
if not self.is_real_net and NewNode.is_real_net:
self.is_real_net = 1
self.child1 = None
self.child2 = None
return 1
if self.is_real_net and self.net.hasSubnet(NewNode.net):
return 1
if not self.net.hasSubnet(NewNode.net):
return 0
for Child in (self.child1, self.child2):
if Child and Child.addSubnet(NewNode):
return 1
if self.child1:
CommonNet = self.child1.net.getCommonNet(NewNode.net, self.net.mask_size + 1)
if CommonNet:
CommonNode = Node(CommonNet, 0)
CommonNode.addSubnet(NewNode)
CommonNode.addSubnet(self.child1)
self.child1 = CommonNode
return 1
if self.child2:
CommonNet = self.child2.net.getCommonNet(NewNode.net, self.net.mask_size + 1)
if CommonNet:
CommonNode = Node(CommonNet, 0)
CommonNode.addSubnet(NewNode)
CommonNode.addSubnet(self.child2)
self.child2 = CommonNode
return 1
if not self.child1:
self.child1 = NewNode
else:
self.child2 = NewNode
return 1
def printTree(self, level):
prefix = ''
for i in range(level):
prefix = prefix + ' '
if self.is_real_net: sign = '*'
elif self.weight == 0: sign = '.'
else: sign = ''
print(prefix + self.net.getAsString() + ' ' + str(self.real_ip_records_count))
if self.child1:
self.child1.printTree(level + 1)
if self.child2:
self.child2.printTree(level + 1)
def finishTreeFirst(self):
if self.is_real_net:
self.real_ip_volume = self.net.ip_volume
self.real_ip_records_count = 1
self.weight = 0
self.max_child_weight = 0
else:
self.real_ip_volume = 0
self.real_ip_records_count = 0
self.max_child_weight = 0
for Child in (self.child1, self.child2):
if Child:
Child.finishTreeFirst()
self.real_ip_volume += Child.real_ip_volume
self.real_ip_records_count += Child.real_ip_records_count
self.max_child_weight = max(self.max_child_weight, Child.weight, Child.max_child_weight)
self.recalcWeight()
def collapse(self, min_weight, max_net_delta):
# trying to collapse self
if self.weight >= min_weight:
self.weight = 0
self.max_child_weight = 0
delta = (self.net.ip_volume - self.real_ip_volume) - self.added_fake_ip_volume
self.added_fake_ip_volume = self.net.ip_volume - self.real_ip_volume
return self.real_ip_records_count - 1, delta
net_delta = 0
fake_ip_delta = 0
self.max_child_weight = 0
for Child in (self.child1, self.child2):
if Child:
if net_delta < max_net_delta and min_weight <= max(Child.weight, Child.max_child_weight):
child_net_delta, child_fake_ip_count = Child.collapse(min_weight, max_net_delta - net_delta)
net_delta += child_net_delta
fake_ip_delta += child_fake_ip_count
self.max_child_weight = max(self.max_child_weight, Child.weight, Child.max_child_weight)
if net_delta > 0:
self.added_fake_ip_volume += fake_ip_delta
self.real_ip_records_count -= net_delta
self.recalcWeight()
# trying to collapse self
if self.weight >= min_weight:
self.weight = 0
self.max_child_weight = 0
delta = (self.net.ip_volume - self.real_ip_volume) - (self.added_fake_ip_volume - fake_ip_delta)
self.added_fake_ip_volume = self.net.ip_volume - self.real_ip_volume
return self.real_ip_records_count - 1, delta
else:
return net_delta, fake_ip_delta
def collapseRoot(self, required_net_delta):
while required_net_delta > 0:
delta, fake_ip_volume = self.collapse(self.max_child_weight, required_net_delta)
required_net_delta -= delta
def printCollapsedTree(self, fmt='{addr}/{masklen}'):
if self.is_real_net or self.weight == 0:
print(self.net.getAsString(fmt))
else:
for Child in (self.child1, self.child2):
if Child:
Child.printCollapsedTree(fmt)
def returnCollapsedTree(self, fmt='{addr}/{masklen}'):
if self.is_real_net or self.weight == 0:
return self.net.getAsString(fmt) + "\n"
else:
res = ""
for Child in (self.child1, self.child2):
if Child:
res += Child.returnCollapsedTree(fmt)
return res
def recalcWeight(self):
fake_ip_delta = self.net.ip_volume - self.real_ip_volume - self.added_fake_ip_volume
if fake_ip_delta:
self.weight = (self.real_ip_records_count - 1) / fake_ip_delta
else:
self.weight = float('Inf')
def getNotRealIpCount(self):
if self.is_real_net: return 0
if self.weight == 0: return self.net.ip_volume - self.real_ip_volume
res = 0
for Child in (self.child1, self.child2):
if Child:
res = res + Child.getNotRealIpCount()
return res