bird_list_ip/download.py

202 lines
11 KiB
Python
Executable File
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/python3
import re
import os
import sys
import net_tree
import requests
# массив выгружаемых списков
ip_list = {
'RU': [
# Большая часть RU сегмента
{ 'url': ['https://stat.ripe.net/data/country-resource-list/data.json?resource=RU'], 'ipv4': True, 'ipv6': False },
{ 'url': ['https://ipv4.fetus.jp/ru.txt'], 'ipv4': True, 'ipv6': False },
{ 'url': ['https://github.com/ipverse/rir-ip/blob/master/country/ru/aggregated.json'], 'ipv4': True, 'ipv6': False },
# HLL LLC
{ 'url': ['https://bgp.he.net/AS51115#_prefixes', 'https://ipinfo.io/widget/demo/AS51115?dataset=asn', 'https://api.hackertarget.com/aslookup/?q=AS51115'], 'ipv4': True, 'ipv6': False },
# STATIC
{ 'static4': '188.130.255.0/24', 'ipv4': True, 'ipv6': False },
],
'CHINA': [
# Большая часть CH сегмента
{ 'url': ['https://stat.ripe.net/data/country-resource-list/data.json?resource=CN'], 'ipv4': True, 'ipv6': False },
{ 'url': ['https://ipv4.fetus.jp/cn.txt'], 'ipv4': True, 'ipv6': False },
{ 'url': ['https://github.com/ipverse/rir-ip/blob/master/country/cn/aggregated.json'], 'ipv4': True, 'ipv6': False },
# Hong Kong
{ 'url': ['https://stat.ripe.net/data/country-resource-list/data.json?resource=HK'], 'ipv4': True, 'ipv6': False },
{ 'url': ['https://ipv4.fetus.jp/hk.txt'], 'ipv4': True, 'ipv6': False },
{ 'url': ['https://github.com/ipverse/rir-ip/blob/master/country/hk/aggregated.json'], 'ipv4': True, 'ipv6': False },
# Alibaba (US) Technology Co., Ltd.
{ 'url': ['https://bgp.he.net/AS45102#_prefixes', 'https://ipinfo.io/widget/demo/AS45102?dataset=asn', 'https://api.hackertarget.com/aslookup/?q=AS45102'], 'ipv4': True, 'ipv6': False },
],
'JAPAN': [
# Большая часть KR сегмента
{ 'url': ['https://stat.ripe.net/data/country-resource-list/data.json?resource=JP'], 'ipv4': True, 'ipv6': False },
{ 'url': ['https://ipv4.fetus.jp/jp.txt'], 'ipv4': True, 'ipv6': False },
{ 'url': ['https://github.com/ipverse/rir-ip/blob/master/country/jp/aggregated.json'], 'ipv4': True, 'ipv6': False },
],
'KOREA': [
# Большая часть KR сегмента
{ 'url': ['https://stat.ripe.net/data/country-resource-list/data.json?resource=KR'], 'ipv4': True, 'ipv6': False },
{ 'url': ['https://ipv4.fetus.jp/kr.txt'], 'ipv4': True, 'ipv6': False },
{ 'url': ['https://github.com/ipverse/rir-ip/blob/master/country/kr/aggregated.json'], 'ipv4': True, 'ipv6': False },
# LG DACOM Corporation
{ 'url': ['https://bgp.he.net/AS3786#_prefixes', 'https://ipinfo.io/widget/demo/AS3786?dataset=asn', 'https://api.hackertarget.com/aslookup/?q=AS3786'], 'ipv4': True, 'ipv6': False },
],
'GOOGLE': [
# MTS, TTK, Megafon
{ 'url': ['https://bgp.he.net/AS15169#_prefixes', 'https://ipinfo.io/widget/demo/AS15169?dataset=asn', 'https://api.hackertarget.com/aslookup/?q=AS15169', 'https://www.gstatic.com/ipranges/goog.json'], 'static4': '212.188.0.0/17,188.43.0.0/16,178.176.0.0/14', 'ipv4': True, 'ipv6': False },
],
'RKN': [
{ 'url': ['https://reestr.rublacklist.net/api/v3/ips/', 'https://antifilter.network/download/ip.lst'], 'ipv4': 32, 'ipv6': False },
],
'MSFT': [
{ 'url': ['https://bgp.he.net/AS8075#_prefixes', 'https://ipinfo.io/widget/demo/AS8075?dataset=asn', 'https://api.hackertarget.com/aslookup/?q=AS8075'], 'ipv4': True, 'ipv6': False },
],
'VALVE': [
{ 'url': ['https://bgp.he.net/AS32590#_prefixes', 'https://ipinfo.io/widget/demo/AS32590?dataset=asn', 'https://api.hackertarget.com/aslookup/?q=AS32590'], 'ipv4': True, 'ipv6': False },
],
'AMAZONE': [
{ 'url': ['https://bgp.he.net/AS16509#_prefixes', 'https://ipinfo.io/widget/demo/AS16509?dataset=asn', 'https://api.hackertarget.com/aslookup/?q=AS16509'], 'ipv4': True, 'ipv6': False },
]
}
# компилируем регулярку поиска ipv4 адреса
ipv4_find_str=re.compile(r"[^0-9.]?(25[0-5]|2[0-4][0-9]|[1-9][0-9][0-9]?|[1-9])\.(25[0-5]|2[0-4][0-9]|[1-9][0-9][0-9]?|[0-9])\.(25[0-5]|2[0-4][0-9]|[1-9][0-9][0-9]?|[0-9])\.(25[0-5]|2[0-4][0-9]|[1-9][0-9][0-9]?|[0-9])(/([0-9]{1}[0-9]*))?[^0-9.]?")
# метод сбора словаря ip адресов ipv4 из текста
def ipv4_find(strip:str, size:int):
"""
Метод сбора словаря ip адресов ipv4 из текста
возвращает словарь ip
где:
ключ - имя сети
значение - [адрес в int формате, размер сети]
"""
listip=dict()
for c in ipv4_find_str.finditer(strip):
ip:int=0
key:str=""
# 4 элемента кортежа, содержащие актеты адреса
for i in range(1, 5):
key+=str(c.group(i))+"."
ip = ip * 256 + int(c.group(i))
# элемент кортежа, содержащий размер сети
if c.group(6) and int(c.group(6))<=size:
listip[key[:-1]+"/"+str(c.group(6))]=[ip,int(c.group(6))]
# элемент кортежа, с размером сети, отсутствует
elif size==32:
listip[f"{key[:-1]}/32"]=[ip,32]
return listip
# метод сбора словаря ip адресов ipv4 из текста
def ipv6_find(strip:str, size:int):
"""
Метод сбора словаря ip адресов ipv4 из текста
возвращает словарь ip
где:
ключ - имя сети
значение - [адрес в int формате, размер сети]
"""
return dict()
# метод получения списка ip адресов
def list_ip(c_dict: dict = []):
"""
Метод получения списка ip адресов
возвращает кортеж из 2-х списков: ipv4 и ipv6
"""
try:
ipv4_list=dict()
ipv6_list=dict()
# пробегаем словарь выгрузки
for c_list in c_dict:
# какие типы обрабытываем, от какого размера
ipv4 = False if 'ipv4' not in list(c_list) or not c_list['ipv4'] else (c_list['ipv4'] if type(c_list['ipv4']) is int else 24)
ipv6 = False if 'ipv6' not in list(c_list) or not c_list['ipv6'] else (c_list['ipv6'] if type(c_list['ipv6']) is int else 32)
# если есть источник ссылка
if 'url' in list(c_list):
# бежим весь список ссылок пока не код 200
for c_url in c_list['url']:
if (result:=requests.get(c_url)) and result.status_code == 200 and result.text:
print(f"URL: {c_url}")
# пополняем словарь ipv4_list
if ipv4: ipv4_list.update(ipv4_find(result.text,ipv4))
# пополняем словарь ipv6_list
if ipv6: ipv6_list.update(ipv6_find(result.text,ipv6))
break
print("Ошибка соединения")
# если есть статичные записи ipv4
if ipv4 and 'static4' in list(c_list):
print(f"STATIC: IPv4")
# пополняем словарь ipv4_list
ipv4_list.update(ipv4_find(str(c_list['static4']),ipv4))
# если есть статичные записи ipv6
if ipv6 and 'static6' in list(c_list):
print(f"STATIC: IPv6")
# пополняем словарь ipv6_list
ipv6_list.update(ipv6_find(str(c_list['static6']),ipv6))
# сжимаем подсети ipv4
if ipv4_list:
# строим дерево
Root = net_tree.Node(net_tree.Net(0,0), 0)
# пробегаем в цикле
for c in ipv4_list.values():
# добавляем запись в дерево
Root.addSubnet(net_tree.Node(net_tree.Net(c[0], c[1]), 1))
Root.finishTreeFirst()
# жесткое сжатие в размер 30000 записей
#Root.collapseRoot(Root.real_ip_records_count - 30000)
# более мягкое сжатие
Root.collapse(1,Root.real_ip_records_count)
# возвращаем результат
ipv4_list:str=Root.returnCollapsedTree('route {addr}/{masklen} blackhole;')
else:
ipv4_list:bool=False
# сжимаем подсети ipv6
if ipv6_list:
None
else:
ipv6_list:bool=False
# возвращаем 2 списка маршрутов
return ipv4_list, ipv6_list
except Exception as e:
# исключение
print(f"Ошибка: {e}")
return False, False
# главная фукция
if __name__ == "__main__":
# создаем дерриктори. для сохранения
outdir=os.path.join(os.path.dirname(os.path.realpath(__file__)), 'unloading')
if not os.path.exists(outdir):
os.makedirs(outdir,exist_ok=True)
# обходим массив списков для выкрузки
for clist in ip_list:
# имена выходых файлов
ipv4_out_file=f"{outdir}/{clist.lower()}_v4.txt"
ipv6_out_file=f"{outdir}/{clist.lower()}_v6.txt"
# вычисляем кол-во записей прошлой выгрузки
ipv4_count_old = sum(1 for line in open(ipv4_out_file)) if os.path.isfile(ipv4_out_file) else 0
ipv6_count_old = sum(1 for line in open(ipv6_out_file)) if os.path.isfile(ipv6_out_file) else 0
# выполняем выгрузку
print(f"Выгружаю список IP: {clist}")
ipv4_list, ipv6_list=list_ip(ip_list[clist])
# сохраняем ipv4
if ipv4_list and len(ipv4_list.splitlines()) >= ipv4_count_old * 0.7:
# сохраняем в файл
with open(ipv4_out_file, "w") as file:
file.write(ipv4_list)
print(f"Файл {ipv4_out_file} сохранён")
# сохраняем ipv6
if ipv6_list and len(ipv6_list.splitlines()) >= ipv6_count_old * 0.7:
# сохраняем в файл
with open(ipv6_out_file, "w") as file:
file.write(ipv6_list)
print(f"Файл {ipv6_out_file} сохранён")
print("")