Files
bird_list_ip/download.py

215 lines
11 KiB
Python
Executable File
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/python3
import re
import os
import sys
import ast
import requests
from include import net_tree
from include.http_header import get_headers
# компилируем регулярку поиска ipv4 адреса
ipv4_find_str=re.compile(r"[^0-9.]?(25[0-5]|2[0-4][0-9]|1?[0-9][0-9]|[0-9])\.(25[0-5]|2[0-4][0-9]|1?[0-9][0-9]|[0-9])\.(25[0-5]|2[0-4][0-9]|1?[0-9][0-9]|[0-9])\.(25[0-5]|2[0-4][0-9]|1?[0-9][0-9]|[0-9])(/([0-9]{1}[0-9]*))?[^0-9.]?")
# метод сбора словаря ip адресов ipv4 из текста
def ipv4_find(strip:str, size:int):
"""
Метод сбора словаря ip адресов ipv4 из текста
возвращает словарь ip
где:
ключ - имя сети
значение - [адрес в int формате, размер сети]
"""
listip=dict()
for c in ipv4_find_str.finditer(strip):
ip:int=0
key:str=""
# 4 элемента кортежа, содержащие актеты адреса
for i in range(1, 5):
key+=str(c.group(i))+"."
ip = ip * 256 + int(c.group(i))
# элемент кортежа, содержащий размер сети
if c.group(6) and int(c.group(6))<=size:
listip[key[:-1]+"/"+str(c.group(6))]=[ip,int(c.group(6))]
# элемент кортежа, с размером сети, отсутствует
elif size==32:
listip[f"{key[:-1]}/32"]=[ip,32]
return listip
# метод сбора словаря ip адресов ipv4 из текста
def ipv6_find(strip:str, size:int):
"""
Метод сбора словаря ip адресов ipv4 из текста
возвращает словарь ip
где:
ключ - имя сети
значение - [адрес в int формате, размер сети]
"""
return dict()
# метод получения списка ip адресов
def list_ip(c_dict: dict = []):
"""
Метод получения списка ip адресов
возвращает кортеж из 2-х списков: ipv4 и ipv6
"""
try:
ipv4_list=dict()
ipv6_list=dict()
# пробегаем словарь выгрузки
for c_list in c_dict:
# какие типы обрабытываем, от какого размера
ipv4 = False if 'ipv4' not in list(c_list) or not c_list['ipv4'] else (c_list['ipv4'] if type(c_list['ipv4']) is int else 24)
ipv6 = False if 'ipv6' not in list(c_list) or not c_list['ipv6'] else (c_list['ipv6'] if type(c_list['ipv6']) is int else 32)
# если есть источник ссылка
if 'url' in list(c_list):
# бежим весь список ссылок пока не код 200
for c_url in c_list['url']:
try:
session = requests.Session()
session.headers.update(get_headers())
if (result:=session.get(c_url)) and result.status_code == 200 and result.text:
print(f"URL: {c_url}")
# пополняем словарь ipv4_list
if ipv4: ipv4_list.update(ipv4_find(result.text,ipv4))
# пополняем словарь ipv6_list
if ipv6: ipv6_list.update(ipv6_find(result.text,ipv6))
break
except requests.exceptions.MissingSchema: pass
print("Ошибка соединения")
# если есть статичные записи ipv4
if ipv4 and 'static4' in list(c_list):
print(f"STATIC: IPv4")
# пополняем словарь ipv4_list
ipv4_list.update(ipv4_find(str(c_list['static4']),ipv4))
# если есть статичные записи ipv6
if ipv6 and 'static6' in list(c_list):
print(f"STATIC: IPv6")
# пополняем словарь ipv6_list
ipv6_list.update(ipv6_find(str(c_list['static6']),ipv6))
# сжимаем подсети ipv4
if ipv4_list:
# строим дерево
Root = net_tree.Node(net_tree.Net(0,0), 0)
# пробегаем в цикле
for c in ipv4_list.values():
# добавляем запись в дерево
Root.addSubnet(net_tree.Node(net_tree.Net(c[0], c[1]), 1))
Root.finishTreeFirst()
# жесткое сжатие в размер 30000 записей
#Root.collapseRoot(Root.real_ip_records_count - 30000)
# более мягкое сжатие
Root.collapse(1,Root.real_ip_records_count)
# возвращаем результат
ipv4_list:str=Root.returnCollapsedTree('route {addr}/{masklen} blackhole;')
else:
ipv4_list:bool=False
# сжимаем подсети ipv6
if ipv6_list:
None
else:
ipv6_list:bool=False
# возвращаем 2 списка маршрутов
return ipv4_list, ipv6_list
except Exception as e:
# исключение
print(f"Ошибка: {e}")
return False, False
# главная фукция
if __name__ == "__main__":
# словарь выгружаемых списков
ip_list = dict()
try:
# если файл list содержет json структуру, парсим его
with open(list_file:=os.path.join(os.path.dirname(os.path.realpath(__file__)), 'list'), "r") as file:
ip_list = ast.literal_eval(file.read())
print(f"Список выгрузки из файла: {list_file}")
except (ValueError, SyntaxError):
try:
# если файл list ссылка, загружаем и парсим его
with open(list_file, "r") as file:
session = requests.Session()
session.headers.update(get_headers())
if (result:=session.get(url_list_file:=file.readline().strip())) and result.status_code == 200 and result.text:
ip_list = ast.literal_eval(result.text)
print(f"Список выгрузки по url: {url_list_file}")
except requests.exceptions.MissingSchema:
print(f"Невалидный URL на список выгрузки", file=sys.stderr)
sys.exit(1)
except (ValueError, SyntaxError):
print(f"Ошибочная структура json", file=sys.stderr)
sys.exit(1)
except Exception as e:
print(f"Ошибка: {e}", file=sys.stderr)
sys.exit(1)
except FileNotFoundError:
print(f"Файл со списками не найден", file=sys.stderr)
sys.exit(1)
except Exception as e:
print(f"Ошибка: {e}", file=sys.stderr)
sys.exit(1)
# проверяем на пустой список выгрузки
if (not ip_list):
print(f"Список выгрузки пустой", file=sys.stderr)
sys.exit(1)
# создаем дерриктори. для сохранения
outdir=os.path.join(os.path.dirname(os.path.realpath(__file__)), 'unloading')
if not os.path.exists(outdir):
os.makedirs(outdir,exist_ok=True)
# создаём временный файл экспортируемой конфигурации, перед выгрузкой
open(ipv4_bird2_m4 := f"{outdir}/bird2_v4.m4.tmp", "w").close()
open(ipv6_bird2_m4 := f"{outdir}/bird2_v6.m4.tmp", "w").close()
# обходим массив списков для выкрузки
for clist in ip_list:
# имена выходых файлов
ipv4_out_file=f"{outdir}/{clist.lower()}_v4.txt"
ipv6_out_file=f"{outdir}/{clist.lower()}_v6.txt"
# если передан аргумент(ы) запуска,
# значит пытаемся обновить только указанные списки
if len(sys.argv)==1 or clist in sys.argv[1:]:
print("")
# вычисляем кол-во записей прошлой выгрузки
ipv4_count_old = sum(1 for line in open(ipv4_out_file)) if os.path.isfile(ipv4_out_file) else 0
ipv6_count_old = sum(1 for line in open(ipv6_out_file)) if os.path.isfile(ipv6_out_file) else 0
# выполняем выгрузку
print(f"Выгружаю список IP: {clist}")
ipv4_list, ipv6_list=list_ip(ip_list[clist]['list'])
# сохраняем ipv4
if ipv4_list and len(ipv4_list.splitlines()) >= ipv4_count_old * 0.5:
# сохраняем в файл
with open(ipv4_out_file, "w") as file:
file.write(ipv4_list)
print(f"Файл выгрузки {ipv4_out_file} сохранён")
# сохраняем ipv6
if ipv6_list and len(ipv6_list.splitlines()) >= ipv6_count_old * 0.5:
# сохраняем в файл
with open(ipv6_out_file, "w") as file:
file.write(ipv6_list)
print(f"Файл выгрузки {ipv6_out_file} сохранён")
# собираем комьюнити маршрутов
bgp_community=str()
for c in str(ip_list[clist]['community']).split(","):
bgp_community+=f"bgp_community.add(({str(c).replace(':',',')})); "
# обновляем временный файл конфигурации ipv4
with open(ipv4_bird2_m4, "a") as file:
file.write(f"protocol static static_{clist.lower()} {{\n\tipv4 {{ import filter {{ {bgp_community}accept; }}; }};\n\tinclude \"{ipv4_out_file}\";\n}}\n")
# обновляем временный файл конфигурации ipv6
with open(ipv6_bird2_m4, "a") as file:
file.write(f"protocol static static_{clist.lower()} {{\n\tipv6 {{ import filter {{ {bgp_community}accept; }}; }};\n\tinclude \"{ipv6_out_file}\";\n}}\n")
# проверяем, что временный файл конфигурации ipv4 не пустой, сохраняем в постоянный
if os.path.exists(ipv4_bird2_m4) and os.path.getsize(ipv4_bird2_m4) != 0:
os.replace(ipv4_bird2_m4, ipv4_bird2_m4.removesuffix(".tmp"))
os.system("systemctl reload bird.service")
print(f"Новый файл конфигурации ipv4 применён")
# проверяем, что временный файл конфигурации ipv6 не пустой, сохраняем в постоянный
if os.path.exists(ipv6_bird2_m4) and os.path.getsize(ipv6_bird2_m4) != 0:
os.replace(ipv6_bird2_m4, ipv6_bird2_m4.removesuffix(".tmp"))
os.system("systemctl reload bird.service")
print(f"Новый файл конфигурации ipv6 применён")