Limnoria/others/SOAPpy/SOAPBuilder.py

621 lines
21 KiB
Python

"""
################################################################################
# Copyright (c) 2003, Pfizer
# Copyright (c) 2001, Cayce Ullman.
# Copyright (c) 2001, Brian Matthews.
#
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
# Redistributions of source code must retain the above copyright notice, this
# list of conditions and the following disclaimer.
#
# Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
#
# Neither the name of actzero, inc. nor the names of its contributors may
# be used to endorse or promote products derived from this software without
# specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE FOR
# ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
# (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
# ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#
################################################################################
"""
ident = '$Id$'
from version import __version__
import cgi
import copy
from wstools.XMLname import toXMLname, fromXMLname
import fpconst
# SOAPpy modules
from Config import Config
from NS import NS
from Types import *
# Test whether this Python version has Types.BooleanType
# If it doesn't have it, then False and True are serialized as integers
try:
BooleanType
pythonHasBooleanType = 1
except NameError:
pythonHasBooleanType = 0
################################################################################
# SOAP Builder
################################################################################
class SOAPBuilder:
_xml_top = '<?xml version="1.0"?>\n'
_xml_enc_top = '<?xml version="1.0" encoding="%s"?>\n'
_env_top = '%(ENV_T)s:Envelope %(ENV_T)s:encodingStyle="%(ENC)s"' % \
NS.__dict__
_env_bot = '</%(ENV_T)s:Envelope>\n' % NS.__dict__
# Namespaces potentially defined in the Envelope tag.
_env_ns = {NS.ENC: NS.ENC_T, NS.ENV: NS.ENV_T,
NS.XSD: NS.XSD_T, NS.XSD2: NS.XSD2_T, NS.XSD3: NS.XSD3_T,
NS.XSI: NS.XSI_T, NS.XSI2: NS.XSI2_T, NS.XSI3: NS.XSI3_T}
def __init__(self, args = (), kw = {}, method = None, namespace = None,
header = None, methodattrs = None, envelope = 1, encoding = 'UTF-8',
use_refs = 0, config = Config, noroot = 0):
# Test the encoding, raising an exception if it's not known
if encoding != None:
''.encode(encoding)
self.args = args
self.kw = kw
self.envelope = envelope
self.encoding = encoding
self.method = method
self.namespace = namespace
self.header = header
self.methodattrs= methodattrs
self.use_refs = use_refs
self.config = config
self.out = []
self.tcounter = 0
self.ncounter = 1
self.icounter = 1
self.envns = {}
self.ids = {}
self.depth = 0
self.multirefs = []
self.multis = 0
self.body = not isinstance(args, bodyType)
self.noroot = noroot
def build(self):
if Config.debug: print "In build."
ns_map = {}
# Cache whether typing is on or not
typed = self.config.typed
if self.header:
# Create a header.
self.dump(self.header, "Header", typed = typed)
self.header = None # Wipe it out so no one is using it.
if self.body:
# Call genns to record that we've used SOAP-ENV.
self.depth += 1
body_ns = self.genns(ns_map, NS.ENV)[0]
self.out.append("<%sBody>\n" % body_ns)
if self.method:
self.depth += 1
a = ''
if self.methodattrs:
for (k, v) in self.methodattrs.items():
a += ' %s="%s"' % (k, v)
if self.namespace: # Use the namespace info handed to us
methodns, n = self.genns(ns_map, self.namespace)
else:
methodns, n = '', ''
self.out.append('<%s%s%s%s%s>\n' % (
methodns, self.method, n, a, self.genroot(ns_map)))
try:
if type(self.args) != TupleType:
args = (self.args,)
else:
args = self.args
for i in args:
self.dump(i, typed = typed, ns_map = ns_map)
if hasattr(self.config, "argsOrdering") and self.config.argsOrdering.has_key(self.method):
for k in self.config.argsOrdering.get(self.method):
self.dump(self.kw.get(k), k, typed = typed, ns_map = ns_map)
else:
for (k, v) in self.kw.items():
self.dump(v, k, typed = typed, ns_map = ns_map)
except RecursionError:
if self.use_refs == 0:
# restart
b = SOAPBuilder(args = self.args, kw = self.kw,
method = self.method, namespace = self.namespace,
header = self.header, methodattrs = self.methodattrs,
envelope = self.envelope, encoding = self.encoding,
use_refs = 1, config = self.config)
return b.build()
raise
if self.method:
self.out.append("</%s%s>\n" % (methodns, self.method))
self.depth -= 1
if self.body:
# dump may add to self.multirefs, but the for loop will keep
# going until it has used all of self.multirefs, even those
# entries added while in the loop.
self.multis = 1
for obj, tag in self.multirefs:
self.dump(obj, tag, typed = typed, ns_map = ns_map)
self.out.append("</%sBody>\n" % body_ns)
self.depth -= 1
if self.envelope:
e = map (lambda ns: ' xmlns:%s="%s"' % (ns[1], ns[0]),
self.envns.items())
self.out = ['<', self._env_top] + e + ['>\n'] + \
self.out + \
[self._env_bot]
if self.encoding != None:
self.out.insert(0, self._xml_enc_top % self.encoding)
return ''.join(self.out).encode(self.encoding)
self.out.insert(0, self._xml_top)
return ''.join(self.out)
def gentag(self):
if Config.debug: print "In gentag."
self.tcounter += 1
return "v%d" % self.tcounter
def genns(self, ns_map, nsURI):
if nsURI == None:
return ('', '')
if type(nsURI) == TupleType: # already a tuple
if len(nsURI) == 2:
ns, nsURI = nsURI
else:
ns, nsURI = None, nsURI[0]
else:
ns = None
if ns_map.has_key(nsURI):
return (ns_map[nsURI] + ':', '')
if self._env_ns.has_key(nsURI):
ns = self.envns[nsURI] = ns_map[nsURI] = self._env_ns[nsURI]
return (ns + ':', '')
if not ns:
ns = "ns%d" % self.ncounter
self.ncounter += 1
ns_map[nsURI] = ns
if self.config.buildWithNamespacePrefix:
return (ns + ':', ' xmlns:%s="%s"' % (ns, nsURI))
else:
return ('', ' xmlns="%s"' % (nsURI))
def genroot(self, ns_map):
if self.noroot:
return ''
if self.depth != 2:
return ''
ns, n = self.genns(ns_map, NS.ENC)
return ' %sroot="%d"%s' % (ns, not self.multis, n)
# checkref checks an element to see if it needs to be encoded as a
# multi-reference element or not. If it returns None, the element has
# been handled and the caller can continue with subsequent elements.
# If it returns a string, the string should be included in the opening
# tag of the marshaled element.
def checkref(self, obj, tag, ns_map):
if self.depth < 2:
return ''
if not self.ids.has_key(id(obj)):
n = self.ids[id(obj)] = self.icounter
self.icounter = n + 1
if self.use_refs == 0:
return ''
if self.depth == 2:
return ' id="i%d"' % n
self.multirefs.append((obj, tag))
else:
if self.use_refs == 0:
raise RecursionError, "Cannot serialize recursive object"
n = self.ids[id(obj)]
if self.multis and self.depth == 2:
return ' id="i%d"' % n
self.out.append('<%s href="#i%d"%s/>\n' %
(tag, n, self.genroot(ns_map)))
return None
# dumpers
def dump(self, obj, tag = None, typed = 1, ns_map = {}):
if Config.debug: print "In dump.", "obj=", obj
ns_map = ns_map.copy()
self.depth += 1
if type(tag) not in (NoneType, StringType, UnicodeType):
raise KeyError, "tag must be a string or None"
try:
meth = getattr(self, "dump_" + type(obj).__name__)
except AttributeError:
if type(obj) == LongType:
obj_type = "integer"
elif pythonHasBooleanType and type(obj) == BooleanType:
obj_type = "boolean"
else:
obj_type = type(obj).__name__
self.out.append(self.dumper(None, obj_type, obj, tag, typed,
ns_map, self.genroot(ns_map)))
else:
meth(obj, tag, typed, ns_map)
self.depth -= 1
# generic dumper
def dumper(self, nsURI, obj_type, obj, tag, typed = 1, ns_map = {},
rootattr = '', id = '',
xml = '<%(tag)s%(type)s%(id)s%(attrs)s%(root)s>%(data)s</%(tag)s>\n'):
if Config.debug: print "In dumper."
if nsURI == None:
nsURI = self.config.typesNamespaceURI
tag = tag or self.gentag()
tag = toXMLname(tag) # convert from SOAP 1.2 XML name encoding
a = n = t = ''
if typed and obj_type:
ns, n = self.genns(ns_map, nsURI)
ins = self.genns(ns_map, self.config.schemaNamespaceURI)[0]
t = ' %stype="%s%s"%s' % (ins, ns, obj_type, n)
try: a = obj._marshalAttrs(ns_map, self)
except: pass
try: data = obj._marshalData()
except:
if (obj_type != "string"): # strings are already encoded
data = cgi.escape(str(obj))
else:
data = obj
return xml % {"tag": tag, "type": t, "data": data, "root": rootattr,
"id": id, "attrs": a}
def dump_float(self, obj, tag, typed = 1, ns_map = {}):
if Config.debug: print "In dump_float."
tag = tag or self.gentag()
tag = toXMLname(tag) # convert from SOAP 1.2 XML name encoding
if Config.strict_range:
doubleType(obj)
if fpconst.isPosInf(obj):
obj = "INF"
elif fpconst.isNegInf(obj):
obj = "-INF"
elif fpconst.isNaN(obj):
obj = "NaN"
else:
obj = str(obj)
# Note: python 'float' is actually a SOAP 'double'.
self.out.append(self.dumper(None, "double", obj, tag, typed, ns_map,
self.genroot(ns_map)))
def dump_string(self, obj, tag, typed = 0, ns_map = {}):
if Config.debug: print "In dump_string."
tag = tag or self.gentag()
tag = toXMLname(tag) # convert from SOAP 1.2 XML name encoding
id = self.checkref(obj, tag, ns_map)
if id == None:
return
try: data = obj._marshalData()
except: data = obj
self.out.append(self.dumper(None, "string", cgi.escape(data), tag,
typed, ns_map, self.genroot(ns_map), id))
dump_str = dump_string # For Python 2.2+
dump_unicode = dump_string
def dump_None(self, obj, tag, typed = 0, ns_map = {}):
if Config.debug: print "In dump_None."
tag = tag or self.gentag()
tag = toXMLname(tag) # convert from SOAP 1.2 XML name encoding
ns = self.genns(ns_map, self.config.schemaNamespaceURI)[0]
self.out.append('<%s %snull="1"%s/>\n' %
(tag, ns, self.genroot(ns_map)))
dump_NoneType = dump_None # For Python 2.2+
def dump_list(self, obj, tag, typed = 1, ns_map = {}):
if Config.debug: print "In dump_list.", "obj=", obj
tag = tag or self.gentag()
tag = toXMLname(tag) # convert from SOAP 1.2 XML name encoding
if type(obj) == InstanceType:
data = obj.data
else:
data = obj
id = self.checkref(obj, tag, ns_map)
if id == None:
return
try:
sample = data[0]
empty = 0
except:
# preserve type if present
if getattr(obj,"_typed",None) and getattr(obj,"_type",None):
if getattr(obj, "_complexType", None):
sample = typedArrayType(typed=obj._type,
complexType = obj._complexType)
sample._typename = obj._type
obj._ns = NS.URN
else:
sample = typedArrayType(typed=obj._type)
else:
sample = structType()
empty = 1
# First scan list to see if all are the same type
same_type = 1
if not empty:
for i in data[1:]:
if type(sample) != type(i) or \
(type(sample) == InstanceType and \
sample.__class__ != i.__class__):
same_type = 0
break
ndecl = ''
if same_type:
if (isinstance(sample, structType)) or \
type(sample) == DictType or \
(isinstance(sample, anyType) and \
(getattr(sample, "_complexType", None) and \
sample._complexType)): # force to urn struct
try:
tns = obj._ns or NS.URN
except:
tns = NS.URN
ns, ndecl = self.genns(ns_map, tns)
try:
typename = sample._typename
except:
typename = "SOAPStruct"
t = ns + typename
elif isinstance(sample, anyType):
ns = sample._validNamespaceURI(self.config.typesNamespaceURI,
self.config.strictNamespaces)
if ns:
ns, ndecl = self.genns(ns_map, ns)
t = ns + sample._type
else:
t = 'ur-type'
else:
typename = type(sample).__name__
# For Python 2.2+
if type(sample) == StringType: typename = 'string'
# HACK: python 'float' is actually a SOAP 'double'.
if typename=="float": typename="double"
t = self.genns(ns_map, self.config.typesNamespaceURI)[0] + \
typename
else:
t = self.genns(ns_map, self.config.typesNamespaceURI)[0] + \
"ur-type"
try: a = obj._marshalAttrs(ns_map, self)
except: a = ''
ens, edecl = self.genns(ns_map, NS.ENC)
ins, idecl = self.genns(ns_map, self.config.schemaNamespaceURI)
self.out.append(
'<%s %sarrayType="%s[%d]" %stype="%sArray"%s%s%s%s%s%s>\n' %
(tag, ens, t, len(data), ins, ens, ndecl, edecl, idecl,
self.genroot(ns_map), id, a))
typed = not same_type
try: elemsname = obj._elemsname
except: elemsname = "item"
for i in data:
self.dump(i, elemsname, typed, ns_map)
self.out.append('</%s>\n' % tag)
dump_tuple = dump_list
def dump_dictionary(self, obj, tag, typed = 1, ns_map = {}):
if Config.debug: print "In dump_dictionary."
tag = tag or self.gentag()
tag = toXMLname(tag) # convert from SOAP 1.2 XML name encoding
id = self.checkref(obj, tag, ns_map)
if id == None:
return
try: a = obj._marshalAttrs(ns_map, self)
except: a = ''
self.out.append('<%s%s%s%s>\n' %
(tag, id, a, self.genroot(ns_map)))
for (k, v) in obj.items():
if k[0] != "_":
self.dump(v, k, 1, ns_map)
self.out.append('</%s>\n' % tag)
dump_dict = dump_dictionary # For Python 2.2+
def dump_instance(self, obj, tag, typed = 1, ns_map = {}):
if Config.debug: print "In dump_instance.", "obj=", obj, "tag=", tag
if not tag:
# If it has a name use it.
if isinstance(obj, anyType) and obj._name:
tag = obj._name
else:
tag = self.gentag()
tag = toXMLname(tag) # convert from SOAP 1.2 XML name encoding
if isinstance(obj, arrayType): # Array
self.dump_list(obj, tag, typed, ns_map)
return
if isinstance(obj, faultType): # Fault
cns, cdecl = self.genns(ns_map, NS.ENC)
vns, vdecl = self.genns(ns_map, NS.ENV)
self.out.append('''<%sFault %sroot="1"%s%s>
<faultcode>%s</faultcode>
<faultstring>%s</faultstring>
''' % (vns, cns, vdecl, cdecl, obj.faultcode, obj.faultstring))
if hasattr(obj, "detail"):
self.dump(obj.detail, "detail", typed, ns_map)
self.out.append("</%sFault>\n" % vns)
return
r = self.genroot(ns_map)
try: a = obj._marshalAttrs(ns_map, self)
except: a = ''
if isinstance(obj, voidType): # void
self.out.append("<%s%s%s></%s>\n" % (tag, a, r, tag))
return
id = self.checkref(obj, tag, ns_map)
if id == None:
return
if isinstance(obj, structType):
# Check for namespace
ndecl = ''
ns = obj._validNamespaceURI(self.config.typesNamespaceURI,
self.config.strictNamespaces)
if ns:
ns, ndecl = self.genns(ns_map, ns)
tag = ns + tag
self.out.append("<%s%s%s%s%s>\n" % (tag, ndecl, id, a, r))
keylist = obj.__dict__.keys()
# first write out items with order information
if hasattr(obj, '_keyord'):
for i in range(len(obj._keyord)):
self.dump(obj._aslist(i), obj._keyord[i], 1, ns_map)
keylist.remove(obj._keyord[i])
# now write out the rest
for k in keylist:
if (k[0] != "_"):
self.dump(getattr(obj,k), k, 1, ns_map)
if isinstance(obj, bodyType):
self.multis = 1
for v, k in self.multirefs:
self.dump(v, k, typed = typed, ns_map = ns_map)
self.out.append('</%s>\n' % tag)
elif isinstance(obj, anyType):
t = ''
if typed:
ns = obj._validNamespaceURI(self.config.typesNamespaceURI,
self.config.strictNamespaces)
if ns:
ons, ondecl = self.genns(ns_map, ns)
ins, indecl = self.genns(ns_map,
self.config.schemaNamespaceURI)
t = ' %stype="%s%s"%s%s' % \
(ins, ons, obj._type, ondecl, indecl)
self.out.append('<%s%s%s%s%s>%s</%s>\n' %
(tag, t, id, a, r, obj._marshalData(), tag))
else: # Some Class
self.out.append('<%s%s%s>\n' % (tag, id, r))
for (k, v) in obj.__dict__.items():
if k[0] != "_":
self.dump(v, k, 1, ns_map)
self.out.append('</%s>\n' % tag)
################################################################################
# SOAPBuilder's more public interface
################################################################################
def buildSOAP(args=(), kw={}, method=None, namespace=None, header=None,
methodattrs=None,envelope=1,encoding='UTF-8',config=Config,noroot = 0):
t = SOAPBuilder(args=args,kw=kw, method=method, namespace=namespace,
header=header, methodattrs=methodattrs,envelope=envelope,
encoding=encoding, config=config,noroot=noroot)
return t.build()