python3.8,启动脚本前打开终端控制台的屏幕录制权限
用到的依赖包: pip install aiohttp aiortc opencv-python mss numpy pyobjc-framework-Quartz
pip install pyautogui
pythonimport asyncio
import json
import cv2
import numpy as np
import mss
from aiohttp import web
from aiortc import RTCPeerConnection, RTCSessionDescription, VideoStreamTrack
from av import VideoFrame
# --- 鼠标库 ---
try:
from Quartz import CGEventCreate, CGEventGetLocation
def get_mouse_pos_fast():
evt = CGEventCreate(None)
loc = CGEventGetLocation(evt)
return loc.x, loc.y
except ImportError:
import pyautogui
def get_mouse_pos_fast():
return pyautogui.position()
# ==========================================
# 🚀 终极画质配置
# ==========================================
# 设置为 True:启用原生点对点(不做任何缩放)
# 优点:清晰度天花板,文字锐利
# 缺点:带宽占用极大 (20-50Mbps),仅限局域网
USE_NATIVE_RESOLUTION = True
# 如果上面设为 False,则限制最大宽度 (如 2560 或 1920)
MAX_WIDTH = 2560
class ScreenCaptureTrack(VideoStreamTrack):
kind = "video"
def __init__(self):
super().__init__()
self.mss = mss.mss()
self.monitor = self.mss.monitors[1]
# 获取原始物理分辨率
self.raw_w = self.monitor['width']
self.raw_h = self.monitor['height']
# 决定最终推流分辨率
if USE_NATIVE_RESOLUTION:
self.width = self.raw_w
self.height = self.raw_h
else:
# 等比缩放逻辑
if self.raw_w > MAX_WIDTH:
ratio = MAX_WIDTH / self.raw_w
self.width = int(self.raw_w * ratio)
self.height = int(self.raw_h * ratio)
else:
self.width = self.raw_w
self.height = self.raw_h
# ⚠️ 编码器硬性要求:宽和高必须是偶数
# 如果是奇数,我们减去 1 像素,否则会报错或绿屏
if self.width % 2 != 0: self.width -= 1
if self.height % 2 != 0: self.height -= 1
print(f"✨ 终极画质模式: {'原生 Retina' if USE_NATIVE_RESOLUTION else '缩放模式'}")
print(f"📺 物理分辨率: {self.raw_w}x{self.raw_h}")
print(f"🚀 发送分辨率: {self.width}x{self.height}")
async def recv(self):
pts, time_base = await self.next_timestamp()
# 1. 抓取 (原生 BGRA 数据)
sct_img = self.mss.grab(self.monitor)
img = np.array(sct_img)
# 2. 预处理
# 如果需要缩放(非原生模式),使用 INTER_AREA 插值
if self.width != self.raw_w or self.height != self.raw_h:
img = cv2.resize(img, (self.width, self.height), interpolation=cv2.INTER_AREA)
else:
# 如果是原生模式,但原始分辨率是奇数,需要裁剪掉最后 1 像素
if img.shape[1] != self.width or img.shape[0] != self.height:
img = img[:self.height, :self.width]
# 3. 极速转色 (BGRA -> BGR) - 这是最耗时的步骤之一
# M1 上这一步由于内存带宽大,速度很快
img = cv2.cvtColor(img, cv2.COLOR_BGRA2BGR)
# 4. 封装
frame = VideoFrame.from_ndarray(img, format="bgr24")
frame.pts = pts
frame.time_base = time_base
return frame
pcs = set()
async def offer(request):
params = await request.json()
offer = RTCSessionDescription(sdp=params["sdp"], type=params["type"])
pc = RTCPeerConnection()
pcs.add(pc)
pc.addTrack(ScreenCaptureTrack())
@pc.on("datachannel")
def on_datachannel(channel):
if channel.label == "mouse":
asyncio.ensure_future(send_mouse_data(channel))
@pc.on("connectionstatechange")
async def on_connectionstatechange():
if pc.connectionState == "failed":
await pc.close()
pcs.discard(pc)
await pc.setRemoteDescription(offer)
answer = await pc.createAnswer()
await pc.setLocalDescription(answer)
return web.json_response({
"sdp": pc.localDescription.sdp,
"type": pc.localDescription.type
})
async def send_mouse_data(channel):
with mss.mss() as sct:
monitor = sct.monitors[1]
w, h = monitor['width'], monitor['height']
l, t = monitor['left'], monitor['top']
while channel.readyState == "open":
try:
mx, my = get_mouse_pos_fast()
pct_x = (mx - l) / w
pct_y = (my - t) / h
msg = json.dumps({"x": round(pct_x, 5), "y": round(pct_y, 5)})
channel.send(msg)
await asyncio.sleep(0.015)
except Exception:
break
async def on_shutdown(app):
coros = [pc.close() for pc in pcs]
await asyncio.gather(*coros)
pcs.clear()
async def index(request):
return web.Response(content_type="text/html", text=HTML_TEMPLATE)
# --- 前端代码:暴力 50Mbps 带宽 ---
# --- 前端代码:暴力 50Mbps 带宽 + 像素化小鼠标 ---
HTML_TEMPLATE = """
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Native Retina Stream</title>
<style>
body { margin: 0; background: #000; height: 100vh; width: 100vw; overflow: hidden; display: flex; justify-content: center; align-items: center; }
#container { position: relative; width: 100%; height: 100%; display: flex; justify-content: center; align-items: center; }
video {
max-width: 100%; max-height: 100%;
object-fit: contain;
cursor: none;
box-shadow: none;
}
#remote-cursor {
position: absolute; top: 0; left: 0;
/* 【改动 1】极致小尺寸:12px */
width: 12px; height: 12px;
pointer-events: none; z-index: 999; display: none;
transition: none;
/* 像素化渲染 */
image-rendering: pixelated;
image-rendering: crisp-edges;
filter: none;
}
#stats {
position: absolute; top: 10px; left: 10px; color: lime; font-family: monospace;
background: rgba(0,0,0,0.5); padding: 5px; border-radius: 5px; pointer-events: none;
}
</style>
</head>
<body>
<div id="stats">连接中...</div>
<div id="container">
<video id="video" autoplay playsinline muted></video>
<svg id="remote-cursor" viewBox="0 0 24 24" fill="none" xmlns="http://www.w3.org/2000/svg">
<path d="M5 5L12 21L14.5 13.5L22 11L5 5Z" fill="black" stroke="white" stroke-width="2"/>
</svg>
</div>
<script>
const pc = new RTCPeerConnection();
const video = document.getElementById('video');
const cursor = document.getElementById('remote-cursor');
const stats = document.getElementById('stats');
function updateCursor(x_pct, y_pct) {
if (video.videoWidth === 0) return;
const vw = video.videoWidth;
const vh = video.videoHeight;
const cw = video.clientWidth;
const ch = video.clientHeight;
const videoRatio = vw / vh;
const containerRatio = cw / ch;
let drawWidth, drawHeight, startX, startY;
if (containerRatio > videoRatio) {
drawHeight = ch;
drawWidth = ch * videoRatio;
startX = (cw - drawWidth) / 2;
startY = 0;
} else {
drawWidth = cw;
drawHeight = cw / videoRatio;
startX = 0;
startY = (ch - drawHeight) / 2;
}
const rect = video.getBoundingClientRect();
const finalX = rect.left + startX + (drawWidth * x_pct);
const finalY = rect.top + startY + (drawHeight * y_pct);
cursor.style.transform = `translate3d(${finalX}px, ${finalY}px, 0)`;
cursor.style.display = 'block';
}
const dc = pc.createDataChannel("mouse", {ordered: false, maxRetransmits: 0});
dc.onmessage = (e) => requestAnimationFrame(() => {
const data = JSON.parse(e.data);
updateCursor(data.x, data.y);
});
pc.ontrack = (evt) => {
if (evt.track.kind === 'video') {
video.srcObject = evt.streams[0];
video.onloadedmetadata = () => {
stats.innerText = `RES: ${video.videoWidth}x${video.videoHeight}`;
};
}
};
async function start() {
pc.addTransceiver('video', {direction: 'recvonly'});
const offer = await pc.createOffer();
await pc.setLocalDescription(offer);
const res = await fetch('/offer', {
method: 'POST',
body: JSON.stringify({sdp: pc.localDescription.sdp, type: pc.localDescription.type}),
headers: {'Content-Type': 'application/json'}
});
const ansData = await res.json();
let sdp = ansData.sdp;
// 50Mbps 暴力带宽
const bandwidth = 50000;
if(sdp.indexOf("b=AS:") === -1) {
sdp = sdp.replace(/m=video (.*)\\r\\n/g, "m=video $1\\r\\nb=AS:" + bandwidth + "\\r\\n");
} else {
sdp = sdp.replace(/b=AS:\\d+/, "b=AS:" + bandwidth);
}
await pc.setRemoteDescription(new RTCSessionDescription({sdp: sdp, type: ansData.type}));
}
start();
</script>
</body>
</html>
"""
if __name__ == "__main__":
app = web.Application()
app.on_shutdown.append(on_shutdown)
app.router.add_get("/", index)
app.router.add_post("/offer", offer)
print("🚀 终极画质版启动: http://0.0.0.0:5080")
print("注意:此模式需要非常好的 Wi-Fi (5Ghz) 或有线网络")
web.run_app(app, host="0.0.0.0", port=5080)
本文作者:lsq_137
本文链接:
版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!