huaweicloud-sdk-python-v3/huaweicloud-sdk-vpc/huaweicloudsdkvpc/v2/vpc_async_client.py

3807 lines
117 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# coding: utf-8
from __future__ import absolute_import
import datetime
import re
import importlib
import six
from huaweicloudsdkcore.client import Client, ClientBuilder
from huaweicloudsdkcore.exceptions import exceptions
from huaweicloudsdkcore.utils import http_utils
from huaweicloudsdkcore.sdk_stream_request import SdkStreamRequest
class VpcAsyncClient(Client):
"""
:param configuration: .Configuration object for this client
:param pool_threads: The number of threads to use for async requests
to the API. More threads means more concurrent API requests.
"""
PRIMITIVE_TYPES = (float, bool, bytes, six.text_type) + six.integer_types
NATIVE_TYPES_MAPPING = {
'int': int,
'long': int if six.PY3 else long,
'float': float,
'str': str,
'bool': bool,
'date': datetime.date,
'datetime': datetime.datetime,
'object': object,
}
def __init__(self):
super(VpcAsyncClient, self).__init__()
self.model_package = importlib.import_module("huaweicloudsdkvpc.v2.model")
self.preset_headers = {'User-Agent': 'HuaweiCloud-SDK-Python'}
@staticmethod
def new_builder(clazz):
return ClientBuilder(clazz)
def accept_vpc_peering_async(self, request):
"""接受对等连接请求
租户A名下的VPC申请和租户B的VPC建立对等连接需要等待租户B接受该请求。此接口用于租户接受其他租户发起的对等连接请求。
:param AcceptVpcPeeringRequest request
:return: AcceptVpcPeeringResponse
"""
return self.accept_vpc_peering_with_http_info(request)
def accept_vpc_peering_with_http_info(self, request):
"""接受对等连接请求
租户A名下的VPC申请和租户B的VPC建立对等连接需要等待租户B接受该请求。此接口用于租户接受其他租户发起的对等连接请求。
:param AcceptVpcPeeringRequest request
:return: AcceptVpcPeeringResponse
"""
all_params = ['peering_id']
local_var_params = {}
for attr in request.attribute_map:
if hasattr(request, attr):
local_var_params[attr] = getattr(request, attr)
collection_formats = {}
path_params = {}
if 'peering_id' in local_var_params:
path_params['peering_id'] = local_var_params['peering_id']
query_params = []
header_params = {}
form_params = {}
body_params = None
if isinstance(request, SdkStreamRequest):
body_params = request.get_file_stream()
response_headers = []
header_params['Content-Type'] = http_utils.select_header_content_type(
['application/json'])
auth_settings = []
return self.call_api(
resource_path='/v2.0/vpc/peerings/{peering_id}/accept',
method='PUT',
path_params=path_params,
query_params=query_params,
header_params=header_params,
body=body_params,
post_params=form_params,
response_type='AcceptVpcPeeringResponse',
response_headers=response_headers,
auth_settings=auth_settings,
collection_formats=collection_formats,
request_type=request.__class__.__name__)
def create_port_async(self, request):
"""创建端口
创建端口。
:param CreatePortRequest request
:return: CreatePortResponse
"""
return self.create_port_with_http_info(request)
def create_port_with_http_info(self, request):
"""创建端口
创建端口。
:param CreatePortRequest request
:return: CreatePortResponse
"""
all_params = ['port']
local_var_params = {}
for attr in request.attribute_map:
if hasattr(request, attr):
local_var_params[attr] = getattr(request, attr)
collection_formats = {}
path_params = {}
query_params = []
header_params = {}
form_params = {}
body_params = None
if 'body' in local_var_params:
body_params = local_var_params['body']
if isinstance(request, SdkStreamRequest):
body_params = request.get_file_stream()
response_headers = []
header_params['Content-Type'] = http_utils.select_header_content_type(
['application/json;charset=UTF-8'])
auth_settings = []
return self.call_api(
resource_path='/v1/{project_id}/ports',
method='POST',
path_params=path_params,
query_params=query_params,
header_params=header_params,
body=body_params,
post_params=form_params,
response_type='CreatePortResponse',
response_headers=response_headers,
auth_settings=auth_settings,
collection_formats=collection_formats,
request_type=request.__class__.__name__)
def create_security_group_async(self, request):
"""创建安全组
创建安全组。
:param CreateSecurityGroupRequest request
:return: CreateSecurityGroupResponse
"""
return self.create_security_group_with_http_info(request)
def create_security_group_with_http_info(self, request):
"""创建安全组
创建安全组。
:param CreateSecurityGroupRequest request
:return: CreateSecurityGroupResponse
"""
all_params = ['security_group']
local_var_params = {}
for attr in request.attribute_map:
if hasattr(request, attr):
local_var_params[attr] = getattr(request, attr)
collection_formats = {}
path_params = {}
query_params = []
header_params = {}
form_params = {}
body_params = None
if 'body' in local_var_params:
body_params = local_var_params['body']
if isinstance(request, SdkStreamRequest):
body_params = request.get_file_stream()
response_headers = []
header_params['Content-Type'] = http_utils.select_header_content_type(
['application/json;charset=UTF-8'])
auth_settings = []
return self.call_api(
resource_path='/v1/{project_id}/security-groups',
method='POST',
path_params=path_params,
query_params=query_params,
header_params=header_params,
body=body_params,
post_params=form_params,
response_type='CreateSecurityGroupResponse',
response_headers=response_headers,
auth_settings=auth_settings,
collection_formats=collection_formats,
request_type=request.__class__.__name__)
def create_security_group_rule_async(self, request):
"""创建安全组规则
创建安全组规则。
:param CreateSecurityGroupRuleRequest request
:return: CreateSecurityGroupRuleResponse
"""
return self.create_security_group_rule_with_http_info(request)
def create_security_group_rule_with_http_info(self, request):
"""创建安全组规则
创建安全组规则。
:param CreateSecurityGroupRuleRequest request
:return: CreateSecurityGroupRuleResponse
"""
all_params = ['security_group_rule']
local_var_params = {}
for attr in request.attribute_map:
if hasattr(request, attr):
local_var_params[attr] = getattr(request, attr)
collection_formats = {}
path_params = {}
query_params = []
header_params = {}
form_params = {}
body_params = None
if 'body' in local_var_params:
body_params = local_var_params['body']
if isinstance(request, SdkStreamRequest):
body_params = request.get_file_stream()
response_headers = []
header_params['Content-Type'] = http_utils.select_header_content_type(
['application/json;charset=UTF-8'])
auth_settings = []
return self.call_api(
resource_path='/v1/{project_id}/security-group-rules',
method='POST',
path_params=path_params,
query_params=query_params,
header_params=header_params,
body=body_params,
post_params=form_params,
response_type='CreateSecurityGroupRuleResponse',
response_headers=response_headers,
auth_settings=auth_settings,
collection_formats=collection_formats,
request_type=request.__class__.__name__)
def create_subnet_async(self, request):
"""创建子网
创建子网。
:param CreateSubnetRequest request
:return: CreateSubnetResponse
"""
return self.create_subnet_with_http_info(request)
def create_subnet_with_http_info(self, request):
"""创建子网
创建子网。
:param CreateSubnetRequest request
:return: CreateSubnetResponse
"""
all_params = ['subnet']
local_var_params = {}
for attr in request.attribute_map:
if hasattr(request, attr):
local_var_params[attr] = getattr(request, attr)
collection_formats = {}
path_params = {}
query_params = []
header_params = {}
form_params = {}
body_params = None
if 'body' in local_var_params:
body_params = local_var_params['body']
if isinstance(request, SdkStreamRequest):
body_params = request.get_file_stream()
response_headers = []
header_params['Content-Type'] = http_utils.select_header_content_type(
['application/json;charset=UTF-8'])
auth_settings = []
return self.call_api(
resource_path='/v1/{project_id}/subnets',
method='POST',
path_params=path_params,
query_params=query_params,
header_params=header_params,
body=body_params,
post_params=form_params,
response_type='CreateSubnetResponse',
response_headers=response_headers,
auth_settings=auth_settings,
collection_formats=collection_formats,
request_type=request.__class__.__name__)
def create_vpc_peering_async(self, request):
"""创建对等连接
创建对等连接。
:param CreateVpcPeeringRequest request
:return: CreateVpcPeeringResponse
"""
return self.create_vpc_peering_with_http_info(request)
def create_vpc_peering_with_http_info(self, request):
"""创建对等连接
创建对等连接。
:param CreateVpcPeeringRequest request
:return: CreateVpcPeeringResponse
"""
all_params = ['peering']
local_var_params = {}
for attr in request.attribute_map:
if hasattr(request, attr):
local_var_params[attr] = getattr(request, attr)
collection_formats = {}
path_params = {}
query_params = []
header_params = {}
form_params = {}
body_params = None
if 'body' in local_var_params:
body_params = local_var_params['body']
if isinstance(request, SdkStreamRequest):
body_params = request.get_file_stream()
response_headers = []
header_params['Content-Type'] = http_utils.select_header_content_type(
['application/json;charset=UTF-8'])
auth_settings = []
return self.call_api(
resource_path='/v2.0/vpc/peerings',
method='POST',
path_params=path_params,
query_params=query_params,
header_params=header_params,
body=body_params,
post_params=form_params,
response_type='CreateVpcPeeringResponse',
response_headers=response_headers,
auth_settings=auth_settings,
collection_formats=collection_formats,
request_type=request.__class__.__name__)
def delete_port_async(self, request):
"""删除端口
删除端口。
:param DeletePortRequest request
:return: DeletePortResponse
"""
return self.delete_port_with_http_info(request)
def delete_port_with_http_info(self, request):
"""删除端口
删除端口。
:param DeletePortRequest request
:return: DeletePortResponse
"""
all_params = ['port_id']
local_var_params = {}
for attr in request.attribute_map:
if hasattr(request, attr):
local_var_params[attr] = getattr(request, attr)
collection_formats = {}
path_params = {}
if 'port_id' in local_var_params:
path_params['port_id'] = local_var_params['port_id']
query_params = []
header_params = {}
form_params = {}
body_params = None
if isinstance(request, SdkStreamRequest):
body_params = request.get_file_stream()
response_headers = []
header_params['Content-Type'] = http_utils.select_header_content_type(
['application/json'])
auth_settings = []
return self.call_api(
resource_path='/v1/{project_id}/ports/{port_id}',
method='DELETE',
path_params=path_params,
query_params=query_params,
header_params=header_params,
body=body_params,
post_params=form_params,
response_type='DeletePortResponse',
response_headers=response_headers,
auth_settings=auth_settings,
collection_formats=collection_formats,
request_type=request.__class__.__name__)
def delete_security_group_async(self, request):
"""删除安全组
删除安全组。
:param DeleteSecurityGroupRequest request
:return: DeleteSecurityGroupResponse
"""
return self.delete_security_group_with_http_info(request)
def delete_security_group_with_http_info(self, request):
"""删除安全组
删除安全组。
:param DeleteSecurityGroupRequest request
:return: DeleteSecurityGroupResponse
"""
all_params = ['security_group_id']
local_var_params = {}
for attr in request.attribute_map:
if hasattr(request, attr):
local_var_params[attr] = getattr(request, attr)
collection_formats = {}
path_params = {}
if 'security_group_id' in local_var_params:
path_params['security_group_id'] = local_var_params['security_group_id']
query_params = []
header_params = {}
form_params = {}
body_params = None
if isinstance(request, SdkStreamRequest):
body_params = request.get_file_stream()
response_headers = []
header_params['Content-Type'] = http_utils.select_header_content_type(
['application/json'])
auth_settings = []
return self.call_api(
resource_path='/v1/{project_id}/security-groups/{security_group_id}',
method='DELETE',
path_params=path_params,
query_params=query_params,
header_params=header_params,
body=body_params,
post_params=form_params,
response_type='DeleteSecurityGroupResponse',
response_headers=response_headers,
auth_settings=auth_settings,
collection_formats=collection_formats,
request_type=request.__class__.__name__)
def delete_security_group_rule_async(self, request):
"""删除安全组规则
删除安全组规则。
:param DeleteSecurityGroupRuleRequest request
:return: DeleteSecurityGroupRuleResponse
"""
return self.delete_security_group_rule_with_http_info(request)
def delete_security_group_rule_with_http_info(self, request):
"""删除安全组规则
删除安全组规则。
:param DeleteSecurityGroupRuleRequest request
:return: DeleteSecurityGroupRuleResponse
"""
all_params = ['security_group_rule_id']
local_var_params = {}
for attr in request.attribute_map:
if hasattr(request, attr):
local_var_params[attr] = getattr(request, attr)
collection_formats = {}
path_params = {}
if 'security_group_rule_id' in local_var_params:
path_params['security_group_rule_id'] = local_var_params['security_group_rule_id']
query_params = []
header_params = {}
form_params = {}
body_params = None
if isinstance(request, SdkStreamRequest):
body_params = request.get_file_stream()
response_headers = []
header_params['Content-Type'] = http_utils.select_header_content_type(
['application/json'])
auth_settings = []
return self.call_api(
resource_path='/v1/{project_id}/security-group-rules/{security_group_rule_id}',
method='DELETE',
path_params=path_params,
query_params=query_params,
header_params=header_params,
body=body_params,
post_params=form_params,
response_type='DeleteSecurityGroupRuleResponse',
response_headers=response_headers,
auth_settings=auth_settings,
collection_formats=collection_formats,
request_type=request.__class__.__name__)
def delete_subnet_async(self, request):
"""删除子网
删除子网
:param DeleteSubnetRequest request
:return: DeleteSubnetResponse
"""
return self.delete_subnet_with_http_info(request)
def delete_subnet_with_http_info(self, request):
"""删除子网
删除子网
:param DeleteSubnetRequest request
:return: DeleteSubnetResponse
"""
all_params = ['vpc_id', 'subnet_id']
local_var_params = {}
for attr in request.attribute_map:
if hasattr(request, attr):
local_var_params[attr] = getattr(request, attr)
collection_formats = {}
path_params = {}
if 'vpc_id' in local_var_params:
path_params['vpc_id'] = local_var_params['vpc_id']
if 'subnet_id' in local_var_params:
path_params['subnet_id'] = local_var_params['subnet_id']
query_params = []
header_params = {}
form_params = {}
body_params = None
if isinstance(request, SdkStreamRequest):
body_params = request.get_file_stream()
response_headers = []
header_params['Content-Type'] = http_utils.select_header_content_type(
['application/json'])
auth_settings = []
return self.call_api(
resource_path='/v1/{project_id}/vpcs/{vpc_id}/subnets/{subnet_id}',
method='DELETE',
path_params=path_params,
query_params=query_params,
header_params=header_params,
body=body_params,
post_params=form_params,
response_type='DeleteSubnetResponse',
response_headers=response_headers,
auth_settings=auth_settings,
collection_formats=collection_formats,
request_type=request.__class__.__name__)
def delete_vpc_peering_async(self, request):
"""删除对等连接
删除对等连接。 可以在在本端或对端任何一端删除对等连接。
:param DeleteVpcPeeringRequest request
:return: DeleteVpcPeeringResponse
"""
return self.delete_vpc_peering_with_http_info(request)
def delete_vpc_peering_with_http_info(self, request):
"""删除对等连接
删除对等连接。 可以在在本端或对端任何一端删除对等连接。
:param DeleteVpcPeeringRequest request
:return: DeleteVpcPeeringResponse
"""
all_params = ['peering_id']
local_var_params = {}
for attr in request.attribute_map:
if hasattr(request, attr):
local_var_params[attr] = getattr(request, attr)
collection_formats = {}
path_params = {}
if 'peering_id' in local_var_params:
path_params['peering_id'] = local_var_params['peering_id']
query_params = []
header_params = {}
form_params = {}
body_params = None
if isinstance(request, SdkStreamRequest):
body_params = request.get_file_stream()
response_headers = []
header_params['Content-Type'] = http_utils.select_header_content_type(
['application/json'])
auth_settings = []
return self.call_api(
resource_path='/v2.0/vpc/peerings/{peering_id}',
method='DELETE',
path_params=path_params,
query_params=query_params,
header_params=header_params,
body=body_params,
post_params=form_params,
response_type='DeleteVpcPeeringResponse',
response_headers=response_headers,
auth_settings=auth_settings,
collection_formats=collection_formats,
request_type=request.__class__.__name__)
def list_ports_async(self, request):
"""查询端口列表
查询提交请求的租户的所有端口单次查询最多返回2000条数据。
:param ListPortsRequest request
:return: ListPortsResponse
"""
return self.list_ports_with_http_info(request)
def list_ports_with_http_info(self, request):
"""查询端口列表
查询提交请求的租户的所有端口单次查询最多返回2000条数据。
:param ListPortsRequest request
:return: ListPortsResponse
"""
all_params = ['name', 'id', 'limit', 'admin_state_up', 'network_id', 'mac_address', 'device_id', 'device_owner', 'status', 'marker', 'fixed_ips', 'enterprise_project_id']
local_var_params = {}
for attr in request.attribute_map:
if hasattr(request, attr):
local_var_params[attr] = getattr(request, attr)
collection_formats = {}
path_params = {}
query_params = []
if 'name' in local_var_params:
query_params.append(('name', local_var_params['name']))
if 'id' in local_var_params:
query_params.append(('id', local_var_params['id']))
if 'limit' in local_var_params:
query_params.append(('limit', local_var_params['limit']))
if 'admin_state_up' in local_var_params:
query_params.append(('admin_state_up', local_var_params['admin_state_up']))
if 'network_id' in local_var_params:
query_params.append(('network_id', local_var_params['network_id']))
if 'mac_address' in local_var_params:
query_params.append(('mac_address', local_var_params['mac_address']))
if 'device_id' in local_var_params:
query_params.append(('device_id', local_var_params['device_id']))
if 'device_owner' in local_var_params:
query_params.append(('device_owner', local_var_params['device_owner']))
if 'status' in local_var_params:
query_params.append(('status', local_var_params['status']))
if 'marker' in local_var_params:
query_params.append(('marker', local_var_params['marker']))
if 'fixed_ips' in local_var_params:
query_params.append(('fixed_ips', local_var_params['fixed_ips']))
if 'enterprise_project_id' in local_var_params:
query_params.append(('enterprise_project_id', local_var_params['enterprise_project_id']))
header_params = {}
form_params = {}
body_params = None
if isinstance(request, SdkStreamRequest):
body_params = request.get_file_stream()
response_headers = []
header_params['Content-Type'] = http_utils.select_header_content_type(
['application/json'])
auth_settings = []
return self.call_api(
resource_path='/v1/{project_id}/ports',
method='GET',
path_params=path_params,
query_params=query_params,
header_params=header_params,
body=body_params,
post_params=form_params,
response_type='ListPortsResponse',
response_headers=response_headers,
auth_settings=auth_settings,
collection_formats=collection_formats,
request_type=request.__class__.__name__)
def list_security_group_rules_async(self, request):
"""查询安全组规则列表
查询安全组规则列表。
:param ListSecurityGroupRulesRequest request
:return: ListSecurityGroupRulesResponse
"""
return self.list_security_group_rules_with_http_info(request)
def list_security_group_rules_with_http_info(self, request):
"""查询安全组规则列表
查询安全组规则列表。
:param ListSecurityGroupRulesRequest request
:return: ListSecurityGroupRulesResponse
"""
all_params = ['marker', 'limit', 'security_group_id']
local_var_params = {}
for attr in request.attribute_map:
if hasattr(request, attr):
local_var_params[attr] = getattr(request, attr)
collection_formats = {}
path_params = {}
query_params = []
if 'marker' in local_var_params:
query_params.append(('marker', local_var_params['marker']))
if 'limit' in local_var_params:
query_params.append(('limit', local_var_params['limit']))
if 'security_group_id' in local_var_params:
query_params.append(('security_group_id', local_var_params['security_group_id']))
header_params = {}
form_params = {}
body_params = None
if isinstance(request, SdkStreamRequest):
body_params = request.get_file_stream()
response_headers = []
header_params['Content-Type'] = http_utils.select_header_content_type(
['application/json'])
auth_settings = []
return self.call_api(
resource_path='/v1/{project_id}/security-group-rules',
method='GET',
path_params=path_params,
query_params=query_params,
header_params=header_params,
body=body_params,
post_params=form_params,
response_type='ListSecurityGroupRulesResponse',
response_headers=response_headers,
auth_settings=auth_settings,
collection_formats=collection_formats,
request_type=request.__class__.__name__)
def list_security_groups_async(self, request):
"""查询安全组列表
查询安全组列表
:param ListSecurityGroupsRequest request
:return: ListSecurityGroupsResponse
"""
return self.list_security_groups_with_http_info(request)
def list_security_groups_with_http_info(self, request):
"""查询安全组列表
查询安全组列表
:param ListSecurityGroupsRequest request
:return: ListSecurityGroupsResponse
"""
all_params = ['limit', 'marker', 'vpc_id', 'enterprise_project_id']
local_var_params = {}
for attr in request.attribute_map:
if hasattr(request, attr):
local_var_params[attr] = getattr(request, attr)
collection_formats = {}
path_params = {}
query_params = []
if 'limit' in local_var_params:
query_params.append(('limit', local_var_params['limit']))
if 'marker' in local_var_params:
query_params.append(('marker', local_var_params['marker']))
if 'vpc_id' in local_var_params:
query_params.append(('vpc_id', local_var_params['vpc_id']))
if 'enterprise_project_id' in local_var_params:
query_params.append(('enterprise_project_id', local_var_params['enterprise_project_id']))
header_params = {}
form_params = {}
body_params = None
if isinstance(request, SdkStreamRequest):
body_params = request.get_file_stream()
response_headers = []
header_params['Content-Type'] = http_utils.select_header_content_type(
['application/json'])
auth_settings = []
return self.call_api(
resource_path='/v1/{project_id}/security-groups',
method='GET',
path_params=path_params,
query_params=query_params,
header_params=header_params,
body=body_params,
post_params=form_params,
response_type='ListSecurityGroupsResponse',
response_headers=response_headers,
auth_settings=auth_settings,
collection_formats=collection_formats,
request_type=request.__class__.__name__)
def list_subnets_async(self, request):
"""查询子网列表
查询子网列表
:param ListSubnetsRequest request
:return: ListSubnetsResponse
"""
return self.list_subnets_with_http_info(request)
def list_subnets_with_http_info(self, request):
"""查询子网列表
查询子网列表
:param ListSubnetsRequest request
:return: ListSubnetsResponse
"""
all_params = ['limit', 'marker', 'vpc_id']
local_var_params = {}
for attr in request.attribute_map:
if hasattr(request, attr):
local_var_params[attr] = getattr(request, attr)
collection_formats = {}
path_params = {}
query_params = []
if 'limit' in local_var_params:
query_params.append(('limit', local_var_params['limit']))
if 'marker' in local_var_params:
query_params.append(('marker', local_var_params['marker']))
if 'vpc_id' in local_var_params:
query_params.append(('vpc_id', local_var_params['vpc_id']))
header_params = {}
form_params = {}
body_params = None
if isinstance(request, SdkStreamRequest):
body_params = request.get_file_stream()
response_headers = []
header_params['Content-Type'] = http_utils.select_header_content_type(
['application/json'])
auth_settings = []
return self.call_api(
resource_path='/v1/{project_id}/subnets',
method='GET',
path_params=path_params,
query_params=query_params,
header_params=header_params,
body=body_params,
post_params=form_params,
response_type='ListSubnetsResponse',
response_headers=response_headers,
auth_settings=auth_settings,
collection_formats=collection_formats,
request_type=request.__class__.__name__)
def list_vpc_peerings_async(self, request):
"""查询对等连接列表
查询提交请求的租户的所有对等连接。根据过滤条件进行过滤。
:param ListVpcPeeringsRequest request
:return: ListVpcPeeringsResponse
"""
return self.list_vpc_peerings_with_http_info(request)
def list_vpc_peerings_with_http_info(self, request):
"""查询对等连接列表
查询提交请求的租户的所有对等连接。根据过滤条件进行过滤。
:param ListVpcPeeringsRequest request
:return: ListVpcPeeringsResponse
"""
all_params = ['limit', 'marker', 'id', 'name', 'status', 'tenant_id', 'vpc_id']
local_var_params = {}
for attr in request.attribute_map:
if hasattr(request, attr):
local_var_params[attr] = getattr(request, attr)
collection_formats = {}
path_params = {}
query_params = []
if 'limit' in local_var_params:
query_params.append(('limit', local_var_params['limit']))
if 'marker' in local_var_params:
query_params.append(('marker', local_var_params['marker']))
if 'id' in local_var_params:
query_params.append(('id', local_var_params['id']))
if 'name' in local_var_params:
query_params.append(('name', local_var_params['name']))
if 'status' in local_var_params:
query_params.append(('status', local_var_params['status']))
if 'tenant_id' in local_var_params:
query_params.append(('tenant_id', local_var_params['tenant_id']))
if 'vpc_id' in local_var_params:
query_params.append(('vpc_id', local_var_params['vpc_id']))
header_params = {}
form_params = {}
body_params = None
if isinstance(request, SdkStreamRequest):
body_params = request.get_file_stream()
response_headers = []
header_params['Content-Type'] = http_utils.select_header_content_type(
['application/json'])
auth_settings = []
return self.call_api(
resource_path='/v2.0/vpc/peerings',
method='GET',
path_params=path_params,
query_params=query_params,
header_params=header_params,
body=body_params,
post_params=form_params,
response_type='ListVpcPeeringsResponse',
response_headers=response_headers,
auth_settings=auth_settings,
collection_formats=collection_formats,
request_type=request.__class__.__name__)
def reject_vpc_peering_async(self, request):
"""拒绝对等连接请求
租户A名下的VPC申请和租户B的VPC建立对等连接需要等待租户B接受该请求。此接口用于租户拒绝其他租户发起的对等连接请求。
:param RejectVpcPeeringRequest request
:return: RejectVpcPeeringResponse
"""
return self.reject_vpc_peering_with_http_info(request)
def reject_vpc_peering_with_http_info(self, request):
"""拒绝对等连接请求
租户A名下的VPC申请和租户B的VPC建立对等连接需要等待租户B接受该请求。此接口用于租户拒绝其他租户发起的对等连接请求。
:param RejectVpcPeeringRequest request
:return: RejectVpcPeeringResponse
"""
all_params = ['peering_id']
local_var_params = {}
for attr in request.attribute_map:
if hasattr(request, attr):
local_var_params[attr] = getattr(request, attr)
collection_formats = {}
path_params = {}
if 'peering_id' in local_var_params:
path_params['peering_id'] = local_var_params['peering_id']
query_params = []
header_params = {}
form_params = {}
body_params = None
if isinstance(request, SdkStreamRequest):
body_params = request.get_file_stream()
response_headers = []
header_params['Content-Type'] = http_utils.select_header_content_type(
['application/json'])
auth_settings = []
return self.call_api(
resource_path='/v2.0/vpc/peerings/{peering_id}/reject',
method='PUT',
path_params=path_params,
query_params=query_params,
header_params=header_params,
body=body_params,
post_params=form_params,
response_type='RejectVpcPeeringResponse',
response_headers=response_headers,
auth_settings=auth_settings,
collection_formats=collection_formats,
request_type=request.__class__.__name__)
def show_port_async(self, request):
"""查询端口
查询单个端口详情。
:param ShowPortRequest request
:return: ShowPortResponse
"""
return self.show_port_with_http_info(request)
def show_port_with_http_info(self, request):
"""查询端口
查询单个端口详情。
:param ShowPortRequest request
:return: ShowPortResponse
"""
all_params = ['port_id']
local_var_params = {}
for attr in request.attribute_map:
if hasattr(request, attr):
local_var_params[attr] = getattr(request, attr)
collection_formats = {}
path_params = {}
if 'port_id' in local_var_params:
path_params['port_id'] = local_var_params['port_id']
query_params = []
header_params = {}
form_params = {}
body_params = None
if isinstance(request, SdkStreamRequest):
body_params = request.get_file_stream()
response_headers = []
header_params['Content-Type'] = http_utils.select_header_content_type(
['application/json'])
auth_settings = []
return self.call_api(
resource_path='/v1/{project_id}/ports/{port_id}',
method='GET',
path_params=path_params,
query_params=query_params,
header_params=header_params,
body=body_params,
post_params=form_params,
response_type='ShowPortResponse',
response_headers=response_headers,
auth_settings=auth_settings,
collection_formats=collection_formats,
request_type=request.__class__.__name__)
def show_quota_async(self, request):
"""查询配额
查询单租户在VPC服务下的网络资源配额包括vpc配额、子网配额、安全组配额、安全组规则配额、弹性公网IP配额vpn配额等。
:param ShowQuotaRequest request
:return: ShowQuotaResponse
"""
return self.show_quota_with_http_info(request)
def show_quota_with_http_info(self, request):
"""查询配额
查询单租户在VPC服务下的网络资源配额包括vpc配额、子网配额、安全组配额、安全组规则配额、弹性公网IP配额vpn配额等。
:param ShowQuotaRequest request
:return: ShowQuotaResponse
"""
all_params = ['type']
local_var_params = {}
for attr in request.attribute_map:
if hasattr(request, attr):
local_var_params[attr] = getattr(request, attr)
collection_formats = {}
path_params = {}
query_params = []
if 'type' in local_var_params:
query_params.append(('type', local_var_params['type']))
header_params = {}
form_params = {}
body_params = None
if isinstance(request, SdkStreamRequest):
body_params = request.get_file_stream()
response_headers = []
header_params['Content-Type'] = http_utils.select_header_content_type(
['application/json'])
auth_settings = []
return self.call_api(
resource_path='/v1/{project_id}/quotas',
method='GET',
path_params=path_params,
query_params=query_params,
header_params=header_params,
body=body_params,
post_params=form_params,
response_type='ShowQuotaResponse',
response_headers=response_headers,
auth_settings=auth_settings,
collection_formats=collection_formats,
request_type=request.__class__.__name__)
def show_security_group_async(self, request):
"""查询安全组
查询单个安全组详情。
:param ShowSecurityGroupRequest request
:return: ShowSecurityGroupResponse
"""
return self.show_security_group_with_http_info(request)
def show_security_group_with_http_info(self, request):
"""查询安全组
查询单个安全组详情。
:param ShowSecurityGroupRequest request
:return: ShowSecurityGroupResponse
"""
all_params = ['security_group_id']
local_var_params = {}
for attr in request.attribute_map:
if hasattr(request, attr):
local_var_params[attr] = getattr(request, attr)
collection_formats = {}
path_params = {}
if 'security_group_id' in local_var_params:
path_params['security_group_id'] = local_var_params['security_group_id']
query_params = []
header_params = {}
form_params = {}
body_params = None
if isinstance(request, SdkStreamRequest):
body_params = request.get_file_stream()
response_headers = []
header_params['Content-Type'] = http_utils.select_header_content_type(
['application/json'])
auth_settings = []
return self.call_api(
resource_path='/v1/{project_id}/security-groups/{security_group_id}',
method='GET',
path_params=path_params,
query_params=query_params,
header_params=header_params,
body=body_params,
post_params=form_params,
response_type='ShowSecurityGroupResponse',
response_headers=response_headers,
auth_settings=auth_settings,
collection_formats=collection_formats,
request_type=request.__class__.__name__)
def show_security_group_rule_async(self, request):
"""查询安全组规则
查询单个安全组规则详情
:param ShowSecurityGroupRuleRequest request
:return: ShowSecurityGroupRuleResponse
"""
return self.show_security_group_rule_with_http_info(request)
def show_security_group_rule_with_http_info(self, request):
"""查询安全组规则
查询单个安全组规则详情
:param ShowSecurityGroupRuleRequest request
:return: ShowSecurityGroupRuleResponse
"""
all_params = ['security_group_rule_id']
local_var_params = {}
for attr in request.attribute_map:
if hasattr(request, attr):
local_var_params[attr] = getattr(request, attr)
collection_formats = {}
path_params = {}
if 'security_group_rule_id' in local_var_params:
path_params['security_group_rule_id'] = local_var_params['security_group_rule_id']
query_params = []
header_params = {}
form_params = {}
body_params = None
if isinstance(request, SdkStreamRequest):
body_params = request.get_file_stream()
response_headers = []
header_params['Content-Type'] = http_utils.select_header_content_type(
['application/json'])
auth_settings = []
return self.call_api(
resource_path='/v1/{project_id}/security-group-rules/{security_group_rule_id}',
method='GET',
path_params=path_params,
query_params=query_params,
header_params=header_params,
body=body_params,
post_params=form_params,
response_type='ShowSecurityGroupRuleResponse',
response_headers=response_headers,
auth_settings=auth_settings,
collection_formats=collection_formats,
request_type=request.__class__.__name__)
def show_subnet_async(self, request):
"""查询子网
查询子网详情。
:param ShowSubnetRequest request
:return: ShowSubnetResponse
"""
return self.show_subnet_with_http_info(request)
def show_subnet_with_http_info(self, request):
"""查询子网
查询子网详情。
:param ShowSubnetRequest request
:return: ShowSubnetResponse
"""
all_params = ['subnet_id']
local_var_params = {}
for attr in request.attribute_map:
if hasattr(request, attr):
local_var_params[attr] = getattr(request, attr)
collection_formats = {}
path_params = {}
if 'subnet_id' in local_var_params:
path_params['subnet_id'] = local_var_params['subnet_id']
query_params = []
header_params = {}
form_params = {}
body_params = None
if isinstance(request, SdkStreamRequest):
body_params = request.get_file_stream()
response_headers = []
header_params['Content-Type'] = http_utils.select_header_content_type(
['application/json'])
auth_settings = []
return self.call_api(
resource_path='/v1/{project_id}/subnets/{subnet_id}',
method='GET',
path_params=path_params,
query_params=query_params,
header_params=header_params,
body=body_params,
post_params=form_params,
response_type='ShowSubnetResponse',
response_headers=response_headers,
auth_settings=auth_settings,
collection_formats=collection_formats,
request_type=request.__class__.__name__)
def show_vpc_peering_async(self, request):
"""查询对等连接
查询对等连接详情。
:param ShowVpcPeeringRequest request
:return: ShowVpcPeeringResponse
"""
return self.show_vpc_peering_with_http_info(request)
def show_vpc_peering_with_http_info(self, request):
"""查询对等连接
查询对等连接详情。
:param ShowVpcPeeringRequest request
:return: ShowVpcPeeringResponse
"""
all_params = ['peering_id']
local_var_params = {}
for attr in request.attribute_map:
if hasattr(request, attr):
local_var_params[attr] = getattr(request, attr)
collection_formats = {}
path_params = {}
if 'peering_id' in local_var_params:
path_params['peering_id'] = local_var_params['peering_id']
query_params = []
header_params = {}
form_params = {}
body_params = None
if isinstance(request, SdkStreamRequest):
body_params = request.get_file_stream()
response_headers = []
header_params['Content-Type'] = http_utils.select_header_content_type(
['application/json'])
auth_settings = []
return self.call_api(
resource_path='/v2.0/vpc/peerings/{peering_id}',
method='GET',
path_params=path_params,
query_params=query_params,
header_params=header_params,
body=body_params,
post_params=form_params,
response_type='ShowVpcPeeringResponse',
response_headers=response_headers,
auth_settings=auth_settings,
collection_formats=collection_formats,
request_type=request.__class__.__name__)
def update_port_async(self, request):
"""更新端口
更新端口。
:param UpdatePortRequest request
:return: UpdatePortResponse
"""
return self.update_port_with_http_info(request)
def update_port_with_http_info(self, request):
"""更新端口
更新端口。
:param UpdatePortRequest request
:return: UpdatePortResponse
"""
all_params = ['port_id', 'port']
local_var_params = {}
for attr in request.attribute_map:
if hasattr(request, attr):
local_var_params[attr] = getattr(request, attr)
collection_formats = {}
path_params = {}
if 'port_id' in local_var_params:
path_params['port_id'] = local_var_params['port_id']
query_params = []
header_params = {}
form_params = {}
body_params = None
if 'body' in local_var_params:
body_params = local_var_params['body']
if isinstance(request, SdkStreamRequest):
body_params = request.get_file_stream()
response_headers = []
header_params['Content-Type'] = http_utils.select_header_content_type(
['application/json;charset=UTF-8'])
auth_settings = []
return self.call_api(
resource_path='/v1/{project_id}/ports/{port_id}',
method='PUT',
path_params=path_params,
query_params=query_params,
header_params=header_params,
body=body_params,
post_params=form_params,
response_type='UpdatePortResponse',
response_headers=response_headers,
auth_settings=auth_settings,
collection_formats=collection_formats,
request_type=request.__class__.__name__)
def update_subnet_async(self, request):
"""更新子网
更新子网。
:param UpdateSubnetRequest request
:return: UpdateSubnetResponse
"""
return self.update_subnet_with_http_info(request)
def update_subnet_with_http_info(self, request):
"""更新子网
更新子网。
:param UpdateSubnetRequest request
:return: UpdateSubnetResponse
"""
all_params = ['vpc_id', 'subnet_id', 'subnet']
local_var_params = {}
for attr in request.attribute_map:
if hasattr(request, attr):
local_var_params[attr] = getattr(request, attr)
collection_formats = {}
path_params = {}
if 'vpc_id' in local_var_params:
path_params['vpc_id'] = local_var_params['vpc_id']
if 'subnet_id' in local_var_params:
path_params['subnet_id'] = local_var_params['subnet_id']
query_params = []
header_params = {}
form_params = {}
body_params = None
if 'body' in local_var_params:
body_params = local_var_params['body']
if isinstance(request, SdkStreamRequest):
body_params = request.get_file_stream()
response_headers = []
header_params['Content-Type'] = http_utils.select_header_content_type(
['application/json;charset=UTF-8'])
auth_settings = []
return self.call_api(
resource_path='/v1/{project_id}/vpcs/{vpc_id}/subnets/{subnet_id}',
method='PUT',
path_params=path_params,
query_params=query_params,
header_params=header_params,
body=body_params,
post_params=form_params,
response_type='UpdateSubnetResponse',
response_headers=response_headers,
auth_settings=auth_settings,
collection_formats=collection_formats,
request_type=request.__class__.__name__)
def update_vpc_peering_async(self, request):
"""更新对等连接
更新对等连接。
:param UpdateVpcPeeringRequest request
:return: UpdateVpcPeeringResponse
"""
return self.update_vpc_peering_with_http_info(request)
def update_vpc_peering_with_http_info(self, request):
"""更新对等连接
更新对等连接。
:param UpdateVpcPeeringRequest request
:return: UpdateVpcPeeringResponse
"""
all_params = ['peering_id', 'peering']
local_var_params = {}
for attr in request.attribute_map:
if hasattr(request, attr):
local_var_params[attr] = getattr(request, attr)
collection_formats = {}
path_params = {}
if 'peering_id' in local_var_params:
path_params['peering_id'] = local_var_params['peering_id']
query_params = []
header_params = {}
form_params = {}
body_params = None
if 'body' in local_var_params:
body_params = local_var_params['body']
if isinstance(request, SdkStreamRequest):
body_params = request.get_file_stream()
response_headers = []
header_params['Content-Type'] = http_utils.select_header_content_type(
['application/json;charset=UTF-8'])
auth_settings = []
return self.call_api(
resource_path='/v2.0/vpc/peerings/{peering_id}',
method='PUT',
path_params=path_params,
query_params=query_params,
header_params=header_params,
body=body_params,
post_params=form_params,
response_type='UpdateVpcPeeringResponse',
response_headers=response_headers,
auth_settings=auth_settings,
collection_formats=collection_formats,
request_type=request.__class__.__name__)
def create_privateip_async(self, request):
"""申请私有IP
申请私有IP。
:param CreatePrivateipRequest request
:return: CreatePrivateipResponse
"""
return self.create_privateip_with_http_info(request)
def create_privateip_with_http_info(self, request):
"""申请私有IP
申请私有IP。
:param CreatePrivateipRequest request
:return: CreatePrivateipResponse
"""
all_params = ['privateips']
local_var_params = {}
for attr in request.attribute_map:
if hasattr(request, attr):
local_var_params[attr] = getattr(request, attr)
collection_formats = {}
path_params = {}
query_params = []
header_params = {}
form_params = {}
body_params = None
if 'body' in local_var_params:
body_params = local_var_params['body']
if isinstance(request, SdkStreamRequest):
body_params = request.get_file_stream()
response_headers = []
header_params['Content-Type'] = http_utils.select_header_content_type(
['application/json;charset=UTF-8'])
auth_settings = []
return self.call_api(
resource_path='/v1/{project_id}/privateips',
method='POST',
path_params=path_params,
query_params=query_params,
header_params=header_params,
body=body_params,
post_params=form_params,
response_type='CreatePrivateipResponse',
response_headers=response_headers,
auth_settings=auth_settings,
collection_formats=collection_formats,
request_type=request.__class__.__name__)
def delete_privateip_async(self, request):
"""删除私有IP
删除私有IP。
:param DeletePrivateipRequest request
:return: DeletePrivateipResponse
"""
return self.delete_privateip_with_http_info(request)
def delete_privateip_with_http_info(self, request):
"""删除私有IP
删除私有IP。
:param DeletePrivateipRequest request
:return: DeletePrivateipResponse
"""
all_params = ['privateip_id']
local_var_params = {}
for attr in request.attribute_map:
if hasattr(request, attr):
local_var_params[attr] = getattr(request, attr)
collection_formats = {}
path_params = {}
if 'privateip_id' in local_var_params:
path_params['privateip_id'] = local_var_params['privateip_id']
query_params = []
header_params = {}
form_params = {}
body_params = None
if isinstance(request, SdkStreamRequest):
body_params = request.get_file_stream()
response_headers = []
header_params['Content-Type'] = http_utils.select_header_content_type(
['application/json'])
auth_settings = []
return self.call_api(
resource_path='/v1/{project_id}/privateips/{privateip_id}',
method='DELETE',
path_params=path_params,
query_params=query_params,
header_params=header_params,
body=body_params,
post_params=form_params,
response_type='DeletePrivateipResponse',
response_headers=response_headers,
auth_settings=auth_settings,
collection_formats=collection_formats,
request_type=request.__class__.__name__)
def list_privateips_async(self, request):
"""查询私有IP列表
查询指定子网下的私有IP列表。
:param ListPrivateipsRequest request
:return: ListPrivateipsResponse
"""
return self.list_privateips_with_http_info(request)
def list_privateips_with_http_info(self, request):
"""查询私有IP列表
查询指定子网下的私有IP列表。
:param ListPrivateipsRequest request
:return: ListPrivateipsResponse
"""
all_params = ['subnet_id', 'limit', 'marker']
local_var_params = {}
for attr in request.attribute_map:
if hasattr(request, attr):
local_var_params[attr] = getattr(request, attr)
collection_formats = {}
path_params = {}
if 'subnet_id' in local_var_params:
path_params['subnet_id'] = local_var_params['subnet_id']
query_params = []
if 'limit' in local_var_params:
query_params.append(('limit', local_var_params['limit']))
if 'marker' in local_var_params:
query_params.append(('marker', local_var_params['marker']))
header_params = {}
form_params = {}
body_params = None
if isinstance(request, SdkStreamRequest):
body_params = request.get_file_stream()
response_headers = []
header_params['Content-Type'] = http_utils.select_header_content_type(
['application/json'])
auth_settings = []
return self.call_api(
resource_path='/v1/{project_id}/subnets/{subnet_id}/privateips',
method='GET',
path_params=path_params,
query_params=query_params,
header_params=header_params,
body=body_params,
post_params=form_params,
response_type='ListPrivateipsResponse',
response_headers=response_headers,
auth_settings=auth_settings,
collection_formats=collection_formats,
request_type=request.__class__.__name__)
def show_network_ip_availabilities_async(self, request):
"""查询网络IP使用情况
显示一个指定网络中的IPv4地址使用情况。 包括此网络中的IP总数以及已用IP总数以及网络下每一个子网的IP地址总数和可用IP地址总数。 > 须知 - 系统预留地址指的是子网的第1个以及最后4个地址一般用于网关、DHCP等服务。 - 这里以及下文描述的IP地址总数、已用IP地址总数不包含系统预留地址。 - 在分配IP时用户可以指定系统预留的IP地址。但是不论IP是如何分配的只要是处于系统预留IP地址段的IP均不会被统计到已用IP地址数目和IP地址总数中。
:param ShowNetworkIpAvailabilitiesRequest request
:return: ShowNetworkIpAvailabilitiesResponse
"""
return self.show_network_ip_availabilities_with_http_info(request)
def show_network_ip_availabilities_with_http_info(self, request):
"""查询网络IP使用情况
显示一个指定网络中的IPv4地址使用情况。 包括此网络中的IP总数以及已用IP总数以及网络下每一个子网的IP地址总数和可用IP地址总数。 > 须知 - 系统预留地址指的是子网的第1个以及最后4个地址一般用于网关、DHCP等服务。 - 这里以及下文描述的IP地址总数、已用IP地址总数不包含系统预留地址。 - 在分配IP时用户可以指定系统预留的IP地址。但是不论IP是如何分配的只要是处于系统预留IP地址段的IP均不会被统计到已用IP地址数目和IP地址总数中。
:param ShowNetworkIpAvailabilitiesRequest request
:return: ShowNetworkIpAvailabilitiesResponse
"""
all_params = ['network_id']
local_var_params = {}
for attr in request.attribute_map:
if hasattr(request, attr):
local_var_params[attr] = getattr(request, attr)
collection_formats = {}
path_params = {}
if 'network_id' in local_var_params:
path_params['network_id'] = local_var_params['network_id']
query_params = []
header_params = {}
form_params = {}
body_params = None
if isinstance(request, SdkStreamRequest):
body_params = request.get_file_stream()
response_headers = []
header_params['Content-Type'] = http_utils.select_header_content_type(
['application/json'])
auth_settings = []
return self.call_api(
resource_path='/v2.0/network-ip-availabilities/{network_id}',
method='GET',
path_params=path_params,
query_params=query_params,
header_params=header_params,
body=body_params,
post_params=form_params,
response_type='ShowNetworkIpAvailabilitiesResponse',
response_headers=response_headers,
auth_settings=auth_settings,
collection_formats=collection_formats,
request_type=request.__class__.__name__)
def show_privateip_async(self, request):
"""查询私有IP
指定ID查询私有IP。
:param ShowPrivateipRequest request
:return: ShowPrivateipResponse
"""
return self.show_privateip_with_http_info(request)
def show_privateip_with_http_info(self, request):
"""查询私有IP
指定ID查询私有IP。
:param ShowPrivateipRequest request
:return: ShowPrivateipResponse
"""
all_params = ['privateip_id']
local_var_params = {}
for attr in request.attribute_map:
if hasattr(request, attr):
local_var_params[attr] = getattr(request, attr)
collection_formats = {}
path_params = {}
if 'privateip_id' in local_var_params:
path_params['privateip_id'] = local_var_params['privateip_id']
query_params = []
header_params = {}
form_params = {}
body_params = None
if isinstance(request, SdkStreamRequest):
body_params = request.get_file_stream()
response_headers = []
header_params['Content-Type'] = http_utils.select_header_content_type(
['application/json'])
auth_settings = []
return self.call_api(
resource_path='/v1/{project_id}/privateips/{privateip_id}',
method='GET',
path_params=path_params,
query_params=query_params,
header_params=header_params,
body=body_params,
post_params=form_params,
response_type='ShowPrivateipResponse',
response_headers=response_headers,
auth_settings=auth_settings,
collection_formats=collection_formats,
request_type=request.__class__.__name__)
def neutron_add_firewall_rule_async(self, request):
"""插入网络ACL规则
插入一条网络ACL规则到某一网络ACL策略中。
:param NeutronAddFirewallRuleRequest request
:return: NeutronAddFirewallRuleResponse
"""
return self.neutron_add_firewall_rule_with_http_info(request)
def neutron_add_firewall_rule_with_http_info(self, request):
"""插入网络ACL规则
插入一条网络ACL规则到某一网络ACL策略中。
:param NeutronAddFirewallRuleRequest request
:return: NeutronAddFirewallRuleResponse
"""
all_params = ['firewall_policy_id', 'insert_firewall_rule']
local_var_params = {}
for attr in request.attribute_map:
if hasattr(request, attr):
local_var_params[attr] = getattr(request, attr)
collection_formats = {}
path_params = {}
if 'firewall_policy_id' in local_var_params:
path_params['firewall_policy_id'] = local_var_params['firewall_policy_id']
query_params = []
header_params = {}
form_params = {}
body_params = None
if 'body' in local_var_params:
body_params = local_var_params['body']
if isinstance(request, SdkStreamRequest):
body_params = request.get_file_stream()
response_headers = []
header_params['Content-Type'] = http_utils.select_header_content_type(
['application/json;charset=UTF-8'])
auth_settings = []
return self.call_api(
resource_path='/v2.0/fwaas/firewall_policies/{firewall_policy_id}/insert_rule',
method='PUT',
path_params=path_params,
query_params=query_params,
header_params=header_params,
body=body_params,
post_params=form_params,
response_type='NeutronAddFirewallRuleResponse',
response_headers=response_headers,
auth_settings=auth_settings,
collection_formats=collection_formats,
request_type=request.__class__.__name__)
def neutron_create_firewall_group_async(self, request):
"""创建网络ACL组
创建网络ACL组
:param NeutronCreateFirewallGroupRequest request
:return: NeutronCreateFirewallGroupResponse
"""
return self.neutron_create_firewall_group_with_http_info(request)
def neutron_create_firewall_group_with_http_info(self, request):
"""创建网络ACL组
创建网络ACL组
:param NeutronCreateFirewallGroupRequest request
:return: NeutronCreateFirewallGroupResponse
"""
all_params = ['firewall_group']
local_var_params = {}
for attr in request.attribute_map:
if hasattr(request, attr):
local_var_params[attr] = getattr(request, attr)
collection_formats = {}
path_params = {}
query_params = []
header_params = {}
form_params = {}
body_params = None
if 'body' in local_var_params:
body_params = local_var_params['body']
if isinstance(request, SdkStreamRequest):
body_params = request.get_file_stream()
response_headers = []
header_params['Content-Type'] = http_utils.select_header_content_type(
['application/json;charset=UTF-8'])
auth_settings = []
return self.call_api(
resource_path='/v2.0/fwaas/firewall_groups',
method='POST',
path_params=path_params,
query_params=query_params,
header_params=header_params,
body=body_params,
post_params=form_params,
response_type='NeutronCreateFirewallGroupResponse',
response_headers=response_headers,
auth_settings=auth_settings,
collection_formats=collection_formats,
request_type=request.__class__.__name__)
def neutron_create_firewall_policy_async(self, request):
"""创建网络ACL策略
创建网络ACL策略。
:param NeutronCreateFirewallPolicyRequest request
:return: NeutronCreateFirewallPolicyResponse
"""
return self.neutron_create_firewall_policy_with_http_info(request)
def neutron_create_firewall_policy_with_http_info(self, request):
"""创建网络ACL策略
创建网络ACL策略。
:param NeutronCreateFirewallPolicyRequest request
:return: NeutronCreateFirewallPolicyResponse
"""
all_params = ['firewall_policy']
local_var_params = {}
for attr in request.attribute_map:
if hasattr(request, attr):
local_var_params[attr] = getattr(request, attr)
collection_formats = {}
path_params = {}
query_params = []
header_params = {}
form_params = {}
body_params = None
if 'body' in local_var_params:
body_params = local_var_params['body']
if isinstance(request, SdkStreamRequest):
body_params = request.get_file_stream()
response_headers = []
header_params['Content-Type'] = http_utils.select_header_content_type(
['application/json;charset=UTF-8'])
auth_settings = []
return self.call_api(
resource_path='/v2.0/fwaas/firewall_policies',
method='POST',
path_params=path_params,
query_params=query_params,
header_params=header_params,
body=body_params,
post_params=form_params,
response_type='NeutronCreateFirewallPolicyResponse',
response_headers=response_headers,
auth_settings=auth_settings,
collection_formats=collection_formats,
request_type=request.__class__.__name__)
def neutron_create_firewall_rule_async(self, request):
"""创建网络ACL规则
创建网络ACL规则。
:param NeutronCreateFirewallRuleRequest request
:return: NeutronCreateFirewallRuleResponse
"""
return self.neutron_create_firewall_rule_with_http_info(request)
def neutron_create_firewall_rule_with_http_info(self, request):
"""创建网络ACL规则
创建网络ACL规则。
:param NeutronCreateFirewallRuleRequest request
:return: NeutronCreateFirewallRuleResponse
"""
all_params = ['firewall_rule']
local_var_params = {}
for attr in request.attribute_map:
if hasattr(request, attr):
local_var_params[attr] = getattr(request, attr)
collection_formats = {}
path_params = {}
query_params = []
header_params = {}
form_params = {}
body_params = None
if 'body' in local_var_params:
body_params = local_var_params['body']
if isinstance(request, SdkStreamRequest):
body_params = request.get_file_stream()
response_headers = []
header_params['Content-Type'] = http_utils.select_header_content_type(
['application/json;charset=UTF-8'])
auth_settings = []
return self.call_api(
resource_path='/v2.0/fwaas/firewall_rules',
method='POST',
path_params=path_params,
query_params=query_params,
header_params=header_params,
body=body_params,
post_params=form_params,
response_type='NeutronCreateFirewallRuleResponse',
response_headers=response_headers,
auth_settings=auth_settings,
collection_formats=collection_formats,
request_type=request.__class__.__name__)
def neutron_delete_firewall_group_async(self, request):
"""删除网络ACL组
删除网络ACL组
:param NeutronDeleteFirewallGroupRequest request
:return: NeutronDeleteFirewallGroupResponse
"""
return self.neutron_delete_firewall_group_with_http_info(request)
def neutron_delete_firewall_group_with_http_info(self, request):
"""删除网络ACL组
删除网络ACL组
:param NeutronDeleteFirewallGroupRequest request
:return: NeutronDeleteFirewallGroupResponse
"""
all_params = ['firewall_group_id']
local_var_params = {}
for attr in request.attribute_map:
if hasattr(request, attr):
local_var_params[attr] = getattr(request, attr)
collection_formats = {}
path_params = {}
if 'firewall_group_id' in local_var_params:
path_params['firewall_group_id'] = local_var_params['firewall_group_id']
query_params = []
header_params = {}
form_params = {}
body_params = None
if isinstance(request, SdkStreamRequest):
body_params = request.get_file_stream()
response_headers = []
header_params['Content-Type'] = http_utils.select_header_content_type(
['application/json'])
auth_settings = []
return self.call_api(
resource_path='/v2.0/fwaas/firewall_groups/{firewall_group_id}',
method='DELETE',
path_params=path_params,
query_params=query_params,
header_params=header_params,
body=body_params,
post_params=form_params,
response_type='NeutronDeleteFirewallGroupResponse',
response_headers=response_headers,
auth_settings=auth_settings,
collection_formats=collection_formats,
request_type=request.__class__.__name__)
def neutron_delete_firewall_policy_async(self, request):
"""删除网络ACL策略
删除网络ACL策略。
:param NeutronDeleteFirewallPolicyRequest request
:return: NeutronDeleteFirewallPolicyResponse
"""
return self.neutron_delete_firewall_policy_with_http_info(request)
def neutron_delete_firewall_policy_with_http_info(self, request):
"""删除网络ACL策略
删除网络ACL策略。
:param NeutronDeleteFirewallPolicyRequest request
:return: NeutronDeleteFirewallPolicyResponse
"""
all_params = ['firewall_policy_id']
local_var_params = {}
for attr in request.attribute_map:
if hasattr(request, attr):
local_var_params[attr] = getattr(request, attr)
collection_formats = {}
path_params = {}
if 'firewall_policy_id' in local_var_params:
path_params['firewall_policy_id'] = local_var_params['firewall_policy_id']
query_params = []
header_params = {}
form_params = {}
body_params = None
if isinstance(request, SdkStreamRequest):
body_params = request.get_file_stream()
response_headers = []
header_params['Content-Type'] = http_utils.select_header_content_type(
['application/json'])
auth_settings = []
return self.call_api(
resource_path='/v2.0/fwaas/firewall_policies/{firewall_policy_id}',
method='DELETE',
path_params=path_params,
query_params=query_params,
header_params=header_params,
body=body_params,
post_params=form_params,
response_type='NeutronDeleteFirewallPolicyResponse',
response_headers=response_headers,
auth_settings=auth_settings,
collection_formats=collection_formats,
request_type=request.__class__.__name__)
def neutron_delete_firewall_rule_async(self, request):
"""删除网络ACL规则
删除网络ACL规则。
:param NeutronDeleteFirewallRuleRequest request
:return: NeutronDeleteFirewallRuleResponse
"""
return self.neutron_delete_firewall_rule_with_http_info(request)
def neutron_delete_firewall_rule_with_http_info(self, request):
"""删除网络ACL规则
删除网络ACL规则。
:param NeutronDeleteFirewallRuleRequest request
:return: NeutronDeleteFirewallRuleResponse
"""
all_params = ['firewall_rule_id']
local_var_params = {}
for attr in request.attribute_map:
if hasattr(request, attr):
local_var_params[attr] = getattr(request, attr)
collection_formats = {}
path_params = {}
if 'firewall_rule_id' in local_var_params:
path_params['firewall_rule_id'] = local_var_params['firewall_rule_id']
query_params = []
header_params = {}
form_params = {}
body_params = None
if isinstance(request, SdkStreamRequest):
body_params = request.get_file_stream()
response_headers = []
header_params['Content-Type'] = http_utils.select_header_content_type(
['application/json'])
auth_settings = []
return self.call_api(
resource_path='/v2.0/fwaas/firewall_rules/{firewall_rule_id}',
method='DELETE',
path_params=path_params,
query_params=query_params,
header_params=header_params,
body=body_params,
post_params=form_params,
response_type='NeutronDeleteFirewallRuleResponse',
response_headers=response_headers,
auth_settings=auth_settings,
collection_formats=collection_formats,
request_type=request.__class__.__name__)
def neutron_list_firewall_groups_async(self, request):
"""查询所有网络ACL组
查询提交请求的租户有权限操作的所有网络ACL组信息。单次查询最多返回2000条数据超过2000后会返回分页标记。
:param NeutronListFirewallGroupsRequest request
:return: NeutronListFirewallGroupsResponse
"""
return self.neutron_list_firewall_groups_with_http_info(request)
def neutron_list_firewall_groups_with_http_info(self, request):
"""查询所有网络ACL组
查询提交请求的租户有权限操作的所有网络ACL组信息。单次查询最多返回2000条数据超过2000后会返回分页标记。
:param NeutronListFirewallGroupsRequest request
:return: NeutronListFirewallGroupsResponse
"""
all_params = ['marker', 'limit', 'id', 'name', 'description', 'ingress_firewall_policy_id', 'egress_firewall_policy_id']
local_var_params = {}
for attr in request.attribute_map:
if hasattr(request, attr):
local_var_params[attr] = getattr(request, attr)
collection_formats = {}
path_params = {}
query_params = []
if 'marker' in local_var_params:
query_params.append(('marker', local_var_params['marker']))
if 'limit' in local_var_params:
query_params.append(('limit', local_var_params['limit']))
if 'id' in local_var_params:
query_params.append(('id', local_var_params['id']))
collection_formats['id'] = 'multi'
if 'name' in local_var_params:
query_params.append(('name', local_var_params['name']))
collection_formats['name'] = 'multi'
if 'description' in local_var_params:
query_params.append(('description', local_var_params['description']))
collection_formats['description'] = 'multi'
if 'ingress_firewall_policy_id' in local_var_params:
query_params.append(('ingress_firewall_policy_id', local_var_params['ingress_firewall_policy_id']))
if 'egress_firewall_policy_id' in local_var_params:
query_params.append(('egress_firewall_policy_id', local_var_params['egress_firewall_policy_id']))
header_params = {}
form_params = {}
body_params = None
if isinstance(request, SdkStreamRequest):
body_params = request.get_file_stream()
response_headers = []
header_params['Content-Type'] = http_utils.select_header_content_type(
['application/json'])
auth_settings = []
return self.call_api(
resource_path='/v2.0/fwaas/firewall_groups',
method='GET',
path_params=path_params,
query_params=query_params,
header_params=header_params,
body=body_params,
post_params=form_params,
response_type='NeutronListFirewallGroupsResponse',
response_headers=response_headers,
auth_settings=auth_settings,
collection_formats=collection_formats,
request_type=request.__class__.__name__)
def neutron_list_firewall_policies_async(self, request):
"""查询所有网络ACL策略
查询提交请求的租户有权限操作的所有网络ACL策略信息。单次查询最多返回2000条数据超过2000后会返回分页标记。
:param NeutronListFirewallPoliciesRequest request
:return: NeutronListFirewallPoliciesResponse
"""
return self.neutron_list_firewall_policies_with_http_info(request)
def neutron_list_firewall_policies_with_http_info(self, request):
"""查询所有网络ACL策略
查询提交请求的租户有权限操作的所有网络ACL策略信息。单次查询最多返回2000条数据超过2000后会返回分页标记。
:param NeutronListFirewallPoliciesRequest request
:return: NeutronListFirewallPoliciesResponse
"""
all_params = ['limit', 'marker', 'id', 'name', 'description', 'tenant_id']
local_var_params = {}
for attr in request.attribute_map:
if hasattr(request, attr):
local_var_params[attr] = getattr(request, attr)
collection_formats = {}
path_params = {}
query_params = []
if 'limit' in local_var_params:
query_params.append(('limit', local_var_params['limit']))
if 'marker' in local_var_params:
query_params.append(('marker', local_var_params['marker']))
if 'id' in local_var_params:
query_params.append(('id', local_var_params['id']))
collection_formats['id'] = 'multi'
if 'name' in local_var_params:
query_params.append(('name', local_var_params['name']))
collection_formats['name'] = 'multi'
if 'description' in local_var_params:
query_params.append(('description', local_var_params['description']))
collection_formats['description'] = 'multi'
if 'tenant_id' in local_var_params:
query_params.append(('tenant_id', local_var_params['tenant_id']))
header_params = {}
form_params = {}
body_params = None
if isinstance(request, SdkStreamRequest):
body_params = request.get_file_stream()
response_headers = []
header_params['Content-Type'] = http_utils.select_header_content_type(
['application/json'])
auth_settings = []
return self.call_api(
resource_path='/v2.0/fwaas/firewall_policies',
method='GET',
path_params=path_params,
query_params=query_params,
header_params=header_params,
body=body_params,
post_params=form_params,
response_type='NeutronListFirewallPoliciesResponse',
response_headers=response_headers,
auth_settings=auth_settings,
collection_formats=collection_formats,
request_type=request.__class__.__name__)
def neutron_list_firewall_rules_async(self, request):
"""查询所有网络ACL规则
查询提交请求的租户有权限操作的所有网络ACL规则信息。单次查询最多返回2000条数据超过2000后会返回分页标记。
:param NeutronListFirewallRulesRequest request
:return: NeutronListFirewallRulesResponse
"""
return self.neutron_list_firewall_rules_with_http_info(request)
def neutron_list_firewall_rules_with_http_info(self, request):
"""查询所有网络ACL规则
查询提交请求的租户有权限操作的所有网络ACL规则信息。单次查询最多返回2000条数据超过2000后会返回分页标记。
:param NeutronListFirewallRulesRequest request
:return: NeutronListFirewallRulesResponse
"""
all_params = ['marker', 'limit', 'id', 'name', 'description', 'action', 'tenant_id']
local_var_params = {}
for attr in request.attribute_map:
if hasattr(request, attr):
local_var_params[attr] = getattr(request, attr)
collection_formats = {}
path_params = {}
query_params = []
if 'marker' in local_var_params:
query_params.append(('marker', local_var_params['marker']))
if 'limit' in local_var_params:
query_params.append(('limit', local_var_params['limit']))
if 'id' in local_var_params:
query_params.append(('id', local_var_params['id']))
collection_formats['id'] = 'multi'
if 'name' in local_var_params:
query_params.append(('name', local_var_params['name']))
collection_formats['name'] = 'multi'
if 'description' in local_var_params:
query_params.append(('description', local_var_params['description']))
collection_formats['description'] = 'multi'
if 'action' in local_var_params:
query_params.append(('action', local_var_params['action']))
if 'tenant_id' in local_var_params:
query_params.append(('tenant_id', local_var_params['tenant_id']))
header_params = {}
form_params = {}
body_params = None
if isinstance(request, SdkStreamRequest):
body_params = request.get_file_stream()
response_headers = []
header_params['Content-Type'] = http_utils.select_header_content_type(
['application/json'])
auth_settings = []
return self.call_api(
resource_path='/v2.0/fwaas/firewall_rules',
method='GET',
path_params=path_params,
query_params=query_params,
header_params=header_params,
body=body_params,
post_params=form_params,
response_type='NeutronListFirewallRulesResponse',
response_headers=response_headers,
auth_settings=auth_settings,
collection_formats=collection_formats,
request_type=request.__class__.__name__)
def neutron_remove_firewall_rule_async(self, request):
"""移除网络ACL规则
从某一网络ACL策略中移除一条网络ACL规则。
:param NeutronRemoveFirewallRuleRequest request
:return: NeutronRemoveFirewallRuleResponse
"""
return self.neutron_remove_firewall_rule_with_http_info(request)
def neutron_remove_firewall_rule_with_http_info(self, request):
"""移除网络ACL规则
从某一网络ACL策略中移除一条网络ACL规则。
:param NeutronRemoveFirewallRuleRequest request
:return: NeutronRemoveFirewallRuleResponse
"""
all_params = ['firewall_policy_id', 'remove_firewall_rule']
local_var_params = {}
for attr in request.attribute_map:
if hasattr(request, attr):
local_var_params[attr] = getattr(request, attr)
collection_formats = {}
path_params = {}
if 'firewall_policy_id' in local_var_params:
path_params['firewall_policy_id'] = local_var_params['firewall_policy_id']
query_params = []
header_params = {}
form_params = {}
body_params = None
if 'body' in local_var_params:
body_params = local_var_params['body']
if isinstance(request, SdkStreamRequest):
body_params = request.get_file_stream()
response_headers = []
header_params['Content-Type'] = http_utils.select_header_content_type(
['application/json;charset=UTF-8'])
auth_settings = []
return self.call_api(
resource_path='/v2.0/fwaas/firewall_policies/{firewall_policy_id}/remove_rule',
method='PUT',
path_params=path_params,
query_params=query_params,
header_params=header_params,
body=body_params,
post_params=form_params,
response_type='NeutronRemoveFirewallRuleResponse',
response_headers=response_headers,
auth_settings=auth_settings,
collection_formats=collection_formats,
request_type=request.__class__.__name__)
def neutron_show_firewall_group_async(self, request):
"""查询特定网络ACL组详情
查询特定网络ACL组详情。
:param NeutronShowFirewallGroupRequest request
:return: NeutronShowFirewallGroupResponse
"""
return self.neutron_show_firewall_group_with_http_info(request)
def neutron_show_firewall_group_with_http_info(self, request):
"""查询特定网络ACL组详情
查询特定网络ACL组详情。
:param NeutronShowFirewallGroupRequest request
:return: NeutronShowFirewallGroupResponse
"""
all_params = ['firewall_group_id']
local_var_params = {}
for attr in request.attribute_map:
if hasattr(request, attr):
local_var_params[attr] = getattr(request, attr)
collection_formats = {}
path_params = {}
if 'firewall_group_id' in local_var_params:
path_params['firewall_group_id'] = local_var_params['firewall_group_id']
query_params = []
header_params = {}
form_params = {}
body_params = None
if isinstance(request, SdkStreamRequest):
body_params = request.get_file_stream()
response_headers = []
header_params['Content-Type'] = http_utils.select_header_content_type(
['application/json'])
auth_settings = []
return self.call_api(
resource_path='/v2.0/fwaas/firewall_groups/{firewall_group_id}',
method='GET',
path_params=path_params,
query_params=query_params,
header_params=header_params,
body=body_params,
post_params=form_params,
response_type='NeutronShowFirewallGroupResponse',
response_headers=response_headers,
auth_settings=auth_settings,
collection_formats=collection_formats,
request_type=request.__class__.__name__)
def neutron_show_firewall_policy_async(self, request):
"""查询特定网络ACL策略详情
查询特定网络ACL策略详情。
:param NeutronShowFirewallPolicyRequest request
:return: NeutronShowFirewallPolicyResponse
"""
return self.neutron_show_firewall_policy_with_http_info(request)
def neutron_show_firewall_policy_with_http_info(self, request):
"""查询特定网络ACL策略详情
查询特定网络ACL策略详情。
:param NeutronShowFirewallPolicyRequest request
:return: NeutronShowFirewallPolicyResponse
"""
all_params = ['firewall_policy_id']
local_var_params = {}
for attr in request.attribute_map:
if hasattr(request, attr):
local_var_params[attr] = getattr(request, attr)
collection_formats = {}
path_params = {}
if 'firewall_policy_id' in local_var_params:
path_params['firewall_policy_id'] = local_var_params['firewall_policy_id']
query_params = []
header_params = {}
form_params = {}
body_params = None
if isinstance(request, SdkStreamRequest):
body_params = request.get_file_stream()
response_headers = []
header_params['Content-Type'] = http_utils.select_header_content_type(
['application/json'])
auth_settings = []
return self.call_api(
resource_path='/v2.0/fwaas/firewall_policies/{firewall_policy_id}',
method='GET',
path_params=path_params,
query_params=query_params,
header_params=header_params,
body=body_params,
post_params=form_params,
response_type='NeutronShowFirewallPolicyResponse',
response_headers=response_headers,
auth_settings=auth_settings,
collection_formats=collection_formats,
request_type=request.__class__.__name__)
def neutron_show_firewall_rule_async(self, request):
"""查询特定网络ACL规则
查询特定网络ACL规则。
:param NeutronShowFirewallRuleRequest request
:return: NeutronShowFirewallRuleResponse
"""
return self.neutron_show_firewall_rule_with_http_info(request)
def neutron_show_firewall_rule_with_http_info(self, request):
"""查询特定网络ACL规则
查询特定网络ACL规则。
:param NeutronShowFirewallRuleRequest request
:return: NeutronShowFirewallRuleResponse
"""
all_params = ['firewall_rule_id']
local_var_params = {}
for attr in request.attribute_map:
if hasattr(request, attr):
local_var_params[attr] = getattr(request, attr)
collection_formats = {}
path_params = {}
if 'firewall_rule_id' in local_var_params:
path_params['firewall_rule_id'] = local_var_params['firewall_rule_id']
query_params = []
header_params = {}
form_params = {}
body_params = None
if isinstance(request, SdkStreamRequest):
body_params = request.get_file_stream()
response_headers = []
header_params['Content-Type'] = http_utils.select_header_content_type(
['application/json'])
auth_settings = []
return self.call_api(
resource_path='/v2.0/fwaas/firewall_rules/{firewall_rule_id}',
method='GET',
path_params=path_params,
query_params=query_params,
header_params=header_params,
body=body_params,
post_params=form_params,
response_type='NeutronShowFirewallRuleResponse',
response_headers=response_headers,
auth_settings=auth_settings,
collection_formats=collection_formats,
request_type=request.__class__.__name__)
def neutron_update_firewall_group_async(self, request):
"""更新网络ACL组
更新网络ACL组。
:param NeutronUpdateFirewallGroupRequest request
:return: NeutronUpdateFirewallGroupResponse
"""
return self.neutron_update_firewall_group_with_http_info(request)
def neutron_update_firewall_group_with_http_info(self, request):
"""更新网络ACL组
更新网络ACL组。
:param NeutronUpdateFirewallGroupRequest request
:return: NeutronUpdateFirewallGroupResponse
"""
all_params = ['firewall_group_id', 'firewall_group']
local_var_params = {}
for attr in request.attribute_map:
if hasattr(request, attr):
local_var_params[attr] = getattr(request, attr)
collection_formats = {}
path_params = {}
if 'firewall_group_id' in local_var_params:
path_params['firewall_group_id'] = local_var_params['firewall_group_id']
query_params = []
header_params = {}
form_params = {}
body_params = None
if 'body' in local_var_params:
body_params = local_var_params['body']
if isinstance(request, SdkStreamRequest):
body_params = request.get_file_stream()
response_headers = []
header_params['Content-Type'] = http_utils.select_header_content_type(
['application/json;charset=UTF-8'])
auth_settings = []
return self.call_api(
resource_path='/v2.0/fwaas/firewall_groups/{firewall_group_id}',
method='PUT',
path_params=path_params,
query_params=query_params,
header_params=header_params,
body=body_params,
post_params=form_params,
response_type='NeutronUpdateFirewallGroupResponse',
response_headers=response_headers,
auth_settings=auth_settings,
collection_formats=collection_formats,
request_type=request.__class__.__name__)
def neutron_update_firewall_policy_async(self, request):
"""更新网络ACL策略
更新网络ACL策略。
:param NeutronUpdateFirewallPolicyRequest request
:return: NeutronUpdateFirewallPolicyResponse
"""
return self.neutron_update_firewall_policy_with_http_info(request)
def neutron_update_firewall_policy_with_http_info(self, request):
"""更新网络ACL策略
更新网络ACL策略。
:param NeutronUpdateFirewallPolicyRequest request
:return: NeutronUpdateFirewallPolicyResponse
"""
all_params = ['firewall_policy_id', 'firewall_policy']
local_var_params = {}
for attr in request.attribute_map:
if hasattr(request, attr):
local_var_params[attr] = getattr(request, attr)
collection_formats = {}
path_params = {}
if 'firewall_policy_id' in local_var_params:
path_params['firewall_policy_id'] = local_var_params['firewall_policy_id']
query_params = []
header_params = {}
form_params = {}
body_params = None
if 'body' in local_var_params:
body_params = local_var_params['body']
if isinstance(request, SdkStreamRequest):
body_params = request.get_file_stream()
response_headers = []
header_params['Content-Type'] = http_utils.select_header_content_type(
['application/json;charset=UTF-8'])
auth_settings = []
return self.call_api(
resource_path='/v2.0/fwaas/firewall_policies/{firewall_policy_id}',
method='PUT',
path_params=path_params,
query_params=query_params,
header_params=header_params,
body=body_params,
post_params=form_params,
response_type='NeutronUpdateFirewallPolicyResponse',
response_headers=response_headers,
auth_settings=auth_settings,
collection_formats=collection_formats,
request_type=request.__class__.__name__)
def neutron_update_firewall_rule_async(self, request):
"""更新网络ACL规则
更新网络ACL规则。
:param NeutronUpdateFirewallRuleRequest request
:return: NeutronUpdateFirewallRuleResponse
"""
return self.neutron_update_firewall_rule_with_http_info(request)
def neutron_update_firewall_rule_with_http_info(self, request):
"""更新网络ACL规则
更新网络ACL规则。
:param NeutronUpdateFirewallRuleRequest request
:return: NeutronUpdateFirewallRuleResponse
"""
all_params = ['firewall_rule_id', 'firewall_rule']
local_var_params = {}
for attr in request.attribute_map:
if hasattr(request, attr):
local_var_params[attr] = getattr(request, attr)
collection_formats = {}
path_params = {}
if 'firewall_rule_id' in local_var_params:
path_params['firewall_rule_id'] = local_var_params['firewall_rule_id']
query_params = []
header_params = {}
form_params = {}
body_params = None
if 'body' in local_var_params:
body_params = local_var_params['body']
if isinstance(request, SdkStreamRequest):
body_params = request.get_file_stream()
response_headers = []
header_params['Content-Type'] = http_utils.select_header_content_type(
['application/json;charset=UTF-8'])
auth_settings = []
return self.call_api(
resource_path='/v2.0/fwaas/firewall_rules/{firewall_rule_id}',
method='PUT',
path_params=path_params,
query_params=query_params,
header_params=header_params,
body=body_params,
post_params=form_params,
response_type='NeutronUpdateFirewallRuleResponse',
response_headers=response_headers,
auth_settings=auth_settings,
collection_formats=collection_formats,
request_type=request.__class__.__name__)
def create_vpc_async(self, request):
"""创建VPC
创建虚拟私有云。
:param CreateVpcRequest request
:return: CreateVpcResponse
"""
return self.create_vpc_with_http_info(request)
def create_vpc_with_http_info(self, request):
"""创建VPC
创建虚拟私有云。
:param CreateVpcRequest request
:return: CreateVpcResponse
"""
all_params = ['vpc']
local_var_params = {}
for attr in request.attribute_map:
if hasattr(request, attr):
local_var_params[attr] = getattr(request, attr)
collection_formats = {}
path_params = {}
query_params = []
header_params = {}
form_params = {}
body_params = None
if 'body' in local_var_params:
body_params = local_var_params['body']
if isinstance(request, SdkStreamRequest):
body_params = request.get_file_stream()
response_headers = []
header_params['Content-Type'] = http_utils.select_header_content_type(
['application/json;charset=UTF-8'])
auth_settings = []
return self.call_api(
resource_path='/v1/{project_id}/vpcs',
method='POST',
path_params=path_params,
query_params=query_params,
header_params=header_params,
body=body_params,
post_params=form_params,
response_type='CreateVpcResponse',
response_headers=response_headers,
auth_settings=auth_settings,
collection_formats=collection_formats,
request_type=request.__class__.__name__)
def create_vpc_route_async(self, request):
"""创建VPC路由
创建路由
:param CreateVpcRouteRequest request
:return: CreateVpcRouteResponse
"""
return self.create_vpc_route_with_http_info(request)
def create_vpc_route_with_http_info(self, request):
"""创建VPC路由
创建路由
:param CreateVpcRouteRequest request
:return: CreateVpcRouteResponse
"""
all_params = ['route']
local_var_params = {}
for attr in request.attribute_map:
if hasattr(request, attr):
local_var_params[attr] = getattr(request, attr)
collection_formats = {}
path_params = {}
query_params = []
header_params = {}
form_params = {}
body_params = None
if 'body' in local_var_params:
body_params = local_var_params['body']
if isinstance(request, SdkStreamRequest):
body_params = request.get_file_stream()
response_headers = []
header_params['Content-Type'] = http_utils.select_header_content_type(
['application/json;charset=UTF-8'])
auth_settings = []
return self.call_api(
resource_path='/v2.0/vpc/routes',
method='POST',
path_params=path_params,
query_params=query_params,
header_params=header_params,
body=body_params,
post_params=form_params,
response_type='CreateVpcRouteResponse',
response_headers=response_headers,
auth_settings=auth_settings,
collection_formats=collection_formats,
request_type=request.__class__.__name__)
def delete_vpc_async(self, request):
"""删除VPC
删除虚拟私有云。
:param DeleteVpcRequest request
:return: DeleteVpcResponse
"""
return self.delete_vpc_with_http_info(request)
def delete_vpc_with_http_info(self, request):
"""删除VPC
删除虚拟私有云。
:param DeleteVpcRequest request
:return: DeleteVpcResponse
"""
all_params = ['vpc_id']
local_var_params = {}
for attr in request.attribute_map:
if hasattr(request, attr):
local_var_params[attr] = getattr(request, attr)
collection_formats = {}
path_params = {}
if 'vpc_id' in local_var_params:
path_params['vpc_id'] = local_var_params['vpc_id']
query_params = []
header_params = {}
form_params = {}
body_params = None
if isinstance(request, SdkStreamRequest):
body_params = request.get_file_stream()
response_headers = []
header_params['Content-Type'] = http_utils.select_header_content_type(
['application/json'])
auth_settings = []
return self.call_api(
resource_path='/v1/{project_id}/vpcs/{vpc_id}',
method='DELETE',
path_params=path_params,
query_params=query_params,
header_params=header_params,
body=body_params,
post_params=form_params,
response_type='DeleteVpcResponse',
response_headers=response_headers,
auth_settings=auth_settings,
collection_formats=collection_formats,
request_type=request.__class__.__name__)
def delete_vpc_route_async(self, request):
"""删除VPC路由
删除路由
:param DeleteVpcRouteRequest request
:return: DeleteVpcRouteResponse
"""
return self.delete_vpc_route_with_http_info(request)
def delete_vpc_route_with_http_info(self, request):
"""删除VPC路由
删除路由
:param DeleteVpcRouteRequest request
:return: DeleteVpcRouteResponse
"""
all_params = ['route_id']
local_var_params = {}
for attr in request.attribute_map:
if hasattr(request, attr):
local_var_params[attr] = getattr(request, attr)
collection_formats = {}
path_params = {}
if 'route_id' in local_var_params:
path_params['route_id'] = local_var_params['route_id']
query_params = []
header_params = {}
form_params = {}
body_params = None
if isinstance(request, SdkStreamRequest):
body_params = request.get_file_stream()
response_headers = []
header_params['Content-Type'] = http_utils.select_header_content_type(
['application/json'])
auth_settings = []
return self.call_api(
resource_path='/v2.0/vpc/routes/{route_id}',
method='DELETE',
path_params=path_params,
query_params=query_params,
header_params=header_params,
body=body_params,
post_params=form_params,
response_type='DeleteVpcRouteResponse',
response_headers=response_headers,
auth_settings=auth_settings,
collection_formats=collection_formats,
request_type=request.__class__.__name__)
def list_vpc_routes_async(self, request):
"""查询VPC路由列表
查询提交请求的租户的所有路由列表,并根据过滤条件进行过滤。
:param ListVpcRoutesRequest request
:return: ListVpcRoutesResponse
"""
return self.list_vpc_routes_with_http_info(request)
def list_vpc_routes_with_http_info(self, request):
"""查询VPC路由列表
查询提交请求的租户的所有路由列表,并根据过滤条件进行过滤。
:param ListVpcRoutesRequest request
:return: ListVpcRoutesResponse
"""
all_params = ['limit', 'marker', 'id', 'type', 'vpc_id', 'destination', 'tenant_id']
local_var_params = {}
for attr in request.attribute_map:
if hasattr(request, attr):
local_var_params[attr] = getattr(request, attr)
collection_formats = {}
path_params = {}
query_params = []
if 'limit' in local_var_params:
query_params.append(('limit', local_var_params['limit']))
if 'marker' in local_var_params:
query_params.append(('marker', local_var_params['marker']))
if 'id' in local_var_params:
query_params.append(('id', local_var_params['id']))
if 'type' in local_var_params:
query_params.append(('type', local_var_params['type']))
if 'vpc_id' in local_var_params:
query_params.append(('vpc_id', local_var_params['vpc_id']))
if 'destination' in local_var_params:
query_params.append(('destination', local_var_params['destination']))
if 'tenant_id' in local_var_params:
query_params.append(('tenant_id', local_var_params['tenant_id']))
header_params = {}
form_params = {}
body_params = None
if isinstance(request, SdkStreamRequest):
body_params = request.get_file_stream()
response_headers = []
header_params['Content-Type'] = http_utils.select_header_content_type(
['application/json'])
auth_settings = []
return self.call_api(
resource_path='/v2.0/vpc/routes',
method='GET',
path_params=path_params,
query_params=query_params,
header_params=header_params,
body=body_params,
post_params=form_params,
response_type='ListVpcRoutesResponse',
response_headers=response_headers,
auth_settings=auth_settings,
collection_formats=collection_formats,
request_type=request.__class__.__name__)
def list_vpcs_async(self, request):
"""查询VPC列表
查询虚拟私有云列表。
:param ListVpcsRequest request
:return: ListVpcsResponse
"""
return self.list_vpcs_with_http_info(request)
def list_vpcs_with_http_info(self, request):
"""查询VPC列表
查询虚拟私有云列表。
:param ListVpcsRequest request
:return: ListVpcsResponse
"""
all_params = ['limit', 'marker', 'id', 'enterprise_project_id']
local_var_params = {}
for attr in request.attribute_map:
if hasattr(request, attr):
local_var_params[attr] = getattr(request, attr)
collection_formats = {}
path_params = {}
query_params = []
if 'limit' in local_var_params:
query_params.append(('limit', local_var_params['limit']))
if 'marker' in local_var_params:
query_params.append(('marker', local_var_params['marker']))
if 'id' in local_var_params:
query_params.append(('id', local_var_params['id']))
if 'enterprise_project_id' in local_var_params:
query_params.append(('enterprise_project_id', local_var_params['enterprise_project_id']))
header_params = {}
form_params = {}
body_params = None
if isinstance(request, SdkStreamRequest):
body_params = request.get_file_stream()
response_headers = []
header_params['Content-Type'] = http_utils.select_header_content_type(
['application/json'])
auth_settings = []
return self.call_api(
resource_path='/v1/{project_id}/vpcs',
method='GET',
path_params=path_params,
query_params=query_params,
header_params=header_params,
body=body_params,
post_params=form_params,
response_type='ListVpcsResponse',
response_headers=response_headers,
auth_settings=auth_settings,
collection_formats=collection_formats,
request_type=request.__class__.__name__)
def show_vpc_async(self, request):
"""查询VPC
查询虚拟私有云。
:param ShowVpcRequest request
:return: ShowVpcResponse
"""
return self.show_vpc_with_http_info(request)
def show_vpc_with_http_info(self, request):
"""查询VPC
查询虚拟私有云。
:param ShowVpcRequest request
:return: ShowVpcResponse
"""
all_params = ['vpc_id']
local_var_params = {}
for attr in request.attribute_map:
if hasattr(request, attr):
local_var_params[attr] = getattr(request, attr)
collection_formats = {}
path_params = {}
if 'vpc_id' in local_var_params:
path_params['vpc_id'] = local_var_params['vpc_id']
query_params = []
header_params = {}
form_params = {}
body_params = None
if isinstance(request, SdkStreamRequest):
body_params = request.get_file_stream()
response_headers = []
header_params['Content-Type'] = http_utils.select_header_content_type(
['application/json'])
auth_settings = []
return self.call_api(
resource_path='/v1/{project_id}/vpcs/{vpc_id}',
method='GET',
path_params=path_params,
query_params=query_params,
header_params=header_params,
body=body_params,
post_params=form_params,
response_type='ShowVpcResponse',
response_headers=response_headers,
auth_settings=auth_settings,
collection_formats=collection_formats,
request_type=request.__class__.__name__)
def show_vpc_route_async(self, request):
"""查询VPC路由
查询路由详情
:param ShowVpcRouteRequest request
:return: ShowVpcRouteResponse
"""
return self.show_vpc_route_with_http_info(request)
def show_vpc_route_with_http_info(self, request):
"""查询VPC路由
查询路由详情
:param ShowVpcRouteRequest request
:return: ShowVpcRouteResponse
"""
all_params = ['route_id']
local_var_params = {}
for attr in request.attribute_map:
if hasattr(request, attr):
local_var_params[attr] = getattr(request, attr)
collection_formats = {}
path_params = {}
if 'route_id' in local_var_params:
path_params['route_id'] = local_var_params['route_id']
query_params = []
header_params = {}
form_params = {}
body_params = None
if isinstance(request, SdkStreamRequest):
body_params = request.get_file_stream()
response_headers = []
header_params['Content-Type'] = http_utils.select_header_content_type(
['application/json'])
auth_settings = []
return self.call_api(
resource_path='/v2.0/vpc/routes/{route_id}',
method='GET',
path_params=path_params,
query_params=query_params,
header_params=header_params,
body=body_params,
post_params=form_params,
response_type='ShowVpcRouteResponse',
response_headers=response_headers,
auth_settings=auth_settings,
collection_formats=collection_formats,
request_type=request.__class__.__name__)
def update_vpc_async(self, request):
"""更新VPC
更新虚拟私有云。
:param UpdateVpcRequest request
:return: UpdateVpcResponse
"""
return self.update_vpc_with_http_info(request)
def update_vpc_with_http_info(self, request):
"""更新VPC
更新虚拟私有云。
:param UpdateVpcRequest request
:return: UpdateVpcResponse
"""
all_params = ['vpc_id', 'vpc']
local_var_params = {}
for attr in request.attribute_map:
if hasattr(request, attr):
local_var_params[attr] = getattr(request, attr)
collection_formats = {}
path_params = {}
if 'vpc_id' in local_var_params:
path_params['vpc_id'] = local_var_params['vpc_id']
query_params = []
header_params = {}
form_params = {}
body_params = None
if 'body' in local_var_params:
body_params = local_var_params['body']
if isinstance(request, SdkStreamRequest):
body_params = request.get_file_stream()
response_headers = []
header_params['Content-Type'] = http_utils.select_header_content_type(
['application/json;charset=UTF-8'])
auth_settings = []
return self.call_api(
resource_path='/v1/{project_id}/vpcs/{vpc_id}',
method='PUT',
path_params=path_params,
query_params=query_params,
header_params=header_params,
body=body_params,
post_params=form_params,
response_type='UpdateVpcResponse',
response_headers=response_headers,
auth_settings=auth_settings,
collection_formats=collection_formats,
request_type=request.__class__.__name__)
def call_api(self, resource_path, method, path_params=None, query_params=None, header_params=None, body=None,
post_params=None, response_type=None, response_headers=None, auth_settings=None,
collection_formats=None, request_type=None):
"""Makes the HTTP request and returns deserialized data.
:param resource_path: Path to method endpoint.
:param method: Method to call.
:param path_params: Path parameters in the url.
:param query_params: Query parameters in the url.
:param header_params: Header parameters to be
placed in the request header.
:param body: Request body.
:param post_params dict: Request post form parameters,
for `application/x-www-form-urlencoded`, `multipart/form-data`.
:param auth_settings list: Auth Settings names for the request.
:param response_type: Response data type.
:param response_headers: Header should be added to response data.
:param collection_formats: dict of collection formats for path, query,
header, and post parameters.
:param request_type: Request data type.
:return:
Return the response directly.
"""
return self.do_http_request(
method=method,
resource_path=resource_path,
path_params=path_params,
query_params=query_params,
header_params=header_params,
body=body,
post_params=post_params,
response_type=response_type,
response_headers=response_headers,
collection_formats=collection_formats,
request_type=request_type,
async_request=True)