0%

NTRIP协议开发

调用库

1
2
3
4
5
6
7
8
9
10
11
from __future__ import annotations
import asyncio
import base64
import os
import queue
import socket
import sys
import threading
import time
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Tuple

NTRIPcaster程序

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
NTRIP 2.0 Caster — GUI, Long‑running, Save Streams (Tkinter, stdlib only)
===========================================================================
- Single file, no third‑party dependencies
- Asyncio TCP server + Tkinter GUI (network runs in a dedicated thread/loop)
- Accepts NTRIP Server (SOURCE … /MOUNT) and NTRIP Client (GET /MOUNT)
- Keeps connections alive (caster不会主动断开),仅在对端关闭/异常/超时才清理
- Per‑mount server stream broadcast to clients with backpressure + timeouts
- Optional saving of:
• server→caster data (per mount)
• client→caster payload (per client)
When enabled from GUI, prompts for a save path and keeps writing until stopped
- Realtime stats in GUI: bytes in/out, client count, last chunk preview (hex)
- Port is user‑selectable from GUI

Pack to EXE (Windows):
pip install pyinstaller
pyinstaller --onefile --noconsole ntrip_caster_gui.py

Notes:
- Auth is not enforced here (any SOURCE/GET accepted). Hooks are marked if needed.
- Designed for robustness: per‑client write timeouts, dead client cleanup, bounded
queues for file writers to avoid unbounded RAM usage.
"""

File sink

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
# ------------------------- File sink (threaded) -------------------------
class FileSink:
"""A backpressure file writer running in its own thread.
Use .write(bytes) to enqueue data, .close() to stop.
Bounded queue prevents RAM blow‑up; when full, .write() drops data gracefully
and returns False (we also emit a GUI warning once).
"""
def __init__(self, path: str, max_queue: int = 10000, chunk_flush: int = 64):
self.path = path
self.q: "queue.Queue[Optional[bytes]]" = queue.Queue(max_queue)
self._t = threading.Thread(target=self._run, daemon=True)
self._stop = threading.Event()
self._dropped = 0
self._warned = False
self._chunk_flush = chunk_flush
self._t.start()

def write(self, data: bytes) -> bool:
try:
self.q.put_nowait(data)
return True
except queue.Full:
self._dropped += 1
return False

def close(self):
try:
self.q.put_nowait(None)
except queue.Full:
pass
self._stop.set()
self._t.join(timeout=2.0)

def dropped(self) -> int:
return self._dropped

def _run(self):
os.makedirs(os.path.dirname(self.path) or ".", exist_ok=True)
with open(self.path, "ab", buffering=1024*1024) as f:
pending = 0
while not self._stop.is_set():
item = self.q.get()
if item is None:
break
f.write(item)
pending += 1
if pending >= self._chunk_flush:
f.flush()
os.fsync(f.fileno())
pending = 0
try:
f.flush(); os.fsync(f.fileno())
except Exception:
pass

Caster core

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
# ----------------------------- Caster Core ------------------------------
@dataclass
class ClientConn:
writer: asyncio.StreamWriter
addr: Tuple[str, int]
save_sink: Optional[FileSink] = None
connected_at: float = field(default_factory=time.time)
bytes_up: int = 0 # client->caster payload bytes

@dataclass
class MountState:
name: str
server_writer: Optional[asyncio.StreamWriter] = None
server_addr: Optional[Tuple[str, int]] = None
clients: List[ClientConn] = field(default_factory=list)
bytes_from_server: int = 0
last_chunk_hex: str = ""
server_save_sink: Optional[FileSink] = None

class NtripCaster:
def __init__(self, uiq: "queue.Queue[tuple]", host: str = "0.0.0.0", port: int = 2101):
self.uiq = uiq
self.host = host
self.port = port
self.server: Optional[asyncio.base_events.Server] = None
self.loop: Optional[asyncio.AbstractEventLoop] = None
self.mounts: Dict[str, MountState] = {}
self.lock = asyncio.Lock()
self._stopping = asyncio.Event()

# ----------------------- lifecycle -----------------------
async def start(self):
self.server = await asyncio.start_server(self._handle_conn, self.host, self.port)
addrs = ", ".join(str(s.getsockname()) for s in self.server.sockets)
self.uiq.put(("status", f"Listening on {addrs}"))

async def serve_forever(self):
assert self.server is not None
async with self.server:
await self.server.serve_forever()

async def stop(self):
self._stopping.set()
if self.server:
self.server.close()
await self.server.wait_closed()
# close all sinks
async with self.lock:
for m in self.mounts.values():
if m.server_save_sink:
m.server_save_sink.close()
for c in m.clients:
if c.save_sink:
c.save_sink.close()

# ---------------------- helpers --------------------------
async def _read_headers(self, reader: asyncio.StreamReader, timeout: float = 15.0) -> str:
data = await asyncio.wait_for(reader.readuntil(b"\r\n\r\n"), timeout=timeout)
return data.decode(errors="ignore")

async def _safe_close(self, writer: asyncio.StreamWriter):
try:
writer.close()
await writer.wait_closed()
except Exception:
pass

# ------------------- connection handler ------------------
async def _handle_conn(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter):
addr = writer.get_extra_info("peername")
try:
header = await self._read_headers(reader)
except Exception:
await self._safe_close(writer)
return

first = header.split("\r\n", 1)[0]
if first.startswith("SOURCE"):
await self._handle_source(header, reader, writer, addr)
return
if first.startswith("GET"):
await self._handle_get(header, reader, writer, addr)
return
# sourcetable fallbacks
if first.strip() in ("GET / HTTP/1.1", "GET / HTTP/1.0"):
await self._send_sourcetable(writer)
return
# unknown
try:
writer.write(b"HTTP/1.1 400 Bad Request\r\nConnection: close\r\n\r\n")
await writer.drain()
finally:
await self._safe_close(writer)

# ---------------------- SOURCE (server) -------------------
async def _handle_source(self, header: str, reader: asyncio.StreamReader, writer: asyncio.StreamWriter, addr):
# Expected: SOURCE <password> /MOUNT
line = header.split("\r\n", 1)[0]
parts = line.split()
if len(parts) < 3 or not parts[2].startswith('/'):
writer.write(b"HTTP/1.1 400 Bad Request\r\n\r\n"); await writer.drain(); await self._safe_close(writer); return
# password = parts[1] # (Auth hook) validate here if needed
mount = parts[2][1:]

async with self.lock:
st = self.mounts.get(mount)
if not st:
st = MountState(name=mount)
self.mounts[mount] = st
if st.server_writer is not None:
# refuse second server for same mount
self.uiq.put(("warn", f"Second SERVER refused for {mount} from {addr}"))
await self._safe_close(writer)
return
st.server_writer = writer
st.server_addr = addr
st.last_chunk_hex = ""
writer.write(b"ICY 200 OK\r\n\r\n"); await writer.drain()
self.uiq.put(("server_conn", mount, addr))

# Read stream forever; broadcast to clients; optionally save
try:
while True:
chunk = await reader.read(8192)
if not chunk:
break
hex_preview = chunk[:24].hex(" ")
async with self.lock:
st = self.mounts.get(mount)
if not st or st.server_writer is not writer:
break
st.bytes_from_server += len(chunk)
st.last_chunk_hex = hex_preview
# save server stream
if st.server_save_sink:
st.server_save_sink.write(chunk)
# broadcast with per-client timeout; drop slow clients
dead: List[ClientConn] = []
for c in list(st.clients):
try:
c.writer.write(chunk)
await asyncio.wait_for(c.writer.drain(), timeout=3.0)
except Exception:
dead.append(c)
for c in dead:
try:
st.clients.remove(c)
except ValueError:
pass
await self._safe_close(c.writer)
self.uiq.put(("client_dead", mount, c.addr))
self.uiq.put(("server_bytes", mount))
except Exception as e:
self.uiq.put(("error", f"Server stream error {addr}: {e}"))
finally:
async with self.lock:
st = self.mounts.get(mount)
if st and st.server_writer is writer:
st.server_writer = None
await self._safe_close(writer)
self.uiq.put(("server_disc", mount, addr))

# ------------------------ GET (client) --------------------
def _parse_basic(self, header: str) -> Optional[Tuple[str, str]]:
for ln in header.split("\r\n"):
if ln.lower().startswith("authorization: basic "):
b64 = ln.split(" ", 2)[-1].strip()
try:
raw = base64.b64decode(b64).decode("utf-8")
if ":" in raw:
return tuple(raw.split(":", 1))
except Exception:
return None
return None

async def _handle_get(self, header: str, reader: asyncio.StreamReader, writer: asyncio.StreamWriter, addr):
line0 = header.split("\r\n", 1)[0]
parts = line0.split()
if len(parts) < 2:
writer.write(b"HTTP/1.1 400 Bad Request\r\n\r\n"); await writer.drain(); await self._safe_close(writer); return
path = parts[1]
if path == "/" or path.upper() == "/SOURCETABLE":
await self._send_sourcetable(writer)
return
mount = path[1:] if path.startswith('/') else path

# (Auth hook) validate username/password if required
# userpass = self._parse_basic(header)

async with self.lock:
st = self.mounts.get(mount)
if not st:
st = MountState(name=mount)
self.mounts[mount] = st
c = ClientConn(writer=writer, addr=addr)
st.clients.append(c)
writer.write(b"ICY 200 OK\r\n\r\n"); await writer.drain()
self.uiq.put(("client_conn", mount, addr))

# Read any client->caster payload (e.g., NMEA GGA) and optionally save
try:
while True:
data = await asyncio.wait_for(reader.read(4096), timeout=60.0)
if not data:
break
async with self.lock:
st = self.mounts.get(mount)
if not st:
break
# locate client object
for cc in st.clients:
if cc.writer is writer:
cc.bytes_up += len(data)
if cc.save_sink:
cc.save_sink.write(data)
break
self.uiq.put(("client_bytes", mount))
except asyncio.TimeoutError:
# No upstream data; keep connection open until socket closes
pass
except Exception as e:
self.uiq.put(("error", f"Client payload error {addr}: {e}"))
finally:
async with self.lock:
st = self.mounts.get(mount)
if st:
st.clients = [cc for cc in st.clients if cc.writer is not writer]
await self._safe_close(writer)
self.uiq.put(("client_disc", mount, addr))

async def _send_sourcetable(self, writer: asyncio.StreamWriter):
body_lines = []
async with self.lock:
for name, st in self.mounts.items():
# Minimal STR entry
line = f"STR;{name};{name};RTCM 3;;0;GPS;NET;;0.00000;0.00000;0;0;PyNtripCaster;none;N;N;0\r\n"
body_lines.append(line)
body_lines.append("ENDSOURCETABLE\r\n")
body = "".join(body_lines).encode("utf-8")
writer.write(b"SOURCETABLE 200 OK\r\n\r\n" + body)
await writer.drain()
await self._safe_close(writer)

GUI

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
# ------------------------------ GUI ------------------------------------
import tkinter as tk
from tkinter import ttk, filedialog, messagebox

class App(tk.Tk):
def __init__(self):
super().__init__()
self.title("NTRIP 2.0 Caster — GUI")
self.geometry("1080x700")
self.minsize(980, 620)

self.uiq: "queue.Queue[tuple]" = queue.Queue()
self.caster: Optional[NtripCaster] = None
self.net_thread: Optional[threading.Thread] = None

self._build()
self.after(80, self._poll_queue)

# ----------------- UI layout -----------------
def _build(self):
top = ttk.Frame(self)
top.pack(side=tk.TOP, fill=tk.X, padx=10, pady=8)
ttk.Label(top, text="Listen Port:").pack(side=tk.LEFT)
self.port_var = tk.StringVar(value="2101")
ttk.Entry(top, textvariable=self.port_var, width=8).pack(side=tk.LEFT, padx=6)
self.btn_start = ttk.Button(top, text="Start", command=self.on_start)
self.btn_start.pack(side=tk.LEFT, padx=4)
self.btn_stop = ttk.Button(top, text="Stop", command=self.on_stop, state=tk.DISABLED)
self.btn_stop.pack(side=tk.LEFT, padx=4)
self.status_var = tk.StringVar(value="Idle")
ttk.Label(top, textvariable=self.status_var).pack(side=tk.RIGHT)

main = ttk.Panedwindow(self, orient=tk.HORIZONTAL)
main.pack(fill=tk.BOTH, expand=True, padx=10, pady=8)

# Mounts panel
left = ttk.Frame(main)
main.add(left, weight=3)
ttk.Label(left, text="Mountpoints").pack(anchor=tk.W)
cols = ("mount", "srv_bytes", "clients", "preview")
self.tree = ttk.Treeview(left, columns=cols, show='headings', height=12)
self.tree.heading("mount", text="Mount")
self.tree.heading("srv_bytes", text="Server Bytes")
self.tree.heading("clients", text="Clients")
self.tree.heading("preview", text="Last Chunk (hex)")
self.tree.column("mount", width=180)
self.tree.column("srv_bytes", width=120)
self.tree.column("clients", width=80)
self.tree.column("preview", width=380)
self.tree.pack(fill=tk.BOTH, expand=True)

ops = ttk.Frame(left); ops.pack(fill=tk.X, pady=6)
ttk.Button(ops, text="Save SERVER stream (selected mount)", command=self.choose_server_save).pack(side=tk.LEFT, padx=4)
ttk.Button(ops, text="Save CLIENT payload (pick client)", command=self.choose_client_save).pack(side=tk.LEFT, padx=4)

# Clients panel
right = ttk.Frame(main)
main.add(right, weight=2)
ttk.Label(right, text="Clients on selected mount").pack(anchor=tk.W)
ccols = ("addr", "bytes_up", "saving")
self.ctree = ttk.Treeview(right, columns=ccols, show='headings', height=12)
self.ctree.heading("addr", text="Client Address")
self.ctree.heading("bytes_up", text="Client→Caster Bytes")
self.ctree.heading("saving", text="Saving")
self.ctree.column("addr", width=220)
self.ctree.column("bytes_up", width=160)
self.ctree.column("saving", width=100)
self.ctree.pack(fill=tk.BOTH, expand=True)

# Logs
ttk.Label(self, text="Logs").pack(anchor=tk.W, padx=10)
self.logbox = tk.Text(self, height=10)
self.logbox.pack(fill=tk.BOTH, expand=False, padx=10, pady=(0,10))
self._log("Ready.")

# Bind selection change to refresh client list
self.tree.bind('<<TreeviewSelect>>', lambda e: self._refresh_clients_panel())

# ----------------- helpers -----------------
def _log(self, msg: str):
ts = time.strftime('%H:%M:%S')
self.logbox.insert(tk.END, f"[{ts}] {msg}\n")
self.logbox.see(tk.END)

def _ensure_row(self, mount: str):
for iid in self.tree.get_children(''):
vals = self.tree.item(iid, 'values')
if vals and vals[0] == mount:
return iid
return self.tree.insert('', tk.END, values=(mount, 0, 0, ""))

def _update_mount_row(self, mount: str):
if not self.caster:
return
st = self.caster.mounts.get(mount)
if not st:
return
for iid in self.tree.get_children(''):
vals = self.tree.item(iid, 'values')
if vals and vals[0] == mount:
self.tree.item(iid, values=(mount, st.bytes_from_server, len(st.clients), st.last_chunk_hex))
return
self.tree.insert('', tk.END, values=(mount, st.bytes_from_server, len(st.clients), st.last_chunk_hex))

def _refresh_clients_panel(self):
# Clear
for iid in self.ctree.get_children(''):
self.ctree.delete(iid)
# Refill
if not self.caster:
return
mount = self._selected_mount()
if not mount:
return
st = self.caster.mounts.get(mount)
if not st:
return
for c in st.clients:
saving = "Yes" if c.save_sink else "No"
self.ctree.insert('', tk.END, values=(f"{c.addr[0]}:{c.addr[1]}", c.bytes_up, saving))

def _selected_mount(self) -> Optional[str]:
item = self.tree.focus()
if not item:
return None
vals = self.tree.item(item, 'values')
return vals[0] if vals else None

# --------------- start/stop -----------------
def on_start(self):
if self.caster:
return
try:
port = int(self.port_var.get())
if not (1 <= port <= 65535):
raise ValueError()
except ValueError:
messagebox.showerror("Invalid port", "Please enter a valid TCP port (1-65535)")
return
self.caster = NtripCaster(self.uiq, port=port)
self.net_thread = threading.Thread(target=self._run_network, daemon=True)
self.net_thread.start()
self.btn_start.config(state=tk.DISABLED)
self.btn_stop.config(state=tk.NORMAL)
self._log(f"Starting caster on 0.0.0.0:{port} …")

def _run_network(self):
asyncio.set_event_loop(asyncio.new_event_loop())
loop = asyncio.get_event_loop()
self.caster.loop = loop
loop.run_until_complete(self.caster.start())
try:
loop.run_until_complete(self.caster.serve_forever())
except Exception as e:
self.uiq.put(("error", f"Network loop error: {e}"))
finally:
loop.stop()
loop.close()

def on_stop(self):
if not self.caster or not self.caster.loop:
return
async def _stop():
await self.caster.stop()
try:
asyncio.run_coroutine_threadsafe(_stop(), self.caster.loop)
except Exception as e:
self._log(f"Stop error: {e}")
self.btn_start.config(state=tk.NORMAL)
self.btn_stop.config(state=tk.DISABLED)
self._log("Caster stopped.")
self.caster = None

# --------------- saving ---------------------
def choose_server_save(self):
mount = self._selected_mount()
if not mount:
messagebox.showinfo("Select", "Please select a mount in the left table first.")
return
if not self.caster:
return
st = self.caster.mounts.get(mount)
if not st:
return
path = filedialog.asksaveasfilename(title=f"Save server stream for {mount}", defaultextension=".bin")
if not path:
return
if st.server_save_sink:
st.server_save_sink.close()
st.server_save_sink = FileSink(path)
self._log(f"Saving SERVER stream of {mount} to: {path}")

def choose_client_save(self):
mount = self._selected_mount()
if not mount or not self.caster:
messagebox.showinfo("Select", "Select a mount first.")
return
st = self.caster.mounts.get(mount)
if not st or not st.clients:
messagebox.showinfo("Clients", "No clients on this mount.")
return
# simple chooser window
win = tk.Toplevel(self)
win.title("Pick client to save")
lst = tk.Listbox(win, width=60, height=8)
for idx, c in enumerate(st.clients):
lst.insert(tk.END, f"[{idx}] {c.addr[0]}:{c.addr[1]}")
lst.pack(fill=tk.BOTH, expand=True, padx=8, pady=8)
def _pick():
sel = lst.curselection()
if not sel:
return
idx = sel[0]
c = st.clients[idx]
path = filedialog.asksaveasfilename(title=f"Save client payload {c.addr}", defaultextension=".bin")
if path:
if c.save_sink:
c.save_sink.close()
c.save_sink = FileSink(path)
self._log(f"Saving CLIENT payload from {c.addr} to: {path}")
self._refresh_clients_panel()
win.destroy()
ttk.Button(win, text="Select", command=_pick).pack(pady=8)

# --------------- UI queue pump -------------
def _poll_queue(self):
try:
while True:
evt = self.uiq.get_nowait()
self._handle_evt(evt)
except queue.Empty:
pass
self.after(100, self._poll_queue)

def _handle_evt(self, evt: tuple):
kind = evt[0]
if kind == "status":
self.status_var.set(evt[1]); self._log(evt[1])
elif kind == "warn":
self._log(f"WARN: {evt[1]}")
elif kind == "error":
self._log(f"ERROR: {evt[1]}")
elif kind == "server_conn":
_, mount, addr = evt
self._ensure_row(mount)
self._update_mount_row(mount)
self._log(f"SERVER connected on {mount} from {addr}")
elif kind == "server_disc":
_, mount, addr = evt
self._update_mount_row(mount)
self._log(f"SERVER disconnected on {mount} from {addr}")
elif kind == "server_bytes":
_, mount = evt
self._update_mount_row(mount)
elif kind == "client_conn":
_, mount, addr = evt
self._ensure_row(mount)
self._update_mount_row(mount)
self._refresh_clients_panel()
self._log(f"CLIENT connected to {mount} from {addr}")
elif kind == "client_disc":
_, mount, addr = evt
self._update_mount_row(mount)
self._refresh_clients_panel()
self._log(f"CLIENT disconnected from {mount} {addr}")
elif kind == "client_dead":
_, mount, addr = evt
self._update_mount_row(mount)
self._refresh_clients_panel()
self._log(f"Cleaned dead client on {mount}: {addr}")

Main

1
2
3
4
# --------------------------------- main ---------------------------------
if __name__ == "__main__":
app = App()
app.mainloop()

为了方便验证ntripcaster,一下创建ntripserver用作数据输入流,ntripclient用作数据接收流。

NTRIPserver程序

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
import socket
import time

def ntrip_server(caster_host="127.0.0.1", caster_port=2101,
mountpoint="RTCM32", password="sourcepass"):
# 创建 TCP 连接
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect((caster_host, caster_port))

# 构造 NTRIP SOURCE 请求
request = (
f"SOURCE {password} /{mountpoint}\r\n"
f"Source-Agent: PythonNTRIPServer/0.1\r\n\r\n"
)
sock.sendall(request.encode("utf-8"))

# 读取响应
response = sock.recv(1024)
print("Caster 响应:\n", response.decode(errors="ignore"))

if b"ICY 200 OK" not in response and b"HTTP/1.1 200 OK" not in response:
print("连接失败,可能密码错误或 mountpoint 不存在")
sock.close()
return

print("已连接 Caster,开始推送数据...")

# 模拟发送差分数据
try:
counter = 0
while True:
# 模拟差分数据(二进制)
fake_rtcm = f"FAKE_RTCM_FRAME_{counter}".encode("utf-8")
sock.sendall(fake_rtcm)
print(f"推送 {len(fake_rtcm)} 字节: {fake_rtcm}")
counter += 1
time.sleep(1)
except KeyboardInterrupt:
print("用户中断,关闭连接")
finally:
sock.close()


if __name__ == "__main__":
ntrip_server()

NTRIPclient程序

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
import socket
import base64

def ntrip_client(caster_host="127.0.0.1", caster_port=2101,
mountpoint="RTCM32", username="demo", password="demopass"):
# TCP 连接
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect((caster_host, caster_port))

# 构造 Basic Auth
credentials = f"{username}:{password}"
auth = base64.b64encode(credentials.encode("utf-8")).decode("utf-8")

# 构造 NTRIP GET 请求
request = (
f"GET /{mountpoint} HTTP/1.1\r\n"
f"Host: {caster_host}:{caster_port}\r\n"
f"Ntrip-Version: Ntrip/2.0\r\n"
f"User-Agent: PythonNTRIPClient/0.1\r\n"
f"Authorization: Basic {auth}\r\n"
f"Connection: keep-alive\r\n\r\n"
)
sock.sendall(request.encode("utf-8"))

# 读取响应头
response = sock.recv(1024)
print("服务器响应头:\n", response.decode(errors="ignore"))

if b"ICY 200 OK" not in response and b"HTTP/1.1 200 OK" not in response:
print("连接失败")
sock.close()
return

print("已连接 NTRIP Caster,等待数据流...")

# 持续读取差分数据
try:
while True:
data = sock.recv(4096)
if not data:
print("连接关闭")
break
print(f"收到 {len(data)} 字节数据")
except KeyboardInterrupt:
print("用户中断")
finally:
sock.close()


if __name__ == "__main__":
ntrip_client()