问题描述
客户要求我们开发一个后台实时定位系统,该系统能够实时监控客户系统里面会员所在的位置,并将会员的位置信息显示在地图上。服务器后端是PHP开发的后台,主要是讲究效率。会员展示的前端是一个微信小程序,那么,前端可通过微信小程序提供的wx.startLocationUpdateBackground、wx.onLocationChange来实时获取当前的位置信息。
具体构思如下:
- 在会员进入相应的小程序页面时,调用wx.startLocationUpdateBackground方法,开启后台定位功能。
- 当会员的位置发生变化时,会触发wx.onLocationChange事件,我们可以获取到当前的位置信息,并使用地图API将会员的位置提交到API服务。
- API服务接收到会员的位置信息后,将会员的位置信息使用redis.publish方法发布到指定的频道,供后台实时定位系统接收。
- 使用docker服务部署一个python服务,该服务监听redis的指定频道,并接收到会员的位置信息后,判断是否有操作员在监听该会员的位置信息,如果有,则将会员的位置信息发送给操作员。
- 操作员客户端收到会员的位置信息后,将会员的位置信息显示在地图上。
下面是具体的实现步骤:
- 准备工作:申请微信小程序的appid和secret,并在微信小程序后台配置好后台定位功能。其实在测试阶段,这些wx.startLocationUpdateBackground、wx.onLocationChange在体验版中都是开放的,但在正式版中需要申请权限。我们可以先在体验版中测试后台定位功能是否正常工作。
- 小程序端:在小程序端,我们需要调用wx.startLocationUpdateBackground方法开启后台定位功能,并在wx.onLocationChange事件中获取当前的位置信息。
methods: {
locationListener() {
uni.startLocationUpdateBackground({
success: () => {
uni.onLocationChange((res) => {
this.$api.sendRequest({
url: '/api/location/move',
data: res
})
});
},
fail: err => {
console.log('监听位置失败 -> ', err);
}
})
},
locationStart() {
uni.getSetting({
success: (res) => {
if (res.authSetting['scope.userLocationBackground']) {
this.locationListener();
} else {
uni.authorize({
scope: 'scope.userLocationBackground',
success: () => {
this.locationListener();
},
fail: () => {
uni.showModal({
title: '提示',
content: '授权失败,点击右上角设置位置为使用时和离开后!',
success: r => {
if (r.confirm) {
uni.openSetting({
complete: t => {
if (t.authSetting['scope.userLocationBackground']) {
this.locationListener();
} else {
console.log('授权失败 ->', t);
}
}
})
}
}
})
}
})
}
}
})
}
}
- 修改mainifest.json文件,添加后台定位权限
"mp-weixin" : {
"permissions": {
"scope.userLocationBackground" : {
"desc" : "为了给您提供更精准、便捷的服务,我们需要获取您的后台持续获取位置信息。我们承诺会严格保护您的隐私,仅在实现上述功能时使用您的位置信息,感谢您的理解与支持!"
},
"scope.userLocation" : {
"desc" : "为了为您提供更好更精准的服务,需要获得您当前位置信息。"
}
},
"requiredPrivateInfos" : [
"onLocationChange",
"startLocationUpdateBackground"
],
"requiredBackgroundModes" : [ "location" ],
}
- PHP服务端
Cache::store('redis')->publish("location_update", json_encode([
'uid' => $uid,
'longitude' => input('longitude', ''),
'latitude' => input('latitude', ''),
]));
- Python服务端 (这里需要注意:访问的redis服务端是宿主机的redis,所以需要将redis的host设置为宿主机的IP地址,否则会出现连接失败的情况,因为服务器的python环境是python2。如果服务器的python本身就是python3,不需要构建docker镜像,直接运行即可。)
#!/usr/bin/env python3
import asyncio
import json
import logging
from redis.asyncio import Redis
from redis.exceptions import RedisError
from websockets.server import serve, WebSocketServerProtocol
from typing import Set, Dict
from datetime import datetime
# 设置更详细的日志格式
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
class LocationServer:
def __init__(self):
self.connections: Dict[str, Set[WebSocketServerProtocol]] = {}
self.redis = None
self.pubsub = None
self.redis_config = {
'host': '127.0.0.1',
'port': 6379,
'password': None,
'db': 0,
'decode_responses': True,
'retry_on_timeout': True,
'socket_keepalive': True
}
def log_connections_status(self):
"""记录当前连接状态"""
total_connections = sum(len(clients) for clients in self.connections.values())
logger.info(f"Current connections status:")
logger.info(f"Total connected clients: {total_connections}")
for uid, clients in self.connections.items():
logger.info(f"Member {uid}: {len(clients)} client(s)")
async def init_redis(self):
max_retries = 5
retry_count = 0
while retry_count < max_retries:
try:
if self.redis:
await self.redis.close()
self.redis = Redis(**self.redis_config)
self.pubsub = self.redis.pubsub()
await self.redis.ping()
logger.info("Redis connection established")
return True
except Exception as e:
retry_count += 1
logger.error(f"Redis connection attempt {retry_count} failed: {e}")
if retry_count < max_retries:
await asyncio.sleep(5)
else:
logger.error("Max retries reached, could not connect to Redis")
return False
async def redis_subscriber(self):
while True:
try:
if self.redis is None or not await self.redis.ping():
logger.info("Trying to connect to Redis...")
if not await self.init_redis():
await asyncio.sleep(5)
continue
await self.pubsub.subscribe('location_update')
logger.info("Successfully subscribed to location_update channel")
while True:
message = await self.pubsub.get_message(ignore_subscribe_messages=True)
if message and message['type'] == 'message':
try:
data = json.loads(message['data'])
uid = str(data.get('uid'))
logger.info(f"Received location update for member {uid}")
if uid in self.connections and self.connections[uid]:
client_count = len(self.connections[uid])
logger.info(f"Found {client_count} client(s) for member {uid}")
disconnected = set()
success_count = 0
for websocket in self.connections[uid]:
try:
await websocket.send(json.dumps(data))
success_count += 1
logger.info(f"Successfully sent location update to a client for member {uid}")
except Exception as e:
logger.error(f"Failed to send to websocket: {e}")
disconnected.add(websocket)
for ws in disconnected:
self.connections[uid].remove(ws)
logger.info(f"Removed disconnected client for member {uid}")
if not self.connections[uid]:
del self.connections[uid]
logger.info(f"Removed member {uid} as no clients remaining")
logger.info(f"Location update sent to {success_count}/{client_count} clients for member {uid}")
else:
logger.info(f"No clients found for member {uid}")
except json.JSONDecodeError as e:
logger.error(f"Invalid JSON message: {e}")
await asyncio.sleep(0.1)
except RedisError as e:
logger.error(f"Redis error: {e}")
await asyncio.sleep(5)
except Exception as e:
logger.error(f"Unexpected error: {e}")
await asyncio.sleep(5)
async def register(self, websocket: WebSocketServerProtocol, uid: str):
"""注册新的WebSocket连接"""
if uid not in self.connections:
self.connections[uid] = set()
self.connections[uid].add(websocket)
logger.info(f"New connection registered for uid: {uid}")
self.log_connections_status()
async def unregister(self, websocket: WebSocketServerProtocol, uid: str):
"""注销WebSocket连接"""
if uid in self.connections:
self.connections[uid].remove(websocket)
if not self.connections[uid]:
del self.connections[uid]
logger.info(f"Connection unregistered for uid: {uid}")
self.log_connections_status()
async def handler(self, websocket: WebSocketServerProtocol):
"""处理WebSocket连接"""
client_address = websocket.remote_address
logger.info(f"New WebSocket connection from {client_address}")
uid = None
try:
message = await websocket.recv()
logger.info(f"Received initial message from {client_address}: {message}")
data = json.loads(message)
uid = str(data.get('uid'))
if not uid:
logger.warning(f"No uid provided from {client_address}")
await websocket.close(1002, "uid required")
return
await self.register(websocket, uid)
logger.info(f"Client {client_address} registered for member {uid}")
try:
async for message in websocket:
logger.debug(f"Received message from {client_address}: {message}")
except Exception as e:
logger.error(f"Error in websocket message loop for {client_address}: {e}")
finally:
if uid:
await self.unregister(websocket, uid)
logger.info(f"Client {client_address} unregistered for member {uid}")
except Exception as e:
logger.error(f"WebSocket handler error for {client_address}: {e}")
if uid:
await self.unregister(websocket, uid)
async def main():
server = LocationServer()
logger.info("Starting Location WebSocket Server...")
subscriber_task = asyncio.create_task(server.redis_subscriber())
logger.info("Redis subscriber task created")
async with serve(server.handler, "0.0.0.0", 30582):
logger.info("WebSocket server is running on ws://0.0.0.0:30582")
try:
await asyncio.Future()
except Exception as e:
logger.error(f"Server error: {e}")
finally:
subscriber_task.cancel()
logger.info("Server shutting down...")
if __name__ == "__main__":
asyncio.run(main())
- 操作员客户端
const ws = new WebSocket('ws://demo.com:30582');
ws.onopen = () => {
console.log('WebSocket 已连接');
};
ws.onmessage = (event) => {
const data = JSON.parse(event.data);
// 刷新地图显示当前位置
consol.log("当前位置发生变化 ->" + data);
// 这里可以进行地图画线、标注等操作,每个人使用的地图不一样,需要将经纬度转换为自己使用的坐标系
};
ws.onclose = (event) => {
console.log('WebSocket 已关闭');
};
ws.onerror = (event) => {
console.log('WebSocket 发生错误');
};
总结
通过以上步骤,我们可以开发出一个后台实时定位系统,该系统能够实时监控客户系统里面会员所在的位置,并将会员的位置信息显示在地图上。服务器后端是PHP开发的后台,最终实现成功也就那么一会,不需要太多的技术难度,比如安装各种库、各种配置、各种环境,仅需要在python运行前运行 pip install websockets redis 即可。调试时,有错误直接修改代码即可,不需要重启服务。
标签:info,uid,self,redis,实时,connections,所在位置,后台,logger From: https://www.cnblogs.com/quaki/p/18654955