#!/usr/bin/python3 import re import os import sys import ast import requests from include import net_tree from collections import defaultdict from include.http_header import get_headers # компилируем регулярку поиска ipv4 адреса ipv4_find_str=re.compile(r"[^0-9.]?(25[0-5]|2[0-4][0-9]|1?[0-9][0-9]|[1-9])\.(25[0-5]|2[0-4][0-9]|1?[0-9][0-9]|[0-9])\.(25[0-5]|2[0-4][0-9]|1?[0-9][0-9]|[0-9])\.(25[0-5]|2[0-4][0-9]|1?[0-9][0-9]|[0-9])(/([0-9]{1}[0-9]*))?[^0-9.]?") # метод сбора словаря ip адресов ipv4 из текста def ipv4_find(strip:str, size:int): """ Метод сбора словаря ip адресов ipv4 из текста возвращает словарь ip где: ключ - имя сети значение - [адрес в int формате, размер сети] """ listip=dict() for c in ipv4_find_str.finditer(strip): ip:int=0 key:str="" # 4 элемента кортежа, содержащие актеты адреса for i in range(1, 5): key+=str(c.group(i))+"." ip = ip * 256 + int(c.group(i)) # элемент кортежа, содержащий размер сети if c.group(6) and int(c.group(6))<=size: listip[key[:-1]+"/"+str(c.group(6))]=[ip,int(c.group(6))] # элемент кортежа, с размером сети, отсутствует elif size==32: listip[f"{key[:-1]}/32"]=[ip,32] return listip # метод сбора словаря ip адресов ipv4 из текста def ipv6_find(strip:str, size:int): """ Метод сбора словаря ip адресов ipv4 из текста возвращает словарь ip где: ключ - имя сети значение - [адрес в int формате, размер сети] """ return dict() # метод группировки словаря def get_dict_groups(input:dict): """ Метод получения сгрупированных данных community и ip адресов возвращает словарь сгрупированных данных данные встречающиеся один раз, не попадают в вывод """ # строим битовые маски name_to_bit = {name: 1 << i for i, name in enumerate(ipv4_dict.keys())} line_communities = defaultdict(set) line_mask = defaultdict(int) # походим по строкам и назначаем маску for name, (communities, lines) in ipv4_dict.items(): bit = name_to_bit[name] for line in lines: line_mask[line] |= bit # добавляем комьюнити этого списка к конкретной строке line_communities[line].update(communities) # группируем строки по маске groups = defaultdict(list) groups_communities = defaultdict(set) for line, mask in line_mask.items(): groups[mask].append(line) groups_communities[mask].update(line_communities[line]) # конвертируем маску в имена bit_to_names = {} for mask in groups.keys(): names = [name for name, bit in name_to_bit.items() if mask & bit] list_name = "__".join(names) bit_to_names[mask] = list_name # возвращаем словарь сгрупированных данных return { bit_to_names[mask]: [ groups_communities[mask], set(groups[mask]) ] for mask in groups if "__" in bit_to_names[mask] } # метод получения списка ip адресов def list_ip(c_dict: dict = [], compress: bool = True): """ Метод получения списка ip адресов возвращает кортеж из 2-х списков: ipv4 и ipv6 """ try: ipv4_list=dict() ipv6_list=dict() # пробегаем словарь выгрузки for c_list in c_dict: # какие типы обрабытываем, от какого размера ipv4 = False if 'ipv4' not in list(c_list) or not c_list['ipv4'] else (c_list['ipv4'] if type(c_list['ipv4']) is int else 24) ipv6 = False if 'ipv6' not in list(c_list) or not c_list['ipv6'] else (c_list['ipv6'] if type(c_list['ipv6']) is int else 32) # если есть источник ссылка if 'url' in list(c_list): # бежим весь список ссылок пока не код 200 for c_url in c_list['url']: try: session = requests.Session() session.headers.update(get_headers()) if (result:=session.get(c_url)) and result.status_code == 200 and result.text: print(f"URL: {c_url}") # пополняем словарь ipv4_list if ipv4: ipv4_list.update(ipv4_find(result.text,ipv4)) # пополняем словарь ipv6_list if ipv6: ipv6_list.update(ipv6_find(result.text,ipv6)) break except requests.exceptions.MissingSchema: pass print("Ошибка соединения") # если есть статичные записи ipv4 if ipv4 and 'static4' in list(c_list): print(f"STATIC: IPv4") # пополняем словарь ipv4_list ipv4_list.update(ipv4_find(str(c_list['static4']),ipv4)) # если есть статичные записи ipv6 if ipv6 and 'static6' in list(c_list): print(f"STATIC: IPv6") # пополняем словарь ipv6_list ipv6_list.update(ipv6_find(str(c_list['static6']),ipv6)) # если ключ не сжимать if ipv4_list and not compress: ipv4_list:str="\n".join([f"route {k} blackhole;" for k, v in ipv4_list.items() if isinstance(v, list)]) # сжимаем подсети ipv4 elif ipv4_list and compress: # строим дерево Root = net_tree.Node(net_tree.Net(0,0), 0) # пробегаем в цикле for c in ipv4_list.values(): # добавляем запись в дерево Root.addSubnet(net_tree.Node(net_tree.Net(c[0], c[1]), 1)) Root.finishTreeFirst() # жесткое сжатие в размер 30000 записей #Root.collapseRoot(Root.real_ip_records_count - 30000) # более мягкое сжатие Root.collapse(1,Root.real_ip_records_count) # возвращаем результат ipv4_list:str=Root.returnCollapsedTree('route {addr}/{masklen} blackhole;') else: ipv4_list:bool=False # если ключ не сжимать if ipv6_list and not compress: ipv6_list:str="\n".join([f"route {k} blackhole;" for k, v in ipv6_list.items() if isinstance(v, list)]) # сжимаем подсети ipv6 if ipv6_list and compress: None else: ipv6_list:bool=False # возвращаем 2 списка маршрутов return ipv4_list, ipv6_list except Exception as e: # исключение print(f"Ошибка: {e}") return False, False # главная фукция if __name__ == "__main__": # словарь выгружаемых списков ip_list = dict() try: # если файл list содержет json структуру, парсим его with open(list_file:=os.path.join(os.path.dirname(os.path.realpath(__file__)), 'list'), "r") as file: ip_list = ast.literal_eval(file.read()) print(f"Список выгрузки из файла: {list_file}") except (ValueError, SyntaxError): try: # если файл list ссылка, загружаем и парсим его with open(list_file, "r") as file: session = requests.Session() session.headers.update(get_headers()) if (result:=session.get(url_list_file:=file.readline().strip())) and result.status_code == 200 and result.text: ip_list = ast.literal_eval(result.text) print(f"Список выгрузки по url: {url_list_file}") except requests.exceptions.MissingSchema: print(f"Невалидный URL на список выгрузки", file=sys.stderr) sys.exit(1) except (ValueError, SyntaxError): print(f"Ошибочная структура json", file=sys.stderr) sys.exit(1) except Exception as e: print(f"Ошибка: {e}", file=sys.stderr) sys.exit(1) except FileNotFoundError: print(f"Файл со списками не найден", file=sys.stderr) sys.exit(1) except Exception as e: print(f"Ошибка: {e}", file=sys.stderr) sys.exit(1) # проверяем на пустой список выгрузки if (not ip_list): print(f"Список выгрузки пустой", file=sys.stderr) sys.exit(1) # cловари для группировки ipv4_dict=dict() ipv6_dict=dict() # создаем дерриктори. для сохранения outdir=os.path.join(os.path.dirname(os.path.realpath(__file__)), 'unloading') if not os.path.exists(outdir): os.makedirs(outdir,exist_ok=True) # создаём временный файл экспортируемой конфигурации, перед выгрузкой open(ipv4_bird2_m4 := f"{outdir}/bird2_v4.m4.tmp", "w").close() open(ipv6_bird2_m4 := f"{outdir}/bird2_v6.m4.tmp", "w").close() # удаляем старые файлы группировок [os.remove(path) for f in os.listdir(outdir) if "__" in f and os.path.isfile(path := os.path.join(outdir, f))] # обходим массив списков для выкрузки for clist in ip_list: # имена выходых файлов ipv4_out_file=f"{outdir}/{clist.lower()}_v4.txt" ipv6_out_file=f"{outdir}/{clist.lower()}_v6.txt" # преобразуем запись комьюнити в список ip_list[clist]['community']=list(ip_list[clist]['community'].split(",")) # если передан аргумент(ы) запуска, # значит пытаемся обновить только указанные списки if len(sys.argv)==1 or clist in sys.argv[1:]: print("") # вычисляем кол-во записей прошлой выгрузки ipv4_count_old = sum(1 for line in open(ipv4_out_file)) if os.path.isfile(ipv4_out_file) else 0 ipv6_count_old = sum(1 for line in open(ipv6_out_file)) if os.path.isfile(ipv6_out_file) else 0 # выполняем выгрузку print(f"Выгружаю список IP: {clist}") ipv4_list, ipv6_list=list_ip(ip_list[clist]['list'],ip_list[clist].get('compress', True)) # сохраняем ipv4 if ipv4_list and len(ipv4_list.splitlines()) >= ipv4_count_old * 0.5: # сохраняем в файл with open(ipv4_out_file, "w") as file: file.write(ipv4_list) print(f"Файл выгрузки {ipv4_out_file} сохранён") # сохраняем ipv6 if ipv6_list and len(ipv6_list.splitlines()) >= ipv6_count_old * 0.5: # сохраняем в файл with open(ipv6_out_file, "w") as file: file.write(ipv6_list) print(f"Файл выгрузки {ipv6_out_file} сохранён") # открываем файл выгрузки и пополняем словарь для группировки ipv4 if os.path.exists(ipv4_out_file): with open(ipv4_out_file, "r") as file: ipv4_dict[clist] = [ip_list[clist]['community'],list(file.readlines())] # открываем файл выгрузки и пополняем словарь для группировки ipv6 if os.path.exists(ipv6_out_file): with open(ipv6_out_file, "r") as file: ipv6_dict[clist] = [ip_list[clist]['community'],list(file.readlines())] # обновляем временный файл конфигурации ipv4 # из группировок if ipv4_dict: for k,v in get_dict_groups(ipv4_dict).items(): # имена выходых файлов ipv4_out_file=f"{outdir}/{k.lower()}_v4.txt" # сохраняем в файл with open(ipv4_out_file, "w") as file: file.write("".join(v[1])) print(f"Файл группировки {ipv4_out_file} сохранён") # список комьюнити маршрутов bgp_community=" ".join([f"bgp_community.add(({str(c).replace(':',',')}));" for c in sorted(v[0])]) with open(ipv4_bird2_m4, "a") as file: file.write(f"protocol static static_{k.lower()}_v4 {{\n\tipv4 {{ import filter {{ {bgp_community} accept; }}; }};\n\tinclude \"{ipv4_out_file}\";\n}}\n") # обновляем временный файл конфигурации ipv6 # из группировок if ipv6_dict: for k,v in get_dict_groups(ipv6_dict).items(): # имена выходых файлов ipv6_out_file=f"{outdir}/{k.lower()}_v6.txt" # сохраняем в файл with open(ipv6_out_file, "w") as file: file.write("".join(v[1])) print(f"Файл группировки {ipv6_out_file} сохранён") # список комьюнити маршрутов bgp_community=" ".join([f"bgp_community.add(({str(c).replace(':',',')}));" for c in sorted(v[0])]) with open(ipv6_bird2_m4, "a") as file: file.write(f"protocol static static_{k.lower()}_v6 {{\n\tipv6 {{ import filter {{ {bgp_community} accept; }}; }};\n\tinclude \"{ipv6_out_file}\";\n}}\n") # дополняем временный файл конфигурации всей выгрузкой ipv4 и ipv6 for clist in ip_list: # имена выходых файлов ipv4_out_file=f"{outdir}/{clist.lower()}_v4.txt" ipv6_out_file=f"{outdir}/{clist.lower()}_v6.txt" # список комьюнити маршрутов bgp_community=" ".join([f"bgp_community.add(({str(c).replace(':',',')}));" for c in sorted(ip_list[clist]['community'])]) if os.path.exists(ipv4_out_file): with open(ipv4_bird2_m4, "a") as file: file.write(f"protocol static static_{clist.lower()}_v4 {{\n\tipv4 {{ import filter {{ {bgp_community} accept; }}; }};\n\tinclude \"{ipv4_out_file}\";\n}}\n") if os.path.exists(ipv6_out_file): with open(ipv6_bird2_m4, "a") as file: file.write(f"protocol static static_{clist.lower()}_v6 {{\n\tipv4 {{ import filter {{ {bgp_community} accept; }}; }};\n\tinclude \"{ipv6_out_file}\";\n}}\n") # проверяем, что временный файл конфигурации ipv4 не пустой, сохраняем в постоянный if os.path.exists(ipv4_bird2_m4) and os.path.getsize(ipv4_bird2_m4) != 0: os.replace(ipv4_bird2_m4, ipv4_bird2_m4.removesuffix(".tmp")) os.system("systemctl reload bird.service") print(f"Новый файл конфигурации ipv4 применён") # проверяем, что временный файл конфигурации ipv6 не пустой, сохраняем в постоянный if os.path.exists(ipv6_bird2_m4) and os.path.getsize(ipv6_bird2_m4) != 0: os.replace(ipv6_bird2_m4, ipv6_bird2_m4.removesuffix(".tmp")) os.system("systemctl reload bird.service") print(f"Новый файл конфигурации ipv6 применён")