python-benedict/benedict/utils/io_util.py

180 lines
4.1 KiB
Python
Raw Normal View History

2019-09-10 14:58:26 +00:00
# -*- coding: utf-8 -*-
from six import string_types
2019-10-03 16:45:22 +00:00
import base64
2019-09-10 14:58:26 +00:00
import errno
import json
import os
2019-10-14 12:47:11 +00:00
import re
2019-09-10 14:58:26 +00:00
import requests
2019-09-23 12:13:52 +00:00
import xmltodict
2019-09-20 14:21:04 +00:00
import toml
2019-09-17 09:50:06 +00:00
import yaml
2019-09-10 14:58:26 +00:00
try:
# python 3
from urllib.parse import unquote
from urllib.parse import unquote_plus
2019-10-14 12:47:11 +00:00
from urllib.parse import urlencode
from urllib.parse import parse_qs
except ImportError:
# python 2
from urllib import unquote
from urllib import unquote_plus
2019-10-14 12:47:11 +00:00
from urllib import urlencode
from urlparse import parse_qs
2019-10-03 16:45:22 +00:00
def decode_base64(s, **kwargs):
# fix urlencoded chars
s = unquote(s)
# fix padding
m = len(s) % 4
if m != 0:
s += '=' * (4 - m)
b = base64.b64decode(s)
encoding = kwargs.pop('encoding', 'utf-8')
format = kwargs.pop('format', None)
if encoding:
data = decode_bytes(b, encoding=encoding, format=format)
else:
data = b
return data
def decode_bytes(b, encoding='utf-8', format=None, **kwargs):
s = b.decode(encoding)
# fix trailing null chars
s = s.strip().strip('\x00').strip()
if format:
decoders = {
'json': decode_json,
'toml': decode_toml,
'yaml': decode_yaml,
'xml': decode_xml,
}
decoder = decoders[format]
data = decoder(s, **kwargs)
else:
data = s
return data
2019-10-03 16:45:22 +00:00
2019-09-10 14:58:26 +00:00
def decode_json(s, **kwargs):
data = json.loads(s, **kwargs)
2019-09-17 09:50:06 +00:00
return data
2019-10-14 12:47:11 +00:00
def decode_query_string(s, **kwargs):
flat = kwargs.pop('flat', True)
qs_re = r'^(([\w\-\%\+]+\=[\w\-\%\+]*)+([\&]{1})?)+'
qs_pattern = re.compile(qs_re)
if qs_pattern.match(s):
data = parse_qs(s)
if flat:
data = { key:value[0] for key, value in data.items() }
return data
else:
raise ValueError('Invalid query string: {}'.format(s))
2019-09-23 12:13:52 +00:00
def decode_xml(s, **kwargs):
kwargs.setdefault('dict_constructor', dict)
data = xmltodict.parse(s, **kwargs)
return data
2019-09-20 14:21:04 +00:00
def decode_toml(s, **kwargs):
data = toml.loads(s, **kwargs)
return data
2019-09-17 09:50:06 +00:00
def decode_yaml(s, **kwargs):
kwargs.setdefault('Loader', yaml.Loader)
data = yaml.load(s, **kwargs)
return data
2019-09-10 14:58:26 +00:00
2019-10-03 16:45:22 +00:00
def encode_base64(d, **kwargs):
encoding = kwargs.pop('encoding', 'utf-8')
encode_format = kwargs.pop('format', 'json')
if encode_format:
encoders = {
'json': encode_json,
'toml': encode_toml,
'yaml': encode_yaml,
'xml': encode_xml,
}
encode_func = encoders.get(encode_format.lower(), None)
if encode_func:
data = encode_func(d, **kwargs)
if isinstance(data, string_types):
data = data.encode(encoding)
data = base64.b64encode(data).decode(encoding)
2019-10-03 16:45:22 +00:00
return data
2019-09-10 14:58:26 +00:00
def encode_json(d, **kwargs):
2019-09-17 09:50:06 +00:00
data = json.dumps(d, **kwargs)
return data
2019-10-14 12:47:11 +00:00
def encode_query_string(d, **kwargs):
data = urlencode(d, **kwargs)
return data
2019-09-20 14:21:04 +00:00
def encode_toml(d, **kwargs):
data = toml.dumps(d, **kwargs)
return data
2019-09-23 12:13:52 +00:00
def encode_xml(d, **kwargs):
data = xmltodict.unparse(d, **kwargs)
return data
2019-09-17 09:50:06 +00:00
def encode_yaml(d, **kwargs):
data = yaml.dump(d, **kwargs)
return data
2019-09-10 14:58:26 +00:00
2019-10-03 16:42:44 +00:00
def read_content(s):
# s -> filepath or url or data
if s.startswith('http://') or s.startswith('https://'):
content = read_url(s)
elif os.path.isfile(s):
content = read_file(s)
else:
content = s
return content
2019-09-10 14:58:26 +00:00
def read_file(filepath):
handler = open(filepath, 'r')
content = handler.read()
handler.close()
return content
def read_url(url, *args, **kwargs):
response = requests.get(url, *args, **kwargs)
content = response.text
return content
def write_file(filepath, content):
# https://stackoverflow.com/questions/12517451/automatically-creating-directories-with-file-output
if not os.path.exists(os.path.dirname(filepath)):
try:
os.makedirs(os.path.dirname(filepath))
except OSError as e:
# Guard against race condition
if e.errno != errno.EEXIST:
raise e
handler = open(filepath, 'w+')
handler.write(content)
handler.close()
return True