huaweicloud-sdk-python-v3/huaweicloud-sdk-iotdm/huaweicloudsdkiotdm/v5/iotdm_client.py

688 lines
24 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# coding: utf-8
from __future__ import absolute_import
import importlib
import warnings
from huaweicloudsdkcore.client import Client, ClientBuilder
from huaweicloudsdkcore.utils import http_utils
from huaweicloudsdkcore.sdk_stream_request import SdkStreamRequest
try:
from huaweicloudsdkcore.invoker.invoker import SyncInvoker
except ImportError as e:
warnings.warn(str(e) + ", please check if you are using the same versions of 'huaweicloudsdkcore' and 'huaweicloudsdkiotdm'")
class IoTDMClient(Client):
def __init__(self):
super(IoTDMClient, self).__init__()
self.model_package = importlib.import_module("huaweicloudsdkiotdm.v5.model")
@classmethod
def new_builder(cls, clazz=None):
if not clazz:
client_builder = ClientBuilder(cls)
else:
if clazz.__name__ != "IoTDMClient":
raise TypeError("client type error, support client type is IoTDMClient")
client_builder = ClientBuilder(clazz)
return client_builder
def bind_instance_tags(self, request):
"""添加实例标签
添加实例标签。
Please refer to HUAWEI cloud API Explorer for details.
:param request: Request instance for BindInstanceTags
:type request: :class:`huaweicloudsdkiotdm.v5.BindInstanceTagsRequest`
:rtype: :class:`huaweicloudsdkiotdm.v5.BindInstanceTagsResponse`
"""
http_info = self._bind_instance_tags_http_info(request)
return self._call_api(**http_info)
def bind_instance_tags_invoker(self, request):
http_info = self._bind_instance_tags_http_info(request)
return SyncInvoker(self, http_info)
@classmethod
def _bind_instance_tags_http_info(cls, request):
http_info = {
"method": "POST",
"resource_path": "/v5/iot/{project_id}/iotda-instances/{instance_id}/bind-tags",
"request_type": request.__class__.__name__,
"response_type": "BindInstanceTagsResponse"
}
local_var_params = {attr: getattr(request, attr) for attr in request.attribute_map if hasattr(request, attr)}
cname = None
collection_formats = {}
path_params = {}
if 'instance_id' in local_var_params:
path_params['instance_id'] = local_var_params['instance_id']
query_params = []
header_params = {}
form_params = {}
body = None
if 'body' in local_var_params:
body = local_var_params['body']
if isinstance(request, SdkStreamRequest):
body = request.get_file_stream()
response_headers = []
header_params['Content-Type'] = http_utils.select_header_content_type(
['application/json;charset=UTF-8'])
auth_settings = []
http_info["cname"] = cname
http_info["collection_formats"] = collection_formats
http_info["path_params"] = path_params
http_info["query_params"] = query_params
http_info["header_params"] = header_params
http_info["post_params"] = form_params
http_info["body"] = body
http_info["response_headers"] = response_headers
return http_info
def change_instance_charge_mode(self, request):
"""修改实例计费模式
修改设备接入实例的计费模式,支持将按需计费模式修改为包年/包月计费模式。
接口约束:当前实例的规格支持包年/包月计费模式时,才可以修改实例的计费模式为包年/包月。支持的实例规格请参见[[产品规格说明](https://support.huaweicloud.com/productdesc-iothub/iot_04_0014.html)](tag:hws)[[产品规格说明](https://support.huaweicloud.com/intl/zh-cn/productdesc-iothub/iot_04_0014.html)](tag:hws_hk)。
Please refer to HUAWEI cloud API Explorer for details.
:param request: Request instance for ChangeInstanceChargeMode
:type request: :class:`huaweicloudsdkiotdm.v5.ChangeInstanceChargeModeRequest`
:rtype: :class:`huaweicloudsdkiotdm.v5.ChangeInstanceChargeModeResponse`
"""
http_info = self._change_instance_charge_mode_http_info(request)
return self._call_api(**http_info)
def change_instance_charge_mode_invoker(self, request):
http_info = self._change_instance_charge_mode_http_info(request)
return SyncInvoker(self, http_info)
@classmethod
def _change_instance_charge_mode_http_info(cls, request):
http_info = {
"method": "POST",
"resource_path": "/v5/iot/{project_id}/iotda-instances/{instance_id}/change-charge-mode",
"request_type": request.__class__.__name__,
"response_type": "ChangeInstanceChargeModeResponse"
}
local_var_params = {attr: getattr(request, attr) for attr in request.attribute_map if hasattr(request, attr)}
cname = None
collection_formats = {}
path_params = {}
if 'instance_id' in local_var_params:
path_params['instance_id'] = local_var_params['instance_id']
query_params = []
header_params = {}
form_params = {}
body = None
if 'body' in local_var_params:
body = local_var_params['body']
if isinstance(request, SdkStreamRequest):
body = request.get_file_stream()
response_headers = []
header_params['Content-Type'] = http_utils.select_header_content_type(
['application/json;charset=UTF-8'])
auth_settings = []
http_info["cname"] = cname
http_info["collection_formats"] = collection_formats
http_info["path_params"] = path_params
http_info["query_params"] = query_params
http_info["header_params"] = header_params
http_info["post_params"] = form_params
http_info["body"] = body
http_info["response_headers"] = response_headers
return http_info
def create_instance(self, request):
"""创建设备接入实例
用户可以调用此接口创建一个设备接入实例。支持的实例规格请参见[[产品规格说明](https://support.huaweicloud.com/productdesc-iothub/iot_04_0014.html)](tag:hws)[[产品规格说明](https://support.huaweicloud.com/intl/zh-cn/productdesc-iothub/iot_04_0014.html)](tag:hws_hk)。
[接口约束:
- 请保证账户余额充足,此接口无法使用优惠券支付,在创建包年/包月实例时,若余额不足会创建一个待支付订单。
- 若想使用优惠券请将请求中的is_auto_pay字段设置为false参考[\"支付包年/包月产品订单\"](https://support.huaweicloud.com/api-bpconsole/api_order_00016.html#section0)进行支付,或者在华为云官网页面使用优惠券进行支付。
- 如果您需要退订包年/包月资源,请参考[\"退订包年/包月资源\"](https://support.huaweicloud.com/api-bpconsole/api_order_00019.html)。](tag:hws)
Please refer to HUAWEI cloud API Explorer for details.
:param request: Request instance for CreateInstance
:type request: :class:`huaweicloudsdkiotdm.v5.CreateInstanceRequest`
:rtype: :class:`huaweicloudsdkiotdm.v5.CreateInstanceResponse`
"""
http_info = self._create_instance_http_info(request)
return self._call_api(**http_info)
def create_instance_invoker(self, request):
http_info = self._create_instance_http_info(request)
return SyncInvoker(self, http_info)
@classmethod
def _create_instance_http_info(cls, request):
http_info = {
"method": "POST",
"resource_path": "/v5/iot/{project_id}/iotda-instances",
"request_type": request.__class__.__name__,
"response_type": "CreateInstanceResponse"
}
local_var_params = {attr: getattr(request, attr) for attr in request.attribute_map if hasattr(request, attr)}
cname = None
collection_formats = {}
path_params = {}
query_params = []
header_params = {}
form_params = {}
body = None
if 'body' in local_var_params:
body = local_var_params['body']
if isinstance(request, SdkStreamRequest):
body = request.get_file_stream()
response_headers = []
header_params['Content-Type'] = http_utils.select_header_content_type(
['application/json;charset=UTF-8'])
auth_settings = []
http_info["cname"] = cname
http_info["collection_formats"] = collection_formats
http_info["path_params"] = path_params
http_info["query_params"] = query_params
http_info["header_params"] = header_params
http_info["post_params"] = form_params
http_info["body"] = body
http_info["response_headers"] = response_headers
return http_info
def delete_instance(self, request):
"""删除实例
删除设备接入实例。约束:此接口仅支持删除按需计费的实例。
Please refer to HUAWEI cloud API Explorer for details.
:param request: Request instance for DeleteInstance
:type request: :class:`huaweicloudsdkiotdm.v5.DeleteInstanceRequest`
:rtype: :class:`huaweicloudsdkiotdm.v5.DeleteInstanceResponse`
"""
http_info = self._delete_instance_http_info(request)
return self._call_api(**http_info)
def delete_instance_invoker(self, request):
http_info = self._delete_instance_http_info(request)
return SyncInvoker(self, http_info)
@classmethod
def _delete_instance_http_info(cls, request):
http_info = {
"method": "DELETE",
"resource_path": "/v5/iot/{project_id}/iotda-instances/{instance_id}",
"request_type": request.__class__.__name__,
"response_type": "DeleteInstanceResponse"
}
local_var_params = {attr: getattr(request, attr) for attr in request.attribute_map if hasattr(request, attr)}
cname = None
collection_formats = {}
path_params = {}
if 'instance_id' in local_var_params:
path_params['instance_id'] = local_var_params['instance_id']
query_params = []
header_params = {}
form_params = {}
body = None
if isinstance(request, SdkStreamRequest):
body = request.get_file_stream()
response_headers = []
header_params['Content-Type'] = http_utils.select_header_content_type(
['application/json'])
auth_settings = []
http_info["cname"] = cname
http_info["collection_formats"] = collection_formats
http_info["path_params"] = path_params
http_info["query_params"] = query_params
http_info["header_params"] = header_params
http_info["post_params"] = form_params
http_info["body"] = body
http_info["response_headers"] = response_headers
return http_info
def list_instances(self, request):
"""查询实例列表
用户可以调用此接口查询设备接入实例列表。
Please refer to HUAWEI cloud API Explorer for details.
:param request: Request instance for ListInstances
:type request: :class:`huaweicloudsdkiotdm.v5.ListInstancesRequest`
:rtype: :class:`huaweicloudsdkiotdm.v5.ListInstancesResponse`
"""
http_info = self._list_instances_http_info(request)
return self._call_api(**http_info)
def list_instances_invoker(self, request):
http_info = self._list_instances_http_info(request)
return SyncInvoker(self, http_info)
@classmethod
def _list_instances_http_info(cls, request):
http_info = {
"method": "GET",
"resource_path": "/v5/iot/{project_id}/iotda-instances",
"request_type": request.__class__.__name__,
"response_type": "ListInstancesResponse"
}
local_var_params = {attr: getattr(request, attr) for attr in request.attribute_map if hasattr(request, attr)}
cname = None
collection_formats = {}
path_params = {}
query_params = []
if 'offset' in local_var_params:
query_params.append(('offset', local_var_params['offset']))
if 'limit' in local_var_params:
query_params.append(('limit', local_var_params['limit']))
if 'marker' in local_var_params:
query_params.append(('marker', local_var_params['marker']))
if 'name' in local_var_params:
query_params.append(('name', local_var_params['name']))
if 'instance_type' in local_var_params:
query_params.append(('instance_type', local_var_params['instance_type']))
header_params = {}
form_params = {}
body = None
if isinstance(request, SdkStreamRequest):
body = request.get_file_stream()
response_headers = []
header_params['Content-Type'] = http_utils.select_header_content_type(
['application/json'])
auth_settings = []
http_info["cname"] = cname
http_info["collection_formats"] = collection_formats
http_info["path_params"] = path_params
http_info["query_params"] = query_params
http_info["header_params"] = header_params
http_info["post_params"] = form_params
http_info["body"] = body
http_info["response_headers"] = response_headers
return http_info
def resize_instance(self, request):
"""修改实例规格信息
修改设备接入实例的规格。
Please refer to HUAWEI cloud API Explorer for details.
:param request: Request instance for ResizeInstance
:type request: :class:`huaweicloudsdkiotdm.v5.ResizeInstanceRequest`
:rtype: :class:`huaweicloudsdkiotdm.v5.ResizeInstanceResponse`
"""
http_info = self._resize_instance_http_info(request)
return self._call_api(**http_info)
def resize_instance_invoker(self, request):
http_info = self._resize_instance_http_info(request)
return SyncInvoker(self, http_info)
@classmethod
def _resize_instance_http_info(cls, request):
http_info = {
"method": "POST",
"resource_path": "/v5/iot/{project_id}/iotda-instances/{instance_id}/resize",
"request_type": request.__class__.__name__,
"response_type": "ResizeInstanceResponse"
}
local_var_params = {attr: getattr(request, attr) for attr in request.attribute_map if hasattr(request, attr)}
cname = None
collection_formats = {}
path_params = {}
if 'instance_id' in local_var_params:
path_params['instance_id'] = local_var_params['instance_id']
query_params = []
header_params = {}
form_params = {}
body = None
if 'body' in local_var_params:
body = local_var_params['body']
if isinstance(request, SdkStreamRequest):
body = request.get_file_stream()
response_headers = []
header_params['Content-Type'] = http_utils.select_header_content_type(
['application/json;charset=UTF-8'])
auth_settings = []
http_info["cname"] = cname
http_info["collection_formats"] = collection_formats
http_info["path_params"] = path_params
http_info["query_params"] = query_params
http_info["header_params"] = header_params
http_info["post_params"] = form_params
http_info["body"] = body
http_info["response_headers"] = response_headers
return http_info
def show_instance(self, request):
"""查询实例详情
查询设备接入实例详情。
Please refer to HUAWEI cloud API Explorer for details.
:param request: Request instance for ShowInstance
:type request: :class:`huaweicloudsdkiotdm.v5.ShowInstanceRequest`
:rtype: :class:`huaweicloudsdkiotdm.v5.ShowInstanceResponse`
"""
http_info = self._show_instance_http_info(request)
return self._call_api(**http_info)
def show_instance_invoker(self, request):
http_info = self._show_instance_http_info(request)
return SyncInvoker(self, http_info)
@classmethod
def _show_instance_http_info(cls, request):
http_info = {
"method": "GET",
"resource_path": "/v5/iot/{project_id}/iotda-instances/{instance_id}",
"request_type": request.__class__.__name__,
"response_type": "ShowInstanceResponse"
}
local_var_params = {attr: getattr(request, attr) for attr in request.attribute_map if hasattr(request, attr)}
cname = None
collection_formats = {}
path_params = {}
if 'instance_id' in local_var_params:
path_params['instance_id'] = local_var_params['instance_id']
query_params = []
header_params = {}
form_params = {}
body = None
if isinstance(request, SdkStreamRequest):
body = request.get_file_stream()
response_headers = []
header_params['Content-Type'] = http_utils.select_header_content_type(
['application/json'])
auth_settings = []
http_info["cname"] = cname
http_info["collection_formats"] = collection_formats
http_info["path_params"] = path_params
http_info["query_params"] = query_params
http_info["header_params"] = header_params
http_info["post_params"] = form_params
http_info["body"] = body
http_info["response_headers"] = response_headers
return http_info
def unbind_instance_tags(self, request):
"""删除实例标签
删除实例标签。
Please refer to HUAWEI cloud API Explorer for details.
:param request: Request instance for UnbindInstanceTags
:type request: :class:`huaweicloudsdkiotdm.v5.UnbindInstanceTagsRequest`
:rtype: :class:`huaweicloudsdkiotdm.v5.UnbindInstanceTagsResponse`
"""
http_info = self._unbind_instance_tags_http_info(request)
return self._call_api(**http_info)
def unbind_instance_tags_invoker(self, request):
http_info = self._unbind_instance_tags_http_info(request)
return SyncInvoker(self, http_info)
@classmethod
def _unbind_instance_tags_http_info(cls, request):
http_info = {
"method": "POST",
"resource_path": "/v5/iot/{project_id}/iotda-instances/{instance_id}/unbind-tags",
"request_type": request.__class__.__name__,
"response_type": "UnbindInstanceTagsResponse"
}
local_var_params = {attr: getattr(request, attr) for attr in request.attribute_map if hasattr(request, attr)}
cname = None
collection_formats = {}
path_params = {}
if 'instance_id' in local_var_params:
path_params['instance_id'] = local_var_params['instance_id']
query_params = []
header_params = {}
form_params = {}
body = None
if 'body' in local_var_params:
body = local_var_params['body']
if isinstance(request, SdkStreamRequest):
body = request.get_file_stream()
response_headers = []
header_params['Content-Type'] = http_utils.select_header_content_type(
['application/json;charset=UTF-8'])
auth_settings = []
http_info["cname"] = cname
http_info["collection_formats"] = collection_formats
http_info["path_params"] = path_params
http_info["query_params"] = query_params
http_info["header_params"] = header_params
http_info["post_params"] = form_params
http_info["body"] = body
http_info["response_headers"] = response_headers
return http_info
def update_instance(self, request):
"""修改实例信息
修改设备接入实例信息。
Please refer to HUAWEI cloud API Explorer for details.
:param request: Request instance for UpdateInstance
:type request: :class:`huaweicloudsdkiotdm.v5.UpdateInstanceRequest`
:rtype: :class:`huaweicloudsdkiotdm.v5.UpdateInstanceResponse`
"""
http_info = self._update_instance_http_info(request)
return self._call_api(**http_info)
def update_instance_invoker(self, request):
http_info = self._update_instance_http_info(request)
return SyncInvoker(self, http_info)
@classmethod
def _update_instance_http_info(cls, request):
http_info = {
"method": "PUT",
"resource_path": "/v5/iot/{project_id}/iotda-instances/{instance_id}",
"request_type": request.__class__.__name__,
"response_type": "UpdateInstanceResponse"
}
local_var_params = {attr: getattr(request, attr) for attr in request.attribute_map if hasattr(request, attr)}
cname = None
collection_formats = {}
path_params = {}
if 'instance_id' in local_var_params:
path_params['instance_id'] = local_var_params['instance_id']
query_params = []
header_params = {}
form_params = {}
body = None
if 'body' in local_var_params:
body = local_var_params['body']
if isinstance(request, SdkStreamRequest):
body = request.get_file_stream()
response_headers = []
header_params['Content-Type'] = http_utils.select_header_content_type(
['application/json;charset=UTF-8'])
auth_settings = []
http_info["cname"] = cname
http_info["collection_formats"] = collection_formats
http_info["path_params"] = path_params
http_info["query_params"] = query_params
http_info["header_params"] = header_params
http_info["post_params"] = form_params
http_info["body"] = body
http_info["response_headers"] = response_headers
return http_info
def _call_api(self, **kwargs):
try:
return self.do_http_request(**kwargs)
except TypeError:
import inspect
params = inspect.signature(self.do_http_request).parameters
http_info = {param_name: kwargs.get(param_name) for param_name in params if param_name in kwargs}
return self.do_http_request(**http_info)
def call_api(self, resource_path, method, path_params=None, query_params=None, header_params=None, body=None,
post_params=None, cname=None, response_type=None, response_headers=None, auth_settings=None,
collection_formats=None, request_type=None):
"""Makes the HTTP request and returns deserialized data.
:param resource_path: Path to method endpoint.
:param method: Method to call.
:param path_params: Path parameters in the url.
:param query_params: Query parameters in the url.
:param header_params: Header parameters to be placed in the request header.
:param body: Request body.
:param post_params: Request post form parameters,
for `application/x-www-form-urlencoded`, `multipart/form-data`.
:param cname: Used for obs endpoint.
:param auth_settings: Auth Settings names for the request.
:param response_type: Response data type.
:param response_headers: Header should be added to response data.
:param collection_formats: dict of collection formats for path, query,
header, and post parameters.
:param request_type: Request data type.
:return:
Return the response directly.
"""
return self.do_http_request(
method=method,
resource_path=resource_path,
path_params=path_params,
query_params=query_params,
header_params=header_params,
body=body,
post_params=post_params,
cname=cname,
response_type=response_type,
response_headers=response_headers,
collection_formats=collection_formats,
request_type=request_type)