File: //proc/thread-self/root/lib/mysqlsh/python-packages/mysql_gadgets/common/connection_parser.py
#
# Copyright (c) 2016, 2024, Oracle and/or its affiliates.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License, version 2.0,
# as published by the Free Software Foundation.
#
# This program is designed to work with certain software (including
# but not limited to OpenSSL) that is licensed under separate terms,
# as designated in a particular file or component or in included license
# documentation. The authors of MySQL hereby grant you an additional
# permission to link the program and your derivative works with the
# separately licensed software that they have either included with
# the program or referenced in the documentation.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See
# the GNU General Public License, version 2.0, for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
#
"""
This module contains the methods to support common operations
over the ip address or hostnames and parsing of connection strings.
"""
import re
import os
import logging
# Use backported OrderedDict if not available (for Python 2.6)
try:
from collections import OrderedDict
except ImportError:
from ordered_dict_backport import OrderedDict
from mysql_gadgets.common.logger import CustomLevelLogger
from mysql_gadgets.exceptions import GadgetCnxFormatError
logging.setLoggerClass(CustomLevelLogger)
_LOGGER = logging.getLogger(__name__)
_BAD_CONN_FORMAT = (u"Connection '{0}' cannot be parsed. Please review the "
u"used connection string (accepted formats: "
u"<user>@<host>[:<port>][:<socket>]")
_BAD_QUOTED_HOST = u"Connection '{0}' has a malformed quoted host"
_INVALID_PORT = (u"Invalid port '{0}' for connection string '{1}'. "
u"Port must be >= 0 and <= 65535.")
_UNPARSED_CONN_FORMAT = ("Connection '{0}' not parsed completely. Parsed "
"elements '{1}', unparsed elements '{2}'")
_CONN_QUOTEDHOST = re.compile(
r"((?:^[\'].*[\'])|(?:^[\"].*[\"]))" # quoted host name
r"(?:\:(\d+))?" # Optional port number
r"(?:\:([\/\\w+.\w+.\-]+))?" # Optional path to socket
)
_CONN_LOGINPATH = re.compile(
r"((?:\\\"|[^:])+|(?:\\\'|[^:])+)" # login-path
r"(?:\:(\d+))?" # Optional port number
r"(?:\:([\/\\w+.\w+.\-]+))?" # Optional path to socket
)
_CONN_CONFIGPATH = re.compile(
r"([\w\:]+(?:\\\"|[^[])+|(?:\\\'|[^[])+)" # config-path
r"(?:\[([^]]+))?", # group
re.U
)
_CONN_ANY_HOST = re.compile(
r"""(
(?![.-]) # Match cannot start by '.' or '-'
[\w._-]* # start with 0 or more: alphanum or '%'
% # must have a wildcard '%' at least
[\w.%_-]* # then 0 or more: alphanum, '%', '.' or '-'
(?<![.-]) # but cannot be ended by '.' or '-'.
)
(?::(.+))?$ # capture all the rest
""", re.VERBOSE)
_CONN_HOST_NAME = re.compile(
r"""(
(?:
(?:
(?:
(?!-) # must not start with hyphen '-'
(?:[\w\d-])* # must not end with the hyphen
[A-Za-z] # starts with a character from the alphabet
(?:[\w\d-])*
(?:
(?<!-) # end capturing if a '-' is followed by '.'
)
){1,63} # limited length for segment
)
(?: # following segments
(?:\.)? # the segment separator the dot '.'
(?:
(?!-)
[\w\d-]{1,63} # last segment
(?<!-) #shuld not end with hyphen
)
)*
)
)
(.*) # capture all the rest
""", re.VERBOSE)
_CONN_IPV4_NUM_ONLY = re.compile(
r"""(
(?: # start of the IPv4 1st group
25[0-4] # this match numbers 250 to 254
| # or
2[0-4]\d # this match numbers from 200 to 249
| # or
1\d\d # this match numbers from 100 to 199
| # or
[1-9]{0,1}\d # this match numbers from 0 to 99
)
(?: # start of the 3 next groups
\. # the prefix '.' like in '.255'
(?:
25[0-4]|2[0-4]\d|1\d\d|[1-9]?\d
# same group as before
)
)
{3} # but it will match 3 times of it and prefixed by '.'
)
(?:\:{0,1}(.*))
""", re.VERBOSE)
_CONN_PORT_ONLY = re.compile(
r"""(?:
\]{0,1} # the ']' of IPv6 -optional
\:{0,1} # the ':' for port number
(
-\d+|\d* # matches any sequence of numbers,
) # including negative/invalid port numbers
) # end of port number group
(?:\:{0,1}(.*)) # all the rest to extract the socket
""", re.VERBOSE)
_CONN_SOCKET_ONLY = re.compile(
r"""(?: # Not capturing group of ':'
\:{0,1}
([ # Capturing '\' or '/' file name.ext
\/\\w+.\w+.\-
]+ # to match a path
)
)?
(.*) # all the rest to advice the user.
""", re.VERBOSE)
_CONN_IPV6 = re.compile(
r"""
\[{0,1} # the optional heading '['
(
(?!.*::.*::) # Only a single whildcard allowed
(?:(?!:)|:(?=:)) # Colon iff it would be part of a wildcard
(?: # Repeat 6 times:
[0-9a-f]{0,4} # A group of at most four hexadecimal digits
(?:(?<=::)|(?<!::):) # Colon unless preceded by wildcard
){6} # expecting 6 groups
(?: # Either
[0-9a-f]{0,4} # Another group
(?:(?<=::)|(?<!::):) # Colon unless preceded by wildcard
[0-9a-f]{0,4} # Last group
(?:(?<=::) # Colon iff preceded by exacly one colon
|(?<!:)
|(?<=:)(?<!::):
)
)
)
(?:
\]{0,1}\:{0,1}(.*) # optional closing ']' and group for the rest
)
""", re.VERBOSE)
# Type of address amd Key names for the dictionary IP_MATCHERS
HN = "hostname"
IPV4 = "IPv4"
IPV6 = "IPv6"
ANY_LIKE = "host like"
# This dictionary is used to identify the matched type..
IP_MATCHERS = OrderedDict([
(IPV4, _CONN_IPV4_NUM_ONLY),
(IPV6, _CONN_IPV6),
(ANY_LIKE, _CONN_ANY_HOST),
(HN, _CONN_HOST_NAME),
])
_DEFAULT_PORT = 3306
def hostname_is_ip(hostname):
"""Check if specified hostname is an IP address.
:param hostname: hostname or IP address to check.
:type hostname: string
:return: True if the specified 'hostname' is a valid IP address (IPv4 or
IPv6), otherwise False.
:rtype: boolean
"""
if len(hostname.split(":")) <= 1: # if fewer colons, must be IPv4
grp = _CONN_IPV4_NUM_ONLY.match(hostname)
else:
grp = _CONN_IPV6.match(hostname)
if not grp:
return False
return True
def parse_connection(connection_str, options=None):
"""Parse connection values.
The function parses a connection string with the following format:
user@host[:port][:socket].
A dictionary is returned containing the connection parameters. The
function is designed so that it shall be possible to use it with a
``connect`` call in the following manner:
cnx_options = parse_connection(connection_values, options)
cnx = mysql.connector.connect(**cnx_options)
Notes:
This method validates IPv4 addresses and standard IPv6 addresses.
This method accepts quoted host portion strings. If the host is marked
with quotes, the code extracts this without validation and assigns it to
the host variable in the returned tuple. This allows users to specify host
names and IP addresses that are outside of the supported validation.
:param connection_str: Connection string in the form:
user@host[:port][:socket].
:type connection_str: string
:param options: Dictionary of options (e.g. charset, ssl_cert,
ssl_ca, ssl_key, ssl).
:type options: dictionary
:return: dictionary with the parsed connection values (user, host, port,
etc.).
:rtype: dictionary
:raises GadgetCnxFormatError: if a parsing error occurs.
"""
if options is None:
options = {}
# SSL options, must not be overwritten with those from options.
ssl_ca = None
ssl_cert = None
ssl_key = None
ssl = None
# Split on the '@' to determine the connection string format.
# The user/password may have the '@' character, split by last occurrence.
conn_format = connection_str.rsplit('@', 1)
if len(conn_format) == 2:
# Handle as in the format: user@host[:port][:socket]
user, hostportsock = conn_format
# Handle host, port and socket
if len(hostportsock) <= 0 or len(user) <= 0:
raise GadgetCnxFormatError(
_BAD_CONN_FORMAT.format(connection_str))
if hostportsock[0] in ('"', "'"):
# Need to strip the quotes
res = _match(_CONN_QUOTEDHOST, hostportsock)
if res and len(res) == 3:
host = res[0]
port = res[1]
socket = res[2]
else:
raise GadgetCnxFormatError(
_BAD_CONN_FORMAT.format(connection_str))
if host[0] == '"':
host = host.strip('"')
if host and host[0] == "'":
host = host.strip("'")
# Raise exception if no host is specified, e.g. user@""
if not host:
raise GadgetCnxFormatError(
_BAD_CONN_FORMAT.format(connection_str))
else:
host, port, socket, _ = parse_server_address(hostportsock)
else:
# Unrecognized format
raise GadgetCnxFormatError(_BAD_CONN_FORMAT.format(connection_str))
# Get character-set from options
if isinstance(options, dict):
charset = options.get("charset", None)
# If one SSL option was found before, not mix with those in options.
if not ssl_cert and not ssl_ca and not ssl_key and not ssl:
ssl_cert = options.get("ssl_cert", None)
ssl_ca = options.get("ssl_ca", None)
ssl_key = options.get("ssl_key", None)
ssl = options.get("ssl", None)
else:
charset = None
ssl_cert = None
ssl_ca = None
ssl_key = None
ssl = None
_LOGGER.warning("options is not instance of dict: %s", options)
# Set parsed connection values
connection = {
"user": user,
"host": host,
}
if charset:
connection["charset"] = charset
if ssl_cert:
connection["ssl_cert"] = ssl_cert
if ssl_ca:
connection["ssl_ca"] = ssl_ca
if ssl_key:
connection["ssl_key"] = ssl_key
if ssl:
connection["ssl"] = ssl
# If a port was specified we use the port, if a socket was specified
# we use the socket, else if we specify neither, we use the
# default port.
if port:
port = int(port)
# Check if port range is valid.
if port < 0 or port > 65535:
raise GadgetCnxFormatError(_INVALID_PORT.format(port,
connection_str))
connection["port"] = port
elif socket is not None and os.name == "posix":
connection['unix_socket'] = socket
# Port is mandatory to create Server instance.
connection["port"] = None
else:
connection["port"] = _DEFAULT_PORT
return connection
def parse_server_address(connection_str):
"""Parses host, port and socket from the given connection string.
:param connection_str: Connection string in the form:
user@host[:port][:socket].
:type connection_str: string
:return: a tuple of (host, port, socket, add_type) where add_type is
the name of the parser that successfully parsed the hostname
from the connection string.
:rtype: tuple
:raises GadgetCnxFormatError: if a parsing error occurs.
"""
# Default values to return.
host = None
port = None
socket = None
address_type = None
unparsed = None
# From the matchers look the one that match a host.
for ip_matcher in IP_MATCHERS:
try:
group = _match(IP_MATCHERS[ip_matcher], connection_str)
if group:
host = group[0]
if ip_matcher == IPV6:
host = "[%s]" % host
if group[1]:
part2_port_socket = _match(_CONN_PORT_ONLY, group[1],
throw_error=False)
if not part2_port_socket:
unparsed = group[1]
else:
port = part2_port_socket[0]
if part2_port_socket[1]:
part4 = _match(_CONN_SOCKET_ONLY,
part2_port_socket[1],
throw_error=False)
if not part4:
unparsed = part2_port_socket[1]
else:
socket = part4[0]
unparsed = part4[1]
# If host is match we stop looking as is the most significant.
if host:
address_type = ip_matcher
break
# ignore the error trying to match.
except GadgetCnxFormatError:
pass
# we must alert, that the connection could not be parsed.
if host is None:
raise GadgetCnxFormatError(_BAD_CONN_FORMAT.format(connection_str))
_verify_parsing(connection_str, host, port, socket, address_type, unparsed)
_LOGGER.debug("->parse_server_address \n host: %s \n address_type: %s",
host, address_type)
return host, port, socket, address_type
def _verify_parsing(connection_str, host, port, socket, address_type,
unparsed):
"""Verify connection string parsing.
Verify that the connection string was totally parsed and not parts of
it where not matched, otherwise raise an error.
:param connection_str: Connection string in the form:
user@host[:port][:socket].
:type connection_str: string
:param host: the parsed host
:type host: string
:param port: the parsed port
:type port: string
:param socket: the parsed socket
:type socket: string
:param address_type: Type of the parsed host, one of the following value:
"IPv4", "IPv6" or "hostname"
:type address_type: string
:param unparsed: unparsed string (not identified part)
:type unparsed: string
:raises GadgetCnxFormatError: if a part of the connection string was not
identified while parsed.
"""
exp_connection_str = connection_str
# _LOGGER.debug("exp_connection_str %s", exp_connection_str)
parsed_connection_list = []
if host:
# _LOGGER.debug("host %s", host)
if address_type == IPV6 and "[" not in connection_str:
host = host.replace("[", "")
host = host.replace("]", "")
parsed_connection_list.append(host)
if port:
# _LOGGER.debug("port %s", port)
parsed_connection_list.append(port)
if socket:
# _LOGGER.debug("socket %s", socket)
parsed_connection_list.append(socket)
parsed_connection = ":".join(parsed_connection_list)
# _LOGGER.debug('parsed_connection %s', parsed_connection)
diff = None
if not unparsed:
# _LOGGER.debug('not unparsed found, creating diff')
diff = connection_str.replace(host, "")
if port:
diff = diff.replace(port, "")
if socket:
diff = diff.replace(socket, "")
# _LOGGER.debug("diff %s", diff)
# _LOGGER.debug("unparsed %s", unparsed)
if unparsed or (exp_connection_str != parsed_connection and
(diff and diff != ":")):
# _LOGGER.debug("raising exception")
parsed_args = "host:%s, port:%s, socket:%s" % (host, port, socket)
err_msg = _UNPARSED_CONN_FORMAT.format(connection_str, parsed_args,
unparsed)
# _LOGGER.warning(err_msg)
raise GadgetCnxFormatError(err_msg)
def _match(pattern, connection_str, throw_error=True):
"""Check pattern match with connection string.
Tries to match a pattern with the connection string and returns the
groups.
:param pattern: Regular expression object used to parse the connection
string.
:type pattern: re.RegexObject
:param connection_str: Connection string in the form:
user@host[:port][:socket].
:type connection_str: string
:param throw_error: Indicate if an exception is raised (True) if the
connection string does not match the pattern, or False
is returned. By default: True.
:type throw_error: boolean
:return: tuple containing all the subgroups of the matched pattern. If no
match is found False is returned if 'throw_error' is set to False.
:rtype: tuple or boolean
:raises GadgetCnxFormatError: if connection string does not match the
given pattern and throw_error is set to True.
"""
grp = pattern.match(connection_str)
if not grp:
if throw_error:
raise GadgetCnxFormatError(_BAD_CONN_FORMAT.format(connection_str))
return False
return grp.groups()
def clean_IPv6(host_address):
"""Clean IPv6 host address.
:param host_address: host address (IPv6)
:type host_address: string
:return: the given host address without '[' and ']' characters (removed).
:rtype: string
"""
if host_address:
host_address = host_address.replace("[", "")
host_address = host_address.replace("]", "")
return host_address
def parse_user_host(user_host):
"""Parse user, passwd, host, port from user:passwd@host
:param user_host: MySQL user string (user:passwd@host)
:type user_host: string
:return: Tuple with the user and host.
:rtype: tuple
:raises: GadgetError if the user and host could not be parsed
"""
no_ticks = user_host.replace("'", "")
try:
conn_values = parse_connection(no_ticks)
except GadgetCnxFormatError as err:
raise GadgetCnxFormatError("Cannot parse user@host from: {0}."
"".format(no_ticks), cause=err)
return (conn_values['user'], conn_values['host'])