Добавлена полная поддержка выгрузки ipv6 по аналогии с ipv4. По умолчанию ipv6 не выгружается, см. конфигурацию. Изменена структура списка выгрузки. Параметры конфигурации вынесены глобально для каждой выгрузки, со значениями по умолчанию и являются не обязательными.

This commit is contained in:
2025-11-21 17:44:10 +10:00
parent a4eee42ec6
commit 87bba5279a
3 changed files with 254 additions and 166 deletions

View File

@@ -5,12 +5,15 @@ import os
import sys
import ast
import requests
import ipaddress
from include import net_tree
from collections import defaultdict
from include.http_header import get_headers
# компилируем регулярку поиска ipv4 адреса
ipv4_find_str=re.compile(r"[^0-9.]?(25[0-5]|2[0-4][0-9]|1?[0-9][0-9]|[1-9])\.(25[0-5]|2[0-4][0-9]|1?[0-9][0-9]|[0-9])\.(25[0-5]|2[0-4][0-9]|1?[0-9][0-9]|[0-9])\.(25[0-5]|2[0-4][0-9]|1?[0-9][0-9]|[0-9])(/([0-9]{1}[0-9]*))?[^0-9.]?")
ipv4_find_str=re.compile(r"(?<![0-9.])(?!10\.|172\.(?:1[6-9]|2[0-9]|3[01])\.|192\.168\.)((?:25[0-5]|2[0-4][0-9]|1?[0-9][0-9]|[1-9])\.(?:25[0-5]|2[0-4][0-9]|1?[0-9][0-9]|[0-9])\.(?:25[0-5]|2[0-4][0-9]|1?[0-9][0-9]|[0-9])\.(?:25[0-5]|2[0-4][0-9]|1?[0-9][0-9]|[0-9]))(?:/(3[0-2]|[12][0-9]|[1-9]))?(?![0-9.])")
# компилируем регулярку поиска ipv6 адреса
ipv6_find_str=re.compile(r'(?<![0-9A-Fa-f:])([23][0-9A-Fa-f]{3}(?:(?::[0-9A-Fa-f]{1,4}){0,6}|(?:::[0-9A-Fa-f]{0,4})?)(?::[0-9A-Fa-f]{0,4})*)(?:/([1-9][0-9]?|1[01][0-9]|12[0-8]))?(?![0-9A-Fa-f:])')
# метод сбора словаря ip адресов ipv4 из текста
def ipv4_find(strip:str, size:int):
@@ -23,21 +26,19 @@ def ipv4_find(strip:str, size:int):
"""
listip=dict()
for c in ipv4_find_str.finditer(strip):
ip:int=0
key:str=""
# 4 элемента кортежа, содержащие актеты адреса
for i in range(1, 5):
key+=str(c.group(i))+"."
ip = ip * 256 + int(c.group(i))
# элемент кортежа, содержащий размер сети
if c.group(6) and int(c.group(6))<=size:
listip[key[:-1]+"/"+str(c.group(6))]=[ip,int(c.group(6))]
# элемент кортежа, с размером сети, отсутствует
elif size==32:
listip[f"{key[:-1]}/32"]=[ip,32]
ip_str = c.group(1)
prefix_str = c.group(2)
# определяем префикс
if (prefix:=int(prefix_str) if prefix_str else 32) > size: continue
# проверка корректности IPv4
try:
ipv4_obj=ipaddress.IPv4Address(ip_str)
except:
continue
listip[f"{ip_str}/{prefix}"] = [int(ipv4_obj), prefix]
return listip
# метод сбора словаря ip адресов ipv4 из текста
# метод сбора словаря ip адресов ipv6 из текста
def ipv6_find(strip:str, size:int):
"""
Метод сбора словаря ip адресов ipv4 из текста
@@ -46,7 +47,19 @@ def ipv6_find(strip:str, size:int):
ключ - имя сети
значение - [адрес в int формате, размер сети]
"""
return dict()
listip=dict()
for c in ipv6_find_str.finditer(strip):
ip_str = c.group(1)
prefix_str = c.group(2)
# определяем префикс
if (prefix:=int(prefix_str) if prefix_str else 128) > size: continue
# проверка корректности IPv4
try:
ipv6_obj=ipaddress.IPv6Address(ip_str)
except:
continue
listip[f"{ip_str}/{prefix}"] = [int(ipv6_obj), prefix]
return listip
# метод группировки словаря
def get_dict_groups(input:dict):
@@ -56,13 +69,16 @@ def get_dict_groups(input:dict):
возвращает словарь сгрупированных данных
данные встречающиеся один раз, не попадают в вывод
"""
# если словарь короче 2х
if len(input) < 2: return {}
# строим битовые маски
name_to_bit = {name: 1 << i for i, name in enumerate(ipv4_dict.keys())}
name_to_bit = {name: 1 << i for i, name in enumerate(input.keys())}
line_communities = defaultdict(set)
line_mask = defaultdict(int)
# походим по строкам и назначаем маску
for name, (communities, lines) in ipv4_dict.items():
for name, (communities, lines) in input.items():
bit = name_to_bit[name]
for line in lines:
line_mask[line] |= bit
@@ -84,10 +100,10 @@ def get_dict_groups(input:dict):
bit_to_names[mask] = list_name
# возвращаем словарь сгрупированных данных
return { bit_to_names[mask]: [ groups_communities[mask], set(groups[mask]) ] for mask in groups if "__" in bit_to_names[mask] }
return { bit_to_names[mask]: [ groups_communities[mask], set(groups[mask]) ] for mask in groups if "__" in bit_to_names[mask] and groups_communities[mask] and groups[mask] }
# метод получения списка ip адресов
def list_ip(c_dict: dict = [], compress: bool = True):
def list_ip(c_list: list = []):
"""
Метод получения списка ip адресов
возвращает кортеж из 2-х списков: ipv4 и ipv6
@@ -95,15 +111,22 @@ def list_ip(c_dict: dict = [], compress: bool = True):
try:
ipv4_list=dict()
ipv6_list=dict()
# определяем, будем сжимать или нет
compress=c_list[0].get('compress', True)
# какие типы обрабытываем, от какого размера
# по умолчанию:
# ipv4 обрабатываем (<=24)
# ipv6 игнорируем (<=64)
# какие типы обрабытываем, от какого размера
ipv4 = False if not (ipv4:=c_list[0].get('ipv4', True)) else (ipv4 if type(ipv4) is int else 24)
ipv6 = False if not (ipv6:=c_list[0].get('ipv6', False)) else (ipv6 if type(ipv6) is int else 64)
# пробегаем словарь выгрузки
for c_list in c_dict:
# какие типы обрабытываем, от какого размера
ipv4 = False if 'ipv4' not in list(c_list) or not c_list['ipv4'] else (c_list['ipv4'] if type(c_list['ipv4']) is int else 24)
ipv6 = False if 'ipv6' not in list(c_list) or not c_list['ipv6'] else (c_list['ipv6'] if type(c_list['ipv6']) is int else 32)
for c_dict in c_list:
# если есть источник ссылка
if 'url' in list(c_list):
if 'url' in list(c_dict):
# бежим весь список ссылок пока не код 200
for c_url in c_list['url']:
for c_url in c_dict['url']:
try:
session = requests.Session()
session.headers.update(get_headers())
@@ -117,15 +140,15 @@ def list_ip(c_dict: dict = [], compress: bool = True):
except requests.exceptions.MissingSchema: pass
print("Ошибка соединения")
# если есть статичные записи ipv4
if ipv4 and 'static4' in list(c_list):
if ipv4 and 'static4' in list(c_dict):
print(f"STATIC: IPv4")
# пополняем словарь ipv4_list
ipv4_list.update(ipv4_find(str(c_list['static4']),ipv4))
ipv4_list.update(ipv4_find(str(c_dict['static4']),ipv4))
# если есть статичные записи ipv6
if ipv6 and 'static6' in list(c_list):
if ipv6 and 'static6' in list(c_dict):
print(f"STATIC: IPv6")
# пополняем словарь ipv6_list
ipv6_list.update(ipv6_find(str(c_list['static6']),ipv6))
ipv6_list.update(ipv6_find(str(c_dict['static6']),ipv6))
# если ключ не сжимать
if ipv4_list and not compress:
@@ -133,7 +156,7 @@ def list_ip(c_dict: dict = [], compress: bool = True):
# сжимаем подсети ipv4
elif ipv4_list and compress:
# строим дерево
Root = net_tree.Node(net_tree.Net(0,0), 0)
Root = net_tree.Node(net_tree.Net(0, 0), 0)
# пробегаем в цикле
for c in ipv4_list.values():
# добавляем запись в дерево
@@ -152,8 +175,20 @@ def list_ip(c_dict: dict = [], compress: bool = True):
if ipv6_list and not compress:
ipv6_list:str="".join([f"route {k} blackhole;\n" for k, v in ipv6_list.items() if isinstance(v, list)])
# сжимаем подсети ipv6
if ipv6_list and compress:
None
elif ipv6_list and compress:
# строим дерево
Root = net_tree.Node(net_tree.Net(1 << 127, 0), 0)
# пробегаем в цикле
for c in ipv6_list.values():
# добавляем запись в дерево
Root.addSubnet(net_tree.Node(net_tree.Net(c[0], c[1]), 1))
Root.finishTreeFirst()
# жесткое сжатие в размер 30000 записей
#Root.collapseRoot(Root.real_ip_records_count - 30000)
# более мягкое сжатие
Root.collapse(1,Root.real_ip_records_count)
# возвращаем результат
ipv6_list:str=Root.returnCollapsedTree('route {addr}/{masklen} blackhole;')
else:
ipv6_list:bool=False
@@ -216,12 +251,12 @@ if __name__ == "__main__":
# удаляем старые файлы группировок
[os.remove(path) for f in os.listdir(outdir) if "__" in f and os.path.isfile(path := os.path.join(outdir, f))]
# обходим массив списков для выкрузки
for clist in ip_list:
for clist, value in ip_list.items():
# имена выходых файлов
ipv4_out_file=f"{outdir}/{clist.lower()}_v4.txt"
ipv6_out_file=f"{outdir}/{clist.lower()}_v6.txt"
# преобразуем запись комьюнити в список
ip_list[clist]['community']=list(ip_list[clist]['community'].split(","))
# извлекаем community
community=[c for c in value[0].get('community', "").split(",") if c]
# если передан аргумент(ы) запуска,
# значит пытаемся обновить только указанные списки
if len(sys.argv)==1 or clist in sys.argv[1:]:
@@ -231,7 +266,7 @@ if __name__ == "__main__":
ipv6_count_old = sum(1 for line in open(ipv6_out_file)) if os.path.isfile(ipv6_out_file) else 0
# выполняем выгрузку
print(f"Выгружаю список IP: {clist}")
ipv4_list, ipv6_list=list_ip(ip_list[clist]['list'],ip_list[clist].get('compress', True))
ipv4_list, ipv6_list=list_ip(value)
# сохраняем ipv4
if ipv4_list and len(ipv4_list.splitlines()) >= ipv4_count_old * 0.5:
# сохраняем в файл
@@ -247,11 +282,11 @@ if __name__ == "__main__":
# открываем файл выгрузки и пополняем словарь для группировки ipv4
if os.path.exists(ipv4_out_file):
with open(ipv4_out_file, "r") as file:
ipv4_dict[clist] = [ip_list[clist]['community'],list(file.readlines())]
ipv4_dict[clist] = [community,list(file.readlines())]
# открываем файл выгрузки и пополняем словарь для группировки ipv6
if os.path.exists(ipv6_out_file):
with open(ipv6_out_file, "r") as file:
ipv6_dict[clist] = [ip_list[clist]['community'],list(file.readlines())]
ipv6_dict[clist] = [community,list(file.readlines())]
# обновляем временный файл конфигурации ipv4
# из группировок
@@ -266,7 +301,7 @@ if __name__ == "__main__":
# список комьюнити маршрутов
bgp_community=" ".join([f"bgp_community.add(({str(c).replace(':',',')}));" for c in sorted(v[0])])
with open(ipv4_bird2_m4, "a") as file:
file.write(f"protocol static static_{k.lower()}_v4 {{\n\tipv4 {{ import filter {{ {bgp_community} accept; }}; }};\n\tinclude \"{ipv4_out_file}\";\n}}\n")
file.write(f"protocol static static_{k.lower()}_v4 {{\n\tipv4 {{ import filter {{ {bgp_community}accept; }}; }};\n\tinclude \"{ipv4_out_file}\";\n}}\n")
# обновляем временный файл конфигурации ipv6
# из группировок
if ipv6_dict:
@@ -280,21 +315,21 @@ if __name__ == "__main__":
# список комьюнити маршрутов
bgp_community=" ".join([f"bgp_community.add(({str(c).replace(':',',')}));" for c in sorted(v[0])])
with open(ipv6_bird2_m4, "a") as file:
file.write(f"protocol static static_{k.lower()}_v6 {{\n\tipv6 {{ import filter {{ {bgp_community} accept; }}; }};\n\tinclude \"{ipv6_out_file}\";\n}}\n")
file.write(f"protocol static static_{k.lower()}_v6 {{\n\tipv6 {{ import filter {{ {bgp_community}accept; }}; }};\n\tinclude \"{ipv6_out_file}\";\n}}\n")
# дополняем временный файл конфигурации всей выгрузкой ipv4 и ipv6
for clist in ip_list:
for clist, value in ip_list.items():
# имена выходых файлов
ipv4_out_file=f"{outdir}/{clist.lower()}_v4.txt"
ipv6_out_file=f"{outdir}/{clist.lower()}_v6.txt"
# список комьюнити маршрутов
bgp_community=" ".join([f"bgp_community.add(({str(c).replace(':',',')}));" for c in sorted(ip_list[clist]['community'])])
# список комьюнити маршрутов
bgp_community=" ".join([f"bgp_community.add(({str(c).replace(':',',')}));" for c in sorted([c for c in value[0].get('community', "").split(",") if c])])
if os.path.exists(ipv4_out_file):
with open(ipv4_bird2_m4, "a") as file:
file.write(f"protocol static static_{clist.lower()}_v4 {{\n\tipv4 {{ import filter {{ {bgp_community} accept; }}; }};\n\tinclude \"{ipv4_out_file}\";\n}}\n")
file.write(f"protocol static static_{clist.lower()}_v4 {{\n\tipv4 {{ import filter {{ {bgp_community}accept; }}; }};\n\tinclude \"{ipv4_out_file}\";\n}}\n")
if os.path.exists(ipv6_out_file):
with open(ipv6_bird2_m4, "a") as file:
file.write(f"protocol static static_{clist.lower()}_v6 {{\n\tipv4 {{ import filter {{ {bgp_community} accept; }}; }};\n\tinclude \"{ipv6_out_file}\";\n}}\n")
file.write(f"protocol static static_{clist.lower()}_v6 {{\n\tipv4 {{ import filter {{ {bgp_community}accept; }}; }};\n\tinclude \"{ipv6_out_file}\";\n}}\n")
# проверяем, что временный файл конфигурации ipv4 не пустой, сохраняем в постоянный
if os.path.exists(ipv4_bird2_m4) and os.path.getsize(ipv4_bird2_m4) != 0: