Files
bird_list_ip/download.py

344 lines
18 KiB
Python
Executable File
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/python3
import re
import os
import sys
import ast
import requests
import ipaddress
from include import net_tree
from collections import defaultdict
from include.http_header import get_headers
# компилируем регулярку поиска ipv4 адреса
ipv4_find_str=re.compile(r"(?<![0-9.])(?!10\.|172\.(?:1[6-9]|2[0-9]|3[01])\.|192\.168\.)((?:25[0-5]|2[0-4][0-9]|1?[0-9][0-9]|[1-9])\.(?:25[0-5]|2[0-4][0-9]|1?[0-9][0-9]|[0-9])\.(?:25[0-5]|2[0-4][0-9]|1?[0-9][0-9]|[0-9])\.(?:25[0-5]|2[0-4][0-9]|1?[0-9][0-9]|[0-9]))(?:/(3[0-2]|[12][0-9]|[1-9]))?(?![0-9.])")
# компилируем регулярку поиска ipv6 адреса
ipv6_find_str=re.compile(r'(?<![0-9A-Fa-f:])([23][0-9A-Fa-f]{3}(?:(?::[0-9A-Fa-f]{1,4}){0,6}|(?:::[0-9A-Fa-f]{0,4})?)(?::[0-9A-Fa-f]{0,4})*)(?:/([1-9][0-9]?|1[01][0-9]|12[0-8]))?(?![0-9A-Fa-f:])')
# метод сбора словаря ip адресов ipv4 из текста
def ipv4_find(strip:str, size:int):
"""
Метод сбора словаря ip адресов ipv4 из текста
возвращает словарь ip
где:
ключ - имя сети
значение - [адрес в int формате, размер сети]
"""
listip=dict()
for c in ipv4_find_str.finditer(strip):
ip_str = c.group(1)
prefix_str = c.group(2)
# определяем префикс
if (prefix:=int(prefix_str) if prefix_str else 32) > size: continue
# проверка корректности IPv4
try:
ipv4_obj=ipaddress.IPv4Address(ip_str)
except:
continue
listip[f"{ip_str}/{prefix}"] = [int(ipv4_obj), prefix]
return listip
# метод сбора словаря ip адресов ipv6 из текста
def ipv6_find(strip:str, size:int):
"""
Метод сбора словаря ip адресов ipv4 из текста
возвращает словарь ip
где:
ключ - имя сети
значение - [адрес в int формате, размер сети]
"""
listip=dict()
for c in ipv6_find_str.finditer(strip):
ip_str = c.group(1)
prefix_str = c.group(2)
# определяем префикс
if (prefix:=int(prefix_str) if prefix_str else 128) > size: continue
# проверка корректности IPv4
try:
ipv6_obj=ipaddress.IPv6Address(ip_str)
except:
continue
listip[f"{ip_str}/{prefix}"] = [int(ipv6_obj), prefix]
return listip
# метод группировки словаря
def get_dict_groups(input:dict):
"""
Метод получения сгрупированных данных
community и ip адресов
возвращает словарь сгрупированных данных
данные встречающиеся один раз, не попадают в вывод
"""
# если словарь короче 2х
if len(input) < 2: return {}
# строим битовые маски
name_to_bit = {name: 1 << i for i, name in enumerate(input.keys())}
line_communities = defaultdict(set)
line_mask = defaultdict(int)
# походим по строкам и назначаем маску
for name, (communities, lines) in input.items():
bit = name_to_bit[name]
for line in lines:
line_mask[line] |= bit
# добавляем комьюнити этого списка к конкретной строке
line_communities[line].update(communities)
# группируем строки по маске
groups = defaultdict(list)
groups_communities = defaultdict(set)
for line, mask in line_mask.items():
groups[mask].append(line)
groups_communities[mask].update(line_communities[line])
# конвертируем маску в имена
bit_to_names = {}
for mask in groups.keys():
names = [name for name, bit in name_to_bit.items() if mask & bit]
list_name = "__".join(names)
bit_to_names[mask] = list_name
# возвращаем словарь сгрупированных данных
return { bit_to_names[mask]: [ groups_communities[mask], set(groups[mask]) ] for mask in groups if "__" in bit_to_names[mask] and groups_communities[mask] and groups[mask] }
# метод получения списка ip адресов
def list_ip(c_list: list = []):
"""
Метод получения списка ip адресов
возвращает кортеж из 2-х списков: ipv4 и ipv6
"""
try:
ipv4_list=dict()
ipv6_list=dict()
# определяем, будем сжимать или нет
compress=c_list[0].get('compress', True)
# какие типы обрабытываем, от какого размера
# по умолчанию:
# ipv4 обрабатываем (<=24)
# ipv6 игнорируем (<=64)
# какие типы обрабытываем, от какого размера
ipv4 = False if not (ipv4:=c_list[0].get('ipv4', True)) else (ipv4 if type(ipv4) is int else 24)
ipv6 = False if not (ipv6:=c_list[0].get('ipv6', False)) else (ipv6 if type(ipv6) is int else 64)
# пробегаем словарь выгрузки
for c_dict in c_list:
# если есть источник ссылка
if 'url' in list(c_dict):
# бежим весь список ссылок пока не код 200
for c_url in c_dict['url']:
try:
session = requests.Session()
session.headers.update(get_headers())
if (result:=session.get(c_url)) and result.status_code == 200 and result.text:
print(f"URL: {c_url}")
# пополняем словарь ipv4_list
if ipv4: ipv4_list.update(ipv4_find(result.text,ipv4))
# пополняем словарь ipv6_list
if ipv6: ipv6_list.update(ipv6_find(result.text,ipv6))
break
except requests.exceptions.MissingSchema: pass
print("Ошибка соединения")
# если есть статичные записи ipv4
if ipv4 and 'static4' in list(c_dict):
print(f"STATIC: IPv4")
# пополняем словарь ipv4_list
ipv4_list.update(ipv4_find(str(c_dict['static4']),ipv4))
# если есть статичные записи ipv6
if ipv6 and 'static6' in list(c_dict):
print(f"STATIC: IPv6")
# пополняем словарь ipv6_list
ipv6_list.update(ipv6_find(str(c_dict['static6']),ipv6))
# если ключ не сжимать
if ipv4_list and not compress:
ipv4_list:str="".join([f"route {k} blackhole;\n" for k, v in ipv4_list.items() if isinstance(v, list)])
# сжимаем подсети ipv4
elif ipv4_list and compress:
# строим дерево
Root = net_tree.Node(net_tree.Net(0, 0), 0)
# пробегаем в цикле
for c in ipv4_list.values():
# добавляем запись в дерево
Root.addSubnet(net_tree.Node(net_tree.Net(c[0], c[1]), 1))
Root.finishTreeFirst()
# жесткое сжатие в размер 30000 записей
#Root.collapseRoot(Root.real_ip_records_count - 30000)
# более мягкое сжатие
Root.collapse(1,Root.real_ip_records_count)
# возвращаем результат
ipv4_list:str=Root.returnCollapsedTree('route {addr}/{masklen} blackhole;')
else:
ipv4_list:bool=False
# если ключ не сжимать
if ipv6_list and not compress:
ipv6_list:str="".join([f"route {k} blackhole;\n" for k, v in ipv6_list.items() if isinstance(v, list)])
# сжимаем подсети ipv6
elif ipv6_list and compress:
# строим дерево
Root = net_tree.Node(net_tree.Net(1 << 127, 0), 0)
# пробегаем в цикле
for c in ipv6_list.values():
# добавляем запись в дерево
Root.addSubnet(net_tree.Node(net_tree.Net(c[0], c[1]), 1))
Root.finishTreeFirst()
# жесткое сжатие в размер 30000 записей
#Root.collapseRoot(Root.real_ip_records_count - 30000)
# более мягкое сжатие
Root.collapse(1,Root.real_ip_records_count)
# возвращаем результат
ipv6_list:str=Root.returnCollapsedTree('route {addr}/{masklen} blackhole;')
else:
ipv6_list:bool=False
# возвращаем 2 списка маршрутов
return ipv4_list, ipv6_list
except Exception as e:
# исключение
print(f"Ошибка: {e}")
return False, False
# главная фукция
if __name__ == "__main__":
# словарь выгружаемых списков
ip_list = dict()
try:
# если файл list содержет json структуру, парсим его
with open(list_file:=os.path.join(os.path.dirname(os.path.realpath(__file__)), 'list'), "r") as file:
ip_list = ast.literal_eval(file.read())
print(f"Список выгрузки из файла: {list_file}")
except (ValueError, SyntaxError):
try:
# если файл list ссылка, загружаем и парсим его
with open(list_file, "r") as file:
session = requests.Session()
session.headers.update(get_headers())
if (result:=session.get(url_list_file:=file.readline().strip())) and result.status_code == 200 and result.text:
ip_list = ast.literal_eval(result.text)
print(f"Список выгрузки по url: {url_list_file}")
except requests.exceptions.MissingSchema:
print(f"Невалидный URL на список выгрузки", file=sys.stderr)
sys.exit(1)
except (ValueError, SyntaxError):
print(f"Ошибочная структура json", file=sys.stderr)
sys.exit(1)
except Exception as e:
print(f"Ошибка: {e}", file=sys.stderr)
sys.exit(1)
except FileNotFoundError:
print(f"Файл со списками не найден", file=sys.stderr)
sys.exit(1)
except Exception as e:
print(f"Ошибка: {e}", file=sys.stderr)
sys.exit(1)
# проверяем на пустой список выгрузки
if (not ip_list):
print(f"Список выгрузки пустой", file=sys.stderr)
sys.exit(1)
# cловари для группировки
ipv4_dict=dict()
ipv6_dict=dict()
# создаем дерриктори. для сохранения
outdir=os.path.join(os.path.dirname(os.path.realpath(__file__)), 'unloading')
if not os.path.exists(outdir):
os.makedirs(outdir,exist_ok=True)
# создаём временный файл экспортируемой конфигурации, перед выгрузкой
open(ipv4_bird2_m4 := f"{outdir}/bird2_v4.m4.tmp", "w").close()
open(ipv6_bird2_m4 := f"{outdir}/bird2_v6.m4.tmp", "w").close()
# удаляем старые файлы группировок
[os.remove(path) for f in os.listdir(outdir) if "__" in f and os.path.isfile(path := os.path.join(outdir, f))]
# обходим массив списков для выкрузки
for clist, value in ip_list.items():
# имена выходых файлов
ipv4_out_file=f"{outdir}/{clist.lower()}_v4.txt"
ipv6_out_file=f"{outdir}/{clist.lower()}_v6.txt"
# извлекаем community
community=[c for c in value[0].get('community', "").split(",") if c]
# если передан аргумент(ы) запуска,
# значит пытаемся обновить только указанные списки
if len(sys.argv)==1 or clist in sys.argv[1:]:
print("")
# вычисляем кол-во записей прошлой выгрузки
ipv4_count_old = sum(1 for line in open(ipv4_out_file)) if os.path.isfile(ipv4_out_file) else 0
ipv6_count_old = sum(1 for line in open(ipv6_out_file)) if os.path.isfile(ipv6_out_file) else 0
# выполняем выгрузку
print(f"Выгружаю список IP: {clist}")
ipv4_list, ipv6_list=list_ip(value)
# сохраняем ipv4
if ipv4_list and len(ipv4_list.splitlines()) >= ipv4_count_old * 0.5:
# сохраняем в файл
with open(ipv4_out_file, "w") as file:
file.write(ipv4_list)
print(f"Файл выгрузки {ipv4_out_file} сохранён")
# сохраняем ipv6
if ipv6_list and len(ipv6_list.splitlines()) >= ipv6_count_old * 0.5:
# сохраняем в файл
with open(ipv6_out_file, "w") as file:
file.write(ipv6_list)
print(f"Файл выгрузки {ipv6_out_file} сохранён")
# открываем файл выгрузки и пополняем словарь для группировки ipv4
if os.path.exists(ipv4_out_file):
with open(ipv4_out_file, "r") as file:
ipv4_dict[clist] = [community,list(file.readlines())]
# открываем файл выгрузки и пополняем словарь для группировки ipv6
if os.path.exists(ipv6_out_file):
with open(ipv6_out_file, "r") as file:
ipv6_dict[clist] = [community,list(file.readlines())]
# обновляем временный файл конфигурации ipv4
# из группировок
if ipv4_dict:
for k,v in get_dict_groups(ipv4_dict).items():
# имена выходых файлов
ipv4_out_file=f"{outdir}/{k.lower()}_v4.txt"
# сохраняем в файл
with open(ipv4_out_file, "w") as file:
file.write("".join(v[1]))
print(f"Файл группировки {ipv4_out_file} сохранён")
# список комьюнити маршрутов
bgp_community=" ".join([f"bgp_community.add(({str(c).replace(':',',')}));" for c in sorted(v[0])])
with open(ipv4_bird2_m4, "a") as file:
file.write(f"protocol static static_{k.lower()}_v4 {{\n\tipv4 {{ import filter {{ {bgp_community} preference=400; accept; }}; }};\n\tinclude \"{ipv4_out_file}\";\n}}\n")
# обновляем временный файл конфигурации ipv6
# из группировок
if ipv6_dict:
for k,v in get_dict_groups(ipv6_dict).items():
# имена выходых файлов
ipv6_out_file=f"{outdir}/{k.lower()}_v6.txt"
# сохраняем в файл
with open(ipv6_out_file, "w") as file:
file.write("".join(v[1]))
print(f"Файл группировки {ipv6_out_file} сохранён")
# список комьюнити маршрутов
bgp_community=" ".join([f"bgp_community.add(({str(c).replace(':',',')}));" for c in sorted(v[0])])
with open(ipv6_bird2_m4, "a") as file:
file.write(f"protocol static static_{k.lower()}_v6 {{\n\tipv6 {{ import filter {{ {bgp_community} preference=400; accept; }}; }};\n\tinclude \"{ipv6_out_file}\";\n}}\n")
# дополняем временный файл конфигурации всей выгрузкой ipv4 и ipv6
for clist, value in ip_list.items():
# имена выходых файлов
ipv4_out_file=f"{outdir}/{clist.lower()}_v4.txt"
ipv6_out_file=f"{outdir}/{clist.lower()}_v6.txt"
# список комьюнити маршрутов
bgp_community=" ".join([f"bgp_community.add(({str(c).replace(':',',')}));" for c in sorted([c for c in value[0].get('community', "").split(",") if c])])
if os.path.exists(ipv4_out_file):
with open(ipv4_bird2_m4, "a") as file:
file.write(f"protocol static static_{clist.lower()}_v4 {{\n\tipv4 {{ import filter {{ {bgp_community} accept; }}; }};\n\tinclude \"{ipv4_out_file}\";\n}}\n")
if os.path.exists(ipv6_out_file):
with open(ipv6_bird2_m4, "a") as file:
file.write(f"protocol static static_{clist.lower()}_v6 {{\n\tipv4 {{ import filter {{ {bgp_community} accept; }}; }};\n\tinclude \"{ipv6_out_file}\";\n}}\n")
# проверяем, что временный файл конфигурации ipv4 не пустой, сохраняем в постоянный
if os.path.exists(ipv4_bird2_m4) and os.path.getsize(ipv4_bird2_m4) != 0:
os.replace(ipv4_bird2_m4, ipv4_bird2_m4.removesuffix(".tmp"))
os.system("systemctl reload bird.service")
print(f"Новый файл конфигурации ipv4 применён")
# проверяем, что временный файл конфигурации ipv6 не пустой, сохраняем в постоянный
if os.path.exists(ipv6_bird2_m4) and os.path.getsize(ipv6_bird2_m4) != 0:
os.replace(ipv6_bird2_m4, ipv6_bird2_m4.removesuffix(".tmp"))
os.system("systemctl reload bird.service")
print(f"Новый файл конфигурации ipv6 применён")