Monotone-Parent: 11e7712d6d915167dfc15be7f67ab23f7f9e7cc7

Monotone-Revision: 1c96c7fdd2f2dab8f4bc3173deb36b069ade3bf1

Monotone-Author: wsourdeau@inverse.ca
Monotone-Date: 2009-08-05T15:34:45
Monotone-Branch: ca.inverse.sogo
maint-2.0.2
Wolfgang Sourdeau 2009-08-05 15:34:45 +00:00
parent d6bb70c29c
commit 5b3592c974
5 changed files with 351 additions and 0 deletions

14
Tests/README 100644
View File

@ -0,0 +1,14 @@
setup
-----
(you need "python-xml" in order to run the scripts)
1) copy testconfig.py.in to testconfig.py (make sure to never EVER add it to monotone)
2) edit testconfig.py to suit your environment
3) run the test scripts
other
-----
propfind.py - a sample implementation of a PROPFIND request using webdavlib

39
Tests/propfind.py 100755
View File

@ -0,0 +1,39 @@
#!/usr/bin/python
from testconfig import hostname, port, username, password
import webdavlib
import sys
import getopt
def parseArguments():
arguments = {}
depth = "0"
quiet = False
(opts, args) = getopt.getopt(sys.argv[1:], "d:q", ["depth=", "quiet"])
for pair in opts:
print pair
if (pair[0] == "-d" or pair[0] == "--depth"):
depth = pair[1]
elif (pair[0] == "-q" or pair[0] == "--quiet"):
quiet = True
# print "depth: " + depth
nargs = len(args)
if (nargs > 0):
resource = args[0]
if (nargs > 1):
properties = args[1:]
else:
properties = [ "allprop" ]
else:
print "resource required"
sys.exit(-1)
client = webdavlib.WebDAVClient(hostname, port, username, password)
propfind = webdavlib.WebDAVPROPFIND(resource, properties, depth)
client.execute(propfind)

View File

@ -0,0 +1,4 @@
hostname = "localhost"
port = "80"
username = "myuser"
password = "mypass"

View File

@ -0,0 +1,71 @@
#!/usr/bin/python
from testconfig import hostname, port, username, password
import sys
import unittest
import webdavlib
import xml.xpath
import time
resource = '/SOGo/dav/%s/Calendar/test-webdavsync/' % username
class WebdavSyncTest(unittest.TestCase):
def setUp(self):
self.client = webdavlib.WebDAVClient(hostname, port,
username, password)
def tearDown(self):
delete = webdavlib.WebDAVDELETE(resource)
self.client.execute(delete)
def _xpath_query(self, query, top_node):
xpath_context = xml.xpath.CreateContext(top_node)
xpath_context.setNamespaces({ "D": "DAV:" })
return xml.xpath.Evaluate(query, None, xpath_context)
def test(self):
# missing tests:
# invalid tokens: negative, non-numeric, > current timestamp
# non-empty collections: token validity, status codes for added,
# modified and removed elements
# preparation
mkcol = webdavlib.WebDAVMKCOL(resource)
self.client.execute(mkcol)
self.assertEquals(mkcol.response["status"], 201,
"preparation: failure creating collection (code != 201)")
# test queries:
# empty collection:
# without a token (query1)
# with a token (query2)
# non-empty collection:
# without a token (query3)
# with a token (query4)
query1 = webdavlib.WebDAVSyncQuery(resource, None, [ "getetag" ])
self.client.execute(query1)
self.assertEquals(query1.response["status"], 207,
("query1: invalid status code: %d (!= 207)"
% query1.response["status"]))
token_node = self._xpath_query("/D:multistatus/D:sync-token",
query1.response["document"])[0]
# Implicit "assertion": we expect SOGo to return a token node, with a
# non-empty numerical value. Anything else will trigger an exception
token = int(token_node.childNodes[0].nodeValue)
self.assertTrue(token > 0)
self.assertTrue(token < int(query1.start))
# we make sure that any token is invalid when the collection is empty
query2 = webdavlib.WebDAVSyncQuery(resource, "1234", [ "getetag" ])
self.client.execute(query2)
self.assertEquals(query2.response["status"], 403)
cond_nodes = self._xpath_query("/D:error/D:valid-sync-token",
query2.response["document"])
self.assertTrue(len(cond_nodes) > 0,
"expected 'valid-sync-token' condition error")
if __name__ == "__main__":
unittest.main()

223
Tests/webdavlib.py 100644
View File

@ -0,0 +1,223 @@
import httplib
import M2Crypto.httpslib
import time
import xml.sax.saxutils
import xml.dom.ext.reader.Sax2
import sys
class WebDAVClient:
def __init__(self, hostname, port, username, password, forcessl = False):
if port == "443" or forcessl:
self.conn = M2Crypto.httpslib.HTTPSConnection(hostname, int(port),
True)
else:
self.conn = httplib.HTTPConnection(hostname, port, True)
self.simpleauth_hash = (("%s:%s" % (username, password))
.encode('base64')[:-1])
def _prepare_headers(self, query, body):
headers = { "User-Agent": "Mozilla/5.0",
"content-length": len(body),
"authorization": "Basic %s" % self.simpleauth_hash }
if query.__dict__.has_key("query") and query.depth is not None:
headers["depth"] = query.depth
if query.__dict__.has_key("content_type"):
headers["content-type"] = query.content_type
return headers
def execute(self, query):
body = query.render()
query.start = time.time()
self.conn.request(query.method, query.url,
body, self._prepare_headers(query, body))
query.set_response(self.conn.getresponse());
query.duration = time.time() - query.start
class HTTPSimpleQuery:
method = None
def __init__(self, url):
self.url = url
self.response = None
self.start = -1
self.duration = -1
def render():
return None
class HTTPGET(HTTPSimpleQuery):
method = "GET"
class HTTPQuery(HTTPSimpleQuery):
def __init__(self, url, content_type):
HTTPSimpleQuery.__init__(self, url)
self.content_type = content_type
def set_response(self, http_response):
headers = {}
for rk, rv in http_response.getheaders():
k = rk.lower()
headers[k] = rv
self.response = { "headers": headers,
"status": http_response.status,
"version": http_response.version,
"body": http_response.read() }
class HTTPPUT(HTTPQuery):
method = "PUT"
def __init__(self, url, content, content_type = "application/octet-stream"):
HTTPQuery.__init__(self, url, content_type)
self.content = content
def render(self):
return self.content
class WebDAVQuery(HTTPQuery):
method = None
def __init__(self, url, depth = None):
HTTPQuery.__init__(self, url, "application/xml; charset=\"utf-8\"")
self.depth = depth
self.ns_mgr = _WD_XMLNS_MGR()
self.top_node = None
self.xml_response = None
def render(self):
if self.top_node is not None:
text = ("<?xml version=\"1.0\" encoding=\"utf-8\"?>\n%s"
% self.top_node.render(self.ns_mgr.render()))
else:
text = ""
return text
def render_tag(self, tag):
cb = tag.find("}")
if cb > -1:
ns = tag[1:cb]
real_tag = tag[cb+1:]
new_tag = self.ns_mgr.register(real_tag, ns)
else:
new_tag = tag
return new_tag
def set_response(self, http_response):
HTTPQuery.set_response(self, http_response)
headers = self.response["headers"]
if (headers.has_key("content-type")
and headers.has_key("content-length")
and (headers["content-type"].startswith("application/xml")
or headers["content-type"].startswith("text/xml"))
and int(headers["content-length"]) > 0):
dom_response = xml.dom.ext.reader.Sax2.FromXml(self.response["body"])
self.response["document"] = dom_response.documentElement
class WebDAVMKCOL(WebDAVQuery):
method = "MKCOL"
class WebDAVDELETE(WebDAVQuery):
method = "DELETE"
class WebDAVREPORT(WebDAVQuery):
method = "REPORT"
class WebDAVPROPFIND(WebDAVQuery):
method = "PROPFIND"
def __init__(self, url, properties, depth = None):
WebDAVQuery.__init__(self, url, depth)
self.top_node = _WD_XMLTreeElement("propfind")
props = _WD_XMLTreeElement("prop")
self.top_node.append(props)
for prop in properties:
prop_tag = self.render_tag(prop)
props.append(_WD_XMLTreeElement(prop_tag))
class WebDAVSyncQuery(WebDAVREPORT):
def __init__(self, url, token, properties):
WebDAVQuery.__init__(self, url)
self.top_node = _WD_XMLTreeElement("sync-collection")
sync_token = _WD_XMLTreeElement("sync-token")
self.top_node.append(sync_token)
if token is not None:
sync_token.append(_WD_XMLTreeTextNode(token))
props = _WD_XMLTreeElement("prop")
self.top_node.append(props)
for prop in properties:
prop_tag = self.render_tag(prop)
props.append(_WD_XMLTreeElement(prop_tag))
# private classes to handle XML stuff
class _WD_XMLNS_MGR:
def __init__(self):
self.xmlns = {}
self.counter = 0
def render(self):
text = " xmlns=\"DAV:\""
for k in self.xmlns:
text = text + " xmlns:%s=\"%s\"" % (self.xmlns[k], k)
return text
def create_key(self, namespace):
new_nssym = "n%d" % self.counter
self.counter = self.counter + 1
self.xmlns[namespace] = new_nssym
return new_nssym
def register(self, tag, namespace):
if namespace != "DAV:":
if self.xmlns.has_key(namespace):
key = self.xmlns[namespace]
else:
key = self.create_key(namespace)
else:
key = None
if key is not None:
newTag = "%s:%s" % (key, tag)
else:
newTag = tag
return newTag
class _WD_XMLTreeElement:
def __init__(self, tag):
self.tag = tag
self.children = []
def append(self, child):
self.children.append(child)
def render(self, ns_text = None):
text = "<" + self.tag
if ns_text is not None:
text = text + ns_text
count = len(self.children)
if count > 0:
text = text + ">"
for x in range(0, count):
text = text + self.children[x].render()
text = text + "</" + self.tag + ">"
else:
text = text + "/>"
return text
class _WD_XMLTreeTextNode:
def __init__(self, text):
self.text = xml.sax.saxutils.escape(text)
def render(self):
return self.text