mirror of
https://github.com/Mikaela/Limnoria.git
synced 2024-11-24 11:39:25 +01:00
1337 lines
48 KiB
Python
1337 lines
48 KiB
Python
|
# Copyright (c) 2001 Zope Corporation and Contributors. All Rights Reserved.
|
||
|
#
|
||
|
# This software is subject to the provisions of the Zope Public License,
|
||
|
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
|
||
|
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
|
||
|
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
|
||
|
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
|
||
|
# FOR A PARTICULAR PURPOSE.
|
||
|
|
||
|
ident = "$Id$"
|
||
|
|
||
|
from Utility import DOM, Collection, CollectionNS
|
||
|
from XMLSchema import XMLSchema, SchemaReader, WSDLToolsAdapter
|
||
|
from Namespaces import WSR, WSA
|
||
|
from StringIO import StringIO
|
||
|
import urllib
|
||
|
|
||
|
|
||
|
class WSDLReader:
|
||
|
"""A WSDLReader creates WSDL instances from urls and xml data."""
|
||
|
|
||
|
# Custom subclasses of WSDLReader may wish to implement a caching
|
||
|
# strategy or other optimizations. Because application needs vary
|
||
|
# so widely, we don't try to provide any caching by default.
|
||
|
|
||
|
def loadFromStream(self, stream, name=None):
|
||
|
"""Return a WSDL instance loaded from a stream object."""
|
||
|
document = DOM.loadDocument(stream)
|
||
|
wsdl = WSDL()
|
||
|
if name:
|
||
|
wsdl.location = name
|
||
|
elif hasattr(stream, 'name'):
|
||
|
wsdl.location = stream.name
|
||
|
wsdl.load(document)
|
||
|
return wsdl
|
||
|
|
||
|
def loadFromURL(self, url):
|
||
|
"""Return a WSDL instance loaded from the given url."""
|
||
|
document = DOM.loadFromURL(url)
|
||
|
wsdl = WSDL()
|
||
|
wsdl.location = url
|
||
|
wsdl.load(document)
|
||
|
return wsdl
|
||
|
|
||
|
def loadFromString(self, data):
|
||
|
"""Return a WSDL instance loaded from an xml string."""
|
||
|
return self.loadFromStream(StringIO(data))
|
||
|
|
||
|
def loadFromFile(self, filename):
|
||
|
"""Return a WSDL instance loaded from the given file."""
|
||
|
file = open(filename, 'rb')
|
||
|
try:
|
||
|
wsdl = self.loadFromStream(file)
|
||
|
finally:
|
||
|
file.close()
|
||
|
return wsdl
|
||
|
|
||
|
class WSDL:
|
||
|
"""A WSDL object models a WSDL service description. WSDL objects
|
||
|
may be created manually or loaded from an xml representation
|
||
|
using a WSDLReader instance."""
|
||
|
|
||
|
def __init__(self, targetNamespace=None, strict=1):
|
||
|
self.targetNamespace = targetNamespace or 'urn:this-document.wsdl'
|
||
|
self.documentation = ''
|
||
|
self.location = None
|
||
|
self.document = None
|
||
|
self.name = None
|
||
|
self.services = CollectionNS(self)
|
||
|
self.messages = CollectionNS(self)
|
||
|
self.portTypes = CollectionNS(self)
|
||
|
self.bindings = CollectionNS(self)
|
||
|
#self.imports = Collection(self)
|
||
|
self.types = Types(self)
|
||
|
self.extensions = []
|
||
|
self.strict = strict
|
||
|
|
||
|
def __del__(self):
|
||
|
if self.document is not None:
|
||
|
self.document.unlink()
|
||
|
|
||
|
version = '1.1'
|
||
|
|
||
|
def addService(self, name, documentation='', targetNamespace=None):
|
||
|
if self.services.has_key(name):
|
||
|
raise WSDLError(
|
||
|
'Duplicate service element: %s' % name
|
||
|
)
|
||
|
item = Service(name, documentation)
|
||
|
if targetNamespace:
|
||
|
item.targetNamespace = targetNamespace
|
||
|
self.services[name] = item
|
||
|
return item
|
||
|
|
||
|
def addMessage(self, name, documentation='', targetNamespace=None):
|
||
|
if self.messages.has_key(name):
|
||
|
raise WSDLError(
|
||
|
'Duplicate message element: %s.' % name
|
||
|
)
|
||
|
item = Message(name, documentation)
|
||
|
if targetNamespace:
|
||
|
item.targetNamespace = targetNamespace
|
||
|
self.messages[name] = item
|
||
|
return item
|
||
|
|
||
|
def addPortType(self, name, documentation='', targetNamespace=None):
|
||
|
if self.portTypes.has_key(name):
|
||
|
raise WSDLError(
|
||
|
'Duplicate portType element: name'
|
||
|
)
|
||
|
item = PortType(name, documentation)
|
||
|
if targetNamespace:
|
||
|
item.targetNamespace = targetNamespace
|
||
|
self.portTypes[name] = item
|
||
|
return item
|
||
|
|
||
|
def addBinding(self, name, type, documentation='', targetNamespace=None):
|
||
|
if self.bindings.has_key(name):
|
||
|
raise WSDLError(
|
||
|
'Duplicate binding element: %s' % name
|
||
|
)
|
||
|
item = Binding(name, type, documentation)
|
||
|
if targetNamespace:
|
||
|
item.targetNamespace = targetNamespace
|
||
|
self.bindings[name] = item
|
||
|
return item
|
||
|
|
||
|
#def addImport(self, namespace, location):
|
||
|
# item = ImportElement(namespace, location)
|
||
|
# self.imports[namespace] = item
|
||
|
# return item
|
||
|
|
||
|
def load(self, document):
|
||
|
# We save a reference to the DOM document to ensure that elements
|
||
|
# saved as "extensions" will continue to have a meaningful context
|
||
|
# for things like namespace references. The lifetime of the DOM
|
||
|
# document is bound to the lifetime of the WSDL instance.
|
||
|
self.document = document
|
||
|
|
||
|
definitions = DOM.getElement(document, 'definitions', None, None)
|
||
|
if definitions is None:
|
||
|
raise WSDLError(
|
||
|
'Missing <definitions> element.'
|
||
|
)
|
||
|
self.version = DOM.WSDLUriToVersion(definitions.namespaceURI)
|
||
|
NS_WSDL = DOM.GetWSDLUri(self.version)
|
||
|
|
||
|
self.targetNamespace = DOM.getAttr(definitions, 'targetNamespace',
|
||
|
None, None)
|
||
|
self.name = DOM.getAttr(definitions, 'name', None, None)
|
||
|
self.documentation = GetDocumentation(definitions)
|
||
|
|
||
|
# Resolve (recursively) any import elements in the document.
|
||
|
imported = {}
|
||
|
base_location = self.location
|
||
|
while 1:
|
||
|
#XXX
|
||
|
imports = []
|
||
|
for element in DOM.getElements(definitions, 'import', NS_WSDL):
|
||
|
location = DOM.getAttr(element, 'location')
|
||
|
# Resolve relative location, and save
|
||
|
location = urllib.basejoin(base_location, location)
|
||
|
|
||
|
if not imported.has_key(location):
|
||
|
imports.append(element)
|
||
|
|
||
|
if not imports:
|
||
|
break
|
||
|
for element in imports:
|
||
|
location = DOM.getAttr(element, 'location')
|
||
|
self._import(document, element, base_location)
|
||
|
location = urllib.basejoin(base_location, location)
|
||
|
imported[location] = 1
|
||
|
base_location = ''
|
||
|
|
||
|
#reader = SchemaReader(base_url=self.location)
|
||
|
for element in DOM.getElements(definitions, None, None):
|
||
|
targetNamespace = DOM.getAttr(element, 'targetNamespace')
|
||
|
localName = element.localName
|
||
|
|
||
|
if not DOM.nsUriMatch(element.namespaceURI, NS_WSDL):
|
||
|
if localName == 'schema':
|
||
|
reader = SchemaReader(base_url=self.location)
|
||
|
schema = reader.loadFromNode(WSDLToolsAdapter(self), element)
|
||
|
schema.setBaseUrl(self.location)
|
||
|
self.types.addSchema(schema)
|
||
|
else:
|
||
|
self.extensions.append(element)
|
||
|
continue
|
||
|
|
||
|
elif localName == 'message':
|
||
|
name = DOM.getAttr(element, 'name')
|
||
|
docs = GetDocumentation(element)
|
||
|
message = self.addMessage(name, docs, targetNamespace)
|
||
|
parts = DOM.getElements(element, 'part', NS_WSDL)
|
||
|
message.load(parts)
|
||
|
continue
|
||
|
|
||
|
elif localName == 'portType':
|
||
|
name = DOM.getAttr(element, 'name')
|
||
|
docs = GetDocumentation(element)
|
||
|
ptype = self.addPortType(name, docs, targetNamespace)
|
||
|
#operations = DOM.getElements(element, 'operation', NS_WSDL)
|
||
|
#ptype.load(operations)
|
||
|
ptype.load(element)
|
||
|
continue
|
||
|
|
||
|
elif localName == 'binding':
|
||
|
name = DOM.getAttr(element, 'name')
|
||
|
type = DOM.getAttr(element, 'type', default=None)
|
||
|
if type is None:
|
||
|
raise WSDLError(
|
||
|
'Missing type attribute for binding %s.' % name
|
||
|
)
|
||
|
type = ParseQName(type, element)
|
||
|
docs = GetDocumentation(element)
|
||
|
binding = self.addBinding(name, type, docs, targetNamespace)
|
||
|
operations = DOM.getElements(element, 'operation', NS_WSDL)
|
||
|
binding.load(operations)
|
||
|
binding.load_ex(GetExtensions(element))
|
||
|
continue
|
||
|
|
||
|
elif localName == 'service':
|
||
|
name = DOM.getAttr(element, 'name')
|
||
|
docs = GetDocumentation(element)
|
||
|
service = self.addService(name, docs, targetNamespace)
|
||
|
ports = DOM.getElements(element, 'port', NS_WSDL)
|
||
|
service.load(ports)
|
||
|
service.load_ex(GetExtensions(element))
|
||
|
continue
|
||
|
|
||
|
elif localName == 'types':
|
||
|
self.types.documentation = GetDocumentation(element)
|
||
|
base_location = DOM.getAttr(element, 'base-location')
|
||
|
if base_location:
|
||
|
element.removeAttribute('base-location')
|
||
|
base_location = base_location or self.location
|
||
|
reader = SchemaReader(base_url=base_location)
|
||
|
for item in DOM.getElements(element, None, None):
|
||
|
if item.localName == 'schema':
|
||
|
schema = reader.loadFromNode(WSDLToolsAdapter(self), item)
|
||
|
# XXX <types> could have been imported
|
||
|
#schema.setBaseUrl(self.location)
|
||
|
schema.setBaseUrl(base_location)
|
||
|
self.types.addSchema(schema)
|
||
|
else:
|
||
|
self.types.addExtension(item)
|
||
|
# XXX remove the attribute
|
||
|
# element.removeAttribute('base-location')
|
||
|
continue
|
||
|
|
||
|
def _import(self, document, element, base_location=None):
|
||
|
'''Algo take <import> element's children, clone them,
|
||
|
and add them to the main document. Support for relative
|
||
|
locations is a bit complicated. The orig document context
|
||
|
is lost, so we need to store base location in DOM elements
|
||
|
representing <types>, by creating a special temporary
|
||
|
"base-location" attribute, and <import>, by resolving
|
||
|
the relative "location" and storing it as "location".
|
||
|
|
||
|
document -- document we are loading
|
||
|
element -- DOM Element representing <import>
|
||
|
base_location -- location of document from which this
|
||
|
<import> was gleaned.
|
||
|
'''
|
||
|
namespace = DOM.getAttr(element, 'namespace', default=None)
|
||
|
location = DOM.getAttr(element, 'location', default=None)
|
||
|
if namespace is None or location is None:
|
||
|
raise WSDLError(
|
||
|
'Invalid import element (missing namespace or location).'
|
||
|
)
|
||
|
if base_location:
|
||
|
location = urllib.basejoin(base_location, location)
|
||
|
element.setAttributeNS(None, 'location', location)
|
||
|
|
||
|
#location = urllib.basejoin(self.location, location)
|
||
|
#obimport = self.addImport(namespace, location)
|
||
|
#obimport._loaded = 1
|
||
|
|
||
|
importdoc = DOM.loadFromURL(location)
|
||
|
try:
|
||
|
if location.find('#') > -1:
|
||
|
idref = location.split('#')[-1]
|
||
|
imported = DOM.getElementById(importdoc, idref)
|
||
|
else:
|
||
|
imported = importdoc.documentElement
|
||
|
if imported is None:
|
||
|
raise WSDLError(
|
||
|
'Import target element not found for: %s' % location
|
||
|
)
|
||
|
|
||
|
imported_tns = DOM.findTargetNS(imported)
|
||
|
if imported_tns != namespace:
|
||
|
return
|
||
|
|
||
|
if imported.localName == 'definitions':
|
||
|
imported_nodes = imported.childNodes
|
||
|
else:
|
||
|
imported_nodes = [imported]
|
||
|
parent = element.parentNode
|
||
|
for node in imported_nodes:
|
||
|
if node.nodeType != node.ELEMENT_NODE:
|
||
|
continue
|
||
|
child = DOM.importNode(document, node, 1)
|
||
|
parent.appendChild(child)
|
||
|
child.setAttribute('targetNamespace', namespace)
|
||
|
attrsNS = imported._attrsNS
|
||
|
for attrkey in attrsNS.keys():
|
||
|
if attrkey[0] == DOM.NS_XMLNS:
|
||
|
attr = attrsNS[attrkey].cloneNode(1)
|
||
|
child.setAttributeNode(attr)
|
||
|
|
||
|
#XXX Quick Hack, should be in WSDL Namespace.
|
||
|
if child.localName == 'import':
|
||
|
rlocation = child.getAttributeNS(None, 'location')
|
||
|
alocation = urllib.basejoin(location, rlocation)
|
||
|
child.setAttribute('location', alocation)
|
||
|
elif child.localName == 'types':
|
||
|
child.setAttribute('base-location', location)
|
||
|
|
||
|
finally:
|
||
|
importdoc.unlink()
|
||
|
return location
|
||
|
|
||
|
class Element:
|
||
|
"""A class that provides common functions for WSDL element classes."""
|
||
|
def __init__(self, name=None, documentation=''):
|
||
|
self.name = name
|
||
|
self.documentation = documentation
|
||
|
self.extensions = []
|
||
|
|
||
|
def addExtension(self, item):
|
||
|
self.extensions.append(item)
|
||
|
|
||
|
|
||
|
class ImportElement(Element):
|
||
|
def __init__(self, namespace, location):
|
||
|
self.namespace = namespace
|
||
|
self.location = location
|
||
|
|
||
|
_loaded = None
|
||
|
|
||
|
|
||
|
class Types(Collection):
|
||
|
default = lambda self,k: k.targetNamespace
|
||
|
def __init__(self, parent):
|
||
|
Collection.__init__(self, parent)
|
||
|
self.documentation = ''
|
||
|
self.extensions = []
|
||
|
|
||
|
def addSchema(self, schema):
|
||
|
name = schema.targetNamespace
|
||
|
self[name] = schema
|
||
|
return schema
|
||
|
|
||
|
def addExtension(self, item):
|
||
|
self.extensions.append(item)
|
||
|
|
||
|
|
||
|
class Message(Element):
|
||
|
def __init__(self, name, documentation=''):
|
||
|
Element.__init__(self, name, documentation)
|
||
|
self.parts = Collection(self)
|
||
|
|
||
|
def addPart(self, name, type=None, element=None):
|
||
|
if self.parts.has_key(name):
|
||
|
raise WSDLError(
|
||
|
'Duplicate message part element: %s' % name
|
||
|
)
|
||
|
if type is None and element is None:
|
||
|
raise WSDLError(
|
||
|
'Missing type or element attribute for part: %s' % name
|
||
|
)
|
||
|
item = MessagePart(name)
|
||
|
item.element = element
|
||
|
item.type = type
|
||
|
self.parts[name] = item
|
||
|
return item
|
||
|
|
||
|
def load(self, elements):
|
||
|
for element in elements:
|
||
|
name = DOM.getAttr(element, 'name')
|
||
|
part = MessagePart(name)
|
||
|
self.parts[name] = part
|
||
|
elemref = DOM.getAttr(element, 'element', default=None)
|
||
|
typeref = DOM.getAttr(element, 'type', default=None)
|
||
|
if typeref is None and elemref is None:
|
||
|
raise WSDLError(
|
||
|
'No type or element attribute for part: %s' % name
|
||
|
)
|
||
|
if typeref is not None:
|
||
|
part.type = ParseTypeRef(typeref, element)
|
||
|
if elemref is not None:
|
||
|
part.element = ParseTypeRef(elemref, element)
|
||
|
|
||
|
|
||
|
class MessagePart(Element):
|
||
|
def __init__(self, name):
|
||
|
Element.__init__(self, name, '')
|
||
|
self.element = None
|
||
|
self.type = None
|
||
|
|
||
|
def getWSDL(self):
|
||
|
"""Return the WSDL object that contains this Message Part."""
|
||
|
return self.parent().parent().parent().parent()
|
||
|
|
||
|
def getTypeDefinition(self):
|
||
|
wsdl = self.getWSDL()
|
||
|
nsuri,name = self.type
|
||
|
schema = wsdl.types.get(nsuri, {})
|
||
|
return schema.get(name)
|
||
|
|
||
|
def getElementDeclaration(self):
|
||
|
wsdl = self.getWSDL()
|
||
|
nsuri,name = self.element
|
||
|
schema = wsdl.types.get(nsuri, {})
|
||
|
return schema.get(name)
|
||
|
|
||
|
|
||
|
class PortType(Element):
|
||
|
'''PortType has a anyAttribute, thus must provide for an extensible
|
||
|
mechanism for supporting such attributes. ResourceProperties is
|
||
|
specified in WS-ResourceProperties. wsa:Action is specified in
|
||
|
WS-Address.
|
||
|
|
||
|
Instance Data:
|
||
|
name -- name attribute
|
||
|
resourceProperties -- optional. wsr:ResourceProperties attribute,
|
||
|
value is a QName this is Parsed into a (namespaceURI, name)
|
||
|
that represents a Global Element Declaration.
|
||
|
operations
|
||
|
'''
|
||
|
|
||
|
def __init__(self, name, documentation=''):
|
||
|
Element.__init__(self, name, documentation)
|
||
|
self.operations = Collection(self)
|
||
|
self.resourceProperties = None
|
||
|
|
||
|
def getWSDL(self):
|
||
|
return self.parent().parent()
|
||
|
|
||
|
def getResourceProperties(self):
|
||
|
return self.resourceProperties
|
||
|
|
||
|
def addOperation(self, name, documentation='', parameterOrder=None):
|
||
|
item = Operation(name, documentation, parameterOrder)
|
||
|
self.operations[name] = item
|
||
|
return item
|
||
|
|
||
|
def load(self, element):
|
||
|
self.name = DOM.getAttr(element, 'name')
|
||
|
self.documentation = GetDocumentation(element)
|
||
|
|
||
|
if DOM.hasAttr(element, 'ResourceProperties', WSR.PROPERTIES):
|
||
|
rpref = DOM.getAttr(element, 'ResourceProperties', WSR.PROPERTIES)
|
||
|
self.resourceProperties = ParseQName(rpref, element)
|
||
|
|
||
|
NS_WSDL = DOM.GetWSDLUri(self.getWSDL().version)
|
||
|
elements = DOM.getElements(element, 'operation', NS_WSDL)
|
||
|
for element in elements:
|
||
|
name = DOM.getAttr(element, 'name')
|
||
|
docs = GetDocumentation(element)
|
||
|
param_order = DOM.getAttr(element, 'parameterOrder', default=None)
|
||
|
if param_order is not None:
|
||
|
param_order = param_order.split(' ')
|
||
|
operation = self.addOperation(name, docs, param_order)
|
||
|
|
||
|
item = DOM.getElement(element, 'input', None, None)
|
||
|
if item is not None:
|
||
|
name = DOM.getAttr(item, 'name')
|
||
|
docs = GetDocumentation(item)
|
||
|
msgref = DOM.getAttr(item, 'message')
|
||
|
message = ParseQName(msgref, item)
|
||
|
action = DOM.getAttr(item, 'Action', WSA.ADDRESS, None)
|
||
|
operation.setInput(message, name, docs, action)
|
||
|
|
||
|
item = DOM.getElement(element, 'output', None, None)
|
||
|
if item is not None:
|
||
|
name = DOM.getAttr(item, 'name')
|
||
|
docs = GetDocumentation(item)
|
||
|
msgref = DOM.getAttr(item, 'message')
|
||
|
message = ParseQName(msgref, item)
|
||
|
action = DOM.getAttr(item, 'Action', WSA.ADDRESS, None)
|
||
|
operation.setOutput(message, name, docs, action)
|
||
|
|
||
|
for item in DOM.getElements(element, 'fault', None):
|
||
|
name = DOM.getAttr(item, 'name')
|
||
|
docs = GetDocumentation(item)
|
||
|
msgref = DOM.getAttr(item, 'message')
|
||
|
message = ParseQName(msgref, item)
|
||
|
action = DOM.getAttr(item, 'Action', WSA.ADDRESS, None)
|
||
|
operation.addFault(message, name, docs, action)
|
||
|
|
||
|
|
||
|
|
||
|
class Operation(Element):
|
||
|
def __init__(self, name, documentation='', parameterOrder=None):
|
||
|
Element.__init__(self, name, documentation)
|
||
|
self.parameterOrder = parameterOrder
|
||
|
self.faults = Collection(self)
|
||
|
self.input = None
|
||
|
self.output = None
|
||
|
|
||
|
def getPortType(self):
|
||
|
return self.parent().parent()
|
||
|
|
||
|
def getInputAction(self):
|
||
|
"""wsa:Action attribute"""
|
||
|
return GetWSAActionInput(self)
|
||
|
|
||
|
def getInputMessage(self):
|
||
|
if self.input is None:
|
||
|
return None
|
||
|
wsdl = self.getPortType().getWSDL()
|
||
|
return wsdl.messages[self.input.message]
|
||
|
|
||
|
def getOutputAction(self):
|
||
|
"""wsa:Action attribute"""
|
||
|
return GetWSAActionOutput(self)
|
||
|
|
||
|
def getOutputMessage(self):
|
||
|
if self.output is None:
|
||
|
return None
|
||
|
wsdl = self.getPortType().getWSDL()
|
||
|
return wsdl.messages[self.output.message]
|
||
|
|
||
|
def getFaultAction(self, name):
|
||
|
"""wsa:Action attribute"""
|
||
|
return GetWSAActionFault(self, name)
|
||
|
|
||
|
def getFaultMessage(self, name):
|
||
|
wsdl = self.getPortType().getWSDL()
|
||
|
return wsdl.messages[self.faults[name].message]
|
||
|
|
||
|
def addFault(self, message, name, documentation='', action=None):
|
||
|
if self.faults.has_key(name):
|
||
|
raise WSDLError(
|
||
|
'Duplicate fault element: %s' % name
|
||
|
)
|
||
|
item = MessageRole('fault', message, name, documentation, action)
|
||
|
self.faults[name] = item
|
||
|
return item
|
||
|
|
||
|
def setInput(self, message, name='', documentation='', action=None):
|
||
|
self.input = MessageRole('input', message, name, documentation, action)
|
||
|
return self.input
|
||
|
|
||
|
def setOutput(self, message, name='', documentation='', action=None):
|
||
|
self.output = MessageRole('output', message, name, documentation, action)
|
||
|
return self.output
|
||
|
|
||
|
|
||
|
class MessageRole(Element):
|
||
|
def __init__(self, type, message, name='', documentation='', action=None):
|
||
|
Element.__init__(self, name, documentation)
|
||
|
self.message = message
|
||
|
self.type = type
|
||
|
self.action = action
|
||
|
|
||
|
|
||
|
class Binding(Element):
|
||
|
def __init__(self, name, type, documentation=''):
|
||
|
Element.__init__(self, name, documentation)
|
||
|
self.operations = Collection(self)
|
||
|
self.type = type
|
||
|
|
||
|
def getWSDL(self):
|
||
|
"""Return the WSDL object that contains this binding."""
|
||
|
return self.parent().parent()
|
||
|
|
||
|
def getPortType(self):
|
||
|
"""Return the PortType object associated with this binding."""
|
||
|
return self.getWSDL().portTypes[self.type]
|
||
|
|
||
|
def findBinding(self, kind):
|
||
|
for item in self.extensions:
|
||
|
if isinstance(item, kind):
|
||
|
return item
|
||
|
return None
|
||
|
|
||
|
def findBindings(self, kind):
|
||
|
return [ item for item in self.extensions if isinstance(item, kind) ]
|
||
|
|
||
|
def addOperationBinding(self, name, documentation=''):
|
||
|
item = OperationBinding(name, documentation)
|
||
|
self.operations[name] = item
|
||
|
return item
|
||
|
|
||
|
def load(self, elements):
|
||
|
for element in elements:
|
||
|
name = DOM.getAttr(element, 'name')
|
||
|
docs = GetDocumentation(element)
|
||
|
opbinding = self.addOperationBinding(name, docs)
|
||
|
opbinding.load_ex(GetExtensions(element))
|
||
|
|
||
|
item = DOM.getElement(element, 'input', None, None)
|
||
|
if item is not None:
|
||
|
mbinding = MessageRoleBinding('input')
|
||
|
mbinding.documentation = GetDocumentation(item)
|
||
|
opbinding.input = mbinding
|
||
|
mbinding.load_ex(GetExtensions(item))
|
||
|
|
||
|
item = DOM.getElement(element, 'output', None, None)
|
||
|
if item is not None:
|
||
|
mbinding = MessageRoleBinding('output')
|
||
|
mbinding.documentation = GetDocumentation(item)
|
||
|
opbinding.output = mbinding
|
||
|
mbinding.load_ex(GetExtensions(item))
|
||
|
|
||
|
for item in DOM.getElements(element, 'fault', None):
|
||
|
name = DOM.getAttr(item, 'name')
|
||
|
mbinding = MessageRoleBinding('fault', name)
|
||
|
mbinding.documentation = GetDocumentation(item)
|
||
|
opbinding.faults[name] = mbinding
|
||
|
mbinding.load_ex(GetExtensions(item))
|
||
|
|
||
|
def load_ex(self, elements):
|
||
|
for e in elements:
|
||
|
ns, name = e.namespaceURI, e.localName
|
||
|
if ns in DOM.NS_SOAP_BINDING_ALL and name == 'binding':
|
||
|
transport = DOM.getAttr(e, 'transport', default=None)
|
||
|
style = DOM.getAttr(e, 'style', default='document')
|
||
|
ob = SoapBinding(transport, style)
|
||
|
self.addExtension(ob)
|
||
|
continue
|
||
|
elif ns in DOM.NS_HTTP_BINDING_ALL and name == 'binding':
|
||
|
verb = DOM.getAttr(e, 'verb')
|
||
|
ob = HttpBinding(verb)
|
||
|
self.addExtension(ob)
|
||
|
continue
|
||
|
else:
|
||
|
self.addExtension(e)
|
||
|
|
||
|
|
||
|
class OperationBinding(Element):
|
||
|
def __init__(self, name, documentation=''):
|
||
|
Element.__init__(self, name, documentation)
|
||
|
self.input = None
|
||
|
self.output = None
|
||
|
self.faults = Collection(self)
|
||
|
|
||
|
def getBinding(self):
|
||
|
"""Return the parent Binding object of the operation binding."""
|
||
|
return self.parent().parent()
|
||
|
|
||
|
def getOperation(self):
|
||
|
"""Return the abstract Operation associated with this binding."""
|
||
|
return self.getBinding().getPortType().operations[self.name]
|
||
|
|
||
|
def findBinding(self, kind):
|
||
|
for item in self.extensions:
|
||
|
if isinstance(item, kind):
|
||
|
return item
|
||
|
return None
|
||
|
|
||
|
def findBindings(self, kind):
|
||
|
return [ item for item in self.extensions if isinstance(item, kind) ]
|
||
|
|
||
|
def addInputBinding(self, binding):
|
||
|
if self.input is None:
|
||
|
self.input = MessageRoleBinding('input')
|
||
|
self.input.addExtension(binding)
|
||
|
return binding
|
||
|
|
||
|
def addOutputBinding(self, binding):
|
||
|
if self.output is None:
|
||
|
self.output = MessageRoleBinding('output')
|
||
|
self.output.addExtension(binding)
|
||
|
return binding
|
||
|
|
||
|
def addFaultBinding(self, name, binding):
|
||
|
fault = self.get(name, None)
|
||
|
if fault is None:
|
||
|
fault = MessageRoleBinding('fault', name)
|
||
|
fault.addExtension(binding)
|
||
|
return binding
|
||
|
|
||
|
def load_ex(self, elements):
|
||
|
for e in elements:
|
||
|
ns, name = e.namespaceURI, e.localName
|
||
|
if ns in DOM.NS_SOAP_BINDING_ALL and name == 'operation':
|
||
|
soapaction = DOM.getAttr(e, 'soapAction', default=None)
|
||
|
style = DOM.getAttr(e, 'style', default=None)
|
||
|
ob = SoapOperationBinding(soapaction, style)
|
||
|
self.addExtension(ob)
|
||
|
continue
|
||
|
elif ns in DOM.NS_HTTP_BINDING_ALL and name == 'operation':
|
||
|
location = DOM.getAttr(e, 'location')
|
||
|
ob = HttpOperationBinding(location)
|
||
|
self.addExtension(ob)
|
||
|
continue
|
||
|
else:
|
||
|
self.addExtension(e)
|
||
|
|
||
|
|
||
|
class MessageRoleBinding(Element):
|
||
|
def __init__(self, type, name='', documentation=''):
|
||
|
Element.__init__(self, name, documentation)
|
||
|
self.type = type
|
||
|
|
||
|
def findBinding(self, kind):
|
||
|
for item in self.extensions:
|
||
|
if isinstance(item, kind):
|
||
|
return item
|
||
|
return None
|
||
|
|
||
|
def findBindings(self, kind):
|
||
|
return [ item for item in self.extensions if isinstance(item, kind) ]
|
||
|
|
||
|
def load_ex(self, elements):
|
||
|
for e in elements:
|
||
|
ns, name = e.namespaceURI, e.localName
|
||
|
if ns in DOM.NS_SOAP_BINDING_ALL and name == 'body':
|
||
|
encstyle = DOM.getAttr(e, 'encodingStyle', default=None)
|
||
|
namespace = DOM.getAttr(e, 'namespace', default=None)
|
||
|
parts = DOM.getAttr(e, 'parts', default=None)
|
||
|
use = DOM.getAttr(e, 'use', default=None)
|
||
|
if use is None:
|
||
|
raise WSDLError(
|
||
|
'Invalid soap:body binding element.'
|
||
|
)
|
||
|
ob = SoapBodyBinding(use, namespace, encstyle, parts)
|
||
|
self.addExtension(ob)
|
||
|
continue
|
||
|
|
||
|
elif ns in DOM.NS_SOAP_BINDING_ALL and name == 'fault':
|
||
|
encstyle = DOM.getAttr(e, 'encodingStyle', default=None)
|
||
|
namespace = DOM.getAttr(e, 'namespace', default=None)
|
||
|
name = DOM.getAttr(e, 'name', default=None)
|
||
|
use = DOM.getAttr(e, 'use', default=None)
|
||
|
if use is None or name is None:
|
||
|
raise WSDLError(
|
||
|
'Invalid soap:fault binding element.'
|
||
|
)
|
||
|
ob = SoapFaultBinding(name, use, namespace, encstyle)
|
||
|
self.addExtension(ob)
|
||
|
continue
|
||
|
|
||
|
elif ns in DOM.NS_SOAP_BINDING_ALL and name in (
|
||
|
'header', 'headerfault'
|
||
|
):
|
||
|
encstyle = DOM.getAttr(e, 'encodingStyle', default=None)
|
||
|
namespace = DOM.getAttr(e, 'namespace', default=None)
|
||
|
message = DOM.getAttr(e, 'message')
|
||
|
part = DOM.getAttr(e, 'part')
|
||
|
use = DOM.getAttr(e, 'use')
|
||
|
if name == 'header':
|
||
|
_class = SoapHeaderBinding
|
||
|
else:
|
||
|
_class = SoapHeaderFaultBinding
|
||
|
message = ParseQName(message, e)
|
||
|
ob = _class(message, part, use, namespace, encstyle)
|
||
|
self.addExtension(ob)
|
||
|
continue
|
||
|
|
||
|
elif ns in DOM.NS_HTTP_BINDING_ALL and name == 'urlReplacement':
|
||
|
ob = HttpUrlReplacementBinding()
|
||
|
self.addExtension(ob)
|
||
|
continue
|
||
|
|
||
|
elif ns in DOM.NS_HTTP_BINDING_ALL and name == 'urlEncoded':
|
||
|
ob = HttpUrlEncodedBinding()
|
||
|
self.addExtension(ob)
|
||
|
continue
|
||
|
|
||
|
elif ns in DOM.NS_MIME_BINDING_ALL and name == 'multipartRelated':
|
||
|
ob = MimeMultipartRelatedBinding()
|
||
|
self.addExtension(ob)
|
||
|
ob.load_ex(GetExtensions(e))
|
||
|
continue
|
||
|
|
||
|
elif ns in DOM.NS_MIME_BINDING_ALL and name == 'content':
|
||
|
part = DOM.getAttr(e, 'part', default=None)
|
||
|
type = DOM.getAttr(e, 'type', default=None)
|
||
|
ob = MimeContentBinding(part, type)
|
||
|
self.addExtension(ob)
|
||
|
continue
|
||
|
|
||
|
elif ns in DOM.NS_MIME_BINDING_ALL and name == 'mimeXml':
|
||
|
part = DOM.getAttr(e, 'part', default=None)
|
||
|
ob = MimeXmlBinding(part)
|
||
|
self.addExtension(ob)
|
||
|
continue
|
||
|
|
||
|
else:
|
||
|
self.addExtension(e)
|
||
|
|
||
|
|
||
|
class Service(Element):
|
||
|
def __init__(self, name, documentation=''):
|
||
|
Element.__init__(self, name, documentation)
|
||
|
self.ports = Collection(self)
|
||
|
|
||
|
def getWSDL(self):
|
||
|
return self.parent().parent()
|
||
|
|
||
|
def addPort(self, name, binding, documentation=''):
|
||
|
item = Port(name, binding, documentation)
|
||
|
self.ports[name] = item
|
||
|
return item
|
||
|
|
||
|
def load(self, elements):
|
||
|
for element in elements:
|
||
|
name = DOM.getAttr(element, 'name', default=None)
|
||
|
docs = GetDocumentation(element)
|
||
|
binding = DOM.getAttr(element, 'binding', default=None)
|
||
|
if name is None or binding is None:
|
||
|
raise WSDLError(
|
||
|
'Invalid port element.'
|
||
|
)
|
||
|
binding = ParseQName(binding, element)
|
||
|
port = self.addPort(name, binding, docs)
|
||
|
port.load_ex(GetExtensions(element))
|
||
|
|
||
|
def load_ex(self, elements):
|
||
|
for e in elements:
|
||
|
self.addExtension(e)
|
||
|
|
||
|
|
||
|
class Port(Element):
|
||
|
def __init__(self, name, binding, documentation=''):
|
||
|
Element.__init__(self, name, documentation)
|
||
|
self.binding = binding
|
||
|
|
||
|
def getService(self):
|
||
|
"""Return the Service object associated with this port."""
|
||
|
return self.parent().parent()
|
||
|
|
||
|
def getBinding(self):
|
||
|
"""Return the Binding object that is referenced by this port."""
|
||
|
wsdl = self.getService().getWSDL()
|
||
|
return wsdl.bindings[self.binding]
|
||
|
|
||
|
def getPortType(self):
|
||
|
"""Return the PortType object that is referenced by this port."""
|
||
|
wsdl = self.getService().getWSDL()
|
||
|
binding = wsdl.bindings[self.binding]
|
||
|
return wsdl.portTypes[binding.type]
|
||
|
|
||
|
def getAddressBinding(self):
|
||
|
"""A convenience method to obtain the extension element used
|
||
|
as the address binding for the port, or None if undefined."""
|
||
|
for item in self.extensions:
|
||
|
if isinstance(item, SoapAddressBinding) or \
|
||
|
isinstance(item, HttpAddressBinding):
|
||
|
return item
|
||
|
raise WSDLError(
|
||
|
'No address binding found in port.'
|
||
|
)
|
||
|
|
||
|
def load_ex(self, elements):
|
||
|
for e in elements:
|
||
|
ns, name = e.namespaceURI, e.localName
|
||
|
if ns in DOM.NS_SOAP_BINDING_ALL and name == 'address':
|
||
|
location = DOM.getAttr(e, 'location', default=None)
|
||
|
ob = SoapAddressBinding(location)
|
||
|
self.addExtension(ob)
|
||
|
continue
|
||
|
elif ns in DOM.NS_HTTP_BINDING_ALL and name == 'address':
|
||
|
location = DOM.getAttr(e, 'location', default=None)
|
||
|
ob = HttpAddressBinding(location)
|
||
|
self.addExtension(ob)
|
||
|
continue
|
||
|
else:
|
||
|
self.addExtension(e)
|
||
|
|
||
|
|
||
|
class SoapBinding:
|
||
|
def __init__(self, transport, style='rpc'):
|
||
|
self.transport = transport
|
||
|
self.style = style
|
||
|
|
||
|
|
||
|
class SoapAddressBinding:
|
||
|
def __init__(self, location):
|
||
|
self.location = location
|
||
|
|
||
|
|
||
|
class SoapOperationBinding:
|
||
|
def __init__(self, soapAction=None, style=None):
|
||
|
self.soapAction = soapAction
|
||
|
self.style = style
|
||
|
|
||
|
|
||
|
class SoapBodyBinding:
|
||
|
def __init__(self, use, namespace=None, encodingStyle=None, parts=None):
|
||
|
if not use in ('literal', 'encoded'):
|
||
|
raise WSDLError(
|
||
|
'Invalid use attribute value: %s' % use
|
||
|
)
|
||
|
self.encodingStyle = encodingStyle
|
||
|
self.namespace = namespace
|
||
|
if type(parts) in (type(''), type(u'')):
|
||
|
parts = parts.split()
|
||
|
self.parts = parts
|
||
|
self.use = use
|
||
|
|
||
|
class SoapFaultBinding:
|
||
|
def __init__(self, name, use, namespace=None, encodingStyle=None):
|
||
|
if not use in ('literal', 'encoded'):
|
||
|
raise WSDLError(
|
||
|
'Invalid use attribute value: %s' % use
|
||
|
)
|
||
|
self.encodingStyle = encodingStyle
|
||
|
self.namespace = namespace
|
||
|
self.name = name
|
||
|
self.use = use
|
||
|
|
||
|
|
||
|
class SoapHeaderBinding:
|
||
|
def __init__(self, message, part, use, namespace=None, encodingStyle=None):
|
||
|
if not use in ('literal', 'encoded'):
|
||
|
raise WSDLError(
|
||
|
'Invalid use attribute value: %s' % use
|
||
|
)
|
||
|
self.encodingStyle = encodingStyle
|
||
|
self.namespace = namespace
|
||
|
self.message = message
|
||
|
self.part = part
|
||
|
self.use = use
|
||
|
|
||
|
tagname = 'header'
|
||
|
|
||
|
class SoapHeaderFaultBinding(SoapHeaderBinding):
|
||
|
tagname = 'headerfault'
|
||
|
|
||
|
|
||
|
class HttpBinding:
|
||
|
def __init__(self, verb):
|
||
|
self.verb = verb
|
||
|
|
||
|
class HttpAddressBinding:
|
||
|
def __init__(self, location):
|
||
|
self.location = location
|
||
|
|
||
|
|
||
|
class HttpOperationBinding:
|
||
|
def __init__(self, location):
|
||
|
self.location = location
|
||
|
|
||
|
class HttpUrlReplacementBinding:
|
||
|
pass
|
||
|
|
||
|
|
||
|
class HttpUrlEncodedBinding:
|
||
|
pass
|
||
|
|
||
|
|
||
|
class MimeContentBinding:
|
||
|
def __init__(self, part=None, type=None):
|
||
|
self.part = part
|
||
|
self.type = type
|
||
|
|
||
|
|
||
|
class MimeXmlBinding:
|
||
|
def __init__(self, part=None):
|
||
|
self.part = part
|
||
|
|
||
|
|
||
|
class MimeMultipartRelatedBinding:
|
||
|
def __init__(self):
|
||
|
self.parts = []
|
||
|
|
||
|
def load_ex(self, elements):
|
||
|
for e in elements:
|
||
|
ns, name = e.namespaceURI, e.localName
|
||
|
if ns in DOM.NS_MIME_BINDING_ALL and name == 'part':
|
||
|
self.parts.append(MimePartBinding())
|
||
|
continue
|
||
|
|
||
|
|
||
|
class MimePartBinding:
|
||
|
def __init__(self):
|
||
|
self.items = []
|
||
|
|
||
|
def load_ex(self, elements):
|
||
|
for e in elements:
|
||
|
ns, name = e.namespaceURI, e.localName
|
||
|
if ns in DOM.NS_MIME_BINDING_ALL and name == 'content':
|
||
|
part = DOM.getAttr(e, 'part', default=None)
|
||
|
type = DOM.getAttr(e, 'type', default=None)
|
||
|
ob = MimeContentBinding(part, type)
|
||
|
self.items.append(ob)
|
||
|
continue
|
||
|
|
||
|
elif ns in DOM.NS_MIME_BINDING_ALL and name == 'mimeXml':
|
||
|
part = DOM.getAttr(e, 'part', default=None)
|
||
|
ob = MimeXmlBinding(part)
|
||
|
self.items.append(ob)
|
||
|
continue
|
||
|
|
||
|
elif ns in DOM.NS_SOAP_BINDING_ALL and name == 'body':
|
||
|
encstyle = DOM.getAttr(e, 'encodingStyle', default=None)
|
||
|
namespace = DOM.getAttr(e, 'namespace', default=None)
|
||
|
parts = DOM.getAttr(e, 'parts', default=None)
|
||
|
use = DOM.getAttr(e, 'use', default=None)
|
||
|
if use is None:
|
||
|
raise WSDLError(
|
||
|
'Invalid soap:body binding element.'
|
||
|
)
|
||
|
ob = SoapBodyBinding(use, namespace, encstyle, parts)
|
||
|
self.items.append(ob)
|
||
|
continue
|
||
|
|
||
|
|
||
|
class WSDLError(Exception):
|
||
|
pass
|
||
|
|
||
|
|
||
|
|
||
|
def DeclareNSPrefix(writer, prefix, nsuri):
|
||
|
if writer.hasNSPrefix(nsuri):
|
||
|
return
|
||
|
writer.declareNSPrefix(prefix, nsuri)
|
||
|
|
||
|
def ParseTypeRef(value, element):
|
||
|
parts = value.split(':', 1)
|
||
|
if len(parts) == 1:
|
||
|
return (DOM.findTargetNS(element), value)
|
||
|
nsuri = DOM.findNamespaceURI(parts[0], element)
|
||
|
return (nsuri, parts[1])
|
||
|
|
||
|
def ParseQName(value, element):
|
||
|
nameref = value.split(':', 1)
|
||
|
if len(nameref) == 2:
|
||
|
nsuri = DOM.findNamespaceURI(nameref[0], element)
|
||
|
name = nameref[-1]
|
||
|
else:
|
||
|
nsuri = DOM.findTargetNS(element)
|
||
|
name = nameref[-1]
|
||
|
return nsuri, name
|
||
|
|
||
|
def GetDocumentation(element):
|
||
|
docnode = DOM.getElement(element, 'documentation', None, None)
|
||
|
if docnode is not None:
|
||
|
return DOM.getElementText(docnode)
|
||
|
return ''
|
||
|
|
||
|
def GetExtensions(element):
|
||
|
return [ item for item in DOM.getElements(element, None, None)
|
||
|
if item.namespaceURI != DOM.NS_WSDL ]
|
||
|
|
||
|
def GetWSAActionFault(operation, name):
|
||
|
"""Find wsa:Action attribute, and return value or WSA.FAULT
|
||
|
for the default.
|
||
|
"""
|
||
|
attr = operation.faults[name].action
|
||
|
if attr is not None:
|
||
|
return attr
|
||
|
return WSA.FAULT
|
||
|
|
||
|
def GetWSAActionInput(operation):
|
||
|
"""Find wsa:Action attribute, and return value or the default."""
|
||
|
attr = operation.input.action
|
||
|
if attr is not None:
|
||
|
return attr
|
||
|
portType = operation.getPortType()
|
||
|
targetNamespace = portType.getWSDL().targetNamespace
|
||
|
ptName = portType.name
|
||
|
msgName = operation.input.name
|
||
|
if not msgName:
|
||
|
msgName = operation.name + 'Request'
|
||
|
if targetNamespace.endswith('/'):
|
||
|
return '%s%s/%s' %(targetNamespace, ptName, msgName)
|
||
|
return '%s/%s/%s' %(targetNamespace, ptName, msgName)
|
||
|
|
||
|
def GetWSAActionOutput(operation):
|
||
|
"""Find wsa:Action attribute, and return value or the default."""
|
||
|
attr = operation.output.action
|
||
|
if attr is not None:
|
||
|
return attr
|
||
|
targetNamespace = operation.getPortType().getWSDL().targetNamespace
|
||
|
ptName = operation.getPortType().name
|
||
|
msgName = operation.output.name
|
||
|
if not msgName:
|
||
|
msgName = operation.name + 'Response'
|
||
|
if targetNamespace.endswith('/'):
|
||
|
return '%s%s/%s' %(targetNamespace, ptName, msgName)
|
||
|
return '%s/%s/%s' %(targetNamespace, ptName, msgName)
|
||
|
|
||
|
def FindExtensions(object, kind, t_type=type(())):
|
||
|
if isinstance(kind, t_type):
|
||
|
result = []
|
||
|
namespaceURI, name = kind
|
||
|
return [ item for item in object.extensions
|
||
|
if hasattr(item, 'nodeType') \
|
||
|
and DOM.nsUriMatch(namespaceURI, item.namespaceURI) \
|
||
|
and item.name == name ]
|
||
|
return [ item for item in object.extensions if isinstance(item, kind) ]
|
||
|
|
||
|
def FindExtension(object, kind, t_type=type(())):
|
||
|
if isinstance(kind, t_type):
|
||
|
namespaceURI, name = kind
|
||
|
for item in object.extensions:
|
||
|
if hasattr(item, 'nodeType') \
|
||
|
and DOM.nsUriMatch(namespaceURI, item.namespaceURI) \
|
||
|
and item.name == name:
|
||
|
return item
|
||
|
else:
|
||
|
for item in object.extensions:
|
||
|
if isinstance(item, kind):
|
||
|
return item
|
||
|
return None
|
||
|
|
||
|
|
||
|
class SOAPCallInfo:
|
||
|
"""SOAPCallInfo captures the important binding information about a
|
||
|
SOAP operation, in a structure that is easier to work with than
|
||
|
raw WSDL structures."""
|
||
|
|
||
|
def __init__(self, methodName):
|
||
|
self.methodName = methodName
|
||
|
self.inheaders = []
|
||
|
self.outheaders = []
|
||
|
self.inparams = []
|
||
|
self.outparams = []
|
||
|
self.retval = None
|
||
|
|
||
|
encodingStyle = DOM.NS_SOAP_ENC
|
||
|
documentation = ''
|
||
|
soapAction = None
|
||
|
transport = None
|
||
|
namespace = None
|
||
|
location = None
|
||
|
use = 'encoded'
|
||
|
style = 'rpc'
|
||
|
|
||
|
def addInParameter(self, name, type, namespace=None, element_type=0):
|
||
|
"""Add an input parameter description to the call info."""
|
||
|
parameter = ParameterInfo(name, type, namespace, element_type)
|
||
|
self.inparams.append(parameter)
|
||
|
return parameter
|
||
|
|
||
|
def addOutParameter(self, name, type, namespace=None, element_type=0):
|
||
|
"""Add an output parameter description to the call info."""
|
||
|
parameter = ParameterInfo(name, type, namespace, element_type)
|
||
|
self.outparams.append(parameter)
|
||
|
return parameter
|
||
|
|
||
|
def setReturnParameter(self, name, type, namespace=None, element_type=0):
|
||
|
"""Set the return parameter description for the call info."""
|
||
|
parameter = ParameterInfo(name, type, namespace, element_type)
|
||
|
self.retval = parameter
|
||
|
return parameter
|
||
|
|
||
|
def addInHeaderInfo(self, name, type, namespace, element_type=0,
|
||
|
mustUnderstand=0):
|
||
|
"""Add an input SOAP header description to the call info."""
|
||
|
headerinfo = HeaderInfo(name, type, namespace, element_type)
|
||
|
if mustUnderstand:
|
||
|
headerinfo.mustUnderstand = 1
|
||
|
self.inheaders.append(headerinfo)
|
||
|
return headerinfo
|
||
|
|
||
|
def addOutHeaderInfo(self, name, type, namespace, element_type=0,
|
||
|
mustUnderstand=0):
|
||
|
"""Add an output SOAP header description to the call info."""
|
||
|
headerinfo = HeaderInfo(name, type, namespace, element_type)
|
||
|
if mustUnderstand:
|
||
|
headerinfo.mustUnderstand = 1
|
||
|
self.outheaders.append(headerinfo)
|
||
|
return headerinfo
|
||
|
|
||
|
def getInParameters(self):
|
||
|
"""Return a sequence of the in parameters of the method."""
|
||
|
return self.inparams
|
||
|
|
||
|
def getOutParameters(self):
|
||
|
"""Return a sequence of the out parameters of the method."""
|
||
|
return self.outparams
|
||
|
|
||
|
def getReturnParameter(self):
|
||
|
"""Return param info about the return value of the method."""
|
||
|
return self.retval
|
||
|
|
||
|
def getInHeaders(self):
|
||
|
"""Return a sequence of the in headers of the method."""
|
||
|
return self.inheaders
|
||
|
|
||
|
def getOutHeaders(self):
|
||
|
"""Return a sequence of the out headers of the method."""
|
||
|
return self.outheaders
|
||
|
|
||
|
|
||
|
class ParameterInfo:
|
||
|
"""A ParameterInfo object captures parameter binding information."""
|
||
|
def __init__(self, name, type, namespace=None, element_type=0):
|
||
|
if element_type:
|
||
|
self.element_type = 1
|
||
|
if namespace is not None:
|
||
|
self.namespace = namespace
|
||
|
self.name = name
|
||
|
self.type = type
|
||
|
|
||
|
element_type = 0
|
||
|
namespace = None
|
||
|
default = None
|
||
|
|
||
|
|
||
|
class HeaderInfo(ParameterInfo):
|
||
|
"""A HeaderInfo object captures SOAP header binding information."""
|
||
|
def __init__(self, name, type, namespace, element_type=None):
|
||
|
ParameterInfo.__init__(self, name, type, namespace, element_type)
|
||
|
|
||
|
mustUnderstand = 0
|
||
|
actor = None
|
||
|
|
||
|
|
||
|
def callInfoFromWSDL(port, name):
|
||
|
"""Return a SOAPCallInfo given a WSDL port and operation name."""
|
||
|
wsdl = port.getService().getWSDL()
|
||
|
binding = port.getBinding()
|
||
|
portType = binding.getPortType()
|
||
|
operation = portType.operations[name]
|
||
|
opbinding = binding.operations[name]
|
||
|
messages = wsdl.messages
|
||
|
callinfo = SOAPCallInfo(name)
|
||
|
|
||
|
addrbinding = port.getAddressBinding()
|
||
|
if not isinstance(addrbinding, SoapAddressBinding):
|
||
|
raise ValueError, 'Unsupported binding type.'
|
||
|
callinfo.location = addrbinding.location
|
||
|
|
||
|
soapbinding = binding.findBinding(SoapBinding)
|
||
|
if soapbinding is None:
|
||
|
raise ValueError, 'Missing soap:binding element.'
|
||
|
callinfo.transport = soapbinding.transport
|
||
|
callinfo.style = soapbinding.style or 'document'
|
||
|
|
||
|
soap_op_binding = opbinding.findBinding(SoapOperationBinding)
|
||
|
if soap_op_binding is not None:
|
||
|
callinfo.soapAction = soap_op_binding.soapAction
|
||
|
callinfo.style = soap_op_binding.style or callinfo.style
|
||
|
|
||
|
parameterOrder = operation.parameterOrder
|
||
|
|
||
|
if operation.input is not None:
|
||
|
message = messages[operation.input.message]
|
||
|
msgrole = opbinding.input
|
||
|
|
||
|
mime = msgrole.findBinding(MimeMultipartRelatedBinding)
|
||
|
if mime is not None:
|
||
|
raise ValueError, 'Mime bindings are not supported.'
|
||
|
else:
|
||
|
for item in msgrole.findBindings(SoapHeaderBinding):
|
||
|
part = messages[item.message].parts[item.part]
|
||
|
header = callinfo.addInHeaderInfo(
|
||
|
part.name,
|
||
|
part.element or part.type,
|
||
|
item.namespace,
|
||
|
element_type = part.element and 1 or 0
|
||
|
)
|
||
|
header.encodingStyle = item.encodingStyle
|
||
|
|
||
|
body = msgrole.findBinding(SoapBodyBinding)
|
||
|
if body is None:
|
||
|
raise ValueError, 'Missing soap:body binding.'
|
||
|
callinfo.encodingStyle = body.encodingStyle
|
||
|
callinfo.namespace = body.namespace
|
||
|
callinfo.use = body.use
|
||
|
|
||
|
if body.parts is not None:
|
||
|
parts = []
|
||
|
for name in body.parts:
|
||
|
parts.append(message.parts[name])
|
||
|
else:
|
||
|
parts = message.parts.values()
|
||
|
|
||
|
for part in parts:
|
||
|
callinfo.addInParameter(
|
||
|
part.name,
|
||
|
part.element or part.type,
|
||
|
element_type = part.element and 1 or 0
|
||
|
)
|
||
|
|
||
|
if operation.output is not None:
|
||
|
try:
|
||
|
message = messages[operation.output.message]
|
||
|
except KeyError:
|
||
|
if self.strict:
|
||
|
raise RuntimeError(
|
||
|
"Recieved message not defined in the WSDL schema: %s" %
|
||
|
operation.output.message)
|
||
|
else:
|
||
|
message = wsdl.addMessage(operation.output.message)
|
||
|
print "Warning:", \
|
||
|
"Recieved message not defined in the WSDL schema.", \
|
||
|
"Adding it."
|
||
|
print "Message:", operation.output.message
|
||
|
|
||
|
msgrole = opbinding.output
|
||
|
|
||
|
mime = msgrole.findBinding(MimeMultipartRelatedBinding)
|
||
|
if mime is not None:
|
||
|
raise ValueError, 'Mime bindings are not supported.'
|
||
|
else:
|
||
|
for item in msgrole.findBindings(SoapHeaderBinding):
|
||
|
part = messages[item.message].parts[item.part]
|
||
|
header = callinfo.addOutHeaderInfo(
|
||
|
part.name,
|
||
|
part.element or part.type,
|
||
|
item.namespace,
|
||
|
element_type = part.element and 1 or 0
|
||
|
)
|
||
|
header.encodingStyle = item.encodingStyle
|
||
|
|
||
|
body = msgrole.findBinding(SoapBodyBinding)
|
||
|
if body is None:
|
||
|
raise ValueError, 'Missing soap:body binding.'
|
||
|
callinfo.encodingStyle = body.encodingStyle
|
||
|
callinfo.namespace = body.namespace
|
||
|
callinfo.use = body.use
|
||
|
|
||
|
if body.parts is not None:
|
||
|
parts = []
|
||
|
for name in body.parts:
|
||
|
parts.append(message.parts[name])
|
||
|
else:
|
||
|
parts = message.parts.values()
|
||
|
|
||
|
if parts:
|
||
|
# XXX no idea what this is for, but it breaks everything. jrb
|
||
|
#callinfo.setReturnParameter(
|
||
|
# parts[0].name,
|
||
|
# parts[0].element or parts[0].type,
|
||
|
# element_type = parts[0].element and 1 or 0
|
||
|
# )
|
||
|
#for part in parts[1:]:
|
||
|
for part in parts:
|
||
|
callinfo.addOutParameter(
|
||
|
part.name,
|
||
|
part.element or part.type,
|
||
|
element_type = part.element and 1 or 0
|
||
|
)
|
||
|
|
||
|
return callinfo
|