commit bdb5ca8a95e4729fa5ee2d08f24a7781a55dcaca
parent 9f51669735dbb91f5fbb0295379f1f2bf02edf59
Author: William Casarin <jb55@jb55.com>
Date: Mon, 22 Aug 2022 16:54:46 -0700
switch to nostr.js
Signed-off-by: William Casarin <jb55@jb55.com>
Diffstat:
5 files changed, 200 insertions(+), 36 deletions(-)
diff --git a/.gitignore b/.gitignore
@@ -0,0 +1 @@
+*.mp4
diff --git a/Makefile b/Makefile
@@ -0,0 +1,4 @@
+dist: fake
+ rsync -avzP ./ charon:/www/damus.io/
+
+.PHONY: fake
diff --git a/web/comments.js b/web/comments.js
@@ -8,18 +8,20 @@ function uuidv4() {
async function comments_init(thread)
{
- const relay = await Relay("wss://relay.damus.io")
+ const pool = RelayPool(["wss://relay.damus.io"])
const now = (new Date().getTime()) / 1000
const model = {events: [], profiles: {}}
const comments_id = uuidv4()
const profiles_id = uuidv4()
- model.pool = relay
+ model.pool = pool
model.el = document.querySelector("#comments")
- relay.subscribe(comments_id, {kinds: [1], "#e": [thread]})
+ pool.on('open', relay => {
+ relay.subscribe(comments_id, {kinds: [1], "#e": [thread]})
+ });
- relay.event = (sub_id, ev) => {
+ pool.on('event', (sub_id, ev) => {
if (sub_id === comments_id) {
if (ev.content !== "")
insert_event_sorted(model.events, ev)
@@ -32,17 +34,17 @@ async function comments_init(thread)
console.log("failed to parse", ev.content)
}
}
- }
+ })
- relay.eose = async (sub_id) => {
+ pool.on('eose', async (sub_id) => {
if (sub_id === comments_id) {
handle_comments_loaded(profiles_id, model)
} else if (sub_id === profiles_id) {
handle_profiles_loaded(profiles_id, model)
}
- }
+ })
- return relay
+ return pool
}
function handle_profiles_loaded(profiles_id, model) {
diff --git a/web/damus.js b/web/damus.js
@@ -6,21 +6,40 @@ function uuidv4() {
);
}
+function insert_event_sorted(evs, new_ev) {
+ for (let i = 0; i < evs.length; i++) {
+ const ev = evs[i]
+
+ if (new_ev.id === ev.id) {
+ return false
+ }
+
+ if (new_ev.created_at > ev.created_at) {
+ evs.splice(i, 0, new_ev)
+ return true
+ }
+ }
+
+ evs.push(new_ev)
+ return true
+}
async function damus_web_init(thread)
{
- const relay = await Relay("wss://relay.damus.io")
+ const pool = RelayPool(["wss://relay.damus.io"])
const now = (new Date().getTime()) / 1000
const model = {events: [], profiles: {}}
const comments_id = uuidv4()
const profiles_id = uuidv4()
- model.pool = relay
+ model.pool = pool
model.el = document.querySelector("#posts")
- relay.subscribe(comments_id, {kinds: [1], limit: 100})
+ pool.on('open', relay => {
+ relay.subscribe(comments_id, {kinds: [1], limit: 100})
+ });
- relay.event = (sub_id, ev) => {
+ pool.on('event', (relay, sub_id, ev) => {
if (sub_id === comments_id) {
if (ev.content !== "")
insert_event_sorted(model.events, ev)
@@ -33,17 +52,17 @@ async function damus_web_init(thread)
console.log("failed to parse", ev.content)
}
}
- }
+ })
- relay.eose = async (sub_id) => {
+ pool.on('eose', async (relay, sub_id) => {
if (sub_id === comments_id) {
handle_comments_loaded(profiles_id, model)
} else if (sub_id === profiles_id) {
handle_profiles_loaded(profiles_id, model)
}
- }
+ })
- return relay
+ return pool
}
function handle_profiles_loaded(profiles_id, model) {
diff --git a/web/nostr.js b/web/nostr.js
@@ -1,19 +1,97 @@
+const RelayPool = (function nostrlib() {
+const WS = typeof WebSocket !== 'undefined' ? WebSocket : require('ws')
-function insert_event_sorted(evs, new_ev) {
- for (let i = 0; i < evs.length; i++) {
- const ev = evs[i]
+function RelayPool(relays, opts)
+{
+ if (!(this instanceof RelayPool))
+ return new RelayPool(relays)
- if (new_ev.id === ev.id) {
- return false
+ this.onfn = {}
+ this.relays = []
+
+ for (const relay of relays) {
+ this.add(relay)
+ }
+
+ return this
+}
+
+RelayPool.prototype.close = function relayPoolClose() {
+ for (const relay of this.relays) {
+ relay.close()
+ }
+}
+
+RelayPool.prototype.on = function relayPoolOn(method, fn) {
+ for (const relay of this.relays) {
+ this.onfn[method] = fn
+ relay.onfn[method] = fn.bind(null, relay)
+ }
+}
+
+RelayPool.prototype.has = function relayPoolHas(relayUrl) {
+ for (const relay of this.relays) {
+ if (relay.relay === relayUrl)
+ return true
+ }
+
+ return false
+}
+
+RelayPool.prototype.setupHandlers = function relayPoolSetupHandlers(method, fn)
+{
+ // setup its message handlers with the ones we have already
+ for (const handler of Object.keys(this.on)) {
+ for (const relay of this.relays) {
+ relay.onfn[handler] = this.onfn[handler].bind(null, relay)
}
+ }
+}
+
+RelayPool.prototype.remove = function relayPoolRemove(url) {
+ let i = 0
- if (new_ev.created_at > ev.created_at) {
- evs.splice(i, 0, new_ev)
+ for (const relay of this.relays) {
+ if (relay.url === url) {
+ relay.ws && relay.ws.close()
+ this.relays = this.replays.splice(i, 1)
return true
}
+
+ i += 1
}
- evs.push(new_ev)
+ return false
+}
+
+RelayPool.prototype.subscribe = function relayPoolSubscribe(...args) {
+ for (const relay of this.relays) {
+ relay.subscribe(...args)
+ }
+}
+
+RelayPool.prototype.unsubscribe = function relayPoolUnsubscibe(...args) {
+ for (const relay of this.relays) {
+ relay.unsubscribe(...args)
+ }
+}
+
+RelayPool.prototype.add = function relayPoolAdd(relay) {
+ if (relay instanceof Relay) {
+ if (this.has(relay.url))
+ return false
+
+ this.relays.push(relay)
+ this.setupHandlers()
+ return true
+ }
+
+ if (this.has(relay))
+ return false
+
+ const r = Relay(relay, this.opts)
+ this.relays.push(r)
+ this.setupHandlers()
return true
}
@@ -22,26 +100,81 @@ function Relay(relay, opts={})
if (!(this instanceof Relay))
return new Relay(relay, opts)
- this.relay = relay
+ this.url = relay
this.opts = opts
+ if (opts.reconnect == null)
+ opts.reconnect = true
+
const me = this
+ me.onfn = {}
+
+ init_websocket(me)
+
+ return this
+}
+
+function init_websocket(me) {
+ const ws = me.ws = new WS(me.url);
return new Promise((resolve, reject) => {
- const ws = me.ws = new WebSocket(relay);
let resolved = false
ws.onmessage = (m) => { handle_nostr_message(me, m) }
- ws.onclose = () => { me.close && me.close() }
- ws.onerror = () => { me.error && me.error() }
+ ws.onclose = () => {
+ if (me.onfn.close)
+ me.onfn.close()
+ if (me.reconnecting)
+ return reject(new Error("close during reconnect"))
+ if (!me.manualClose && me.opts.reconnect)
+ reconnect(me)
+ }
+ ws.onerror = () => {
+ if (me.onfn.error)
+ me.onfn.error()
+ if (me.reconnecting)
+ return reject(new Error("error during reconnect"))
+ if (me.opts.reconnect)
+ reconnect(me)
+ }
ws.onopen = () => {
- if (resolved) {
- me.open.bind(me)
- return
- }
+ if (me.onfn.open)
+ me.onfn.open()
+
+ if (resolved) return
resolved = true
resolve(me)
}
- })
+ });
+}
+
+function sleep(ms) {
+ return new Promise(resolve => setTimeout(resolve, ms));
+}
+
+async function reconnect(me)
+{
+ const reconnecting = true
+ let n = 100
+ try {
+ me.reconnecting = true
+ await init_websocket(me)
+ me.reconnecting = false
+ } catch {
+ console.error(`error thrown during reconnect... trying again in ${n} ms`)
+ await sleep(n)
+ n *= 1.5
+ }
+}
+
+Relay.prototype.on = function relayOn(method, fn) {
+ this.onfn[method] = fn
+}
+
+Relay.prototype.close = function relayClose() {
+ if (this.ws) {
+ this.manualClose = true
+ this.ws.close()
+ }
}
Relay.prototype.subscribe = function relay_subscribe(sub_id, ...filters) {
@@ -62,12 +195,17 @@ function handle_nostr_message(relay, msg)
case "EVENT":
if (data.length < 3)
return
- return relay.event && relay.event(data[1], data[2])
+ return relay.onfn.event && relay.onfn.event(data[1], data[2])
case "EOSE":
- return relay.eose && relay.eose(data[1])
+ return relay.onfn.eose && relay.onfn.eose(data[1])
case "NOTICE":
- return relay.note && relay.note(...data.slice(1))
+ return relay.onfn.notice && relay.onfn.notice(...data.slice(1))
}
}
}
+return RelayPool
+})()
+
+if (typeof module !== 'undefined' && module.exports)
+ module.exports = RelayPool