Skip to content

Instantly share code, notes, and snippets.

@palkan
Last active February 17, 2024 00:18
Show Gist options
  • Save palkan/a07581cf90fdd674169168122d3b5d33 to your computer and use it in GitHub Desktop.
Save palkan/a07581cf90fdd674169168122d3b5d33 to your computer and use it in GitHub Desktop.
Turbo Presence
<!-- index.html.erb -->
<div id="messages">
<%= render "channels/presence", channel: %>
<%= turbo_presence_stream_from channel, params: {channel_id: channel.id, model: Channel}, presence: channel.id %>
<%= render messages %>
</div>
<!-- _presence.html.erb -->
<div id="presence">
👀 <%= channel.online_users.size %>
</div>
class Channel < ApplicationRecord
def self.turbo_broadcast_presence(params)
channel = Channel.find(params[:channel_id])
channel.broadcast_replace_to channel, target: "presence",
partial: "channels/presence",
locals: {channel:}
end
def online_users
User.where(id: Turbo::Presence.for(channel.id))
end
end
import { Turbo, cable } from "@hotwired/turbo-rails";
const { connectStreamSource, disconnectStreamSource } = Turbo;
const { subscribeTo } = cable;
class StreamSourceElement extends HTMLElement {
async connectedCallback() {
connectStreamSource(this);
let element = this;
let presenceId;
this.subscription = await subscribeTo(this.channel, {
disconnected() {
this.stopPresence()
},
received(data) {
element.dispatchMessageEvent(data)
},
connected() {
this.startPresence()
},
startPresence() {
presenceId = setInterval(() => this.perform("presence_keepalive"), 60000)
},
stopPresence() {
if (presenceId) clearInterval(presenceId)
}
});
}
disconnectedCallback() {
disconnectStreamSource(this);
if (this.subscription) this.subscription.unsubscribe();
}
dispatchMessageEvent(data) {
const event = new MessageEvent("message", { data });
return this.dispatchEvent(event);
}
get channel() {
const channel = this.getAttribute("channel");
const signed_stream_name = this.getAttribute("signed-stream-name");
let params = {};
const paramsJSON = this.getAttribute("params");
if (paramsJSON) {
params = JSON.parse(paramsJSON);
}
let presence = this.getAttribute("presence");
return { ...params, channel, signed_stream_name, presence };
}
}
customElements.define("turbo-cable-stream-source-presence", StreamSourceElement);
# frozen_string_literal: true
class TurboChannel < Turbo::StreamsChannel
include Turbo::Streams::Presence
end
# frozen_string_literal: true
module Turbo
module Streams
module Presence
extend ActiveSupport::Concern
included do
after_subscribe :add_resence, if: :presence_enabled?
after_unsubscribe :remove_presence, if: :presence_enabled?
end
def presence_enabled?() = params[:presence]
def broadcast_presence
model = params[:model].classify.constantize
model.turbo_broadcast_presence(params)
end
def add_presence
::Presence.add(presence_identifier, connection.presence_user_id)
maybe_expire
broadcast_presence
end
def presence_keepalive
::Presence.touch(presence_identifier, connection.presence_user_id)
end
def remove_presence
::Presence.remove(presence_identifier, connection.presence_user_id)
broadcast_presence
end
def presence_identifier() = params[:presence]
private
def maybe_expire
# Ideally, that should be moved to some background thread,
# which calls .expire periodically.
return false if rand > 0.3
::Presence.expire(presence_identifier)
end
end
end
end
# frozen_string_literal: true
module Turbo
class Presence
class << self
def instance() = (@instance ||= new)
delegate_missing_to :instance
end
private attr_reader :redis, :ttl
PREFIX = "turbopresence"
class RedisRollback < StandardError; end
def initialize(ttl: 5.minutes)
@ttl = ttl
@redis = Redis.new
end
def add(channel_id, user_id)
redis.multi do |r|
r.incr("#{PREFIX}/#{channel_id}/#{user_id}")
r.expire("#{PREFIX}/#{channel_id}/#{user_id}", ttl.seconds.from_now.to_i)
r.zadd("#{PREFIX}/#{channel_id}", Time.now.to_i, user_id.to_s)
end
end
def touch(channel_id, user_id)
redis.zadd("#{PREFIX}/#{channel_id}", Time.now.to_i, user_id.to_s)
end
def remove(channel_id, user_id)
counter_key = "#{PREFIX}/#{channel_id}/#{user_id}"
redis.watch(counter_key) do |wr|
val = wr.get(counter_key).to_i
wr.multi do |r|
r.decr(counter_key)
r.zrem("#{PREFIX}/#{channel_id}", user_id) if val == 1
end.tap do |reply|
raise RedisRollback if reply.nil?
wr.unwatch
end
end
rescue RedisRollback
retry
end
def for(channel_id)
redis.zrange("#{PREFIX}/#{channel_id}", 0, -1)
end
def expire(channel_id)
deadline = ttl.seconds.ago.to_i
# First, retrieve expired ids — we may want to trigger
# leave event for them
expired_ids = redis.zrangebyscore("#{PREFIX}/#{channel_id}", "-inf", deadline)
redis.zremrangebyscore(
"#{PREFIX}/#{channel_id}",
"-inf",
deadline
)
expired_ids
end
end
end
module ApplicationHelper
def turbo_presence_stream_from(*streamables, **attributes)
attributes[:channel] = attributes[:channel]&.to_s || "TurboChannel"
attributes[:"signed-stream-name"] = Turbo::StreamsChannel.signed_stream_name(streamables)
attributes[:params] = attributes[:params]&.to_json
tag.turbo_cable_stream_source_presence(**attributes)
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment