使用Python进行dubbo接口测试
更新时间:2022-07-25 09:00:46 作者:多测师 浏览:289
背景
大家在测试dubbo接口是不是特别痛苦?因为dubbo接口并不是比较常见的http协议的,而是dubbo协议的,测试dubbo接口的有几种方法,譬如jmeter自定义sampler调用,java连接zookeeper中心调用dubbo,telnet命令调用dubbo等。
痛点
相信大家都比较熟悉使用jmeter,看了上面的测试方案,肯定是首选jmeter,但是这里踩坑较多,比如下载的插件与dubbo版本不对应,有时候响应参数出现中文乱码,有时候需要反编译jar包查看对应的入参类型等等...
解决痛点
在网上搜索了一下和看了dubbo接口的用户手册,发现dubbo接口支持telnet命令执行,Python中刚好有个库可以执行telnet命令 telnetlib库 dubbo接口又是通过zookeeper中心进行注册服务的,那我直接通过zookeeper中心查询dubbo接口相关的信息(ip、端口、服务名、方法名和入参类型)然后模拟telnet命令进行dubbo接口调用,岂不是美滋滋!
实现方案
Python + fastapi + telnetlib + kazoo。
调用流程
telnet命令实操
前提,我们得知了一个dubbo接口的ip和端口(通常可以通过服务名在zk中心搜索得知):
telnet 192.168.xx.xx 32024 连接dubbo服务
ls -l cn.com.api.dubbo.xxxxService 查询该服务的方法列表
invoke xxxxService.xxxMethod(1234, "test") 调用服务的方法
通过上述一系列的骚操作,相当于进行了dubbo接口的测试,当然,我们决对不可能通过终端敲命令进行dubbo接口测试,下面我们进行通过代码模拟telnet命令。
核心源码分析
zk中心搜索服务封装
class GetDubboService(object):
def __init__(self):
#测试环境,ZK_CONFIG为zk中心的注册地址,可传入string或者list,譬如ZK_CONFIG = ['xxx','xxx','xxx']或者ZK_CONFIG = 'xxx'
self.hosts = ZK_CONFIG
self.zk = self.zk_conn()
def zk_conn(self):
try:
zk = KazooClient(hosts=self.hosts, timeout=2)
zk.start(2) # 与zookeeper连接
except BaseException as e:
return False
return zk
def get_dubbo_info(self, dubbo_service):
global data
dubbo_service_data = {}
try:
#先查出注册中心所有的dubbo服务
all_node = self.zk.get_children('/dubbo')
# 根据传入服务名匹配对应的服务
node = [i for i in all_node if dubbo_service.lower() in i.lower()]
# 查询dubbo服务的详细信息
#遍历数据,过滤掉空数据
for i in node:
if self.zk.get_children(f'/dubbo/{i}/providers'):
dubbo_data = self.zk.get_children(f'/dubbo/{i}/providers')
for index, a in enumerate(dubbo_data):
url = parse.urlparse(parse.unquote(a)).netloc
host, port = url.split(":")
conn = BmDubbo(host, port)
#判断获取的ip地址是否连接成功,因为有些开发本地起了dubbo服务
status = conn.command("")
if status:
data = dubbo_data[index]
break
self.zk.stop()
except BaseException as e:
return dubbo_service_data
#parse.unquote 解码
#parse.urlparse 解析URL
#parse.query 获取查询参数
#parse.parse_qsl 返回列表
url_data = parse.urlparse(parse.unquote(data))
query_data = dict(parse.parse_qsl(url_data.query))
query_data['methods'] = query_data['methods'].split(",")
dubbo_service_data['url'] = url_data.netloc
dubbo_service_data['dubbo_service'] = dubbo_service
dubbo_service_data.update(query_data)
return dubbo_service_data
telnet命令调用封装:
class BmDubbo(object):
prompt = 'dubbo>'
def __init__(self, host, port):
self.conn = self.conn(host, port)
def conn(self,host, port):
conn = telnetlib.Telnet()
try:
conn.open(host, port, timeout=1)
except BaseException:
return False
return conn
def command(self, str_=""):
# 模拟cmd控制台 dubbo>invoke ...
if self.conn :
self.conn.write(str_.encode() + b'\n')
data = self.conn.read_until(self.prompt.encode())
return data
else:
return False
def invoke(self, service_name, method_name, arg):
command_str = "invoke {0}.{1}({2})".format(service_name, method_name, arg)
data = self.command(command_str)
try:
# 字节数据解码 utf8
data = data.decode("utf-8").split('\n')[0].strip()
except BaseException:
# 字节数据解码 gbk
data = data.decode("gbk").split('\n')[0].strip()
return data
def ls_invoke(self, service_name):
command_str = "ls -l {0}".format(service_name)
data = self.command(command_str)
if "No such service" in data.decode("utf-8"):
return False
else:
data = data.decode("utf-8").split('\n')
key = ['methodName', 'paramType','type']
dubbo_list = []
#这里解析有点复杂,可以自己通过telnet命令实操一下,ls -l xxx
for i in range(0, len(data) - 1):
value = []
dubbo_name = data[i].strip().split(' ')[1]
method_name = re.findall(r"(.*?)[(]", dubbo_name)[0]
value.append(method_name)
paramType = re.findall(r"[(](.*?)[)]", dubbo_name)[0]
paramTypeList = paramType.split(',')
if len(paramTypeList) ==1:
paramTypeList = paramTypeList[0]
value.append(paramTypeList)
#这里我将传参类型分成了4大类
if 'java.lang' in paramType or 'java.math' in paramType:
value.append(0)
elif not paramType:
value.append(1)
elif 'List' in paramType:
value.append(2)
else:
value.append(3)
dubbo_list.append(dict(zip(key, value)))
return dubbo_list
def param_data(self,service_name,method_name):
#这里是根据服务名和方法名,找到对应的传参类型
dubbo_data = self.ls_invoke(service_name)
if dubbo_data:
dubbo_list = dubbo_data
if dubbo_list:
for i in dubbo_list:
for v in i.values():
if v == method_name:
param_key = ['paramType','type']
param_value = [i.get('paramType'),i.get('type')]
return dict(zip(param_key,param_value))
else:
return False
else:
return False
dao层设计:
class DubboHandle(object):
@staticmethod
def invoke(service_name, method_name, data):
zk_conn = GetDubboService()
if zk_conn.zk:
zk_data = zk_conn.get_dubbo_info(service_name)
if zk_data:
host, port = zk_data['url'].split(":")
service_name = zk_data['interface']
boby = data.copy()
conn = BmDubbo(host, port)
status = conn.command("")
if status:
# 根据服务名和方法名,返回param方法名和类型
param_data = conn.param_data(service_name, method_name)
if param_data:
type = param_data['type']
param = param_data['paramType']
# 传参类型为枚举值方法
if type == 0 and isinstance(boby, dict):
l_data = []
for v in boby.values():
if isinstance(v,str):
v = f"'{v}'"
elif isinstance(v,dict) or isinstance(v,list):
v = json.dumps(v)
v = f"'{v}'"
l_data.append(str(v))
boby = ','.join(l_data)
# 无需传参
elif type == 1:
boby = ''
# 传参类型为集合对象
elif type == 2:
# params 只有一个集合对象传参
if isinstance(boby, list):
boby = boby
# params 一个集合对象后面跟着多个枚举值
elif isinstance(boby, dict):
set_list = []
for v in boby.values():
set_list.append(v)
set_data = str(set_list)
boby = set_data[1:-1]
# 传参类型为自定义对象
elif type == 3:
# 兼容多个自定义对象传参
if isinstance(param, list):
dtoList = []
for index, dto in enumerate(boby):
dto.update({"class": param[index]})
dtoList.append(json.dumps(dto))
boby = ','.join(dtoList)
elif isinstance(boby, dict):
boby.update({"class": param})
boby = json.dumps(boby)
else:
return None, f"data请求参数有误,请检查!"
response_data = conn.invoke(service_name, method_name, boby)
try:
response_data = json.loads(response_data)
except Exception as e:
return None, f"解析json失败:{response_data}"
return response_data, None
else:
return None, f"{service_name.split('.')[-1]}服务下不存在{method_name}方法"
else:
return None, f"{service_name}服务连接出错"
else:
return None, f"{service_name}没有在zk中心注册"
else:
return None, "zk服务连接失败"
view层引用:
@router.post('/invoke', name='dubbo业务请求接口')
async def dubboInvoke(data: DubboInvokeBody):
res_data, err = DubboHandle.invoke(data.serviceName, data.methodName, data.data)
if err:
return res_400(msg=err)
return res_200(data=res_data)
invoke接口传参说明
原生对象或者自定义对象传参(xxDto、jsonObj、java.util.HashMap):
{
"serviceName": "xxxxxx",
"methodName": "xxxxxx",
"data": { //data传入对应的对象数据,一般为json格式的
"productStoreQueryDTOS": [
{
"productNoNumDTOList": [
{
"num": 13,
"productNo": "10000620"
},
{
"num": 13,
"productNo": "10000014"
}
],
"storeCode": "4401S1389"
}
]
}
}
枚举值类型传参(java.lang.String、java.lang.Integer):
{
"serviceName": "xxxx",
"methodName": "xxxxx",
"data": { //格式为json,枚举值顺序必须按照dubbo接口定义的传参顺序,注意是否为int还是string
"account":"123456",
"password":"3fd6ebe43dab8b6ce6d033a5da6e6ac5"
}
}
方法名无需传参:
{
"serviceName": "xxxx",
"methodName": "xxxxxx",
"data":{} //传入空对象
}
集合对象传参(java.util.List):
{
"serviceName": "xxxx",
"methodName": "xxxxxx",
"data":{
"List": [
"1221323",
"3242442"
]
} //传入对象,里面嵌套数组
}
集合对象传参,后面跟着枚举值(java.util.List 、 java.lang.String 、 java.lang.Integer):
{
"serviceName": "xxxx",
"methodName": "xxxxxx",
"data":{
"userCode": ["12345","686838"],
"startTime": "2021-04-16 13:30:00",
"endTime": "2021-04-16 14:30:00"
}
}
多个自定义对象传参,对象顺序按照dubbo接口定义的传参顺序(xxdtox、xxdto):
{
"serviceName": "xxxx",
"methodName": "xxxxxx",
"data":[
{
"userCode": "7932723",
"startTime": "2021-04-16 13:30:00",
"endTime": "2021-04-16 14:30:00"
},
{
"name": "fang",
"age": "18"
}
]
}
上述传参可以满足大部分入参类型~
以上内容为大家介绍了使用Python进行dubbo接口测试,本文由多测师亲自撰写,希望对大家有所帮助。了解更多接口测试相关知识:https://www.aichudan.com/xwzx/