huaweicloud-sdk-python-v3/huaweicloud-sdk-meeting/huaweicloudsdkmeeting/v1/meeting_credentials.py

56 lines
2.2 KiB
Python

import base64
import datetime
import functools
import json
from concurrent.futures.thread import ThreadPoolExecutor
from pprint import pprint
from huaweicloudsdkcore.auth.credentials import Credentials
from huaweicloudsdkcore.exceptions.exceptions import SdkException
from huaweicloudsdkcore.sdk_request import SdkRequest
class MeetingCredentials(Credentials):
def __init__(self):
self._token = None
self._last_token_date = None
self._username = None
self._password = None
def get_update_path_params(self):
pass
def process_auth_request(self, request, http_client, executor=None):
if executor is None:
executor = ThreadPoolExecutor(max_workers=1)
future = executor.submit(self.process_request, request, http_client)
return future
def process_request(self, request, http_client):
now_time = datetime.datetime.now()
if self._token is None or self._last_token_date is None or (
now_time - self._last_token_date).days * 24 * 3600 + (
now_time - self._last_token_date).seconds > 12 * 60 * 60:
authorization = "Basic " + str(base64.b64encode((self._username + ':' + self._password).encode('utf-8')),
'utf-8')
body = {'account': self._username, 'clientType': 0}
sdk_request = SdkRequest('POST', 'https', request.host, '/v1/usg/acs/auth/account', [],
{'Authorization': authorization, 'Content-Type': 'application/json'},
json.dumps(body), [], '/v1/usg/acs/auth/account')
response = http_client.do_request_sync(sdk_request, [], [])
content = json.loads(response.content.decode())
self._token = content['accessToken']
self._last_token_date = datetime.datetime.now()
request.header_params["X-Auth-Token"] = self._token
request.uri = request.resource_path
return request
else:
request.header_params["X-Auth-Token"] = self._token
request.uri = request.resource_path
return request