多测师是一家拥有先进的教学理念,强大的师资团队,业内好评甚多的接口自动化测试培训机构!

17727591462

联系电话

您现在所在位置:接口自动化测试培训 > 新闻资讯

使用Python进行dubbo接口测试

更新时间:2022-07-25 09:00:46 作者:多测师 浏览:289

  背景

  大家在测试dubbo接口是不是特别痛苦?因为dubbo接口并不是比较常见的http协议的,而是dubbo协议的,测试dubbo接口的有几种方法,譬如jmeter自定义sampler调用,java连接zookeeper中心调用dubbo,telnet命令调用dubbo等。

使用Python进行dubbo接口测试

  痛点

  相信大家都比较熟悉使用jmeter,看了上面的测试方案,肯定是首选jmeter,但是这里踩坑较多,比如下载的插件与dubbo版本不对应,有时候响应参数出现中文乱码,有时候需要反编译jar包查看对应的入参类型等等...

  解决痛点

  在网上搜索了一下和看了dubbo接口的用户手册,发现dubbo接口支持telnet命令执行,Python中刚好有个库可以执行telnet命令 telnetlib库 dubbo接口又是通过zookeeper中心进行注册服务的,那我直接通过zookeeper中心查询dubbo接口相关的信息(ip、端口、服务名、方法名和入参类型)然后模拟telnet命令进行dubbo接口调用,岂不是美滋滋!

  实现方案

  Python + fastapi + telnetlib + kazoo。

  调用流程

  telnet命令实操

  前提,我们得知了一个dubbo接口的ip和端口(通常可以通过服务名在zk中心搜索得知):

  telnet 192.168.xx.xx 32024 连接dubbo服务

  ls -l cn.com.api.dubbo.xxxxService 查询该服务的方法列表

  invoke xxxxService.xxxMethod(1234, "test") 调用服务的方法

  通过上述一系列的骚操作,相当于进行了dubbo接口的测试,当然,我们决对不可能通过终端敲命令进行dubbo接口测试,下面我们进行通过代码模拟telnet命令。

  核心源码分析

  zk中心搜索服务封装

  class GetDubboService(object):

  def __init__(self):

  #测试环境,ZK_CONFIG为zk中心的注册地址,可传入string或者list,譬如ZK_CONFIG = ['xxx','xxx','xxx']或者ZK_CONFIG = 'xxx'

  self.hosts = ZK_CONFIG

  self.zk = self.zk_conn()

  def zk_conn(self):

  try:

  zk = KazooClient(hosts=self.hosts, timeout=2)

  zk.start(2) # 与zookeeper连接

  except BaseException as e:

  return False

  return zk

  def get_dubbo_info(self, dubbo_service):

  global data

  dubbo_service_data = {}

  try:

  #先查出注册中心所有的dubbo服务

  all_node = self.zk.get_children('/dubbo')

  # 根据传入服务名匹配对应的服务

  node = [i for i in all_node if dubbo_service.lower() in i.lower()]

  # 查询dubbo服务的详细信息

  #遍历数据,过滤掉空数据

  for i in node:

  if self.zk.get_children(f'/dubbo/{i}/providers'):

  dubbo_data = self.zk.get_children(f'/dubbo/{i}/providers')

  for index, a in enumerate(dubbo_data):

  url = parse.urlparse(parse.unquote(a)).netloc

  host, port = url.split(":")

  conn = BmDubbo(host, port)

  #判断获取的ip地址是否连接成功,因为有些开发本地起了dubbo服务

  status = conn.command("")

  if status:

  data = dubbo_data[index]

  break

  self.zk.stop()

  except BaseException as e:

  return dubbo_service_data

  #parse.unquote 解码

  #parse.urlparse 解析URL

  #parse.query 获取查询参数

  #parse.parse_qsl 返回列表

  url_data = parse.urlparse(parse.unquote(data))

  query_data = dict(parse.parse_qsl(url_data.query))

  query_data['methods'] = query_data['methods'].split(",")

  dubbo_service_data['url'] = url_data.netloc

  dubbo_service_data['dubbo_service'] = dubbo_service

  dubbo_service_data.update(query_data)

  return dubbo_service_data

  telnet命令调用封装:

  class BmDubbo(object):

  prompt = 'dubbo>'

  def __init__(self, host, port):

  self.conn = self.conn(host, port)

  def conn(self,host, port):

  conn = telnetlib.Telnet()

  try:

  conn.open(host, port, timeout=1)

  except BaseException:

  return False

  return conn

  def command(self, str_=""):

  # 模拟cmd控制台 dubbo>invoke ...

  if self.conn :

  self.conn.write(str_.encode() + b'\n')

  data = self.conn.read_until(self.prompt.encode())

  return data

  else:

  return False

  def invoke(self, service_name, method_name, arg):

  command_str = "invoke {0}.{1}({2})".format(service_name, method_name, arg)

  data = self.command(command_str)

  try:

  # 字节数据解码 utf8

  data = data.decode("utf-8").split('\n')[0].strip()

  except BaseException:

  # 字节数据解码 gbk

  data = data.decode("gbk").split('\n')[0].strip()

  return data

  def ls_invoke(self, service_name):

  command_str = "ls -l {0}".format(service_name)

  data = self.command(command_str)

  if "No such service" in data.decode("utf-8"):

  return False

  else:

  data = data.decode("utf-8").split('\n')

  key = ['methodName', 'paramType','type']

  dubbo_list = []

  #这里解析有点复杂,可以自己通过telnet命令实操一下,ls -l xxx

  for i in range(0, len(data) - 1):

  value = []

  dubbo_name = data[i].strip().split(' ')[1]

  method_name = re.findall(r"(.*?)[(]", dubbo_name)[0]

  value.append(method_name)

  paramType = re.findall(r"[(](.*?)[)]", dubbo_name)[0]

  paramTypeList = paramType.split(',')

  if len(paramTypeList) ==1:

  paramTypeList = paramTypeList[0]

  value.append(paramTypeList)

  #这里我将传参类型分成了4大类

  if 'java.lang' in paramType or 'java.math' in paramType:

  value.append(0)

  elif not paramType:

  value.append(1)

  elif 'List' in paramType:

  value.append(2)

  else:

  value.append(3)

  dubbo_list.append(dict(zip(key, value)))

  return dubbo_list

  def param_data(self,service_name,method_name):

  #这里是根据服务名和方法名,找到对应的传参类型

  dubbo_data = self.ls_invoke(service_name)

  if dubbo_data:

  dubbo_list = dubbo_data

  if dubbo_list:

  for i in dubbo_list:

  for v in i.values():

  if v == method_name:

  param_key = ['paramType','type']

  param_value = [i.get('paramType'),i.get('type')]

  return dict(zip(param_key,param_value))

  else:

  return False

  else:

  return False

  dao层设计:

  class DubboHandle(object):

  @staticmethod

  def invoke(service_name, method_name, data):

  zk_conn = GetDubboService()

  if zk_conn.zk:

  zk_data = zk_conn.get_dubbo_info(service_name)

  if zk_data:

  host, port = zk_data['url'].split(":")

  service_name = zk_data['interface']

  boby = data.copy()

  conn = BmDubbo(host, port)

  status = conn.command("")

  if status:

  # 根据服务名和方法名,返回param方法名和类型

  param_data = conn.param_data(service_name, method_name)

  if param_data:

  type = param_data['type']

  param = param_data['paramType']

  # 传参类型为枚举值方法

  if type == 0 and isinstance(boby, dict):

  l_data = []

  for v in boby.values():

  if isinstance(v,str):

  v = f"'{v}'"

  elif isinstance(v,dict) or isinstance(v,list):

  v = json.dumps(v)

  v = f"'{v}'"

  l_data.append(str(v))

  boby = ','.join(l_data)

  # 无需传参

  elif type == 1:

  boby = ''

  # 传参类型为集合对象

  elif type == 2:

  # params 只有一个集合对象传参

  if isinstance(boby, list):

  boby = boby

  # params 一个集合对象后面跟着多个枚举值

  elif isinstance(boby, dict):

  set_list = []

  for v in boby.values():

  set_list.append(v)

  set_data = str(set_list)

  boby = set_data[1:-1]

  # 传参类型为自定义对象

  elif type == 3:

  # 兼容多个自定义对象传参

  if isinstance(param, list):

  dtoList = []

  for index, dto in enumerate(boby):

  dto.update({"class": param[index]})

  dtoList.append(json.dumps(dto))

  boby = ','.join(dtoList)

  elif isinstance(boby, dict):

  boby.update({"class": param})

  boby = json.dumps(boby)

  else:

  return None, f"data请求参数有误,请检查!"

  response_data = conn.invoke(service_name, method_name, boby)

  try:

  response_data = json.loads(response_data)

  except Exception as e:

  return None, f"解析json失败:{response_data}"

  return response_data, None

  else:

  return None, f"{service_name.split('.')[-1]}服务下不存在{method_name}方法"

  else:

  return None, f"{service_name}服务连接出错"

  else:

  return None, f"{service_name}没有在zk中心注册"

  else:

  return None, "zk服务连接失败"

  view层引用:

  @router.post('/invoke', name='dubbo业务请求接口')

  async def dubboInvoke(data: DubboInvokeBody):

  res_data, err = DubboHandle.invoke(data.serviceName, data.methodName, data.data)

  if err:

  return res_400(msg=err)

  return res_200(data=res_data)

  invoke接口传参说明

  原生对象或者自定义对象传参(xxDto、jsonObj、java.util.HashMap):

  {

  "serviceName": "xxxxxx",

  "methodName": "xxxxxx",

  "data": { //data传入对应的对象数据,一般为json格式的

  "productStoreQueryDTOS": [

  {

  "productNoNumDTOList": [

  {

  "num": 13,

  "productNo": "10000620"

  },

  {

  "num": 13,

  "productNo": "10000014"

  }

  ],

  "storeCode": "4401S1389"

  }

  ]

  }

  }

  枚举值类型传参(java.lang.String、java.lang.Integer):

  {

  "serviceName": "xxxx",

  "methodName": "xxxxx",

  "data": { //格式为json,枚举值顺序必须按照dubbo接口定义的传参顺序,注意是否为int还是string

  "account":"123456",

  "password":"3fd6ebe43dab8b6ce6d033a5da6e6ac5"

  }

  }

  方法名无需传参:

  {

  "serviceName": "xxxx",

  "methodName": "xxxxxx",

  "data":{} //传入空对象

  }

  集合对象传参(java.util.List):

  {

  "serviceName": "xxxx",

  "methodName": "xxxxxx",

  "data":{

  "List": [

  "1221323",

  "3242442"

  ]

  } //传入对象,里面嵌套数组

  }

  集合对象传参,后面跟着枚举值(java.util.List 、 java.lang.String 、 java.lang.Integer):

  {

  "serviceName": "xxxx",

  "methodName": "xxxxxx",

  "data":{

  "userCode": ["12345","686838"],

  "startTime": "2021-04-16 13:30:00",

  "endTime": "2021-04-16 14:30:00"

  }

  }

  多个自定义对象传参,对象顺序按照dubbo接口定义的传参顺序(xxdtox、xxdto):

  {

  "serviceName": "xxxx",

  "methodName": "xxxxxx",

  "data":[

  {

  "userCode": "7932723",

  "startTime": "2021-04-16 13:30:00",

  "endTime": "2021-04-16 14:30:00"

  },

  {

  "name": "fang",

  "age": "18"

  }

  ]

  }

  上述传参可以满足大部分入参类型~

  以上内容为大家介绍了使用Python进行dubbo接口测试,本文由多测师亲自撰写,希望对大家有所帮助。了解更多接口测试相关知识:https://www.aichudan.com/xwzx/

联系电话

17727591462

返回顶部