Добавлен параметр конфигурации "consider", отвечающий за фильрацию маршрутов по перечисленным спискам. Мелкие исправления.

This commit is contained in:
2025-12-03 19:01:01 +10:00
parent 501f556050
commit 1d1ec425bf
2 changed files with 58 additions and 23 deletions

View File

@@ -41,7 +41,7 @@ def ipv4_find(strip:str, size:int):
# метод сбора словаря ip адресов ipv6 из текста
def ipv6_find(strip:str, size:int):
"""
Метод сбора словаря ip адресов ipv4 из текста
Метод сбора словаря ip адресов ipv6 из текста
возвращает словарь ip
где:
ключ - имя сети
@@ -53,7 +53,7 @@ def ipv6_find(strip:str, size:int):
prefix_str = c.group(2)
# определяем префикс
if (prefix:=int(prefix_str) if prefix_str else 128) > size: continue
# проверка корректности IPv4
# проверка корректности IPv6
try:
ipv6_obj=ipaddress.IPv6Address(ip_str)
except:
@@ -170,7 +170,7 @@ def list_ip(c_list: list = []):
if ipv6_list:
# строим дерево
Root = net_tree.Node(net_tree.Net(1 << 127, 0, 6))
# добавляем IPv4 подсети
# добавляем IPv6 подсети
for ip_int, mask in sorted(ipv6_list.values(), key=lambda x: x[0]):
Root.insert(net_tree.Net(ip_int, mask, 6))
# считаем статистику
@@ -188,6 +188,32 @@ def list_ip(c_list: list = []):
print(f"Ошибка: {e}")
return False, False
# метод анализа элементов списка (аргументов)
def right_list(ip_list: list, args_list: list=[]):
"""
Метод анализа элементов списка выгрузок, возвращает актульный список выгрузок,
основываясь на списке аргументов, переданных вторым параметром
"""
# собираем словарь аргументов
# элементы списка начинающиеся с "-"
# попадают в off, остальные в on
c_dict = defaultdict(list)
for c in args_list:
key = "off" if c[0] == "-" else "on"
value = c[1:].upper() if c[0] == "-" else c.upper()
c_dict[key].append(value)
c_dict = dict(c_dict)
# если словарь аргументов не пустой
if c_dict:
# пробегаем список выгрузок
for c in ip_list[:]:
# пропускаем
if len(c_dict) == 2 and c in c_dict["on"] and not c in c_dict["off"]: continue
if len(c_dict) == 1 and (("on" in c_dict and c in c_dict["on"]) or ("off" in c_dict and not c in c_dict["off"])): continue
# удаляем элемент из списка
ip_list = list(filter(lambda x: x != c, ip_list))
return ip_list
# главная фукция
if __name__ == "__main__":
# словарь выгружаемых списков
@@ -230,13 +256,8 @@ if __name__ == "__main__":
# cловари для группировки
ipv4_dict=dict()
ipv6_dict=dict()
# словарь аргументов вызова
arg_dict = defaultdict(list)
for c in sys.argv[1:]:
key = "off" if c[0] == "-" else "on"
value = c[1:].upper() if c[0] == "-" else c.upper()
arg_dict[key].append(value)
arg_dict = dict(arg_dict)
# список того, что будем выгружать/обновлять
download_ip_list=right_list(list(ip_list.keys()), sys.argv[1:])
# создаем дерриктори. для сохранения
outdir=os.path.join(os.path.dirname(os.path.realpath(__file__)), 'unloading')
if not os.path.exists(outdir):
@@ -246,18 +267,15 @@ if __name__ == "__main__":
open(ipv6_bird2_m4 := f"{outdir}/bird2_v6.m4.tmp", "w").close()
# удаляем старые файлы группировок
[os.remove(path) for f in os.listdir(outdir) if "__" in f and os.path.isfile(path := os.path.join(outdir, f))]
# обходим массив списков для выкрузки
# обходим массив списков для выгрузки
for clist, value in ip_list.items():
# имена выходых файлов
ipv4_out_file=f"{outdir}/{clist.lower()}_v4.txt"
ipv6_out_file=f"{outdir}/{clist.lower()}_v6.txt"
# извлекаем community
community=[c for c in value[0].get('community', "").split(",") if c]
# если передан аргумент(ы) запуска,
# значит пытаемся обновить только указанные списки
if not arg_dict or \
(len(arg_dict) == 2 and clist in arg_dict["on"] and not clist in arg_dict["off"]) or \
(len(arg_dict) == 1 and (("on" in arg_dict and clist in arg_dict["on"]) or ("off" in arg_dict and not clist in arg_dict["off"]))):
# обновляем только указанные списки
if clist in download_ip_list:
print("")
# вычисляем кол-во записей прошлой выгрузки
ipv4_count_old = sum(1 for line in open(ipv4_out_file)) if os.path.isfile(ipv4_out_file) else 0
@@ -323,19 +341,35 @@ if __name__ == "__main__":
# список комьюнити маршрутов
bgp_community=" ".join([f"bgp_community.add(({str(c).replace(':',',')}));" for c in sorted([c for c in value[0].get('community', "").split(",") if c])])
if os.path.exists(ipv4_out_file):
# фильтер маршрутов ipv4, если список consider существует в конфигурации
ip_addresses_filter=list()
if (consider:=value[0].get('consider', [])):
for c in right_list(list(ip_list.keys()), consider):
if os.path.exists(f_open:=f"{outdir}/{c.lower()}_v4.txt"):
with open(f_open, "r") as file:
ip_addresses_filter.extend([line.strip().split()[1]+"+" for line in file])
with open(ipv4_bird2_m4, "a") as file:
file.write(f"protocol static static_{clist.lower()}_v4 {{\n\tipv4 {{ import filter {{ {bgp_community} accept; }}; }};\n\tinclude \"{ipv4_out_file}\";\n}}\n")
file.write(f"protocol static static_{clist.lower()}_v4 {{\n\tipv4 {{ import filter {{ {f'if net !~ [{','.join(ip_addresses_filter)}] then reject; ' if ip_addresses_filter else ''}{bgp_community} accept; }}; }};\n\tinclude \"{ipv4_out_file}\";\n}}\n")
if os.path.exists(ipv6_out_file):
# фильтер маршрутов ipv6, если список consider существует в конфигурации
ip_addresses_filter=list()
if (consider:=value[0].get('consider', [])):
for c in right_list(list(ip_list.keys()), consider):
if os.path.exists(f_open:=f"{outdir}/{c.lower()}_v6.txt"):
with open(f_open, "r") as file:
ip_addresses_filter.extend([line.strip().split()[1]+"+" for line in file])
with open(ipv6_bird2_m4, "a") as file:
file.write(f"protocol static static_{clist.lower()}_v6 {{\n\tipv4 {{ import filter {{ {bgp_community} accept; }}; }};\n\tinclude \"{ipv6_out_file}\";\n}}\n")
file.write(f"protocol static static_{clist.lower()}_v6 {{\n\tipv6 {{ import filter {{ {f'if net !~ [{','.join(ip_addresses_filter)}] then reject; ' if ip_addresses_filter else ''}{bgp_community} accept; }}; }};\n\tinclude \"{ipv6_out_file}\";\n}}\n")
# проверяем, что временный файл конфигурации ipv4 не пустой, сохраняем в постоянный
if os.path.exists(ipv4_bird2_m4) and os.path.getsize(ipv4_bird2_m4) != 0:
os.replace(ipv4_bird2_m4, ipv4_bird2_m4.removesuffix(".tmp"))
os.system("systemctl reload bird.service")
print(f"Новый файл конфигурации ipv4 применён")
os.system("systemctl reload bird.service >/dev/null 2>&1 || \
systemctl restart bird.service >/dev/null 2>&1 && \
echo 'Новый файл конфигурации ipv6 применён' || echo '\\e[31mBird2 error...\\e[0m'")
# проверяем, что временный файл конфигурации ipv6 не пустой, сохраняем в постоянный
if os.path.exists(ipv6_bird2_m4) and os.path.getsize(ipv6_bird2_m4) != 0:
os.replace(ipv6_bird2_m4, ipv6_bird2_m4.removesuffix(".tmp"))
os.system("systemctl reload bird.service")
print(f"Новый файл конфигурации ipv6 применён")
os.system("systemctl reload bird.service >/dev/null 2>&1 || \
systemctl restart bird.service >/dev/null 2>&1 && \
echo 'Новый файл конфигурации ipv6 применён' || echo '\\e[31mBird2 error...\\e[0m'")