Source code for lakesuperior.util.toolbox

import logging
import os
import re

from collections import defaultdict
from hashlib import sha1

from rdflib import Graph
from rdflib.term import URIRef, Variable

from lakesuperior.dictionaries.namespaces import ns_collection as nsc
from lakesuperior.store.ldp_rs import ROOT_RSRC_URI


logger = logging.getLogger(__name__)

__doc__ = ''' Utility to translate and generate strings and other objects. '''


[docs]def fsize_fmt(num, suffix='b'): """ Format an integer into 1024-block file size format. Adapted from Python 2 code on https://stackoverflow.com/a/1094933/3758232 :param int num: Size value in bytes. :param str suffix: Suffix label (defaults to ``b``). :rtype: str :return: Formatted size to largest fitting unit. """ for unit in ['','K','M','G','T','P','E','Z']: if abs(num) < 1024.0: return f'{num:3.1f} {unit}{suffix}' num /= 1024.0 return f'{num:.1f} Y{suffix}'
[docs]def get_tree_size(path, follow_symlinks=True): """ Return total size of files in given path and subdirs. Ripped from https://www.python.org/dev/peps/pep-0471/ """ total = 0 for entry in os.scandir(path): if entry.is_dir(follow_symlinks=follow_symlinks): total += get_tree_size(entry.path) else: total += entry.stat( follow_symlinks=follow_symlinks ).st_size return total
[docs]def replace_term_domain(term, search, replace): ''' Replace the domain of a term. :param rdflib.URIRef term: The term (URI) to change. :param str search: Domain string to replace. :param str replace: Domain string to use for replacement. :rtype: rdflib.URIRef ''' s = str(term) if s.startswith(search): s = s.replace(search, replace) return URIRef(s)
[docs]def parse_rfc7240(h_str): ''' Parse ``Prefer`` header as per https://tools.ietf.org/html/rfc7240 The ``cgi.parse_header`` standard method does not work with all possible use cases for this header. :param str h_str: The header(s) as a comma-separated list of Prefer statements, excluding the ``Prefer:`` token. ''' parsed_hdr = defaultdict(dict) # Split up headers by comma hdr_list = [ x.strip() for x in h_str.split(',') ] for hdr in hdr_list: parsed_pref = defaultdict(dict) # Split up tokens by semicolon token_list = [ token.strip() for token in hdr.split(';') ] prefer_token = token_list.pop(0).split('=') prefer_name = prefer_token[0] # If preference has a '=', it has a value, else none. if len(prefer_token)>1: parsed_pref['value'] = prefer_token[1].strip('"') for param_token in token_list: # If the token list had a ';' the preference has a parameter. param_parts = [ prm.strip().strip('"') \ for prm in param_token.split('=') ] param_value = param_parts[1] if len(param_parts) > 1 else None parsed_pref['parameters'][param_parts[0]] = param_value parsed_hdr[prefer_name] = parsed_pref return parsed_hdr
[docs]def split_uuid(uuid): ''' Split a UID into pairtree segments. This mimics FCREPO4 behavior. :param str uuid: UUID to split. :rtype: str ''' path = '{}/{}/{}/{}/{}'.format(uuid[:2], uuid[2:4], uuid[4:6], uuid[6:8], uuid) return path
[docs]def rel_uri_to_urn(uri, uid): """ Convert a URIRef with a relative location (e.g. ``<>``) to an URN. :param URIRef uri: The URI to convert. :param str uid: Resource UID that the URI should be relative to. :return: Converted URN if the input is relative, otherwise the unchanged URI. :rtype: URIRef """ # FIXME This only accounts for empty URIs, not all relative URIs. return nsc['fcres'][uid] if str(uri) == '' else uri
#return URIRef( # re.sub('<#([^>]+)>', f'<{base_uri}#\\1>', str(uri)) # .replace('<>', f'<{base_uri}>'))
[docs]def rel_uri_to_urn_string(string, uid): """ Convert relative URIs in a SPARQL or RDF string to internal URNs. :param str string: Input string. :param str uid Resource UID to build the base URN from. :rtype: str :return: Modified string. """ urn = str(nsc['fcres'][uid]) return ( re.sub('<#([^>]+)>', f'<{urn}#\\1>', string).replace('<>', f'<{urn}>') )
[docs]class RequestUtils: """ Utilities that require access to an HTTP request context. Initialize this within a Flask request context. """ def __init__(self): from flask import g self.webroot = g.webroot
[docs] def uid_to_uri(self, uid): '''Convert a UID to a URI. :rtype: rdflib.URIRef ''' return URIRef(self.webroot + uid)
[docs] def uri_to_uid(self, uri): '''Convert an absolute URI (internal or external) to a UID. :rtype: str ''' if uri.startswith(nsc['fcres']): return str(uri).replace(nsc['fcres'], '') else: return '/' + str(uri).replace(self.webroot, '').strip('/')
[docs] def localize_uri_string(self, s): '''Convert URIs into URNs in a string using the application base URI. :param str: s Input string. :rtype: str ''' if s.strip('/') == self.webroot: return str(ROOT_RSRC_URI) else: return s.rstrip('/').replace( self.webroot, str(nsc['fcres']))
[docs] def localize_term(self, uri): ''' Localize an individual term. :param rdflib.URIRef: urn Input URI. :rtype: rdflib.URIRef ''' return URIRef(self.localize_uri_string(str(uri)))
[docs] def localize_triple(self, trp): ''' Localize terms in a triple. :param tuple(rdflib.URIRef) trp: The triple to be converted :rtype: tuple(rdflib.URIRef) ''' s, p, o = trp if s.startswith(self.webroot): s = self.localize_term(s) if o.startswith(self.webroot): o = self.localize_term(o) return s, p, o
[docs] def localize_graph(self, gr): ''' Localize a graph. ''' l_id = self.localize_term(gr.identifier) l_gr = Graph(identifier=l_id) for trp in gr: l_gr.add(self.localize_triple(trp)) return l_gr
[docs] def localize_payload(self, data): ''' Localize an RDF stream with domain-specific URIs. :param bytes data: Binary RDF data. :rtype: bytes ''' return data.replace( (self.webroot + '/').encode('utf-8'), (nsc['fcres'] + '/').encode('utf-8') ).replace( self.webroot.encode('utf-8'), (nsc['fcres'] + '/').encode('utf-8') )
[docs] def localize_ext_str(self, s, urn): ''' Convert global URIs to local in a SPARQL or RDF string. Also replace empty URIs (`<>`) with a fixed local URN and take care of fragments and relative URIs. This is a 3-pass replacement. First, global URIs whose webroot matches the application ones are replaced with internal URIs. Then, relative URIs are converted to absolute using the internal URI as the base; finally, the root node is appropriately addressed. ''' esc_webroot = self.webroot.replace('/', '\\/') #loc_ptn = r'<({}\/?)?(.*?)?(\?.*?)?(#.*?)?>'.format(esc_webroot) loc_ptn1 = r'<{}\/?(.*?)>'.format(esc_webroot) loc_sub1 = '<{}/\\1>'.format(nsc['fcres']) s1 = re.sub(loc_ptn1, loc_sub1, s) loc_ptn2 = r'<([#?].*?)?>' loc_sub2 = '<{}\\1>'.format(urn) s2 = re.sub(loc_ptn2, loc_sub2, s1) loc_ptn3 = r'<{}([#?].*?)?>'.format(nsc['fcres']) loc_sub3 = '<{}\\1>'.format(ROOT_RSRC_URI) s3 = re.sub(loc_ptn3, loc_sub3, s2) return s3
[docs] def globalize_string(self, s): '''Convert URNs into URIs in a string using the application base URI. :param string s: Input string. :rtype: string ''' return s.replace(str(nsc['fcres']), self.webroot)
[docs] def globalize_term(self, urn): ''' Convert an URN into an URI using the application base URI. :param rdflib.URIRef urn: Input URN. :rtype: rdflib.URIRef ''' return URIRef(self.globalize_string(str(urn)))
[docs] def globalize_triple(self, trp): ''' Globalize terms in a triple. :param tuple(rdflib.URIRef) trp: The triple to be converted :rtype: tuple(rdflib.URIRef) ''' s, p, o = trp if s.startswith(nsc['fcres']): s = self.globalize_term(s) if o.startswith(nsc['fcres']): o = self.globalize_term(o) return s, p, o
[docs] def globalize_imr(self, imr): ''' Globalize an Imr. :rtype: rdflib.Graph ''' g_gr = Graph(identifier=self.globalize_term(imr.uri)) for trp in imr: g_gr.add(self.globalize_triple(trp)) return g_gr
[docs] def globalize_graph(self, gr): ''' Globalize a graph. ''' g_id = self.globalize_term(gr.identifier) g_gr = Graph(identifier=g_id) for trp in gr: g_gr.add(self.globalize_triple(trp)) return g_gr
[docs] def globalize_rsrc(self, rsrc): ''' Globalize a resource. ''' gr = rsrc.graph urn = rsrc.identifier global_gr = self.globalize_graph(gr) global_uri = self.globalize_term(urn) return global_gr.resource(global_uri)