Files
bird_list_ip/download.py

216 lines
10 KiB
Python
Executable File
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/python3
import re
import os
import sys
import ast
import net_tree
import requests
# компилируем регулярку поиска ipv4 адреса
ipv4_find_str=re.compile(r"[^0-9.]?(25[0-5]|2[0-4][0-9]|1?[0-9][0-9]|[0-9])\.(25[0-5]|2[0-4][0-9]|1?[0-9][0-9]|[0-9])\.(25[0-5]|2[0-4][0-9]|1?[0-9][0-9]|[0-9])\.(25[0-5]|2[0-4][0-9]|1?[0-9][0-9]|[0-9])(/([0-9]{1}[0-9]*))?[^0-9.]?")
# заголовок HTTP запроса
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/124.0.0.0 Safari/537.36",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Accept-Language": "en-US,en;q=0.5",
"Accept-Encoding": "gzip, deflate, br",
"Connection": "keep-alive",
"Upgrade-Insecure-Requests": "1",
}
# метод сбора словаря ip адресов ipv4 из текста
def ipv4_find(strip:str, size:int):
"""
Метод сбора словаря ip адресов ipv4 из текста
возвращает словарь ip
где:
ключ - имя сети
значение - [адрес в int формате, размер сети]
"""
listip=dict()
for c in ipv4_find_str.finditer(strip):
ip:int=0
key:str=""
# 4 элемента кортежа, содержащие актеты адреса
for i in range(1, 5):
key+=str(c.group(i))+"."
ip = ip * 256 + int(c.group(i))
# элемент кортежа, содержащий размер сети
if c.group(6) and int(c.group(6))<=size:
listip[key[:-1]+"/"+str(c.group(6))]=[ip,int(c.group(6))]
# элемент кортежа, с размером сети, отсутствует
elif size==32:
listip[f"{key[:-1]}/32"]=[ip,32]
return listip
# метод сбора словаря ip адресов ipv4 из текста
def ipv6_find(strip:str, size:int):
"""
Метод сбора словаря ip адресов ipv4 из текста
возвращает словарь ip
где:
ключ - имя сети
значение - [адрес в int формате, размер сети]
"""
return dict()
# метод получения списка ip адресов
def list_ip(c_dict: dict = []):
"""
Метод получения списка ip адресов
возвращает кортеж из 2-х списков: ipv4 и ipv6
"""
try:
ipv4_list=dict()
ipv6_list=dict()
# пробегаем словарь выгрузки
for c_list in c_dict:
# какие типы обрабытываем, от какого размера
ipv4 = False if 'ipv4' not in list(c_list) or not c_list['ipv4'] else (c_list['ipv4'] if type(c_list['ipv4']) is int else 24)
ipv6 = False if 'ipv6' not in list(c_list) or not c_list['ipv6'] else (c_list['ipv6'] if type(c_list['ipv6']) is int else 32)
# если есть источник ссылка
if 'url' in list(c_list):
# бежим весь список ссылок пока не код 200
for c_url in c_list['url']:
try:
if (result:=requests.get(c_url, headers=headers)) and result.status_code == 200 and result.text:
print(f"URL: {c_url}")
# пополняем словарь ipv4_list
if ipv4: ipv4_list.update(ipv4_find(result.text,ipv4))
# пополняем словарь ipv6_list
if ipv6: ipv6_list.update(ipv6_find(result.text,ipv6))
break
except requests.exceptions.MissingSchema: pass
print("Ошибка соединения")
# если есть статичные записи ipv4
if ipv4 and 'static4' in list(c_list):
print(f"STATIC: IPv4")
# пополняем словарь ipv4_list
ipv4_list.update(ipv4_find(str(c_list['static4']),ipv4))
# если есть статичные записи ipv6
if ipv6 and 'static6' in list(c_list):
print(f"STATIC: IPv6")
# пополняем словарь ipv6_list
ipv6_list.update(ipv6_find(str(c_list['static6']),ipv6))
# сжимаем подсети ipv4
if ipv4_list:
# строим дерево
Root = net_tree.Node(net_tree.Net(0,0), 0)
# пробегаем в цикле
for c in ipv4_list.values():
# добавляем запись в дерево
Root.addSubnet(net_tree.Node(net_tree.Net(c[0], c[1]), 1))
Root.finishTreeFirst()
# жесткое сжатие в размер 30000 записей
#Root.collapseRoot(Root.real_ip_records_count - 30000)
# более мягкое сжатие
Root.collapse(1,Root.real_ip_records_count)
# возвращаем результат
ipv4_list:str=Root.returnCollapsedTree(' route {addr}/{masklen} blackhole;')
else:
ipv4_list:bool=False
# сжимаем подсети ipv6
if ipv6_list:
None
else:
ipv6_list:bool=False
# возвращаем 2 списка маршрутов
return ipv4_list, ipv6_list
except Exception as e:
# исключение
print(f"Ошибка: {e}")
return False, False
# главная фукция
if __name__ == "__main__":
# словарь выгружаемых списков
ip_list = dict()
try:
# если файл list содержет json структуру, парсим его
with open(list_file:=os.path.join(os.path.dirname(os.path.realpath(__file__)), 'list'), "r") as file:
ip_list = ast.literal_eval(file.read())
print(f"Список выгрузки из файла: {list_file}")
except (ValueError, SyntaxError):
try:
# если файл list ссылка, загружаем и парсим его
with open(list_file, "r") as file:
if (result:=requests.get(url_list_file:=file.readline().strip(), headers=headers)) and result.status_code == 200 and result.text:
ip_list = ast.literal_eval(result.text)
print(f"Список выгрузки по url: {url_list_file}")
except requests.exceptions.MissingSchema:
print(f"Невалидный URL на список выгрузки", file=sys.stderr)
sys.exit(1)
except (ValueError, SyntaxError):
print(f"Ошибочная структура json", file=sys.stderr)
sys.exit(1)
except Exception as e:
print(f"Ошибка: {e}", file=sys.stderr)
sys.exit(1)
except FileNotFoundError:
print(f"Файл со списками не найден", file=sys.stderr)
sys.exit(1)
except Exception as e:
print(f"Ошибка: {e}", file=sys.stderr)
sys.exit(1)
# если передан аргумент(ы) запуска,
# значит пытаемся обновить только указанные списки
if len(sys.argv) > 1:
ip_list = {k: v for k, v in ip_list.items() if k in sys.argv[1:]}
# проверяем на пустой список выгрузки
if (not ip_list):
print(f"Список выгрузки пустой", file=sys.stderr)
sys.exit(1)
# создаем дерриктори. для сохранения
outdir=os.path.join(os.path.dirname(os.path.realpath(__file__)), 'unloading')
if not os.path.exists(outdir):
os.makedirs(outdir,exist_ok=True)
# создаём временный файл, перед выгрузкой
open(ipv4_out_file := f"{outdir}/bird2_v4.m4.tmp", "w").close()
open(ipv6_out_file := f"{outdir}/bird2_v6.m4.tmp", "w").close()
# обходим массив списков для выкрузки
for clist in ip_list:
# выполняем выгрузку
print(f"Выгружаю список IP: {clist}")
ipv4_list, ipv6_list=list_ip(ip_list[clist]['list'])
# сохраняем ipv4
if ipv4_list:
# собираем комьюнити маршрутов
bgp_community=str()
for c in str(ip_list[clist]['community']).split(","):
bgp_community+=f"bgp_community.add(({str(c).replace(':',',')})); "
# сохраняем в файл
with open(ipv4_out_file, "a") as file:
file.write(f"protocol static static_{clist.lower()} {{\n ipv4 {{ import filter {{ {bgp_community}accept; }}; }};\n{ipv4_list}}}\n")
print(f"Временный файл {ipv4_out_file} создан")
# проверяем, что файл не пустой, сохраняем в постоянный
if os.path.exists(ipv4_out_file) and os.path.getsize(ipv4_out_file) != 0:
print(f"Новый файл выгрузки ipv4 премененён")
os.replace(ipv4_out_file, ipv4_out_file.removesuffix(".tmp"))
# сохраняем ipv6
if ipv6_list:
# собираем комьюнити маршрутов
bgp_community=str()
for c in str(ip_list[clist]['community']).split(","):
bgp_community+=f"bgp_community.add(({str(c).replace(':',',')})); "
# сохраняем в файл
with open(ipv6_out_file, "a") as file:
file.write(f"protocol static static_{clist.lower()} {{\n ipv6 {{ import filter {{ {bgp_community}accept; }}; }};\n{ipv6_list}}}\n")
print(f"Временный файл {ipv6_out_file} создан")
# проверяем, что файл не пустой, сохраняем в постоянный
if os.path.exists(ipv6_out_file) and os.path.getsize(ipv6_out_file) != 0:
print(f"Новый файл выгрузки ipv6 премененён")
os.replace(ipv6_out_file, ipv6_out_file.removesuffix(".tmp"))
print("")
# bird2 reload
os.system("systemctl reload bird.service")