-
-
Save palkan/a07581cf90fdd674169168122d3b5d33 to your computer and use it in GitHub Desktop.
Turbo Presence
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<!-- index.html.erb --> | |
<div id="messages"> | |
<%= render "channels/presence", channel: %> | |
<%= turbo_presence_stream_from channel, params: {channel_id: channel.id, model: Channel}, presence: channel.id %> | |
<%= render messages %> | |
</div> | |
<!-- _presence.html.erb --> | |
<div id="presence"> | |
👀 <%= channel.online_users.size %> | |
</div> |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class Channel < ApplicationRecord | |
def self.turbo_broadcast_presence(params) | |
channel = Channel.find(params[:channel_id]) | |
channel.broadcast_replace_to channel, target: "presence", | |
partial: "channels/presence", | |
locals: {channel:} | |
end | |
def online_users | |
User.where(id: Turbo::Presence.for(channel.id)) | |
end | |
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { Turbo, cable } from "@hotwired/turbo-rails"; | |
const { connectStreamSource, disconnectStreamSource } = Turbo; | |
const { subscribeTo } = cable; | |
class StreamSourceElement extends HTMLElement { | |
async connectedCallback() { | |
connectStreamSource(this); | |
let element = this; | |
let presenceId; | |
this.subscription = await subscribeTo(this.channel, { | |
disconnected() { | |
this.stopPresence() | |
}, | |
received(data) { | |
element.dispatchMessageEvent(data) | |
}, | |
connected() { | |
this.startPresence() | |
}, | |
startPresence() { | |
presenceId = setInterval(() => this.perform("presence_keepalive"), 60000) | |
}, | |
stopPresence() { | |
if (presenceId) clearInterval(presenceId) | |
} | |
}); | |
} | |
disconnectedCallback() { | |
disconnectStreamSource(this); | |
if (this.subscription) this.subscription.unsubscribe(); | |
} | |
dispatchMessageEvent(data) { | |
const event = new MessageEvent("message", { data }); | |
return this.dispatchEvent(event); | |
} | |
get channel() { | |
const channel = this.getAttribute("channel"); | |
const signed_stream_name = this.getAttribute("signed-stream-name"); | |
let params = {}; | |
const paramsJSON = this.getAttribute("params"); | |
if (paramsJSON) { | |
params = JSON.parse(paramsJSON); | |
} | |
let presence = this.getAttribute("presence"); | |
return { ...params, channel, signed_stream_name, presence }; | |
} | |
} | |
customElements.define("turbo-cable-stream-source-presence", StreamSourceElement); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# frozen_string_literal: true | |
class TurboChannel < Turbo::StreamsChannel | |
include Turbo::Streams::Presence | |
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# frozen_string_literal: true | |
module Turbo | |
module Streams | |
module Presence | |
extend ActiveSupport::Concern | |
included do | |
after_subscribe :add_resence, if: :presence_enabled? | |
after_unsubscribe :remove_presence, if: :presence_enabled? | |
end | |
def presence_enabled?() = params[:presence] | |
def broadcast_presence | |
model = params[:model].classify.constantize | |
model.turbo_broadcast_presence(params) | |
end | |
def add_presence | |
::Presence.add(presence_identifier, connection.presence_user_id) | |
maybe_expire | |
broadcast_presence | |
end | |
def presence_keepalive | |
::Presence.touch(presence_identifier, connection.presence_user_id) | |
end | |
def remove_presence | |
::Presence.remove(presence_identifier, connection.presence_user_id) | |
broadcast_presence | |
end | |
def presence_identifier() = params[:presence] | |
private | |
def maybe_expire | |
# Ideally, that should be moved to some background thread, | |
# which calls .expire periodically. | |
return false if rand > 0.3 | |
::Presence.expire(presence_identifier) | |
end | |
end | |
end | |
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# frozen_string_literal: true | |
module Turbo | |
class Presence | |
class << self | |
def instance() = (@instance ||= new) | |
delegate_missing_to :instance | |
end | |
private attr_reader :redis, :ttl | |
PREFIX = "turbopresence" | |
class RedisRollback < StandardError; end | |
def initialize(ttl: 5.minutes) | |
@ttl = ttl | |
@redis = Redis.new | |
end | |
def add(channel_id, user_id) | |
redis.multi do |r| | |
r.incr("#{PREFIX}/#{channel_id}/#{user_id}") | |
r.expire("#{PREFIX}/#{channel_id}/#{user_id}", ttl.seconds.from_now.to_i) | |
r.zadd("#{PREFIX}/#{channel_id}", Time.now.to_i, user_id.to_s) | |
end | |
end | |
def touch(channel_id, user_id) | |
redis.zadd("#{PREFIX}/#{channel_id}", Time.now.to_i, user_id.to_s) | |
end | |
def remove(channel_id, user_id) | |
counter_key = "#{PREFIX}/#{channel_id}/#{user_id}" | |
redis.watch(counter_key) do |wr| | |
val = wr.get(counter_key).to_i | |
wr.multi do |r| | |
r.decr(counter_key) | |
r.zrem("#{PREFIX}/#{channel_id}", user_id) if val == 1 | |
end.tap do |reply| | |
raise RedisRollback if reply.nil? | |
wr.unwatch | |
end | |
end | |
rescue RedisRollback | |
retry | |
end | |
def for(channel_id) | |
redis.zrange("#{PREFIX}/#{channel_id}", 0, -1) | |
end | |
def expire(channel_id) | |
deadline = ttl.seconds.ago.to_i | |
# First, retrieve expired ids — we may want to trigger | |
# leave event for them | |
expired_ids = redis.zrangebyscore("#{PREFIX}/#{channel_id}", "-inf", deadline) | |
redis.zremrangebyscore( | |
"#{PREFIX}/#{channel_id}", | |
"-inf", | |
deadline | |
) | |
expired_ids | |
end | |
end | |
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
module ApplicationHelper | |
def turbo_presence_stream_from(*streamables, **attributes) | |
attributes[:channel] = attributes[:channel]&.to_s || "TurboChannel" | |
attributes[:"signed-stream-name"] = Turbo::StreamsChannel.signed_stream_name(streamables) | |
attributes[:params] = attributes[:params]&.to_json | |
tag.turbo_cable_stream_source_presence(**attributes) | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment