使用场景
云数据库服务端以前支持Node.JS和Java平台的Server SDK。现在介绍一下服务端为Python平台时的使用方法。
集成准备
Python环境配置
1.下载Python和PyCharm并安装。
2.使用安装的python本身作为解释器。
3.安装AGC Python SDK。
AGC环境配置
1.在AGC创建项目和应用
2.开通云数据库服务。
3.选择:我的项目-> 构建 – 云数据库,创建对象类型和存储区,对象类型各字段如下图,存储区名为“QuickStartDemo”。
4.返回项目设置界面,选择Server SDK 页签,在认证凭据处点击创建按钮,然后下载认证凭据。
5.在我的项目-云数据库中导出对象类型的JS件,导入项目目录中
功能实现
引入AGC与云数据库模块
from agconnect.common_server import AGCClient
from agconnect.common_server import CredentialParser
from agconnect.database_server import AGConnectCloudDBException
from agconnect.database_server import CloudDBZoneConfig
from agconnect.database_server import CloudDBZoneQuery
from agconnect.database_server import AGConnectCloudDB
from agconnect.database_server import TransactionFunction, Transaction
from agconnect.common_server import logger
将下载的凭据文件放入项目中,调用AGCClient.initialize方法初始化AGCClient实例
将配置开发环境中获取的认证凭据放置到自定义的目录,通过initialize方法初始化对应数据处理位置的AGCClient实例,调用get_instance取初始化后的AGCClient实例,然后开启存储区“QuickStartDemo”。
bj_wrapper = CloudDbZoneWrapper("clientCN", "CN")
zone_name = "QuickStartDemo"
credential_path = CredentialParser.to_credential("./agc-apiclient-1071113651286461824-7189597874176627946.json")
AGCClient.initialize(self.client_name, credential_path, self.region)
agc_client = AGCClient.get_instance(self.client_name)
logger.info(f"name {agc_client.get_name()}")
AGConnectCloudDB.initialize(agc_client)
cloud_db_zone_config = CloudDBZoneConfig(zone_name)
self.cloud_db_zone = AGConnectCloudDB.get_instance(agc_client).open_cloud_db_zone(cloud_db_zone_config)
写入数据
使用execute_upsert()将BookInfo对象写入至Cloud DB zone中,写入成功后,返回写入的数量;写入失败,捕获异常信息。
async def upsert_book(self, book_info):
if not self.cloud_db_zone:
logger.info("CloudDBClient is null, try re-initialize it")
return
try:
resp = await self.cloud_db_zone.execute_upsert(book_info)
print(resp)
logger.info(f'The number of upsert books is: {resp}')
except Exception as err:
logger.warning(f'upsertInfoBook=> {err}')
调用execute_query()查询数据
查询所有数据
async def query_all_books(self):
if not self.cloud_db_zone:
logger.info("CloudDBClient is null, try re-initialize it")
return
try:
cloud_db_zone_query = CloudDBZoneQuery.where(BookInfo)
resp = await self.cloud_db_zone.execute_query(cloud_db_zone_query)
len(resp.get_snapshot_objects())
logger.info(f'The number of query table is: {len(resp.get_snapshot_objects())}')
except Exception as err:
logger.warning(f'queryAllInfo=> {err}')
查询bookName为the Red And Black的图书
async def query_books_start_at(self):
obj = BookInfo()
obj.set_id(5)
obj.set_book_name("The Red And Black")
obj.set_price(10.99)
try:
cloud_db_zone_query = CloudDBZoneQuery.where(BookInfo).order_by_asc("bookName"). \
order_by_asc("price").start_at(obj)
resp = await self.cloud_db_zone.execute_query(cloud_db_zone_query)
print(len(resp.get_snapshot_objects()))
logger.info(f'The number of query table is: {len(resp.get_snapshot_objects())}')
except Exception as err:
logger.warning(f'queryAllInfo=> {err}')
调用execute_delete()删除数据
删除对应bookinfo对象的数据
async def delete_book(self, book_info):
if not self.cloud_db_zone:
logger.info("CloudDBClient is null, try re-initialize it")
return
try:
resp = await self.cloud_db_zone.execute_delete(book_info)
logger.info(f'The number of delete books is: {resp}')
except Exception as err:
logger.warning(f'deleteInfoBook=> {err}')
删除所有数据
async def delete_all_books(self, book_info):
if not self.cloud_db_zone:
logger.info("CloudDBClient is null, try re-initialize it")
return
try:
resp = await self.cloud_db_zone.execute_delete_all(book_info)
logger.info(f'The number of delete all books is: {resp}')
except Exception as err:
logger.warning(f'deleteAllInfoBook=> {err}')
调用run_transaction()执行事务,实现对云侧存储区数据的管理操作,包含数据的增、删、改、查操作。
使用以下方法,通过事务删除1900年之前发布的书。
async def delete_over_due_books(self, cloud_db_zone_query):
try:
transaction_function_obj = TransactionFunction()
async def apply(transaction: Transaction):
try:
data = await transaction.execute_query(cloud_db_zone_query)
logger.info(f'query entityone num: {str(len(data))}')
except AGConnectCloudDBException as error:
logger.error(error)
return False
return True
transaction_function_obj.apply = apply
res = await self.cloud_db_zone.run_transaction(transaction_function=transaction_function_obj)
print(res)
logger.info(f"the transaction result: {res}")
except AGConnectCloudDBException as err:
logger.error(err.get_error_message())
main.py中执行的方法
new_loop = asyncio.new_event_loop()
asyncio.set_event_loop(new_loop)
loop = asyncio.get_event_loop()
loop.run_until_complete(obj_wrapper.delete_all_books(BookInfo))
loop.run_until_complete(obj_wrapper.upsert_book(book))
loop.run_until_complete(obj_wrapper.upsert_book(book_info_list))
loop.run_until_complete(obj_wrapper.query_all_books())
try:
cloud_db_zone_query = CloudDBZoneQuery.where(BookInfo).less_than(
"price", 50)
res = loop.run_until_complete(obj_wrapper.query_books(cloud_db_zone_query))
print(res)
except Exception as err:
logger.warning(err)
loop.run_until_complete(obj_wrapper.query_books_with_order())
loop.run_until_complete(obj_wrapper.query_books_start_at())
loop.run_until_complete(obj_wrapper.query_average())
try:
date_obj = datetime.datetime.now()
cloud_db_zone_query = CloudDBZoneQuery.where(BookInfo).greater_than_equal_to("publishTime", date_obj)
loop.run_until_complete(obj_wrapper.delete_over_due_books(cloud_db_zone_query))
except Exception as err:
logger.warning(err)
loop.run_until_complete(obj_wrapper.delete_book(book))
loop.run_until_complete(obj_wrapper.delete_book(book_info_list))
功能测试
执行python main.py命令,服务依次执行:
删除所有数据:
插入一个bookinfo对象的数据:
插入bookinfo列表中的多条数据:
查询所有的书,打印出数量:
f'The number of query table is:5
查询价格低于50的书的数量:
f'The number of query table is:3
删除一个bookinfo对象的数据:
删除列表中所有数据:
以上就是服务端集成云数据Python SDK的使用方法,开发指导文档也会在日后上线。