Files
bird_list_ip/download.py

378 lines
20 KiB
Python
Executable File
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/python3
import re
import os
import sys
import ast
import requests
import ipaddress
from include import net_tree
from collections import defaultdict
from include.http_header import get_headers
# компилируем регулярку поиска ipv4 адреса
ipv4_find_str=re.compile(r"(?<![0-9.])(?!10\.|172\.(?:1[6-9]|2[0-9]|3[01])\.|192\.168\.)((?:25[0-5]|2[0-4][0-9]|1?[0-9][0-9]|[1-9])\.(?:25[0-5]|2[0-4][0-9]|1?[0-9][0-9]|[0-9])\.(?:25[0-5]|2[0-4][0-9]|1?[0-9][0-9]|[0-9])\.(?:25[0-5]|2[0-4][0-9]|1?[0-9][0-9]|[0-9]))(?:/(3[0-2]|[12][0-9]|[1-9]))?(?![0-9.])")
# компилируем регулярку поиска ipv6 адреса
ipv6_find_str=re.compile(r'(?<![0-9A-Fa-f:])([23][0-9A-Fa-f]{3}(?:(?::[0-9A-Fa-f]{1,4}){0,6}|(?:::[0-9A-Fa-f]{0,4})?)(?::[0-9A-Fa-f]{0,4})*)(?:/([1-9][0-9]?|1[01][0-9]|12[0-8]))?(?![0-9A-Fa-f:])')
# метод сбора словаря ip адресов ipv4 из текста
def ipv4_find(strip:str, size:int):
"""
Метод сбора словаря ip адресов ipv4 из текста
возвращает словарь ip
где:
ключ - имя сети
значение - [адрес в int формате, размер сети]
"""
listip=dict()
for c in ipv4_find_str.finditer(strip):
ip_str = c.group(1)
prefix_str = c.group(2)
# определяем префикс
if (prefix:=int(prefix_str) if prefix_str else 32) > size: continue
# проверка корректности IPv4
try:
ipv4_obj=ipaddress.IPv4Address(ip_str)
except:
continue
listip[f"{ip_str}/{prefix}"] = [int(ipv4_obj), prefix]
return listip
# метод сбора словаря ip адресов ipv6 из текста
def ipv6_find(strip:str, size:int):
"""
Метод сбора словаря ip адресов ipv6 из текста
возвращает словарь ip
где:
ключ - имя сети
значение - [адрес в int формате, размер сети]
"""
listip=dict()
for c in ipv6_find_str.finditer(strip):
ip_str = c.group(1)
prefix_str = c.group(2)
# определяем префикс
if (prefix:=int(prefix_str) if prefix_str else 128) > size: continue
# проверка корректности IPv6
try:
ipv6_obj=ipaddress.IPv6Address(ip_str)
except:
continue
listip[f"{ip_str}/{prefix}"] = [int(ipv6_obj), prefix]
return listip
# метод группировки словаря
def get_dict_groups(input:dict):
"""
Метод получения сгрупированных данных
community и ip адресов
возвращает словарь сгрупированных данных
данные встречающиеся один раз, не попадают в вывод
"""
# если словарь короче 2х
if len(input) < 2: return {}
# строим битовые маски
name_to_bit = {name: 1 << i for i, name in enumerate(input.keys())}
line_communities = defaultdict(set)
line_mask = defaultdict(int)
# походим по строкам и назначаем маску
for name, (communities, lines) in input.items():
bit = name_to_bit[name]
for line in lines:
line_mask[line] |= bit
# добавляем комьюнити этого списка к конкретной строке
line_communities[line].update(communities)
# группируем строки по маске
groups = defaultdict(list)
groups_communities = defaultdict(set)
for line, mask in line_mask.items():
groups[mask].append(line)
groups_communities[mask].update(line_communities[line])
# конвертируем маску в имена
bit_to_names = {}
for mask in groups.keys():
names = [name for name, bit in name_to_bit.items() if mask & bit]
list_name = "__".join(names)
bit_to_names[mask] = list_name
# возвращаем словарь сгрупированных данных
return { bit_to_names[mask]: [ groups_communities[mask], set(groups[mask]) ] for mask in groups if "__" in bit_to_names[mask] and groups_communities[mask] and groups[mask] }
# метод получения списка ip адресов
def list_ip(c_list: list = []):
"""
Метод получения списка ip адресов
возвращает кортеж из 2-х списков: ipv4 и ipv6
"""
try:
ipv4_list=dict()
ipv6_list=dict()
# определяем, будем сжимать или нет
compress=c_list[0].get('compress', True)
# какие типы обрабытываем, от какого размера
# по умолчанию:
# ipv4 обрабатываем (<=24)
# ipv6 игнорируем (<=64)
# какие типы обрабытываем, от какого размера
ipv4 = False if not (ipv4:=c_list[0].get('ipv4', True)) else (ipv4 if type(ipv4) is int else 24)
ipv6 = False if not (ipv6:=c_list[0].get('ipv6', False)) else (ipv6 if type(ipv6) is int else 64)
# пробегаем словарь выгрузки
for c_dict in c_list:
# если есть источник ссылка
if 'url' in list(c_dict):
# бежим весь список ссылок пока не код 200
for c_url in c_dict['url']:
try:
session = requests.Session()
session.headers.update(get_headers())
if (result:=session.get(c_url)) and result.status_code == 200 and result.text:
print(f"URL: {c_url}")
# пополняем словарь ipv4_list
if ipv4: ipv4_list.update(ipv4_find(result.text,ipv4))
# пополняем словарь ipv6_list
if ipv6: ipv6_list.update(ipv6_find(result.text,ipv6))
break
except requests.exceptions.MissingSchema: pass
print("Ошибка соединения")
# если есть статичные записи ipv4
if ipv4 and 'static4' in list(c_dict):
print(f"STATIC: IPv4")
# пополняем словарь ipv4_list
ipv4_list.update(ipv4_find(str(c_dict['static4']),ipv4))
# если есть статичные записи ipv6
if ipv6 and 'static6' in list(c_dict):
print(f"STATIC: IPv6")
# пополняем словарь ipv6_list
ipv6_list.update(ipv6_find(str(c_dict['static6']),ipv6))
# сжимаем подсети ipv4
if ipv4_list:
# создаем дерево
Root = net_tree.Node(net_tree.Net(0, 0, 4))
# добавляем IPv4 подсети
for ip_int, mask in sorted(ipv4_list.values(), key=lambda x: x[0]):
Root.insert(net_tree.Net(ip_int, mask, 4))
# считаем статистику
Root.finalize()
# сжатие по CIDR, если ключ сжимать
if compress: Root.collapse()
# получаем результат
ipv4_list = Root.export('route {addr}/{masklen} blackhole;')
else:
ipv4_list:bool=False
# сжимаем подсети ipv6
if ipv6_list:
# строим дерево
Root = net_tree.Node(net_tree.Net(1 << 127, 0, 6))
# добавляем IPv6 подсети
for ip_int, mask in sorted(ipv6_list.values(), key=lambda x: x[0]):
Root.insert(net_tree.Net(ip_int, mask, 6))
# считаем статистику
Root.finalize()
# сжатие по CIDR, если ключ сжимать
if compress: Root.collapse()
# получаем результат
ipv6_list = Root.export('route {addr}/{masklen} blackhole;')
else:
ipv6_list:bool=False
# возвращаем 2 списка маршрутов
return ipv4_list, ipv6_list
except Exception as e:
# исключение
print(f"Ошибка: {e}")
return False, False
# метод анализа элементов списка (аргументов)
def right_list(ip_list: list, args_list: list=[]):
"""
Метод анализа элементов списка выгрузок, возвращает актульный список выгрузок,
основываясь на списке аргументов, переданных вторым параметром
"""
# собираем словарь аргументов
# элементы списка начинающиеся с "-"
# попадают в off, остальные в on
c_dict = defaultdict(list)
for c in args_list:
key = "off" if c[0] == "-" else "on"
value = c[1:].upper() if c[0] == "-" else c.upper()
c_dict[key].append(value)
c_dict = dict(c_dict)
# если словарь аргументов не пустой
if c_dict:
# пробегаем список выгрузок
for c in ip_list[:]:
# пропускаем
if len(c_dict) == 2 and c in c_dict["on"] and not c in c_dict["off"]: continue
if len(c_dict) == 1 and (("on" in c_dict and c in c_dict["on"]) or ("off" in c_dict and not c in c_dict["off"])): continue
# удаляем элемент из списка
ip_list = list(filter(lambda x: x != c, ip_list))
return ip_list
# главная фукция
if __name__ == "__main__":
# словарь выгружаемых списков
ip_list = dict()
try:
# если файл list содержет json структуру, парсим его
with open(list_file:=os.path.join(os.path.dirname(os.path.realpath(__file__)), 'list'), "r") as file:
ip_list = ast.literal_eval(file.read())
print(f"Список выгрузки из файла: {list_file}")
except (ValueError, SyntaxError):
try:
# если файл list ссылка, загружаем и парсим его
with open(list_file, "r") as file:
session = requests.Session()
session.headers.update(get_headers())
if (result:=session.get(url_list_file:=file.readline().strip())) and result.status_code == 200 and result.text:
ip_list = ast.literal_eval(result.text)
print(f"Список выгрузки по url: {url_list_file}")
except requests.exceptions.MissingSchema:
print(f"Невалидный URL на список выгрузки", file=sys.stderr)
sys.exit(1)
except (ValueError, SyntaxError):
print(f"Ошибочная структура json", file=sys.stderr)
sys.exit(1)
except Exception as e:
print(f"Ошибка: {e}", file=sys.stderr)
sys.exit(1)
except FileNotFoundError:
print(f"Файл со списками не найден", file=sys.stderr)
sys.exit(1)
except Exception as e:
print(f"Ошибка: {e}", file=sys.stderr)
sys.exit(1)
# проверяем на пустой список выгрузки
if (not ip_list):
print(f"Список выгрузки пустой", file=sys.stderr)
sys.exit(1)
# cловари для группировки
ipv4_dict=dict()
ipv6_dict=dict()
# список того, что будем выгружать/обновлять
download_ip_list=right_list(list(ip_list.keys()), sys.argv[1:])
# создаем дерриктори. для сохранения
outdir=os.path.join(os.path.dirname(os.path.realpath(__file__)), 'unloading')
if not os.path.exists(outdir):
os.makedirs(outdir,exist_ok=True)
# создаём временный файл экспортируемой конфигурации, перед выгрузкой
open(ipv4_bird2_m4 := f"{outdir}/bird2_v4.m4.tmp", "w").close()
open(ipv6_bird2_m4 := f"{outdir}/bird2_v6.m4.tmp", "w").close()
# удаляем старые файлы группировок
[os.remove(path) for f in os.listdir(outdir) if "__" in f and os.path.isfile(path := os.path.join(outdir, f))]
# обходим массив списков для выгрузки
for clist, value in ip_list.items():
# имена выходых файлов
ipv4_out_file=f"{outdir}/{clist.lower()}_v4.txt"
ipv6_out_file=f"{outdir}/{clist.lower()}_v6.txt"
# извлекаем community
community=[c for c in value[0].get('community', "").split(",") if c]
# обновляем только указанные списки
if clist in download_ip_list:
print("")
# вычисляем кол-во записей прошлой выгрузки
ipv4_count_old = sum(1 for line in open(ipv4_out_file)) if os.path.isfile(ipv4_out_file) else 0
ipv6_count_old = sum(1 for line in open(ipv6_out_file)) if os.path.isfile(ipv6_out_file) else 0
# выполняем выгрузку
print(f"Выгружаю список IP: {clist}")
ipv4_list, ipv6_list=list_ip(value)
# сохраняем ipv4
if ipv4_list and len(ipv4_list.splitlines()) >= ipv4_count_old * 0.5:
# сохраняем в файл
with open(ipv4_out_file, "w") as file:
file.write(ipv4_list)
print(f"Файл выгрузки {ipv4_out_file} сохранён")
# сохраняем ipv6
if ipv6_list and len(ipv6_list.splitlines()) >= ipv6_count_old * 0.5:
# сохраняем в файл
with open(ipv6_out_file, "w") as file:
file.write(ipv6_list)
print(f"Файл выгрузки {ipv6_out_file} сохранён")
# открываем файл выгрузки и пополняем словарь для группировки ipv4
if os.path.exists(ipv4_out_file):
with open(ipv4_out_file, "r") as file:
ipv4_dict[clist] = [community,list(file.readlines())]
# открываем файл выгрузки и пополняем словарь для группировки ipv6
if os.path.exists(ipv6_out_file):
with open(ipv6_out_file, "r") as file:
ipv6_dict[clist] = [community,list(file.readlines())]
# обновляем временный файл конфигурации ipv4
# из группировок
if ipv4_dict:
for k,v in get_dict_groups(ipv4_dict).items():
# имена выходых файлов
ipv4_out_file=f"{outdir}/{k.lower()}_v4.txt"
# сохраняем в файл
with open(ipv4_out_file, "w") as file:
file.write("".join(v[1]))
print(f"Файл группировки {ipv4_out_file} сохранён")
# список комьюнити маршрутов
bgp_community=" ".join([f"bgp_community.add(({str(c).replace(':',',')}));" for c in sorted(v[0])])
with open(ipv4_bird2_m4, "a") as file:
file.write(f"protocol static static_{k.lower()}_v4 {{\n\tipv4 {{ import filter {{ {bgp_community} preference=400; accept; }}; }};\n\tinclude \"{ipv4_out_file}\";\n}}\n")
# обновляем временный файл конфигурации ipv6
# из группировок
if ipv6_dict:
for k,v in get_dict_groups(ipv6_dict).items():
# имена выходых файлов
ipv6_out_file=f"{outdir}/{k.lower()}_v6.txt"
# сохраняем в файл
with open(ipv6_out_file, "w") as file:
file.write("".join(v[1]))
print(f"Файл группировки {ipv6_out_file} сохранён")
# список комьюнити маршрутов
bgp_community=" ".join([f"bgp_community.add(({str(c).replace(':',',')}));" for c in sorted(v[0])])
with open(ipv6_bird2_m4, "a") as file:
file.write(f"protocol static static_{k.lower()}_v6 {{\n\tipv6 {{ import filter {{ {bgp_community} preference=400; accept; }}; }};\n\tinclude \"{ipv6_out_file}\";\n}}\n")
# дополняем временный файл конфигурации всей выгрузкой ipv4 и ipv6
for clist, value in ip_list.items():
# имена выходых файлов
ipv4_out_file=f"{outdir}/{clist.lower()}_v4.txt"
ipv6_out_file=f"{outdir}/{clist.lower()}_v6.txt"
# список комьюнити маршрутов
bgp_community=" ".join([f"bgp_community.add(({str(c).replace(':',',')}));" for c in sorted([c for c in value[0].get('community', "").split(",") if c])])
if os.path.exists(ipv4_out_file):
# фильтер маршрутов ipv4, если список ignore существует в конфигурации
ip_addresses_filter=list()
if (ignore:=value[0].get('ignore', [])):
for c in right_list(list(ip_list.keys()), ignore):
if os.path.exists(f_open:=f"{outdir}/{c.lower()}_v4.txt"):
with open(f_open, "r") as file:
ip_addresses_filter.extend([line.strip().split()[1]+"+" for line in file])
with open(ipv4_bird2_m4, "a") as file:
bgp_filter=f"if net ~ [{','.join(ip_addresses_filter)}] then reject; " if ip_addresses_filter else ''
file.write(f"protocol static static_{clist.lower()}_v4 {{\n\tipv4 {{ import filter {{ {bgp_filter}{bgp_community} accept; }}; }};\n\tinclude \"{ipv4_out_file}\";\n}}\n")
if os.path.exists(ipv6_out_file):
# фильтер маршрутов ipv6, если список ignore существует в конфигурации
ip_addresses_filter=list()
if (ignore:=value[0].get('ignore', [])):
for c in right_list(list(ip_list.keys()), ignore):
if os.path.exists(f_open:=f"{outdir}/{c.lower()}_v6.txt"):
with open(f_open, "r") as file:
ip_addresses_filter.extend([line.strip().split()[1]+"+" for line in file])
with open(ipv6_bird2_m4, "a") as file:
bgp_filter=f"if net ~ [{','.join(ip_addresses_filter)}] then reject; " if ip_addresses_filter else ''
file.write(f"protocol static static_{clist.lower()}_v6 {{\n\tipv6 {{ import filter {{ {bgp_filter}{bgp_community} accept; }}; }};\n\tinclude \"{ipv6_out_file}\";\n}}\n")
# проверяем, что временный файл конфигурации ipv4 не пустой, сохраняем в постоянный
if os.path.exists(ipv4_bird2_m4) and os.path.getsize(ipv4_bird2_m4) != 0:
os.replace(ipv4_bird2_m4, ipv4_bird2_m4.removesuffix(".tmp"))
os.system("systemctl reload bird.service >/dev/null 2>&1 || \
systemctl restart bird.service >/dev/null 2>&1 && \
echo 'Новый файл конфигурации ipv6 применён' || echo '\\e[31mBird2 error...\\e[0m'")
# проверяем, что временный файл конфигурации ipv6 не пустой, сохраняем в постоянный
if os.path.exists(ipv6_bird2_m4) and os.path.getsize(ipv6_bird2_m4) != 0:
os.replace(ipv6_bird2_m4, ipv6_bird2_m4.removesuffix(".tmp"))
os.system("systemctl reload bird.service >/dev/null 2>&1 || \
systemctl restart bird.service >/dev/null 2>&1 && \
echo 'Новый файл конфигурации ipv6 применён' || echo '\\e[31mBird2 error...\\e[0m'")