Правка вывода в терминал и другое по мелочи.

This commit is contained in:
2025-12-05 00:37:17 +10:00
parent e3e3c2ce31
commit ddb21076a0

View File

@@ -8,6 +8,8 @@ import requests
import ipaddress import ipaddress
from include import net_tree from include import net_tree
from collections import defaultdict from collections import defaultdict
from time import sleep as time_sleep
from shutil import get_terminal_size
from include.http_header import get_headers from include.http_header import get_headers
# компилируем регулярку поиска ipv4 адреса # компилируем регулярку поиска ipv4 адреса
@@ -15,16 +17,26 @@ ipv4_find_str=re.compile(r"(?<![0-9.])(?!10\.|172\.(?:1[6-9]|2[0-9]|3[01])\.|192
# компилируем регулярку поиска ipv6 адреса # компилируем регулярку поиска ipv6 адреса
ipv6_find_str=re.compile(r'(?<![0-9A-Fa-f:])([23][0-9A-Fa-f]{3}(?:(?::[0-9A-Fa-f]{1,4}){0,6}|(?:::[0-9A-Fa-f]{0,4})?)(?::[0-9A-Fa-f]{0,4})*)(?:/([1-9][0-9]?|1[01][0-9]|12[0-8]))?(?![0-9A-Fa-f:])') ipv6_find_str=re.compile(r'(?<![0-9A-Fa-f:])([23][0-9A-Fa-f]{3}(?:(?::[0-9A-Fa-f]{1,4}){0,6}|(?:::[0-9A-Fa-f]{0,4})?)(?::[0-9A-Fa-f]{0,4})*)(?:/([1-9][0-9]?|1[01][0-9]|12[0-8]))?(?![0-9A-Fa-f:])')
# метод сбора словаря ip адресов ipv4 из текста # метод вывода прогресса на экран
def progres_print(lstr:list, mtype:int=0, ss:float=0.1):
"""
Метод вывода на экран прогресса выполнения
"""
term_width = get_terminal_size().columns
mtype: str = '33' if mtype == 1 else '31' if mtype == 2 else '32'
cstr: str = f"\033[{mtype}m | \033[0m".join(lstr)[:term_width]
print(f"\r{' ' * term_width}\r\033[{mtype}m>>>\033[0m {cstr}", end='', flush=True)
time_sleep(ss)
# метод сбора set адресов ipv4 из текста
def ipv4_find(strip:str, size:int): def ipv4_find(strip:str, size:int):
""" """
Метод сбора словаря ip адресов ipv4 из текста Метод сбора set адресов ipv4 из текста
возвращает словарь ip возвращает set ip
где: где:
ключ - имя сети
значение - [адрес в int формате, размер сети] значение - [адрес в int формате, размер сети]
""" """
listip=dict() ips=set()
for c in ipv4_find_str.finditer(strip): for c in ipv4_find_str.finditer(strip):
ip_str = c.group(1) ip_str = c.group(1)
prefix_str = c.group(2) prefix_str = c.group(2)
@@ -32,22 +44,20 @@ def ipv4_find(strip:str, size:int):
if (prefix:=int(prefix_str) if prefix_str else 32) > size: continue if (prefix:=int(prefix_str) if prefix_str else 32) > size: continue
# проверка корректности IPv4 # проверка корректности IPv4
try: try:
ipv4_obj=ipaddress.IPv4Address(ip_str) ips.add((int(ipaddress.IPv4Address(ip_str)), prefix))
except: except (ValueError, ipaddress.AddressValueError):
continue continue
listip[f"{ip_str}/{prefix}"] = [int(ipv4_obj), prefix] return ips
return listip
# метод сбора словаря ip адресов ipv6 из текста # метод сбора set адресов ipv6 из текста
def ipv6_find(strip:str, size:int): def ipv6_find(strip:str, size:int):
""" """
Метод сбора словаря ip адресов ipv6 из текста Метод сбора set адресов ipv6 из текста
возвращает словарь ip возвращает set ip
где: где:
ключ - имя сети
значение - [адрес в int формате, размер сети] значение - [адрес в int формате, размер сети]
""" """
listip=dict() ips=set()
for c in ipv6_find_str.finditer(strip): for c in ipv6_find_str.finditer(strip):
ip_str = c.group(1) ip_str = c.group(1)
prefix_str = c.group(2) prefix_str = c.group(2)
@@ -55,11 +65,10 @@ def ipv6_find(strip:str, size:int):
if (prefix:=int(prefix_str) if prefix_str else 128) > size: continue if (prefix:=int(prefix_str) if prefix_str else 128) > size: continue
# проверка корректности IPv6 # проверка корректности IPv6
try: try:
ipv6_obj=ipaddress.IPv6Address(ip_str) ips.add((int(ipaddress.IPv6Address(ip_str)), prefix))
except: except (ValueError, ipaddress.AddressValueError):
continue continue
listip[f"{ip_str}/{prefix}"] = [int(ipv6_obj), prefix] return ips
return listip
# метод группировки словаря # метод группировки словаря
def get_dict_groups(input:dict): def get_dict_groups(input:dict):
@@ -109,8 +118,8 @@ def list_ip(c_list: list = []):
возвращает кортеж из 2-х списков: ipv4 и ipv6 возвращает кортеж из 2-х списков: ipv4 и ipv6
""" """
try: try:
ipv4_list=dict() ipv4_list=set()
ipv6_list=dict() ipv6_list=set()
# определяем, будем сжимать или нет # определяем, будем сжимать или нет
compress=c_list[0].get('compress', True) compress=c_list[0].get('compress', True)
# какие типы обрабытываем, от какого размера # какие типы обрабытываем, от какого размера
@@ -121,32 +130,37 @@ def list_ip(c_list: list = []):
ipv4 = False if not (ipv4:=c_list[0].get('ipv4', True)) else (ipv4 if type(ipv4) is int else 24) ipv4 = False if not (ipv4:=c_list[0].get('ipv4', True)) else (ipv4 if type(ipv4) is int else 24)
ipv6 = False if not (ipv6:=c_list[0].get('ipv6', False)) else (ipv6 if type(ipv6) is int else 64) ipv6 = False if not (ipv6:=c_list[0].get('ipv6', False)) else (ipv6 if type(ipv6) is int else 64)
c_list_len=len(c_list)
# пробегаем словарь выгрузки # пробегаем словарь выгрузки
for c_dict in c_list: for i, c_dict in enumerate(c_list, start=1):
# прогрес %
percent = int(i / c_list_len * 100)
# если есть источник ссылка # если есть источник ссылка
if 'url' in list(c_dict): if 'url' in list(c_dict):
# бежим весь список ссылок пока не код 200 # бежим весь список ссылок пока не код 200
for c_url in c_dict['url']: for c_url in c_dict['url']:
progres_print([c_url, f"{percent}%"])
try: try:
session = requests.Session() session = requests.Session()
session.headers.update(get_headers()) session.headers.update(get_headers())
if (result:=session.get(c_url)) and result.status_code == 200 and result.text: if (result:=session.get(c_url, timeout=(5, 60), stream=True)) and result.status_code == 200 and result.text:
print(f"URL: {c_url}")
# пополняем словарь ipv4_list # пополняем словарь ipv4_list
if ipv4: ipv4_list.update(ipv4_find(result.text,ipv4)) if ipv4: ipv4_list.update(ipv4_find(result.text,ipv4))
# пополняем словарь ipv6_list # пополняем словарь ipv6_list
if ipv6: ipv6_list.update(ipv6_find(result.text,ipv6)) if ipv6: ipv6_list.update(ipv6_find(result.text,ipv6))
break break
except requests.exceptions.MissingSchema: pass except requests.exceptions.RequestException: pass
print("Ошибка соединения") progres_print([c_url, f"{percent}%"],1,1)
# если есть статичные записи ipv4 # если есть статичные записи ipv4
if ipv4 and 'static4' in list(c_dict): if ipv4 and 'static4' in list(c_dict):
print(f"STATIC: IPv4") progres_print(["StaticIPv4", f"{percent}%"])
# пополняем словарь ipv4_list # пополняем словарь ipv4_list
ipv4_list.update(ipv4_find(str(c_dict['static4']),ipv4)) ipv4_list.update(ipv4_find(str(c_dict['static4']),ipv4))
# если есть статичные записи ipv6 # если есть статичные записи ipv6
if ipv6 and 'static6' in list(c_dict): if ipv6 and 'static6' in list(c_dict):
print(f"STATIC: IPv6") progres_print(["StaticIPv6", f"{percent}%"])
# пополняем словарь ipv6_list # пополняем словарь ipv6_list
ipv6_list.update(ipv6_find(str(c_dict['static6']),ipv6)) ipv6_list.update(ipv6_find(str(c_dict['static6']),ipv6))
@@ -155,7 +169,7 @@ def list_ip(c_list: list = []):
# создаем дерево # создаем дерево
Root = net_tree.Node(net_tree.Net(0, 0, 4)) Root = net_tree.Node(net_tree.Net(0, 0, 4))
# добавляем IPv4 подсети # добавляем IPv4 подсети
for ip_int, mask in sorted(ipv4_list.values(), key=lambda x: x[0]): for ip_int, mask in sorted(ipv4_list, key=lambda x: x[0]):
Root.insert(net_tree.Net(ip_int, mask, 4)) Root.insert(net_tree.Net(ip_int, mask, 4))
# считаем статистику # считаем статистику
Root.finalize() Root.finalize()
@@ -171,7 +185,7 @@ def list_ip(c_list: list = []):
# строим дерево # строим дерево
Root = net_tree.Node(net_tree.Net(1 << 127, 0, 6)) Root = net_tree.Node(net_tree.Net(1 << 127, 0, 6))
# добавляем IPv6 подсети # добавляем IPv6 подсети
for ip_int, mask in sorted(ipv6_list.values(), key=lambda x: x[0]): for ip_int, mask in sorted(ipv6_list, key=lambda x: x[0]):
Root.insert(net_tree.Net(ip_int, mask, 6)) Root.insert(net_tree.Net(ip_int, mask, 6))
# считаем статистику # считаем статистику
Root.finalize() Root.finalize()
@@ -185,7 +199,7 @@ def list_ip(c_list: list = []):
return ipv4_list, ipv6_list return ipv4_list, ipv6_list
except Exception as e: except Exception as e:
# исключение # исключение
print(f"Ошибка: {e}") print(f"\nОшибка: {e}")
return False, False return False, False
# метод анализа элементов списка (аргументов) # метод анализа элементов списка (аргументов)
@@ -222,18 +236,18 @@ if __name__ == "__main__":
# если файл list содержет json структуру, парсим его # если файл list содержет json структуру, парсим его
with open(list_file:=os.path.join(os.path.dirname(os.path.realpath(__file__)), 'list'), "r") as file: with open(list_file:=os.path.join(os.path.dirname(os.path.realpath(__file__)), 'list'), "r") as file:
ip_list = ast.literal_eval(file.read()) ip_list = ast.literal_eval(file.read())
print(f"Список выгрузки из файла: {list_file}") print(f"Список выгрузки (файл): {list_file}")
except (ValueError, SyntaxError): except (ValueError, SyntaxError):
try: try:
# если файл list ссылка, загружаем и парсим его # если файл list ссылка, загружаем и парсим его
with open(list_file, "r") as file: with open(list_file, "r") as file:
session = requests.Session() session = requests.Session()
session.headers.update(get_headers()) session.headers.update(get_headers())
if (result:=session.get(url_list_file:=file.readline().strip())) and result.status_code == 200 and result.text: if (result:=session.get(url_list_file:=file.readline().strip(), timeout=(5, 5))) and result.status_code == 200 and result.text:
ip_list = ast.literal_eval(result.text) ip_list = ast.literal_eval(result.text)
print(f"Список выгрузки по url: {url_list_file}") print(f"Список выгрузки (url): {url_list_file}")
except requests.exceptions.MissingSchema: except requests.exceptions.RequestException:
print(f"Невалидный URL на список выгрузки", file=sys.stderr) print(f"Невалидный URL/ошибка выгрузки", file=sys.stderr)
sys.exit(1) sys.exit(1)
except (ValueError, SyntaxError): except (ValueError, SyntaxError):
print(f"Ошибочная структура json", file=sys.stderr) print(f"Ошибочная структура json", file=sys.stderr)
@@ -281,20 +295,20 @@ if __name__ == "__main__":
ipv4_count_old = sum(1 for line in open(ipv4_out_file)) if os.path.isfile(ipv4_out_file) else 0 ipv4_count_old = sum(1 for line in open(ipv4_out_file)) if os.path.isfile(ipv4_out_file) else 0
ipv6_count_old = sum(1 for line in open(ipv6_out_file)) if os.path.isfile(ipv6_out_file) else 0 ipv6_count_old = sum(1 for line in open(ipv6_out_file)) if os.path.isfile(ipv6_out_file) else 0
# выполняем выгрузку # выполняем выгрузку
print(f"Выгружаю список IP: {clist}") print(f"Выгрузка списка IP: {clist}")
ipv4_list, ipv6_list=list_ip(value) ipv4_list, ipv6_list=list_ip(value)
# сохраняем ipv4 # сохраняем ipv4
if ipv4_list and len(ipv4_list.splitlines()) >= ipv4_count_old * 0.5: if ipv4_list and len(ipv4_list.splitlines()) >= ipv4_count_old * 0.5:
# сохраняем в файл # сохраняем в файл
with open(ipv4_out_file, "w") as file: with open(ipv4_out_file, "w") as file:
file.write(ipv4_list) file.write(ipv4_list)
print(f"Файл выгрузки {ipv4_out_file} сохранён") progres_print([f"Файл {ipv4_out_file}", "сохранён"])
# сохраняем ipv6 # сохраняем ipv6
if ipv6_list and len(ipv6_list.splitlines()) >= ipv6_count_old * 0.5: if ipv6_list and len(ipv6_list.splitlines()) >= ipv6_count_old * 0.5:
# сохраняем в файл # сохраняем в файл
with open(ipv6_out_file, "w") as file: with open(ipv6_out_file, "w") as file:
file.write(ipv6_list) file.write(ipv6_list)
print(f"Файл выгрузки {ipv6_out_file} сохранён") progres_print([f"Файл {ipv6_out_file}", "сохранён"])
# открываем файл выгрузки и пополняем словарь для группировки ipv4 # открываем файл выгрузки и пополняем словарь для группировки ipv4
if os.path.exists(ipv4_out_file): if os.path.exists(ipv4_out_file):
with open(ipv4_out_file, "r") as file: with open(ipv4_out_file, "r") as file:
@@ -304,6 +318,7 @@ if __name__ == "__main__":
with open(ipv6_out_file, "r") as file: with open(ipv6_out_file, "r") as file:
ipv6_dict[clist] = [community,list(file.readlines())] ipv6_dict[clist] = [community,list(file.readlines())]
print("\n\nКонфигурация Bird2:")
# обновляем временный файл конфигурации ipv4 # обновляем временный файл конфигурации ipv4
# из группировок # из группировок
if ipv4_dict: if ipv4_dict:
@@ -313,11 +328,13 @@ if __name__ == "__main__":
# сохраняем в файл # сохраняем в файл
with open(ipv4_out_file, "w") as file: with open(ipv4_out_file, "w") as file:
file.write("".join(v[1])) file.write("".join(v[1]))
print(f"Файл группировки {ipv4_out_file} сохранён") progres_print([f"Группировка {ipv4_out_file}", "сохранена"])
# список комьюнити маршрутов # список комьюнити маршрутов
bgp_community=" ".join([f"bgp_community.add(({str(c).replace(':',',')}));" for c in sorted(v[0])]) bgp_community=" ".join([f"bgp_community.add(({str(c).replace(':',',')}));" for c in sorted(v[0])])
with open(ipv4_bird2_m4, "a") as file: with open(ipv4_bird2_m4, "a") as file:
file.write(f"protocol static static_{k.lower()}_v4 {{\n\tipv4 {{ import filter {{ {bgp_community} preference=400; accept; }}; }};\n\tinclude \"{ipv4_out_file}\";\n}}\n") file.write(f"protocol static static_{k.lower()}_v4 {{\n\tipv4 {{ import filter {{ {bgp_community} preference=400; accept; }}; }};\n\tinclude \"{ipv4_out_file}\";\n}}\n")
progres_print([ipv4_bird2_m4, f"добавлен {k}"])
# обновляем временный файл конфигурации ipv6 # обновляем временный файл конфигурации ipv6
# из группировок # из группировок
if ipv6_dict: if ipv6_dict:
@@ -327,11 +344,12 @@ if __name__ == "__main__":
# сохраняем в файл # сохраняем в файл
with open(ipv6_out_file, "w") as file: with open(ipv6_out_file, "w") as file:
file.write("".join(v[1])) file.write("".join(v[1]))
print(f"Файл группировки {ipv6_out_file} сохранён") progres_print([f"Группировка {ipv6_out_file}", "сохранена"])
# список комьюнити маршрутов # список комьюнити маршрутов
bgp_community=" ".join([f"bgp_community.add(({str(c).replace(':',',')}));" for c in sorted(v[0])]) bgp_community=" ".join([f"bgp_community.add(({str(c).replace(':',',')}));" for c in sorted(v[0])])
with open(ipv6_bird2_m4, "a") as file: with open(ipv6_bird2_m4, "a") as file:
file.write(f"protocol static static_{k.lower()}_v6 {{\n\tipv6 {{ import filter {{ {bgp_community} preference=400; accept; }}; }};\n\tinclude \"{ipv6_out_file}\";\n}}\n") file.write(f"protocol static static_{k.lower()}_v6 {{\n\tipv6 {{ import filter {{ {bgp_community} preference=400; accept; }}; }};\n\tinclude \"{ipv6_out_file}\";\n}}\n")
progres_print([ipv6_bird2_m4, f"добавлен {k}"])
# дополняем временный файл конфигурации всей выгрузкой ipv4 и ipv6 # дополняем временный файл конфигурации всей выгрузкой ipv4 и ipv6
for clist, value in ip_list.items(): for clist, value in ip_list.items():
@@ -352,6 +370,7 @@ if __name__ == "__main__":
with open(ipv4_bird2_m4, "a") as file: with open(ipv4_bird2_m4, "a") as file:
bgp_filter=f"if net ~ [{','.join(ip_addresses_filter)}] then reject; " if ip_addresses_filter else '' bgp_filter=f"if net ~ [{','.join(ip_addresses_filter)}] then reject; " if ip_addresses_filter else ''
file.write(f"protocol static static_{clist.lower()}_v4 {{\n\tipv4 {{ import filter {{ {bgp_filter}{bgp_community} accept; }}; }};\n\tinclude \"{ipv4_out_file}\";\n}}\n") file.write(f"protocol static static_{clist.lower()}_v4 {{\n\tipv4 {{ import filter {{ {bgp_filter}{bgp_community} accept; }}; }};\n\tinclude \"{ipv4_out_file}\";\n}}\n")
progres_print([ipv4_bird2_m4, f"добавлен {clist}"])
if os.path.exists(ipv6_out_file): if os.path.exists(ipv6_out_file):
# фильтер маршрутов ipv6, если список ignore существует в конфигурации # фильтер маршрутов ipv6, если список ignore существует в конфигурации
ip_addresses_filter=set() ip_addresses_filter=set()
@@ -364,7 +383,9 @@ if __name__ == "__main__":
with open(ipv6_bird2_m4, "a") as file: with open(ipv6_bird2_m4, "a") as file:
bgp_filter=f"if net ~ [{','.join(ip_addresses_filter)}] then reject; " if ip_addresses_filter else '' bgp_filter=f"if net ~ [{','.join(ip_addresses_filter)}] then reject; " if ip_addresses_filter else ''
file.write(f"protocol static static_{clist.lower()}_v6 {{\n\tipv6 {{ import filter {{ {bgp_filter}{bgp_community} accept; }}; }};\n\tinclude \"{ipv6_out_file}\";\n}}\n") file.write(f"protocol static static_{clist.lower()}_v6 {{\n\tipv6 {{ import filter {{ {bgp_filter}{bgp_community} accept; }}; }};\n\tinclude \"{ipv6_out_file}\";\n}}\n")
progres_print([ipv6_bird2_m4, f"добавлен {clist}"])
print("\n")
# проверяем, что временный файл конфигурации ipv4 не пустой, сохраняем в постоянный # проверяем, что временный файл конфигурации ipv4 не пустой, сохраняем в постоянный
if os.path.exists(ipv4_bird2_m4) and os.path.getsize(ipv4_bird2_m4) != 0: if os.path.exists(ipv4_bird2_m4) and os.path.getsize(ipv4_bird2_m4) != 0:
os.replace(ipv4_bird2_m4, ipv4_bird2_m4.removesuffix(".tmp")) os.replace(ipv4_bird2_m4, ipv4_bird2_m4.removesuffix(".tmp"))