#!/usr/bin/env python
# UTIL -- Utility classes and functions for the Data Lab client interfaces.
from __future__ import print_function
__authors__ = 'Mike Fitzpatrick <fitz@noao.edu>'
__version__ = '20190422' # yyyymmdd
Utilities for managing the use of Data Lab auth tokens.
Import via
.. code-block:: python
from dl import Util
from dl.Util import multimethod, def_token
import os
import mimetypes
import random
import string
from functools import partial
import ConfigParser # Python 2
except ImportError:
import configparser as ConfigParser # Python 3
# =========================================================================
# MULTIMETHOD -- An object class to manage class methods.
# Globals
method_registry = {} # Class-method registry
[docs]def add_doc(value):
'''Decorator to set the 'Call docstring' ipython field.
def _doc(func):
func.__doc__ = value
return func
return _doc
class MultiMethod(object):
'''MultiMethod -- An object class to manage the module functions
such that functions may be overloaded and the appropriate functions
is dispatched depending on the calling arguments.
def __init__(self, module, name, cm, func):
self.module = module
self.name = name
self.func = func
self.cm = cm
self.obj = None
self.nargs = None
self.methodmap = {}
def __call__(self, *args, **kw):
'''Call the appropriate instance of the function.
# DEBUG - Above docstring roduces the 'Call Docstring' in ipython '??'
# Lookup the function to call in the method map.
if self.cm:
reg_name = self.module + '.' + self.name + '.' + str(len(args)-1)
reg_name = self.module + '.' + self.name + '.' + str(len(args))
function = self.methodmap.get(reg_name)
if function is None:
raise TypeError("No MultiFunction match found for " + reg_name)
# Call the function with all original args/keywords and return result.
if self.cm:
return function(self.obj, *args, **kw)
return function(*args, **kw)
def __repr__(self):
return self.func.__repr__()
def __get__(self, obj, objtype):
self.obj = obj
f = partial(self.__call__, obj)
f.__doc__ = self.func.__doc__
f.__dict__ = self.func.__dict__
f.__module__ = self.func.__module__
f.__defaults__ = self.func.__defaults__
if self.cm:
f.__dir__ = dir(self.obj)
#return partial(self.__call__, obj)
return f
__doc__ = property(lambda self:self.func.__doc__)
__annotations__ = property(lambda self:self.func.__annotations__)
__name__ = property(lambda self:self.func.__name__)
__module__ = property(lambda self:self.func.__module__)
def getdoc(self):
# DEBUG - Produces the 'Docstring' value in ipython '??'
return self.func.__doc__
def register(self, nargs, function, module):
'''Register the method based on the number of method arguments.
Duplicates are rejected when two method names with the same
number of arguments are registered. For generality, we
construct a registry id from the method name and no. of args.
reg_name = module + '.' + function.__name__ + '.' + str(nargs)
if reg_name in self.methodmap:
raise TypeError("duplicate registration")
self.methodmap[reg_name] = function
self.func = function
self.nargs = nargs
[docs]def multimethod(module, nargs, cm):
'''Wrapper function to implement multimethod for functions. The
identifying signature in this case is the number of required
method parameters. When methods are called, all original arguments
and keywords are passed.
def register(function):
'''multimethod register()
function = getattr(function, "__lastreg__", function)
name = function.__name__
mf = registry.get(name)
if mf is None:
mf = registry[name] = MultiMethod(module, name, cm, function)
mf.register(nargs, function, module)
mf.__lastreg__ = function
return mf
#if cm or nargs > 0:
if nargs > 0:
return mf
mf.__call__ = classmethod(function)
return mf.__lastreg__
if module not in method_registry.keys():
method_registry[module] = {}
registry = method_registry[module]
return register
# =========================================================================
# Globals
ANON_TOKEN = 'anonymous.0.0.anon_access'
# READTOKENFILE -- Read the contents of the named token file. If it
# doesn't exist, default to the anonymous token.
[docs]def readTokenFile (tok_file):
if TOK_DEBUG: print ('readTokenFile: ' + tok_file)
if not os.path.exists(tok_file):
if TOK_DEBUG: print ('returning ANON_TOKEN')
return ANON_TOKEN # FIXME -- print a warning?
tok_fd = open(tok_file, "r")
user_tok = tok_fd.read(128).strip('\n') # read the old token
if TOK_DEBUG: print ('returning user_tok: ' + user_tok)
return user_tok # return named user tok
# DEF_TOKEN -- Utility method to get the default user token to be passed
# by a Data Lab client call.
[docs]def def_token(tok):
''' Get a default token. If no token is provided, check for an
existing $HOME/.datalab/id_token.<user> file and return that if
it exists, otherwise default to the ANON_TOKEN.
If a token string is provided, return it directly. The value
may also simply be a username, in which case the same check for
a token ID file is done.
home = '%s/.datalab' % os.path.expanduser('~')
if tok is None or tok == '':
# Read the $HOME/.datalab/dl.conf file
config = ConfigParser.RawConfigParser(allow_no_value=True)
if os.path.exists('%s/dl.conf' % home):
config.read('%s/dl.conf' % home)
_status = config.get('login','status')
if _status == 'loggedin':
# Return the currently logged-in user.
_user = config.get('login','user')
tok_file = ('%s/id_token.%s' % (home, _user))
if TOK_DEBUG: print ('returning loggedin user: %s' % tok_file)
return readTokenFile(tok_file)
# Nobody logged in so return 'anonymous'
if TOK_DEBUG: print ('returning ANON_TOKEN')
# No token supplied, not logged-in, check for a login user token.
tok_file = ('%s/id_token.%s' % (home, os.getlogin()))
if TOK_DEBUG: print ('tok_file: %s' % tok_file)
if not os.path.exists(home) or not os.path.exists(tok_file):
if TOK_DEBUG: print ('returning ANON_TOKEN')
return readTokenFile(tok_file)
# Check for a plane user name or valid token. If we're given a
# token just return it. If it may be a user name, look for a token
# id file and return that, otherwise we're just anonymous.
if len(tok.split('.')) >= 4: # looks like a token
if TOK_DEBUG: print ('returning input tok: ' + tok)
return tok
elif len(tok.split('.')) == 1: # user name maybe?
tok_file = ('%s/id_token.%s' % (home, tok))
return readTokenFile(tok_file)
if TOK_DEBUG: print ('returning ANON_TOKEN')
"""Encode multipart form data to upload files via POST."""
_BOUNDARY_CHARS = string.digits + string.ascii_letters
[docs]def encode_multipart(fields, files, boundary=None):
""" Encode dict of form fields and dict of files as multipart/form-data.
Return tuple of (body_string, headers_dict). Each value in files is
a dict with required keys 'filename' and 'content', and optional
'mimetype' (if not specified, tries to guess mime type or uses
..code-block:: python
>>> body, headers = encode_multipart({'FIELD': 'VALUE'},
... {'FILE': {'filename': 'F.TXT', 'content': 'CONTENT'}},
... boundary='BOUNDARY')
>>> print('\\n'.join(repr(l) for l in body.split('\\r\\n')))
'Content-Disposition: form-data; name="FIELD"'
'Content-Disposition: form-data; name="FILE"; filename="F.TXT"'
'Content-Type: text/plain'
>>> print(sorted(headers.items()))
[('Content-Length', '193'), ('Content-Type', 'multipart/form-data; boundary=BOUNDARY')]
>>> len(body)
def escape_quote(s):
return s.replace('"', '\\"')
if boundary is None:
boundary = ''.join(random.choice(_BOUNDARY_CHARS) for i in range(30))
lines = []
for name, value in fields.items():
'Content-Disposition: form-data; name="{0}"'.format(escape_quote(name)),
for name, value in files.items():
filename = value['filename']
if 'mimetype' in value:
mimetype = value['mimetype']
mimetype = mimetypes.guess_type(filename)[0] or 'application/octet-stream'
'Content-Disposition: form-data; name="{0}"; filename="{1}"'.format(
escape_quote(name), escape_quote(filename)),
'Content-Type: {0}'.format(mimetype),
body = '\r\n'.join(lines)
headers = {
'Content-Type': 'multipart/form-data; boundary={0}'.format(boundary),
'Content-Length': str(len(body)),
return (body, headers)
import urllib2
import formdata
fields = {'name': 'BOB SMITH'}
files = {'file': {'filename': 'F.DAT', 'content': 'DATA HERE'}}
data, headers = formdata.encode_multipart(fields, files)
request = urllib2.Request('http://httpbin.org/post', data=data, headers=headers)
f = urllib2.urlopen(request)
print f.read()