Files
nym/scripts/node_api_check.py
T
2025-02-11 05:39:00 -06:00

418 lines
18 KiB
Python
Executable File

#!/usr/bin/python3
# TODO (in later versions)
# - make option to run in sandbox env
# - pull endpoints from: https://validator.nymtech.net/api/v1/openapi.json
# - try this https://stackoverflow.com/questions/15431044/can-i-set-max-retries-for-requests-request/35504626#35504626
import requests as r
import argparse
import sys
import os
import pandas as pd
import json
import urllib3
import time
from json import JSONDecodeError
from tabulate import tabulate
class MainFunctions:
def __init__(self):
self.api_url = "https://validator.nymtech.net/api/v1"
self.api_endpoints_json = "api_endpoints.json"
self.output = Output()
def display_results(self, args):
id_key = args.id
mode, host, version, mix_id, role, node_df, node_dict, api_data, swagger_data, routing_history = self.collect_all_results(args)
print("\n============================================================")
print("\nNYM NODE INFO\n")
print(f"Type = {mode}")
if role:
print(f"Mode = {role}")
print(f"Identity Key = {id_key}")
print(f"Host = {host}")
print(f"Version = {version}")
toc = "Swagger page is not accessible. T&Cs are unrecognized and treated as False!"
if swagger_data:
toc = swagger_data["/auxiliary-details"]["accepted_operator_terms_and_conditions"]
print(f"T&Cs = {toc}")
if mix_id:
print(f"Mix ID = {mix_id}")
print("\n\nNODE RESULTS FROM UNFILTERED QUERY\n")
if args.markdown:
node_markdown = self._dataframe_to_markdown(node_df, ["RESULT"], ["API ENDPOINT"])
print(node_markdown, "\n")
else:
self.print_neat_dict(node_dict)
print(f"\n\nNODE RESULTS FROM {self.api_url.upper()}\n")
if args.markdown:
api_df = self._json_to_dataframe(api_data)
node_markdown = self._dataframe_to_markdown(api_df, ["RESULT"], ["API EDNPOINT"])
print(node_markdown, "\n")
else:
self.print_neat_dict(api_data)
if swagger_data:
print(f"\n\nNODE RESULTS FROM SWAGGER PAGE\n")
if args.markdown:
swagger_df = self._json_to_dataframe(swagger_data)
node_markdown = self._dataframe_to_markdown(swagger_df, ["RESULT"], ["API EDNPOINT"])
print(node_markdown, "\n")
else:
swagger_data = self._json_neat_format(swagger_data)
print(swagger_data)
else:
swagger_data = f"\nSwagger API endpoints of node {id_key} hosted on IP: {host} are not responding. Maybe you querying a deprecated version of nym-mixnode or the VPS ports are not open correctly.\n"
if routing_history:
print(f"\n\nNODE UPTIME HISTORY\n")
if args.markdown:
routing_history_df = self._json_to_dataframe(routing_history)
print(routing_history_df.to_markdown(index = False))
else:
self.print_neat_dict(routing_history)
routing_history = self._json_neat_format(routing_history)
else:
routing_history = " "
if args.output or args.output == "":
node_dict = self._json_neat_format(node_dict)
api_data = self._json_neat_format(api_data)
if role:
data_list = [f"Id. Key = {id_key}", f"Host = {host}", f"Type = {mode}", f"Mode = {role}", f"T&Cs = {toc}", node_dict, api_data, swagger_data, routing_history]
else:
data_list = [f"Id. Key = {id_key}", f"Host = {host}", f"Type = {mode}", f"T&Cs = {toc}", node_dict, api_data, swagger_data, routing_history]
self.output.concat_to_file(args, data_list)
def collect_all_results(self,args):
id_key = args.id
gateways_unfiltered, mixnodes_unfiltered = self.get_unfiltered_data()
gateways_df = self._json_to_dataframe(gateways_unfiltered)
gateways_df = self._set_index_to_empty(gateways_df)
mixnodes_df = self._json_to_dataframe(mixnodes_unfiltered)
mixnodes_df = self._set_index_to_empty(mixnodes_df)
mode, node_df, node_dict = self.get_node_df(id_key, gateways_df, mixnodes_df, gateways_unfiltered, mixnodes_unfiltered)
host, version, mix_id, role, api_data, swagger_data, routing_history = self.get_node_data(mode, node_dict, id_key, args)
return mode, host, version, mix_id, role, node_df, node_dict, api_data, swagger_data, routing_history
def get_node_df(self,id_key, gateways_df, mixnodes_df, gateways_unfiltered,mixnodes_unfiltered):
if id_key in mixnodes_df['mixnode_details.bond_information.mix_node.identity_key'].values:
node_df = mixnodes_df.loc[mixnodes_df['mixnode_details.bond_information.mix_node.identity_key'] == id_key]
node_dict = next((mn for mn in mixnodes_unfiltered if mn['mixnode_details']['bond_information']['mix_node']['identity_key'] == f"{id_key}"), None)
mode = "mixnode"
elif id_key in gateways_df["gateway_bond.gateway.identity_key"].values:
node_df = gateways_df.loc[gateways_df["gateway_bond.gateway.identity_key"] == id_key]
node_dict = next((gw for gw in gateways_unfiltered if gw['gateway_bond']['gateway']['identity_key'] == f"{id_key}"), None)
mode = "gateway"
else:
print(f"The identity key '{id_key}' does not exist.")
return mode, node_df, node_dict
def get_unfiltered_data(self):
print("INFO: Starting to query /detailed-unfiltered endpoint...")
gateways_unfiltered = r.get(f"{self.api_url}/status/gateways/detailed-unfiltered").json()
mixnodes_unfiltered = r.get(f"{self.api_url}/status/mixnodes/detailed-unfiltered").json()
return gateways_unfiltered, mixnodes_unfiltered
def get_node_data(self,mode, node_dict, id_key, args):
print("INFO: Sorting out data from the unfiltered endpoint...")
identity = id_key
endpoint_json = self.api_endpoints_json
with open(endpoint_json, "r") as f:
dicts = json.load(f)
endpoints = dicts[mode]
swagger = dicts["swagger"]
if args.no_verloc_metrics:
swagger.remove("/metrics/verloc")
api_data = {}
swagger_data = {}
routing_history = {}
mix_id = None
role = None
if mode == "gateway":
host = node_dict["gateway_bond"]["gateway"]["host"]
version = node_dict["gateway_bond"]["gateway"]["version"]
for key in endpoints:
endpoint = key.replace("{identity}", identity)
url = f"{self.api_url}{endpoint}"
print(f"Querying {url}")
value = r.get(url).json()
api_data[endpoint] = value
routing_history = api_data[f"/status/gateway/{identity}/history"]["history"]
del api_data[f"/status/gateway/{identity}/history"]["history"]
swagger_data = self.get_swagger_data(host,swagger,swagger_data)
if swagger_data:
if swagger_data["/roles"]["network_requester_enabled"] == True and swagger_data["/roles"]["ip_packet_router_enabled"] == True:
role = "exit-gateway"
else:
role = "entry-gateway"
else:
role = None
elif mode == "mixnode":
mix_id = str(node_dict["mixnode_details"]["bond_information"]["mix_id"])
for key in endpoints:
endpoint = key.replace("{mix_id}", mix_id)
url = f"{self.api_url}{endpoint}"
print(f"Querying {url}")
try:
value = r.get(url).json()
api_data[endpoint] = value
except (JSONDecodeError, json.JSONDecodeError, r.exceptions.JSONDecodeError):
print(f"Error: Endpoint {url} results in 404: Not Found!")
host = node_dict["mixnode_details"]["bond_information"]["mix_node"]["host"]
version = node_dict["mixnode_details"]["bond_information"]["mix_node"]["version"]
routing_history = api_data[f"/status/mixnode/{mix_id}/history"]["history"]
del api_data[f"/status/mixnode/{mix_id}/history"]["history"]
swagger_data = self.get_swagger_data(host,swagger,swagger_data)
else:
print(f"The mode type {mode} is not recognized!")
sys.exit(-1)
host = str(host)
if args.no_routing_history == True:
routing_history = None
else:
routing_history = routing_history
return host, version, mix_id, role, api_data, swagger_data, routing_history
def get_swagger_data(self,host,swagger,swagger_data):
print("INFO: Starting to query SWAGGER API endpoints...")
urls = [
f"http://{host}:8080/api/v1",
f"https://{host}/api/v1",
f"http://{host}/api/v1"
]
responding_url = self.get_swagger_response(urls)
if responding_url:
for endpoint in swagger:
print(f"Querying {responding_url}{endpoint}")
value = self.try_query_swagger(responding_url,endpoint)
if value:
swagger_data[endpoint] = value
else:
swagger_data = {}
return swagger_data
def try_query_swagger(self, base_url, endpoint):
url = f"{base_url}{endpoint}"
value = None
try:
value = r.get(url, timeout=2).json()
except (r.exceptions.ConnectionError, urllib3.exceptions.ProtocolError) as e:
print(f"Error: Connection error when querying {url}: {e}") # No break because you could be dealing with a different protocol
except (JSONDecodeError, json.JSONDecodeError, r.exceptions.JSONDecodeError, ConnectionResetError) as e:
print(f"Error: JSON decode error when querying {url}: {e}")
except r.exceptions.ConnectTimeout as e:
print(f"Error: Connection timeout when querying {url}: {e}")
except Exception as e:
print(f"Error: An unexpected error occurred when querying {url}: {e}")
return value
def get_swagger_response(self,urls):
responding_url = None
for base_url in urls:
endpoint = "/health"
value = self.try_query_swagger(base_url, endpoint)
if value:
responding_url = base_url
print(f"INFO: Swagger API is accessible via {responding_url}, we are going to proceed with querying Swagger endpoints...")
break
else:
print(f"Swagger API was unreachable via {base_url}, we cannot proceed with querying Swagger end points!")
return responding_url
def _set_index_to_empty(self, df):
index_len = pd.RangeIndex(len(df.index))
new_index = []
for x in index_len:
x = ""
new_index.append(x)
df.index = new_index
return df
def _dataframe_to_markdown(self,df,col_names, index_names=""):
df = df.T
df.index.names = index_names
df.columns = col_names
markdown = df.to_markdown()
return markdown
def format_dataframe(self, df):
df = self._json_to_dataframe(df)
df = df.T
return df
def print_neat_dict(self, dictionary, indent=4):
neat_dictionary = self._json_neat_format(dictionary)
print(neat_dictionary)
def _json_neat_format(self,dictionary,indent=4):
dictionary = json.dumps(dictionary, indent = indent)
return dictionary
def _json_to_dataframe(self,json):
df = pd.json_normalize(json)
return df
class Output():
def __init__(self):
self.home = os.path.expanduser('~')
self.pwd = os.path.dirname(os.path.realpath(__file__))
def concat_to_file(self,args, data_list):
filename = self.init_output_file(args)
with open(f"{filename}", "w") as output_file:
for name in data_list:
output_file.write(name)
output_file.write("\n")
print(f"\nResults were exported to {filename}.")
def init_output_file(self,args):
filename = self.get_filename(args)
os.system(f"touch {filename}")
return filename
def get_filename(self,args):
path = args.output
id_key = args.id
file = f"api_output_{id_key}.txt"
if path == "":
filename = file
else:
if path[-1] != "/":
path = path + "/"
if path[0] == "~":
path = self.home + path[1:]
filename = f"{path}{file}"
return filename
class VersionCount():
def __init__(self):
self.functions = MainFunctions()
self.mixnodes_version_column = 'mixnode_details.bond_information.mix_node.version'
self.gateways_version_column = 'gateway_bond.gateway.version'
def display_results(self, args):
df_final = self.fetch_results(args)
if args.markdown:
table = df_final.to_markdown(index=False)
else:
table = tabulate(df_final)
print(table)
def fetch_results(self, args):
gateways_unfiltered, mixnodes_unfiltered = self.functions.get_unfiltered_data()
df_gateways = self.functions._json_to_dataframe(gateways_unfiltered)
df_mixnodes = self.functions._json_to_dataframe(mixnodes_unfiltered)
versions = list(args.version)
mixnodes_version_column = self.mixnodes_version_column
gateways_version_column = self.gateways_version_column
mixnodes_sum = self.version_count(df_mixnodes, mixnodes_version_column, versions, "mixnode")
gateways_sum = self.version_count(df_gateways, gateways_version_column, versions, "gateway")
df_final = self.final_summary(mixnodes_sum, gateways_sum, versions)
return df_final
def version_count(self, df, column, versions, mode):
count_all = []
for version in versions:
version_sum = df[f'{column}'].value_counts()[f'{version}']
result = {"Node type": mode, "Version": version, "Summary":version_sum}
count_all.append(result)
return count_all
def final_summary(self, mixnodes_sum, gateways_sum, versions):
list_final = mixnodes_sum + gateways_sum
df_final = pd.DataFrame(list_final)
col_names = df_final.columns
total_summary = df_final['Summary'].sum()
if len(versions) > 1:
mixnodes_total = df_final.loc[df_final['Node type'] == 'mixnode', 'Summary'].sum()
gateways_total = df_final.loc[df_final['Node type'] == 'gateway', 'Summary'].sum()
df_append = pd.DataFrame([["mixnodes",f"versions: {versions}", f"{mixnodes_total}"],["gateways",f"versions: {versions}",f"{gateways_total}"]],columns=col_names)
df_final = pd.concat([df_final, df_append], ignore_index=True)
for version in versions:
version_total = df_final.loc[df_final['Version'] == f'{version}', 'Summary'].sum()
df_append = pd.DataFrame([["all nodes",f"{version}", f"{version_total}"]],columns=col_names)
df_final = pd.concat([df_final, df_append], ignore_index=True)
df_append = pd.DataFrame([["TOTAL SUMMARY",f"{versions}", f"{total_summary}"]],columns=col_names)
df_final = pd.concat([df_final, df_append], ignore_index=True)
return df_final
class ArgParser:
def __init__(self):
"""init for parser"""
self.functions = MainFunctions()
self.version_count = VersionCount()
def parser_main(self):
"""Main function initializing ArgumentParser, storing arguments and executing commands."""
# Top level parser
parser = argparse.ArgumentParser(
prog= "Nym-node API check",
description='''Run through all endpoints and print results.'''
)
parser.add_argument("-V","--version", action="version", version='%(prog)s 0.1.1')
# sub-command parsers
subparsers = parser.add_subparsers()
parser_pull_stats = subparsers.add_parser('query_stats',help='Get all nodes API endpoints', aliases=['q','query'])
parser_version_count = subparsers.add_parser('version_count', help='Sum of nodes in given version(s)', aliases=['v','version'])
# pull_stats arguments
parser_pull_stats.add_argument("id", help="supply nym-node identity key")
parser_pull_stats.add_argument("--no_routing_history", help="Display node stats without routing history", action="store_true")
parser_pull_stats.add_argument("--no_verloc_metrics", help="Display node stats without verloc metrics", action="store_true")
parser_pull_stats.add_argument("-m","--markdown",help="Display results in markdown format", action="store_true")
parser_pull_stats.add_argument("-o","--output",help="Save results to file (in current dir or supply with path without filename)", nargs='?',const="", type=str)
parser_pull_stats.set_defaults(func=self.functions.display_results)
# version_count arguments
parser_version_count.add_argument('version', help="supply node versions separated with space", nargs='+')
parser_version_count.add_argument("-m","--markdown",help="Display results in markdown format", action="store_true")
parser_version_count.set_defaults(func=self.version_count.display_results)
args = parser.parse_args()
try:
func = args.func
try:
args.func(args)
except (AttributeError, KeyError) as e:
msg = f"{e}.\nPlease run python {__file__} --help"
self.panic(msg)
except UnboundLocalError as e:
msg = f"{e}.\nPlease provide a correct node identity key."
self.panic(msg)
except FileNotFoundError as e:
msg = f"{e}.\nMake sure your <PATH> supplied to --output is correct."
self.panic(msg)
except AttributeError:
parser.print_help(sys.stderr)
def panic(self,msg):
"""Error message print"""
print(f"error: {msg}", file=sys.stderr)
sys.exit(-1)
if __name__ == '__main__':
node_check = ArgParser()
node_check.parser_main()