1
0
mirror of https://github.com/ubf/ubf.git synced 2026-04-18 02:35:41 +00:00
Files
ubf/priv/python/jsonrpc.py
2009-06-12 11:43:38 +09:00

82 lines
2.1 KiB
Python
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/python
#
# $Id$
#
import json
import urllib2
import time
import pyubf
from pyubf import Atom, Integer
class JsonResError(RuntimeError):
pass
class JsonFault(RuntimeError):
pass
def do_rpc(UserId, method, *params):
id = getNewId(UserId)
jsonId = pyubf.to_pyjson(id)
jsonMethod = pyubf.to_pyjson(method)
jsonParams = [ pyubf.to_pyjson(param) for param in params ]
jsonReq = json.write(dict(version='1.1', id=jsonId, method=jsonMethod, params=jsonParams))
#print jsonReq
url = 'http://localhost:7590/rpc'
cookie = auth_cookie(UserId)
headers = {'X-temp-auth-info' : cookie }
req = urllib2.Request(url, jsonReq, headers)
res = urllib2.urlopen(req)
jsonRes = json.read(res.read())
#print jsonRes
if jsonRes["id"] != jsonId:
raise JsonResError("Invalid request id (is: %s, expected: %s)" % (jsonRes["id"], jsonId))
if jsonRes["error"] is not None:
raise JsonFault("JSON Error", pyubf.from_pyjson(jsonRes["error"]))
return pyubf.from_pyjson(jsonRes["result"])
def do_request(url,UserId, method, *params):
id = getNewId(UserId)
url = 'http://localhost:7590/vcard'
cookie = auth_cookie(UserId)
headers = {'X-temp-auth-info' : cookie }
headers["HOST"]="localhost"
req = urllib2.Request(url, None, headers)
res = urllib2.urlopen(req)
return res.read()
def getNewId(Tag):
"""Build a new Id"""
t1 = time.time()
sid = str(Tag) + '_' + str(t1)
return sid
def auth_cookie(UserId):
id = 'uraid' + str(UserId)
strid = str(id)
return strid[::-1]
if __name__ == "__main__":
UserId = 1
# profile get
res1 = do_rpc(UserId, 'profile_get', Atom('user'), None, Atom('infinity'))
assert res1[0] == 'ok'
profile = res1[1]
ts = res1[2]
print profile
# profile set
profile[1].update({'cos' : Atom('paying')})
res2 = do_rpc(UserId, 'profile_set', profile, ts, Atom('infinity'))
assert res2[0] == 'ok'
assert res2[1] != ts
# profile get
res3 = do_rpc(UserId, 'profile_get', Atom('user'), None, Atom('infinity'))
assert res3[0] == 'ok'
assert res3[1] == profile
assert res3[2] == res2[1]