Welcome to mirror list, hosted at ThFree Co, Russian Federation.

dev.gajim.org/gajim/python-nbxmpp.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEmmanuel Gil Peyrot <linkmauve@linkmauve.fr>2020-04-18 16:14:23 +0300
committerPhilipp Hörist <philipp@hoerist.com>2020-04-19 08:35:06 +0300
commitf48e6f59858cfa7a836a12f2a54b26f48e04c60a (patch)
tree235ec30b7c7938337ecf8f9a00a0a7c7621f59bf
parent197813d79753f318714a6c28eedec2d56fba59c8 (diff)
Add typing hints to simplexml
-rw-r--r--nbxmpp/simplexml.py180
1 files changed, 106 insertions, 74 deletions
diff --git a/nbxmpp/simplexml.py b/nbxmpp/simplexml.py
index 67427b1..f72cde5 100644
--- a/nbxmpp/simplexml.py
+++ b/nbxmpp/simplexml.py
@@ -18,14 +18,19 @@ nodes and XML streams. I'm personally using it in many other separate
projects. It is designed to be as standalone as possible
"""
+from __future__ import annotations
+
import logging
import xml.parsers.expat
from xml.parsers.expat import ExpatError
from copy import deepcopy
+from typing import Dict, List, Optional, Union, Iterator, Callable, Any
+
+Attrs = Dict[str, str]
log = logging.getLogger('nbxmpp.simplexml')
-def XMLescape(txt):
+def XMLescape(txt: str) -> str:
"""
Return provided string with symbols & < > " replaced by their respective XML
entities
@@ -54,10 +59,19 @@ class Node:
on the classes tree).
"""
- FORCE_NODE_RECREATION = 0
+ name: str
+ namespace: str
+ attrs: Attrs
+ data: List[str]
+ kids: List[Union[Node, str]]
+ parent: Optional[Node]
+ nsd: Dict[str, str]
+ nsp_cache: Dict[Any, Any]
- def __init__(self, tag=None, attrs=None, payload=None, parent=None, nsp=None,
- node_built=False, node=None):
+ FORCE_NODE_RECREATION = False
+
+ def __init__(self, tag: Optional[str] = None, attrs: Optional[Attrs] = None, payload: Optional[Union[Node, str, List[Union[Node, str]]]] = None, parent: Optional[Node] = None, nsp: Optional[Dict[Any, Any]] = None,
+ node_built: bool = False, node: Optional[Union[Node, Any]] = None) -> None:
"""
Takes "tag" argument as the name of node (prepended by namespace, if
needed and separated from it by a space), attrs dictionary as the set of
@@ -121,7 +135,7 @@ class Node:
else:
self.data.append(str(i))
- def lookup_nsp(self, pfx=''):
+ def lookup_nsp(self, pfx: str = '') -> str:
ns = self.nsd.get(pfx, None)
if ns is None:
ns = self.nsp_cache.get(pfx, None)
@@ -133,7 +147,7 @@ class Node:
return 'http://www.gajim.org/xmlns/undeclared'
return ns
- def __str__(self, fancy=0):
+ def __str__(self, fancy: int = 0) -> str:
"""
Method used to dump node into textual representation. If "fancy" argument
is set to True produces indented output for readability
@@ -178,7 +192,7 @@ class Node:
s += "\n"
return s
- def addChild(self, name=None, attrs=None, payload=None, namespace=None, node=None):
+ def addChild(self, name: Optional[str] = None, attrs: Optional[Attrs] = None, payload: Optional[List[Any]] = None, namespace: Optional[str] = None, node: Optional[Node] = None) -> Node:
"""
If "node" argument is provided, adds it as child node. Else creates new
node from the other arguments' values and adds it as well
@@ -199,25 +213,25 @@ class Node:
self.kids.append(newnode)
return newnode
- def addData(self, data):
+ def addData(self, data: Any) -> None:
"""
Add some CDATA to node
"""
self.data.append(str(data))
- def clearData(self):
+ def clearData(self) -> None:
"""
Remove all CDATA from the node
"""
self.data = []
- def delAttr(self, key):
+ def delAttr(self, key: str) -> None:
"""
Delete an attribute "key"
"""
del self.attrs[key]
- def delChild(self, node, attrs=None):
+ def delChild(self, node: Union[Node, str], attrs: Optional[Attrs] = None) -> Optional[Node]:
"""
Delete the "node" from the node's childs list, if "node" is an instance.
Else delete the first node that have specified name and (optionally)
@@ -225,10 +239,11 @@ class Node:
"""
if not isinstance(node, Node):
node = self.getTag(node, attrs)
+ assert isinstance(node, Node)
self.kids.remove(node)
return node
- def getAttrs(self, copy=False):
+ def getAttrs(self, copy: bool = False) -> Attrs:
"""
Return all node's attributes as dictionary
"""
@@ -236,49 +251,49 @@ class Node:
return deepcopy(self.attrs)
return self.attrs
- def getAttr(self, key):
+ def getAttr(self, key: str) -> Optional[str]:
"""
Return value of specified attribute
"""
return self.attrs.get(key)
- def getChildren(self):
+ def getChildren(self) -> List[Union[Node, str]]:
"""
Return all node's child nodes as list
"""
return self.kids
- def getData(self):
+ def getData(self) -> str:
"""
Return all node CDATA as string (concatenated)
"""
return ''.join(self.data)
- def getName(self):
+ def getName(self) -> str:
"""
Return the name of node
"""
return self.name
- def getNamespace(self):
+ def getNamespace(self) -> str:
"""
Return the namespace of node
"""
return self.namespace
- def getParent(self):
+ def getParent(self) -> Optional[Node]:
"""
Returns the parent of node (if present)
"""
return self.parent
- def getPayload(self):
+ def getPayload(self) -> List[Union[Node, str]]:
"""
Return the payload of node i.e. list of child nodes and CDATA entries.
F.e. for "<node>text1<nodea/><nodeb/> text2</node>" will be returned
list: ['text1', <nodea instance>, <nodeb instance>, ' text2']
"""
- ret = []
+ ret: List[Union[Node, str]] = []
for i in range(len(self.kids)+len(self.data)+1):
try:
if self.data[i]:
@@ -291,33 +306,35 @@ class Node:
pass
return ret
- def getTag(self, name, attrs=None, namespace=None):
+ def getTag(self, name: str, attrs: Optional[Attrs] = None, namespace: Optional[str] = None) -> Optional[Node]:
"""
Filter all child nodes using specified arguments as filter. Return the
first found or None if not found
"""
- return self.getTags(name, attrs, namespace, one=1)
+ tag = self.getTags(name, attrs, namespace, one=True)
+ assert not isinstance(tag, list)
+ return tag
- def getTagAttr(self, tag, attr, namespace=None):
+ def getTagAttr(self, tag: str, attr: str, namespace: Optional[str] = None) -> Optional[str]:
"""
Return attribute value of the child with specified name (or None if no
such attribute)
"""
- try:
- return self.getTag(tag, namespace=namespace).attrs[attr]
- except Exception:
+ node = self.getTag(tag, namespace=namespace)
+ if node is None:
return None
+ return node.getAttr(attr)
- def getTagData(self, tag):
+ def getTagData(self, tag: str) -> Optional[str]:
"""
Return cocatenated CDATA of the child with specified name
"""
- try:
- return self.getTag(tag).getData()
- except Exception:
+ node = self.getTag(tag)
+ if node is None:
return None
+ return node.getData()
- def getTags(self, name, attrs=None, namespace=None, one=0):
+ def getTags(self, name: str, attrs: Optional[Attrs] = None, namespace: Optional[str] = None, one: bool = False) -> Union[List[Node], Node, None]:
"""
Filter all child nodes using specified arguments as filter. Returns the
list of nodes found
@@ -340,7 +357,7 @@ class Node:
return nodes
return None
- def iterTags(self, name, attrs=None, namespace=None):
+ def iterTags(self, name: str, attrs: Optional[Attrs] = None, namespace: Optional[str] = None) -> Iterator[Node]:
"""
Iterate over all children using specified arguments as filter
"""
@@ -357,31 +374,31 @@ class Node:
else:
yield node
- def setAttr(self, key, val):
+ def setAttr(self, key: str, val: str) -> None:
"""
Set attribute "key" with the value "val"
"""
self.attrs[key] = val
- def setData(self, data):
+ def setData(self, data: Any) -> None:
"""
Set node's CDATA to provided string. Resets all previous CDATA!
"""
self.data = [str(data)]
- def setName(self, val):
+ def setName(self, val: str) -> None:
"""
Change the node name
"""
self.name = val
- def setNamespace(self, namespace):
+ def setNamespace(self, namespace: str) -> None:
"""
Changes the node namespace
"""
self.namespace = namespace
- def setParent(self, node):
+ def setParent(self, node: Node) -> None:
"""
Set node's parent to "node". WARNING: do not checks if the parent already
present and not removes the node from the list of childs of previous
@@ -389,7 +406,7 @@ class Node:
"""
self.parent = node
- def setPayload(self, payload, add=0):
+ def setPayload(self, payload: Union[List[Union[Node, str]], Node, str], add: bool = False) -> None:
"""
Set node payload according to the list specified. WARNING: completely
replaces all node's previous content. If you wish just to add child or
@@ -402,17 +419,17 @@ class Node:
else:
self.kids = payload
- def setTag(self, name, attrs=None, namespace=None):
+ def setTag(self, name: str, attrs: Optional[Attrs] = None, namespace: Optional[str] = None) -> Node:
"""
Same as getTag but if the node with specified namespace/attributes not
found, creates such node and returns it
"""
- node = self.getTags(name, attrs, namespace=namespace, one=1)
+ node = self.getTags(name, attrs, namespace=namespace, one=True)
if node:
return node
return self.addChild(name, attrs, namespace=namespace)
- def setTagAttr(self, tag, attr, val, namespace=None):
+ def setTagAttr(self, tag: str, attr: str, val: str, namespace: Optional[str] = None) -> None:
"""
Create new node (if not already present) with name "tag" and set it's
attribute "attr" to value "val"
@@ -422,7 +439,7 @@ class Node:
except Exception:
self.addChild(tag, namespace=namespace, attrs={attr: val})
- def setTagData(self, tag, val, attrs=None):
+ def setTagData(self, tag: str, val: str, attrs: Optional[Attrs] = None) -> None:
"""
Creates new node (if not already present) with name "tag" and
(optionally) attributes "attrs" and sets it's CDATA to string "val"
@@ -432,7 +449,7 @@ class Node:
except Exception:
self.addChild(tag, attrs, payload = [str(val)])
- def getXmlLang(self):
+ def getXmlLang(self) -> Optional[str]:
lang = self.attrs.get('xml:lang')
if lang is not None:
return lang
@@ -441,37 +458,37 @@ class Node:
return self.parent.getXmlLang()
return None
- def has_attr(self, key):
+ def has_attr(self, key: str) -> bool:
"""
Check if node have attribute "key"
"""
return key in self.attrs
- def __getitem__(self, item):
+ def __getitem__(self, item: str) -> Optional[str]:
"""
Return node's attribute "item" value
"""
return self.getAttr(item)
- def __setitem__(self, item, val):
+ def __setitem__(self, item: str, val: str) -> None:
"""
Set node's attribute "item" value
"""
- return self.setAttr(item, val)
+ self.setAttr(item, val)
- def __delitem__(self, item):
+ def __delitem__(self, item: str) -> None:
"""
Delete node's attribute "item"
"""
- return self.delAttr(item)
+ self.delAttr(item)
- def __contains__(self, item):
+ def __contains__(self, item: str) -> bool:
"""
Check if node has attribute "item"
"""
return self.has_attr(item)
- def __getattr__(self, attr):
+ def __getattr__(self, attr: str) -> Union['T', 'NT']:
"""
Reduce memory usage caused by T/NT classes - use memory only when needed
"""
@@ -533,8 +550,23 @@ class NodeBuilder:
XML handler
"""
- def __init__(self, data=None, initial_node=None,
- dispatch_depth=1, finished=True):
+ _parser: Any
+ Parse: Callable[[str, bool], None]
+ __depth: int
+ __last_depth: int
+ __max_depth: int
+ _dispatch_depth: int
+ _document_attrs: Optional[Attrs]
+ _document_nsp: Optional[Dict[str, str]]
+ _mini_dom: Optional[Node]
+ last_is_data: bool
+ _ptr: Optional[Node]
+ data_buffer: Optional[List[str]]
+ streamError: str
+ _is_stream: bool
+
+ def __init__(self, data: Optional[str] = None, initial_node: Optional[Node] = None,
+ dispatch_depth: int = 1, finished: bool = True) -> None:
"""
Take two optional parameters: "data" and "initial_node"
@@ -565,7 +597,7 @@ class NodeBuilder:
self._document_attrs = None
self._document_nsp = None
self._mini_dom = initial_node
- self.last_is_data = 1
+ self.last_is_data = True
self._ptr = None
self.data_buffer = None
self.streamError = ''
@@ -573,13 +605,13 @@ class NodeBuilder:
if data:
self._parser.Parse(data, finished)
- def check_data_buffer(self):
+ def check_data_buffer(self) -> None:
if self.data_buffer:
self._ptr.data.append(''.join(self.data_buffer))
del self.data_buffer[:]
self.data_buffer = None
- def destroy(self):
+ def destroy(self) -> None:
"""
Method used to allow class instance to be garbage-collected
"""
@@ -589,7 +621,7 @@ class NodeBuilder:
self._parser.CharacterDataHandler = None
self._parser.StartNamespaceDeclHandler = None
- def starttag(self, tag, attrs):
+ def starttag(self, tag: str, attrs: Attrs) -> None:
"""
XML Parser callback. Used internally
"""
@@ -627,9 +659,9 @@ class NodeBuilder:
raise ValueError(str(e))
if not self.last_is_data and self._ptr.parent:
self._ptr.parent.data.append('')
- self.last_is_data = 0
+ self.last_is_data = False
- def _check_stream_start(self, ns, tag):
+ def _check_stream_start(self, ns: str, tag: str) -> None:
if self._is_stream:
if ns != 'http://etherx.jabber.org/streams' or tag != 'stream':
raise ValueError('Incorrect stream start: (%s,%s). Terminating.'
@@ -637,7 +669,7 @@ class NodeBuilder:
else:
self.stream_header_received()
- def endtag(self, tag):
+ def endtag(self, tag: str) -> None:
"""
XML Parser callback. Used internally
"""
@@ -656,70 +688,70 @@ class NodeBuilder:
else:
log.debug("Got higher than dispatch level. Stream terminated?")
self._dec_depth()
- self.last_is_data = 0
+ self.last_is_data = False
if self.__depth == 0:
self.stream_footer_received()
- def handle_cdata(self, data):
+ def handle_cdata(self, data: str) -> None:
if self.last_is_data:
if self.data_buffer:
self.data_buffer.append(data)
elif self._ptr:
self.data_buffer = [data]
- self.last_is_data = 1
+ self.last_is_data = True
@staticmethod
- def handle_invalid_xmpp_element(*args):
+ def handle_invalid_xmpp_element(*args: Any) -> None:
raise ExpatError('Found invalid xmpp stream element: %s' % str(args))
- def handle_namespace_start(self, _prefix, _uri):
+ def handle_namespace_start(self, _prefix: str, _uri: str) -> None:
"""
XML Parser callback. Used internally
"""
self.check_data_buffer()
- def getDom(self):
+ def getDom(self) -> Optional[Node]:
"""
Return just built Node
"""
self.check_data_buffer()
return self._mini_dom
- def dispatch(self, stanza):
+ def dispatch(self, stanza: Any) -> None:
"""
Get called when the NodeBuilder reaches some level of depth on it's way
up with the built node as argument. Can be redefined to convert incoming
XML stanzas to program events
"""
- def stream_header_received(self):
+ def stream_header_received(self) -> None:
"""
Method called when stream just opened
"""
self.check_data_buffer()
- def stream_footer_received(self):
+ def stream_footer_received(self) -> None:
"""
Method called when stream just closed
"""
self.check_data_buffer()
- def has_received_endtag(self, level=0):
+ def has_received_endtag(self, level: int = 0) -> bool:
"""
Return True if at least one end tag was seen (at level)
"""
return self.__depth <= level < self.__max_depth
- def _inc_depth(self):
+ def _inc_depth(self) -> None:
self.__last_depth = self.__depth
self.__depth += 1
self.__max_depth = max(self.__depth, self.__max_depth)
- def _dec_depth(self):
+ def _dec_depth(self) -> None:
self.__last_depth = self.__depth
self.__depth -= 1
-def XML2Node(xml):
+def XML2Node(xml: str) -> Optional[Node]:
"""
Convert supplied textual string into XML node. Handy f.e. for reading
configuration file. Raises xml.parser.expat.parsererror if provided string
@@ -727,7 +759,7 @@ def XML2Node(xml):
"""
return NodeBuilder(xml).getDom()
-def BadXML2Node(xml):
+def BadXML2Node(xml: str) -> Optional[Node]:
"""
Convert supplied textual string into XML node. Survives if xml data is
cutted half way round. I.e. "<html>some text <br>some more text". Will raise