Compare commits

...

8 Commits

Author SHA1 Message Date
Abelfreyja
0eea339e41 adds the ability for shards to sync with the main server regarding their caches, when pulling files from main server the shard can stream it to the client directly while downloading and add info for server to report to client regarding file locations across shards 2026-01-14 16:52:06 +09:00
e8c56bb3bc Merge pull request 'Updated links for Xivapi sources' (#54) from xivapi-link-changes into master
Reviewed-on: #54
2026-01-05 12:03:09 +00:00
8e0dcc6792 Updated links for Xivapi sources 2026-01-05 12:01:53 +00:00
ac4174f6e0 Fix moderator status on broadcast (#51)
Co-authored-by: cake <admin@cakeandbanana.nl>
Reviewed-on: #51
Co-authored-by: cake <cake@noreply.git.lightless-sync.org>
Co-committed-by: cake <cake@noreply.git.lightless-sync.org>
2026-01-04 05:14:37 +00:00
9fffaf7df2 fix-prune (#50)
Co-authored-by: Tsubasahane <wozaiha@gmail.com>
Co-authored-by: cake <admin@cakeandbanana.nl>
Reviewed-on: #50
Co-authored-by: cake <cake@noreply.git.lightless-sync.org>
Co-committed-by: cake <cake@noreply.git.lightless-sync.org>
2026-01-04 05:14:22 +00:00
ca0c548373 AddBan changes for ban admin panel rework (#52)
Co-authored-by: cake <admin@cakeandbanana.nl>
Reviewed-on: #52
Co-authored-by: cake <cake@noreply.git.lightless-sync.org>
Co-committed-by: cake <cake@noreply.git.lightless-sync.org>
2026-01-04 05:12:00 +00:00
1716750347 Location Sharing (#49)
Authored-by: Tsubasahane <wozaiha@gmail.com>
Reviewed-on: #49
Co-authored-by: Tsubasa <tsubasa@noreply.git.lightless-sync.org>
Co-committed-by: Tsubasa <tsubasa@noreply.git.lightless-sync.org>
2025-12-31 17:32:00 +00:00
d39a922482 Merge pull request 'Trying a different method of setting access time.' (#48) from cache-test-fix into master
Reviewed-on: #48
2025-12-23 17:14:08 +00:00
23 changed files with 2609 additions and 147 deletions

5
.gitignore vendored
View File

@@ -350,4 +350,7 @@ MigrationBackup/
.ionide/
# docker run data
Docker/run/data/
Docker/run/data/
#idea files
*.idea

View File

@@ -40,5 +40,6 @@ namespace LightlessSyncServer.Hubs
public Task Client_GposeLobbyPushPoseData(UserData userData, PoseData poseData) => throw new PlatformNotSupportedException("Calling clientside method on server not supported");
public Task Client_GposeLobbyPushWorldData(UserData userData, WorldData worldData) => throw new PlatformNotSupportedException("Calling clientside method on server not supported");
public Task Client_ChatReceive(ChatMessageDto message) => throw new PlatformNotSupportedException("Calling clientside method on server not supported");
public Task Client_SendLocationToClient(LocationDto locationDto, DateTimeOffset expireAt) => throw new PlatformNotSupportedException("Calling clientside method on server not supported");
}
}

View File

@@ -28,25 +28,72 @@ public partial class LightlessHub
var (userHasRights, group) = await TryValidateGroupModeratorOrOwner(dto.Group.GID).ConfigureAwait(false);
if (!userHasRights) return;
var (userExists, groupPair) = await TryValidateUserInGroup(dto.Group.GID, dto.User.UID).ConfigureAwait(false);
if (!userExists) return;
var targetUid = dto.User.UID?.Trim();
if (string.IsNullOrWhiteSpace(targetUid)) return;
if (groupPair.IsModerator || string.Equals(group.OwnerUID, dto.User.UID, StringComparison.Ordinal)) return;
if (string.Equals(group.OwnerUID, targetUid, StringComparison.Ordinal))
return;
var alias = string.IsNullOrEmpty(groupPair.GroupUser.Alias) ? "-" : groupPair.GroupUser.Alias;
var ban = new GroupBan()
var groupPair = await DbContext.GroupPairs
.Include(p => p.GroupUser)
.SingleOrDefaultAsync(p => p.GroupGID == dto.Group.GID && p.GroupUserUID == targetUid, cancellationToken: RequestAbortedToken).ConfigureAwait(false);
if (groupPair?.IsModerator == true)
return;
var now = DateTime.UtcNow;
var existingBan = await DbContext.Set<GroupBan>().SingleOrDefaultAsync(b => b.GroupGID == dto.Group.GID && b.BannedUserUID == targetUid, cancellationToken: RequestAbortedToken).ConfigureAwait(false);
var userExists = await DbContext.Users.AsNoTracking().AnyAsync(u => u.UID == targetUid || u.Alias == targetUid, RequestAbortedToken).ConfigureAwait(false);
if (!userExists && existingBan == null)
return;
const string marker = " (Alias at time of ban:";
string suffix;
if (existingBan?.BannedReason is { } existingReason)
{
BannedByUID = UserUID,
BannedReason = $"{reason} (Alias at time of ban: {alias})",
BannedOn = DateTime.UtcNow,
BannedUserUID = dto.User.UID,
GroupGID = dto.Group.GID,
};
var idx = existingReason.IndexOf(marker, StringComparison.Ordinal);
suffix = idx >= 0 ? existingReason.Substring(startIndex: idx) : string.Empty;
}
else
{
var alias = groupPair?.GroupUser?.Alias;
alias = string.IsNullOrWhiteSpace(alias) ? "-" : alias;
suffix = $" (Alias at time of ban: {alias})";
}
DbContext.Add(ban);
await DbContext.SaveChangesAsync().ConfigureAwait(false);
var baseReason = (reason ?? string.Empty).Trim();
var finalReason = string.IsNullOrEmpty(suffix) ? baseReason : (baseReason + suffix);
await GroupRemoveUser(dto).ConfigureAwait(false);
if (existingBan != null)
{
existingBan.BannedByUID = UserUID;
existingBan.BannedReason = finalReason;
DbContext.Update(existingBan);
}
else
{
var ban = new GroupBan
{
BannedByUID = UserUID,
BannedReason = finalReason,
BannedOn = now,
BannedUserUID = targetUid,
GroupGID = dto.Group.GID,
};
DbContext.Add(ban);
}
await DbContext.SaveChangesAsync(RequestAbortedToken).ConfigureAwait(false);
if (groupPair != null)
{
await GroupRemoveUser(dto).ConfigureAwait(false);
}
_logger.LogCallInfo(LightlessHubLogger.Args(dto, "Success"));
}
@@ -326,7 +373,7 @@ public partial class LightlessHub
await Clients.User(UserUID).Client_GroupSendFullInfo(new GroupFullInfoDto(newGroup.ToGroupData(), self.ToUserData(),
newGroup.ToEnum(), initialPrefPermissions.ToEnum(), initialPair.ToEnum(), new(StringComparer.Ordinal), 1))
.ConfigureAwait(false);
_logger.LogCallInfo(LightlessHubLogger.Args(gid));
return new GroupJoinDto(newGroup.ToGroupData(), passwd, initialPrefPermissions.ToEnum());
@@ -400,9 +447,9 @@ public partial class LightlessHub
var banEntries = await DbContext.GroupBans.Include(b => b.BannedUser).Where(g => g.GroupGID == dto.Group.GID).AsNoTracking().ToListAsync(cancellationToken: RequestAbortedToken).ConfigureAwait(false);
List<BannedGroupUserDto> bannedGroupUsers = banEntries.Select(b =>
List<BannedGroupUserDto> bannedGroupUsers = [.. banEntries.Select(b =>
new BannedGroupUserDto(group.ToGroupData(), b.BannedUser.ToUserData(), b.BannedReason, b.BannedOn,
b.BannedByUID)).ToList();
b.BannedByUID))];
_logger.LogCallInfo(LightlessHubLogger.Args(dto, bannedGroupUsers.Count));
@@ -831,7 +878,7 @@ public partial class LightlessHub
}
var data = await DbContext.GroupProfiles
.Include(gp => gp.Group)
.Include(gp => gp.Group)
.FirstOrDefaultAsync(
g => g.Group.GID == dto.Group.GID || g.Group.Alias == dto.Group.AliasOrGID,
cancellationToken
@@ -862,85 +909,85 @@ public partial class LightlessHub
[Authorize(Policy = "Identified")]
public async Task GroupSetProfile(GroupProfileDto dto)
{
_logger.LogCallInfo(LightlessHubLogger.Args(dto));
_logger.LogCallInfo(LightlessHubLogger.Args(dto));
var cancellationToken = RequestAbortedToken;
var cancellationToken = RequestAbortedToken;
if (dto.Group == null) return;
if (dto.Group == null) return;
var (hasRights, group) = await TryValidateGroupModeratorOrOwner(dto.Group.GID).ConfigureAwait(false);
if (!hasRights) return;
var (hasRights, group) = await TryValidateGroupModeratorOrOwner(dto.Group.GID).ConfigureAwait(false);
if (!hasRights) return;
var groupProfileDb = await DbContext.GroupProfiles
.Include(g => g.Group)
.FirstOrDefaultAsync(g => g.GroupGID == dto.Group.GID, cancellationToken)
var groupProfileDb = await DbContext.GroupProfiles
.Include(g => g.Group)
.FirstOrDefaultAsync(g => g.GroupGID == dto.Group.GID, cancellationToken)
.ConfigureAwait(false);
ImageCheckService.ImageLoadResult profileResult = new();
ImageCheckService.ImageLoadResult bannerResult = new();
//Avatar image validation
if (!string.IsNullOrEmpty(dto.PictureBase64))
{
profileResult = await ImageCheckService.ValidateImageAsync(dto.PictureBase64, banner: false, RequestAbortedToken).ConfigureAwait(false);
if (!profileResult.Success)
{
await Clients.Caller.Client_ReceiveServerMessage(MessageSeverity.Error, profileResult.ErrorMessage).ConfigureAwait(false);
return;
}
}
//Banner image validation
if (!string.IsNullOrEmpty(dto.BannerBase64))
{
bannerResult = await ImageCheckService.ValidateImageAsync(dto.BannerBase64, banner: true, RequestAbortedToken).ConfigureAwait(false);
if (!bannerResult.Success)
{
await Clients.Caller.Client_ReceiveServerMessage(MessageSeverity.Error, bannerResult.ErrorMessage).ConfigureAwait(false);
return;
}
}
var sanitizedProfileImage = profileResult?.Base64Image;
var sanitizedBannerImage = bannerResult?.Base64Image;
if (groupProfileDb == null)
{
groupProfileDb = new GroupProfile
{
GroupGID = dto.Group.GID,
Group = group,
ProfileDisabled = dto.IsDisabled ?? false,
IsNSFW = dto.IsNsfw ?? false,
};
groupProfileDb.UpdateProfileFromDto(dto, sanitizedProfileImage, sanitizedBannerImage);
await DbContext.GroupProfiles.AddAsync(groupProfileDb, cancellationToken).ConfigureAwait(false);
}
else
{
groupProfileDb.Group ??= group;
groupProfileDb.UpdateProfileFromDto(dto, sanitizedProfileImage, sanitizedBannerImage);
}
var userIds = await DbContext.GroupPairs
.Where(p => p.GroupGID == groupProfileDb.GroupGID)
.Select(p => p.GroupUserUID)
.ToListAsync(cancellationToken)
.ConfigureAwait(false);
if (userIds.Count > 0)
{
var profileDto = groupProfileDb.ToDTO();
await Clients.Users(userIds).Client_GroupSendProfile(profileDto)
.ConfigureAwait(false);
}
ImageCheckService.ImageLoadResult profileResult = new();
ImageCheckService.ImageLoadResult bannerResult = new();
//Avatar image validation
if (!string.IsNullOrEmpty(dto.PictureBase64))
{
profileResult = await ImageCheckService.ValidateImageAsync(dto.PictureBase64, banner: false, RequestAbortedToken).ConfigureAwait(false);
if (!profileResult.Success)
{
await Clients.Caller.Client_ReceiveServerMessage(MessageSeverity.Error, profileResult.ErrorMessage).ConfigureAwait(false);
return;
}
}
//Banner image validation
if (!string.IsNullOrEmpty(dto.BannerBase64))
{
bannerResult = await ImageCheckService.ValidateImageAsync(dto.BannerBase64, banner: true, RequestAbortedToken).ConfigureAwait(false);
if (!bannerResult.Success)
{
await Clients.Caller.Client_ReceiveServerMessage(MessageSeverity.Error, bannerResult.ErrorMessage).ConfigureAwait(false);
return;
}
}
var sanitizedProfileImage = profileResult?.Base64Image;
var sanitizedBannerImage = bannerResult?.Base64Image;
if (groupProfileDb == null)
{
groupProfileDb = new GroupProfile
{
GroupGID = dto.Group.GID,
Group = group,
ProfileDisabled = dto.IsDisabled ?? false,
IsNSFW = dto.IsNsfw ?? false,
};
groupProfileDb.UpdateProfileFromDto(dto, sanitizedProfileImage, sanitizedBannerImage);
await DbContext.GroupProfiles.AddAsync(groupProfileDb, cancellationToken).ConfigureAwait(false);
}
else
{
groupProfileDb.Group ??= group;
groupProfileDb.UpdateProfileFromDto(dto, sanitizedProfileImage, sanitizedBannerImage);
}
var userIds = await DbContext.GroupPairs
.Where(p => p.GroupGID == groupProfileDb.GroupGID)
.Select(p => p.GroupUserUID)
.ToListAsync(cancellationToken)
.ConfigureAwait(false);
if (userIds.Count > 0)
{
var profileDto = groupProfileDb.ToDTO();
await Clients.Users(userIds).Client_GroupSendProfile(profileDto)
.ConfigureAwait(false);
}
await DbContext.SaveChangesAsync(cancellationToken).ConfigureAwait(false);
await DbContext.SaveChangesAsync(cancellationToken).ConfigureAwait(false);
}
[Authorize(Policy = "Identified")]
@@ -1103,11 +1150,11 @@ public partial class LightlessHub
return false;
}
var (isOwner, _) = await TryValidateOwner(dto.GID).ConfigureAwait(false);
if (!isOwner)
var (isOwnerOrMod, _) = await TryValidateGroupModeratorOrOwner(dto.GID).ConfigureAwait(false);
if (!isOwnerOrMod)
{
_logger.LogCallWarning(LightlessHubLogger.Args("Unauthorized syncshell broadcast change", "User", UserUID, "GID", dto.GID));
await Clients.Caller.Client_ReceiveServerMessage(MessageSeverity.Error, "You must be the owner of the syncshell to broadcast it.");
await Clients.Caller.Client_ReceiveServerMessage(MessageSeverity.Error, "You must be the owner or moderator of the syncshell to broadcast it.");
return false;
}

View File

@@ -12,8 +12,6 @@ using LightlessSyncShared.Utils;
using Microsoft.AspNetCore.Authorization;
using Microsoft.AspNetCore.SignalR;
using Microsoft.EntityFrameworkCore;
using SixLabors.ImageSharp;
using SixLabors.ImageSharp.PixelFormats;
using StackExchange.Redis;
using System.Text;
using System.Text.Json;
@@ -1232,6 +1230,120 @@ public partial class LightlessHub
errorMessage = string.Empty;
return true;
}
[Authorize(Policy = "Identified")]
public async Task UpdateLocation(LocationDto dto, bool offline = false)
{
_logger.LogCallInfo(LightlessHubLogger.Args(UserUID,dto));
if (string.IsNullOrEmpty(dto.User.UID))
{
_logger.LogCallWarning(LightlessHubLogger.Args("LocationDto with no userinfo :",UserUID, dto));
return;
}
if (!string.Equals(UserUID, dto.User.UID, StringComparison.Ordinal))
{
_logger.LogCallWarning(LightlessHubLogger.Args("LocationDto with another UID :",UserUID, dto));
return;
}
var key = $"Location:{UserUID}";
if (offline)
{
var allUsers = await GetSharingUsers().ConfigureAwait(false);
await _redis.RemoveAsync(key, CommandFlags.FireAndForget).ConfigureAwait(false);
await Clients.Users(allUsers.Keys).Client_SendLocationToClient(dto, DateTimeOffset.MinValue).ConfigureAwait(false);
}
else
{
var currentLocation = await _redis.GetAsync<LocationDto>(key).ConfigureAwait(false);
if (currentLocation != dto)
{
var allUsers = await GetSharingUsers().ConfigureAwait(false);
await _redis.AddAsync(key, dto).ConfigureAwait(false);
var sendTasks = allUsers.Select(pair => Clients.User(pair.Key).Client_SendLocationToClient(dto, pair.Value));
await Task.WhenAll(sendTasks).ConfigureAwait(false);
}
}
}
private async Task<Dictionary<string, DateTimeOffset>> GetSharingUsers()
{
return await DbContext.Permissions.AsNoTracking()
.Where(x => x.UserUID == UserUID && x.ShareLocationUntil > DateTimeOffset.UtcNow)
.ToDictionaryAsync(x => x.OtherUserUID, x => x.ShareLocationUntil, RequestAbortedToken)
.ConfigureAwait(false);
}
[Authorize(Policy = "Identified")]
public async Task<(List<LocationWithTimeDto>, List<SharingStatusDto>)> RequestAllLocationInfo()
{
_logger.LogCallInfo();
var locationWithTime = await GetLocationWithTime().ConfigureAwait(false);
var sharingStatus = await GetSharingStatus().ConfigureAwait(false);
return (locationWithTime, sharingStatus);
}
private async Task<List<LocationWithTimeDto>> GetLocationWithTime()
{
var dictionary = await DbContext.Permissions.AsNoTracking()
.Where(x => x.OtherUserUID == UserUID && x.ShareLocationUntil > DateTimeOffset.UtcNow)
.ToDictionaryAsync(x => x.UserUID, x => x.ShareLocationUntil, cancellationToken: RequestAbortedToken)
.ConfigureAwait(false);
if (dictionary.Count == 0)
{
return [];
}
var redisKeys = dictionary.Keys.Select(uid => $"Location:{uid}").ToHashSet(StringComparer.Ordinal);
var data = await _redis.GetAllAsync<LocationDto>(redisKeys).ConfigureAwait(false);
var result = new List<LocationWithTimeDto>();
foreach (var (userUid, expireAt) in dictionary)
{
var redisKey = $"Location:{userUid}";
if (data.TryGetValue(redisKey, out var locationDto) && locationDto is not null)
{
result.Add(new LocationWithTimeDto(locationDto, expireAt));
}
}
return result;
}
private async Task<List<SharingStatusDto>> GetSharingStatus()
{
return await DbContext.Permissions.AsNoTracking()
.Where(x => x.UserUID == UserUID && x.ShareLocationUntil > DateTimeOffset.UtcNow)
.Select(x => new SharingStatusDto(new UserData(x.OtherUserUID), x.ShareLocationUntil))
.ToListAsync(cancellationToken: RequestAbortedToken).ConfigureAwait(false);
}
[Authorize(Policy = "Identified")]
public async Task<bool> ToggleLocationSharing(LocationSharingToggleDto dto)
{
_logger.LogCallInfo(LightlessHubLogger.Args(UserUID,dto));
try
{
await DbContext.Permissions.Where(x => x.UserUID == UserUID && dto.users.Contains(x.OtherUserUID))
.ExecuteUpdateAsync(setter =>
setter.SetProperty(x => x.ShareLocationUntil, dto.duration.ToUniversalTime()),
cancellationToken: RequestAbortedToken).ConfigureAwait(false);
//update user's location for target users
var currentLocation = await _redis.GetAsync<LocationDto>($"Location:{UserUID}").ConfigureAwait(false);
await Clients.Users(dto.users).Client_SendLocationToClient(currentLocation, dto.duration.ToUniversalTime())
.ConfigureAwait(false);
}
catch (Exception ex)
{
_logger.LogError(ex, "ToggleLocationSharing error:");
return false;
}
return true;
}
[GeneratedRegex(@"^([a-z0-9_ '+&,\.\-\{\}]+\/)+([a-z0-9_ '+&,\.\-\{\}]+\.[a-z]{3,4})$", RegexOptions.IgnoreCase | RegexOptions.Compiled | RegexOptions.ECMAScript)]
private static partial Regex GamePathRegex();

View File

@@ -16,6 +16,8 @@ using Microsoft.AspNetCore.SignalR;
using Microsoft.EntityFrameworkCore;
using StackExchange.Redis.Extensions.Core.Abstractions;
using System.Collections.Concurrent;
using LightlessSync.API.Dto.CharaData;
using LightlessSync.API.Dto.User;
using LightlessSyncServer.Services.Interfaces;
namespace LightlessSyncServer.Hubs;
@@ -217,6 +219,8 @@ public partial class LightlessHub : Hub<ILightlessHub>, ILightlessHub
await RemoveUserFromRedis().ConfigureAwait(false);
_lightlessCensus.ClearStatistics(UserUID);
await UpdateLocation(new LocationDto(new UserData(UserUID), new LocationInfo()), offline: true).ConfigureAwait(false);
await SendOfflineToAllPairedUsers().ConfigureAwait(false);

View File

@@ -72,7 +72,7 @@ public class LightlessCensus : IHostedService
Dictionary<ushort, short> worldDcs = new();
var dcs = await client.GetStringAsync("https://raw.githubusercontent.com/xivapi/ffxiv-datamining/master/csv/WorldDCGroupType.csv", cancellationToken).ConfigureAwait(false);
var dcs = await client.GetStringAsync("https://raw.githubusercontent.com/xivapi/ffxiv-datamining/master/csv/en/WorldDCGroupType.csv", cancellationToken).ConfigureAwait(false);
// dc: https://raw.githubusercontent.com/xivapi/ffxiv-datamining/master/csv/WorldDCGroupType.csv
// id, name, region
@@ -92,7 +92,7 @@ public class LightlessCensus : IHostedService
_dcs[id] = name;
}
var worlds = await client.GetStringAsync("https://raw.githubusercontent.com/xivapi/ffxiv-datamining/master/csv/World.csv", cancellationToken).ConfigureAwait(false);
var worlds = await client.GetStringAsync("https://raw.githubusercontent.com/xivapi/ffxiv-datamining/master/csv/en/World.csv", cancellationToken).ConfigureAwait(false);
// world: https://raw.githubusercontent.com/xivapi/ffxiv-datamining/master/csv/World.csv
// id, internalname, name, region, usertype, datacenter, ispublic
@@ -114,7 +114,7 @@ public class LightlessCensus : IHostedService
_logger.LogInformation("World: ID: {id}, Name: {name}, DC: {dc}", id, name, dc);
}
var races = await client.GetStringAsync("https://raw.githubusercontent.com/xivapi/ffxiv-datamining/master/csv/Race.csv", cancellationToken).ConfigureAwait(false);
var races = await client.GetStringAsync("https://raw.githubusercontent.com/xivapi/ffxiv-datamining/master/csv/en/Race.csv", cancellationToken).ConfigureAwait(false);
// race: https://raw.githubusercontent.com/xivapi/ffxiv-datamining/master/csv/Race.csv
// id, masc name, fem name, other crap I don't care about
@@ -134,7 +134,7 @@ public class LightlessCensus : IHostedService
_logger.LogInformation("Race: ID: {id}, Name: {name}", id, name);
}
var tribe = await client.GetStringAsync("https://raw.githubusercontent.com/xivapi/ffxiv-datamining/master/csv/Tribe.csv", cancellationToken).ConfigureAwait(false);
var tribe = await client.GetStringAsync("https://raw.githubusercontent.com/xivapi/ffxiv-datamining/master/csv/en/Tribe.csv", cancellationToken).ConfigureAwait(false);
// tribe: https://raw.githubusercontent.com/xivapi/ffxiv-datamining/master/csv/Tribe.csv
// id masc name, fem name, other crap I don't care about

View File

@@ -2,15 +2,19 @@
using LightlessSyncShared.Data;
using LightlessSyncShared.Models;
using Microsoft.EntityFrameworkCore;
using StackExchange.Redis.Extensions.Core.Abstractions;
namespace LightlessSyncServer.Services
{
public class PruneService(LightlessDbContext dbContext) : IPruneService
public class PruneService(LightlessDbContext dbContext, IRedisDatabase redis) : IPruneService
{
private readonly LightlessDbContext _dbContext = dbContext;
private readonly IRedisDatabase _redis = redis;
public async Task<int> CountPrunableUsersAsync(string groupGid, int days, CancellationToken ct)
{
var onlineUids = await GetOnlineUidsAsync().ConfigureAwait(false);
var allGroupUsers = await _dbContext.GroupPairs
.Include(p => p.GroupUser)
.Include(p => p.Group)
@@ -20,17 +24,14 @@ namespace LightlessSyncServer.Services
var inactivitySpan = GetInactivitySpan(days);
var now = DateTime.UtcNow;
var usersToPrune = allGroupUsers.Where(p =>
!p.IsPinned &&
!p.IsModerator &&
!string.Equals(p.Group.OwnerUID, p.GroupUserUID, StringComparison.Ordinal) &&
p.GroupUser.LastLoggedIn < now - inactivitySpan);
return usersToPrune.Count();
var usersToPrune = GetPruneUserList(allGroupUsers, onlineUids, inactivitySpan, now);
return usersToPrune.Count;
}
public async Task<IReadOnlyList<GroupPair>> ExecutePruneAsync(string groupGid, int days, CancellationToken ct)
{
var onlineUids = await GetOnlineUidsAsync().ConfigureAwait(false);
var allGroupUsers = await _dbContext.GroupPairs
.Include(p => p.GroupUser)
.Include(p => p.Group)
@@ -40,12 +41,7 @@ namespace LightlessSyncServer.Services
var inactivitySpan = GetInactivitySpan(days);
var now = DateTime.UtcNow;
var usersToPrune = allGroupUsers.Where(p =>
!p.IsPinned &&
!p.IsModerator &&
!string.Equals(p.Group.OwnerUID, p.GroupUserUID, StringComparison.Ordinal) &&
p.GroupUser.LastLoggedIn < now - inactivitySpan)
.ToList();
var usersToPrune = GetPruneUserList(allGroupUsers, onlineUids, inactivitySpan, now);
_dbContext.GroupPairs.RemoveRange(usersToPrune);
await _dbContext.SaveChangesAsync(ct).ConfigureAwait(false);
@@ -53,8 +49,52 @@ namespace LightlessSyncServer.Services
return usersToPrune;
}
private static TimeSpan GetInactivitySpan(int days) => days == 0
? TimeSpan.FromMinutes(15)
: TimeSpan.FromDays(days);
private static List<GroupPair> GetPruneUserList(
List<GroupPair> allGroupUsers,
HashSet<string> onlineUids,
TimeSpan inactivitySpan,
DateTime now)
{
return
[
.. allGroupUsers.Where(p =>
!p.IsPinned &&
!p.IsModerator &&
!string.Equals(p.Group.OwnerUID, p.GroupUserUID, StringComparison.Ordinal) &&
!onlineUids.Contains(p.GroupUserUID) &&
p.GroupUser.LastLoggedIn < now - inactivitySpan),
];
}
private async Task<HashSet<string>> GetOnlineUidsAsync()
{
var keys = await _redis.SearchKeysAsync("UID:*").ConfigureAwait(false);
var set = new HashSet<string>(StringComparer.Ordinal);
foreach (var k in keys)
{
if (string.IsNullOrEmpty(k)) continue;
const string prefix = "UID:";
if (k.StartsWith(prefix, StringComparison.Ordinal))
{
var uid = k.Substring(prefix.Length);
if (!string.IsNullOrEmpty(uid))
set.Add(uid);
}
else
{
var idx = k.IndexOf(':', StringComparison.Ordinal);
if (idx >= 0 && idx < k.Length - 1)
set.Add(k[(idx + 1)..]);
}
}
return set;
}
private static TimeSpan GetInactivitySpan(int days) =>
days == 0 ? TimeSpan.FromHours(2) : TimeSpan.FromDays(days);
}
}
}

View File

@@ -24,7 +24,7 @@ namespace LightlessSyncServer.Worker
var hubContext = scope.ServiceProvider.GetRequiredService<IHubContext<LightlessHub>>();
var groups = await db.Groups
.Where(g => g.AutoPruneEnabled && g.AutoPruneDays > 0)
.Where(g => g.AutoPruneEnabled)
.ToListAsync(stoppingToken).ConfigureAwait(false);
foreach (var group in groups)

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,30 @@
using System;
using Microsoft.EntityFrameworkCore.Migrations;
#nullable disable
namespace LightlessSyncServer.Migrations
{
/// <inheritdoc />
public partial class ShareLocation : Migration
{
/// <inheritdoc />
protected override void Up(MigrationBuilder migrationBuilder)
{
migrationBuilder.AddColumn<DateTimeOffset>(
name: "share_location_until",
table: "user_permission_sets",
type: "timestamp with time zone",
nullable: false,
defaultValue: new DateTimeOffset(new DateTime(1, 1, 1, 0, 0, 0, 0, DateTimeKind.Unspecified), new TimeSpan(0, 0, 0, 0, 0)));
}
/// <inheritdoc />
protected override void Down(MigrationBuilder migrationBuilder)
{
migrationBuilder.DropColumn(
name: "share_location_until",
table: "user_permission_sets");
}
}
}

View File

@@ -941,6 +941,10 @@ namespace LightlessSyncServer.Migrations
.HasColumnType("boolean")
.HasColumnName("is_paused");
b.Property<DateTimeOffset>("ShareLocationUntil")
.HasColumnType("timestamp with time zone")
.HasColumnName("share_location_until");
b.Property<bool>("Sticky")
.HasColumnType("boolean")
.HasColumnName("sticky");

View File

@@ -0,0 +1,14 @@
namespace LightlessSyncShared.Models;
public sealed record ShardFileInventoryUpdateDto
{
public long Sequence { get; init; }
public bool IsFullSnapshot { get; init; }
public List<string> Added { get; init; } = new();
public List<string> Removed { get; init; } = new();
}
public sealed record ShardFileInventoryUpdateAckDto
{
public long AppliedSequence { get; init; }
}

View File

@@ -15,4 +15,5 @@ public class UserPermissionSet
public bool DisableAnimations { get; set; } = false;
public bool DisableVFX { get; set; } = false;
public bool DisableSounds { get; set; } = false;
public DateTimeOffset ShareLocationUntil { get; set; } = DateTimeOffset.MinValue;
}

View File

@@ -1,6 +1,7 @@
using LightlessSync.API.Routes;
using LightlessSyncShared.Utils.Configuration;
using LightlessSyncStaticFilesServer.Services;
using LightlessSyncShared.Models;
using Microsoft.AspNetCore.Authorization;
using Microsoft.AspNetCore.Mvc;
@@ -63,4 +64,19 @@ public class MainController : ControllerBase
return BadRequest();
}
}
[HttpPost(LightlessFiles.Main_ShardFiles)]
public IActionResult ShardFilesUpdate([FromBody] ShardFileInventoryUpdateDto update)
{
try
{
var applied = _shardRegistrationService.ApplyFileInventoryUpdate(LightlessUser, update);
return Ok(new ShardFileInventoryUpdateAckDto { AppliedSequence = applied });
}
catch (Exception ex)
{
_logger.LogWarning(ex, "Shard file inventory update failed: {shard}", LightlessUser);
return BadRequest();
}
}
}

View File

@@ -82,7 +82,9 @@ public class ServerFilesController : ControllerBase
}
[HttpGet(LightlessFiles.ServerFiles_GetSizes)]
public async Task<IActionResult> FilesGetSizes([FromBody] List<string> hashes)
public async Task<IActionResult> FilesGetSizes(
[FromBody] List<string> hashes,
[FromQuery(Name = "avoidHost")] List<string>? avoidHosts = null)
{
using var dbContext = await _lightlessDbContext.CreateDbContextAsync();
var forbiddenFiles = await dbContext.ForbiddenUploadEntries.
@@ -94,27 +96,97 @@ public class ServerFilesController : ControllerBase
.Select(k => new { k.Hash, k.Size, k.RawSize })
.ToListAsync().ConfigureAwait(false);
var allFileShards = _shardRegistrationService.GetConfigurationsByContinent(Continent);
var avoidHostSet = new HashSet<string>(StringComparer.OrdinalIgnoreCase);
if (avoidHosts != null)
{
foreach (var host in avoidHosts)
{
if (!string.IsNullOrWhiteSpace(host))
{
avoidHostSet.Add(host);
}
}
}
var allFileShards = _shardRegistrationService.GetShardEntriesByContinent(Continent);
var shardContexts = new List<ShardSelectionContext>(allFileShards.Count);
foreach (var shard in allFileShards)
{
shardContexts.Add(new ShardSelectionContext(
shard.ShardName,
shard.Config,
new Regex(shard.Config.FileMatch, RegexOptions.Compiled)));
}
foreach (var file in cacheFile)
{
var forbiddenFile = forbiddenFiles.SingleOrDefault(f => string.Equals(f.Hash, file.Hash, StringComparison.OrdinalIgnoreCase));
Uri? baseUrl = null;
Uri? queuedBaseUrl = null;
Uri? directBaseUrl = null;
var queuedUrls = new List<string>();
var hasFileUrls = new List<string>();
var hasFileDirectUrls = new List<string>();
var pullThroughUrls = new List<string>();
var pullThroughDirectUrls = new List<string>();
if (forbiddenFile == null)
{
var matchingShards = allFileShards.Where(f => new Regex(f.FileMatch).IsMatch(file.Hash)).ToList();
var matchingShards = shardContexts
.Where(f => f.FileMatchRegex.IsMatch(file.Hash))
.ToList();
var shard = matchingShards.SelectMany(g => g.RegionUris)
.OrderBy(g => Guid.NewGuid()).FirstOrDefault();
foreach (var shardEntry in matchingShards)
{
var regionUris = shardEntry.GetRegionUris(avoidHostSet);
baseUrl = shard.Value ?? _configuration.GetValue<Uri>(nameof(StaticFilesServerConfiguration.CdnFullUrl));
if (regionUris.Count == 0)
{
continue;
}
foreach (var uri in regionUris)
{
AddBaseUrl(queuedUrls, uri);
}
var hasFile = !string.IsNullOrEmpty(shardEntry.ShardName)
&& _shardRegistrationService.ShardHasFile(shardEntry.ShardName, file.Hash);
var baseList = hasFile ? hasFileUrls : pullThroughUrls;
var directList = hasFile ? hasFileDirectUrls : pullThroughDirectUrls;
foreach (var uri in regionUris)
{
AddCandidate(baseList, directList, uri, file.Hash);
}
}
if (queuedUrls.Count == 0)
{
var fallback = _configuration.GetValue<Uri>(nameof(StaticFilesServerConfiguration.CdnFullUrl));
if (fallback != null && (avoidHostSet.Count == 0 || !IsAvoidedHost(fallback, avoidHostSet)))
{
AddBaseUrl(queuedUrls, fallback);
}
}
if (hasFileUrls.Count == 0 && pullThroughUrls.Count == 0)
{
var fallback = _configuration.GetValue<Uri>(nameof(StaticFilesServerConfiguration.CdnFullUrl));
if (fallback != null && (avoidHostSet.Count == 0 || !IsAvoidedHost(fallback, avoidHostSet)))
{
AddCandidate(pullThroughUrls, pullThroughDirectUrls, fallback, file.Hash);
}
}
queuedBaseUrl = SelectPreferredBase(queuedUrls);
directBaseUrl = SelectPreferredBase(hasFileUrls, pullThroughUrls);
}
var cdnDownloadUrl = string.Empty;
if (forbiddenFile == null)
{
var directUri = _cdnDownloadUrlService.TryCreateDirectDownloadUri(baseUrl, file.Hash);
var directUri = _cdnDownloadUrlService.TryCreateDirectDownloadUri(directBaseUrl, file.Hash);
if (directUri != null)
{
cdnDownloadUrl = directUri.ToString();
@@ -128,8 +200,10 @@ public class ServerFilesController : ControllerBase
IsForbidden = forbiddenFile != null,
Hash = file.Hash,
Size = file.Size,
Url = baseUrl?.ToString() ?? string.Empty,
Url = queuedBaseUrl?.ToString() ?? string.Empty,
CDNDownloadUrl = cdnDownloadUrl,
HasFileDirectUrls = hasFileDirectUrls,
PullThroughDirectUrls = pullThroughDirectUrls,
RawSize = file.RawSize
});
}
@@ -144,22 +218,127 @@ public class ServerFilesController : ControllerBase
return Ok(JsonSerializer.Serialize(allFileShards.SelectMany(t => t.RegionUris.Select(v => v.Value.ToString()))));
}
private static bool IsAvoidedHost(Uri uri, HashSet<string> avoidHosts)
{
if (avoidHosts.Count == 0)
return false;
var host = uri.Host;
if (!string.IsNullOrWhiteSpace(host) && avoidHosts.Contains(host))
return true;
var authority = uri.Authority;
if (!string.IsNullOrWhiteSpace(authority) && avoidHosts.Contains(authority))
return true;
var absolute = uri.ToString().TrimEnd('/');
return avoidHosts.Contains(absolute);
}
private sealed class ShardSelectionContext
{
private List<Uri>? _cachedUris;
private List<Uri>? _cachedAvoidedUris;
public ShardSelectionContext(string shardName, ShardConfiguration config, Regex fileMatchRegex)
{
ShardName = shardName;
Config = config;
FileMatchRegex = fileMatchRegex;
}
public string ShardName { get; }
public ShardConfiguration Config { get; }
public Regex FileMatchRegex { get; }
public List<Uri> GetRegionUris(HashSet<string> avoidHosts)
{
if (_cachedUris == null)
{
_cachedUris = Config.RegionUris.Values.ToList();
}
if (avoidHosts.Count == 0)
{
return _cachedUris;
}
_cachedAvoidedUris ??= _cachedUris.Where(u => !IsAvoidedHost(u, avoidHosts)).ToList();
return _cachedAvoidedUris.Count > 0 ? _cachedAvoidedUris : _cachedUris;
}
}
private void AddCandidate(List<string> baseUrls, List<string> directUrls, Uri baseUri, string hash)
{
var baseUrl = baseUri.ToString();
if (baseUrls.Any(u => string.Equals(u, baseUrl, StringComparison.OrdinalIgnoreCase)))
return;
baseUrls.Add(baseUrl);
var direct = _cdnDownloadUrlService.TryCreateDirectDownloadUri(baseUri, hash);
directUrls.Add(direct?.ToString() ?? string.Empty);
}
private static void AddBaseUrl(List<string> baseUrls, Uri baseUri)
{
var baseUrl = baseUri.ToString();
if (baseUrls.Any(u => string.Equals(u, baseUrl, StringComparison.OrdinalIgnoreCase)))
return;
baseUrls.Add(baseUrl);
}
private static Uri? SelectPreferredBase(List<string> urls)
{
if (urls.Count == 0)
return null;
var selected = urls[Random.Shared.Next(urls.Count)];
return Uri.TryCreate(selected, UriKind.Absolute, out var uri) ? uri : null;
}
private static Uri? SelectPreferredBase(List<string> hasFileUrls, List<string> pullThroughUrls)
{
var list = hasFileUrls.Count > 0 ? hasFileUrls : pullThroughUrls;
if (list.Count == 0)
return null;
var selected = list[Random.Shared.Next(list.Count)];
return Uri.TryCreate(selected, UriKind.Absolute, out var uri) ? uri : null;
}
[HttpGet(LightlessFiles.ServerFiles_DirectDownload + "/{hash}")]
[AllowAnonymous]
public async Task<IActionResult> DownloadFileDirect(string hash, [FromQuery] long expires, [FromQuery] string signature)
{
var result = await _cdnDownloadsService.GetDownloadAsync(hash, expires, signature).ConfigureAwait(false);
var result = await _cdnDownloadsService.GetDownloadAsync(hash, expires, signature, HttpContext.RequestAborted).ConfigureAwait(false);
return result.Status switch
{
CDNDownloadsService.ResultStatus.Disabled => NotFound(),
CDNDownloadsService.ResultStatus.Unauthorized => Unauthorized(),
CDNDownloadsService.ResultStatus.NotFound => NotFound(),
CDNDownloadsService.ResultStatus.Success => PhysicalFile(result.File!.FullName, "application/octet-stream"),
CDNDownloadsService.ResultStatus.Success => BuildDirectDownloadResult(result),
_ => NotFound()
};
}
private IActionResult BuildDirectDownloadResult(CDNDownloadsService.Result result)
{
if (result.Stream != null)
{
if (result.ContentLength.HasValue)
{
Response.ContentLength = result.ContentLength.Value;
}
return new FileStreamResult(result.Stream, "application/octet-stream");
}
return PhysicalFile(result.File!.FullName, "application/octet-stream");
}
[HttpPost(LightlessFiles.ServerFiles_FilesSend)]
public async Task<IActionResult> FilesSend([FromBody] FilesSendDto filesSendDto)
{

View File

@@ -20,15 +20,30 @@ public class ShardServerFilesController : ControllerBase
[AllowAnonymous]
public async Task<IActionResult> DownloadFileDirect(string hash, [FromQuery] long expires, [FromQuery] string signature)
{
var result = await _cdnDownloadsService.GetDownloadAsync(hash, expires, signature).ConfigureAwait(false);
var result = await _cdnDownloadsService.GetDownloadAsync(hash, expires, signature, HttpContext.RequestAborted).ConfigureAwait(false);
return result.Status switch
{
CDNDownloadsService.ResultStatus.Disabled => NotFound(),
CDNDownloadsService.ResultStatus.Unauthorized => Unauthorized(),
CDNDownloadsService.ResultStatus.NotFound => NotFound(),
CDNDownloadsService.ResultStatus.Success => PhysicalFile(result.File!.FullName, "application/octet-stream"),
CDNDownloadsService.ResultStatus.Success => BuildDirectDownloadResult(result),
_ => NotFound()
};
}
private IActionResult BuildDirectDownloadResult(CDNDownloadsService.Result result)
{
if (result.Stream != null)
{
if (result.ContentLength.HasValue)
{
Response.ContentLength = result.ContentLength.Value;
}
return new FileStreamResult(result.Stream, "application/octet-stream");
}
return PhysicalFile(result.File!.FullName, "application/octet-stream");
}
}

View File

@@ -1,4 +1,5 @@
using System.IO;
using System.Threading;
using System.Threading.Tasks;
namespace LightlessSyncStaticFilesServer.Services;
@@ -13,7 +14,7 @@ public class CDNDownloadsService
Success
}
public readonly record struct Result(ResultStatus Status, FileInfo? File);
public readonly record struct Result(ResultStatus Status, FileInfo? File, Stream? Stream, long? ContentLength);
private readonly CDNDownloadUrlService _cdnDownloadUrlService;
private readonly CachedFileProvider _cachedFileProvider;
@@ -26,31 +27,31 @@ public class CDNDownloadsService
public bool DownloadsEnabled => _cdnDownloadUrlService.DirectDownloadsEnabled;
public async Task<Result> GetDownloadAsync(string hash, long expiresUnixSeconds, string signature)
public async Task<Result> GetDownloadAsync(string hash, long expiresUnixSeconds, string signature, CancellationToken ct)
{
if (!_cdnDownloadUrlService.DirectDownloadsEnabled)
{
return new Result(ResultStatus.Disabled, null);
return new Result(ResultStatus.Disabled, null, null, null);
}
if (string.IsNullOrEmpty(signature) || string.IsNullOrEmpty(hash))
{
return new Result(ResultStatus.Unauthorized, null);
return new Result(ResultStatus.Unauthorized, null, null, null);
}
hash = hash.ToUpperInvariant();
if (!_cdnDownloadUrlService.TryValidateSignature(hash, expiresUnixSeconds, signature))
{
return new Result(ResultStatus.Unauthorized, null);
return new Result(ResultStatus.Unauthorized, null, null, null);
}
var fileInfo = await _cachedFileProvider.DownloadAndGetLocalFileInfo(hash).ConfigureAwait(false);
if (fileInfo == null)
var fileResult = await _cachedFileProvider.GetFileStreamForDirectDownloadAsync(hash, ct).ConfigureAwait(false);
if (fileResult == null)
{
return new Result(ResultStatus.NotFound, null);
return new Result(ResultStatus.NotFound, null, null, null);
}
return new Result(ResultStatus.Success, fileInfo);
return new Result(ResultStatus.Success, fileResult.Value.File, fileResult.Value.Stream, fileResult.Value.ContentLength);
}
}

View File

@@ -16,6 +16,7 @@ public sealed class CachedFileProvider : IDisposable
private readonly FileStatisticsService _fileStatisticsService;
private readonly LightlessMetrics _metrics;
private readonly ServerTokenGenerator _generator;
private readonly IShardFileInventoryReporter _inventoryReporter;
private readonly Uri _remoteCacheSourceUri;
private readonly string _hotStoragePath;
private readonly ConcurrentDictionary<string, Task> _currentTransfers = new(StringComparer.Ordinal);
@@ -27,13 +28,15 @@ public sealed class CachedFileProvider : IDisposable
private bool _isDistributionServer;
public CachedFileProvider(IConfigurationService<StaticFilesServerConfiguration> configuration, ILogger<CachedFileProvider> logger,
FileStatisticsService fileStatisticsService, LightlessMetrics metrics, ServerTokenGenerator generator)
FileStatisticsService fileStatisticsService, LightlessMetrics metrics, ServerTokenGenerator generator,
IShardFileInventoryReporter inventoryReporter)
{
_configuration = configuration;
_logger = logger;
_fileStatisticsService = fileStatisticsService;
_metrics = metrics;
_generator = generator;
_inventoryReporter = inventoryReporter;
_remoteCacheSourceUri = configuration.GetValueOrDefault<Uri>(nameof(StaticFilesServerConfiguration.DistributionFileServerAddress), null);
_isDistributionServer = configuration.GetValueOrDefault(nameof(StaticFilesServerConfiguration.IsDistributionNode), false);
_hotStoragePath = configuration.GetValue<string>(nameof(StaticFilesServerConfiguration.CacheDirectory));
@@ -97,10 +100,11 @@ public sealed class CachedFileProvider : IDisposable
_metrics.IncGauge(MetricsAPI.GaugeFilesTotal);
_metrics.IncGauge(MetricsAPI.GaugeFilesTotalSize, FilePathUtil.GetFileInfoForHash(_hotStoragePath, hash).Length);
_inventoryReporter.ReportAdded(hash);
response.Dispose();
}
private bool TryCopyFromColdStorage(string hash, string destinationFilePath)
private bool TryCopyFromColdStorage(string hash, string destinationFilePath)
{
if (!_configuration.GetValueOrDefault(nameof(StaticFilesServerConfiguration.UseColdStorage), false)) return false;
@@ -124,6 +128,7 @@ public sealed class CachedFileProvider : IDisposable
_metrics.IncGauge(MetricsAPI.GaugeFilesTotal);
_metrics.IncGauge(MetricsAPI.GaugeFilesTotalSize, new FileInfo(destinationFilePath).Length);
_inventoryReporter.ReportAdded(hash);
return true;
}
catch (Exception ex)
@@ -134,6 +139,93 @@ public sealed class CachedFileProvider : IDisposable
return false;
}
public async Task<CachedFileStreamResult?> GetFileStreamForDirectDownloadAsync(string hash, CancellationToken ct)
{
var destinationFilePath = FilePathUtil.GetFilePath(_hotStoragePath, hash);
var existing = GetLocalFilePath(hash);
if (existing != null)
{
return new CachedFileStreamResult(existing, null, existing.Length);
}
if (TryCopyFromColdStorage(hash, destinationFilePath))
{
var coldFile = GetLocalFilePath(hash);
if (coldFile != null)
{
return new CachedFileStreamResult(coldFile, null, coldFile.Length);
}
}
if (_remoteCacheSourceUri == null)
{
return null;
}
TaskCompletionSource<bool>? completion = null;
await _downloadSemaphore.WaitAsync(ct).ConfigureAwait(false);
try
{
if (_currentTransfers.TryGetValue(hash, out var downloadTask)
&& !(downloadTask?.IsCompleted ?? true))
{
completion = null;
}
else
{
completion = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
_currentTransfers[hash] = completion.Task;
_metrics.IncGauge(MetricsAPI.GaugeFilesDownloadingFromCache);
}
}
finally
{
_downloadSemaphore.Release();
}
if (completion == null)
{
var waited = await DownloadAndGetLocalFileInfo(hash).ConfigureAwait(false);
if (waited == null) return null;
return new CachedFileStreamResult(waited, null, waited.Length);
}
var downloadUrl = LightlessFiles.DistributionGetFullPath(_remoteCacheSourceUri, hash);
_logger.LogInformation("Did not find {hash}, streaming from {server}", hash, downloadUrl);
using var requestMessage = new HttpRequestMessage(HttpMethod.Get, downloadUrl);
requestMessage.Headers.Authorization = new AuthenticationHeaderValue("Bearer", _generator.Token);
HttpResponseMessage response;
try
{
response = await _httpClient.SendAsync(requestMessage, HttpCompletionOption.ResponseHeadersRead, ct).ConfigureAwait(false);
response.EnsureSuccessStatusCode();
}
catch (Exception ex)
{
_logger.LogWarning(ex, "Failed to stream {url}", downloadUrl);
FinalizeStreamingDownload(hash, null, destinationFilePath, completion, false, 0);
return null;
}
var tempFileName = destinationFilePath + ".dl";
var fileStream = new FileStream(tempFileName, FileMode.Create, FileAccess.Write, FileShare.Read, bufferSize: 64 * 1024, useAsync: true);
var sourceStream = await response.Content.ReadAsStreamAsync(ct).ConfigureAwait(false);
var stream = new StreamingCacheWriteStream(
sourceStream,
fileStream,
response,
bytesWritten => FinalizeStreamingDownload(hash, tempFileName, destinationFilePath, completion, true, bytesWritten),
bytesWritten => FinalizeStreamingDownload(hash, tempFileName, destinationFilePath, completion, false, bytesWritten),
_logger);
return new CachedFileStreamResult(null, stream, response.Content.Headers.ContentLength);
}
public async Task DownloadFileWhenRequired(string hash)
{
var fi = FilePathUtil.GetFileInfoForHash(_hotStoragePath, hash);
@@ -219,4 +311,174 @@ public sealed class CachedFileProvider : IDisposable
{
return hashes.Exists(_currentTransfers.Keys.Contains);
}
}
private void FinalizeStreamingDownload(string hash, string? tempFileName, string destinationFilePath,
TaskCompletionSource<bool> completion, bool success, long bytesWritten)
{
try
{
if (success)
{
if (!string.IsNullOrEmpty(tempFileName))
{
File.Move(tempFileName, destinationFilePath, true);
}
var fi = FilePathUtil.GetFileInfoForHash(_hotStoragePath, hash);
if (fi != null)
{
_metrics.IncGauge(MetricsAPI.GaugeFilesTotal);
_metrics.IncGauge(MetricsAPI.GaugeFilesTotalSize, fi.Length);
_fileStatisticsService.LogFile(hash, fi.Length);
_inventoryReporter.ReportAdded(hash);
}
}
else if (!string.IsNullOrEmpty(tempFileName))
{
try { File.Delete(tempFileName); } catch { /* ignore */ }
}
}
catch (Exception ex)
{
_logger.LogWarning(ex, "Failed to finalize streaming download for {hash} after {bytes} bytes", hash, bytesWritten);
}
finally
{
_metrics.DecGauge(MetricsAPI.GaugeFilesDownloadingFromCache);
_currentTransfers.Remove(hash, out _);
completion.TrySetResult(success);
}
}
private sealed class StreamingCacheWriteStream : Stream
{
private readonly Stream _source;
private readonly FileStream _cacheStream;
private readonly HttpResponseMessage _response;
private readonly Action<long> _onSuccess;
private readonly Action<long> _onFailure;
private readonly ILogger _logger;
private long _bytesWritten;
private int _completed;
public StreamingCacheWriteStream(Stream source, FileStream cacheStream, HttpResponseMessage response,
Action<long> onSuccess, Action<long> onFailure, ILogger logger)
{
_source = source;
_cacheStream = cacheStream;
_response = response;
_onSuccess = onSuccess;
_onFailure = onFailure;
_logger = logger;
}
public override bool CanRead => true;
public override bool CanSeek => false;
public override bool CanWrite => false;
public override long Length => _bytesWritten;
public override long Position
{
get => _bytesWritten;
set => throw new NotSupportedException();
}
public override int Read(byte[] buffer, int offset, int count)
{
if (Volatile.Read(ref _completed) != 0)
{
return 0;
}
try
{
int bytesRead = _source.Read(buffer, offset, count);
if (bytesRead > 0)
{
_cacheStream.Write(buffer, offset, bytesRead);
_bytesWritten += bytesRead;
return bytesRead;
}
Complete(true);
return 0;
}
catch (Exception ex)
{
_logger.LogWarning(ex, "Streaming download failed while reading");
Complete(false);
throw;
}
}
public override async ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken cancellationToken = default)
{
if (Volatile.Read(ref _completed) != 0)
{
return 0;
}
try
{
int bytesRead = await _source.ReadAsync(buffer, cancellationToken).ConfigureAwait(false);
if (bytesRead > 0)
{
await _cacheStream.WriteAsync(buffer.Slice(0, bytesRead), cancellationToken).ConfigureAwait(false);
_bytesWritten += bytesRead;
return bytesRead;
}
Complete(true);
return 0;
}
catch (Exception ex)
{
_logger.LogWarning(ex, "Streaming download failed while reading");
Complete(false);
throw;
}
}
public override void Flush()
{
// no-op
}
protected override void Dispose(bool disposing)
{
if (disposing)
{
Complete(false);
}
base.Dispose(disposing);
}
private void Complete(bool success)
{
if (Interlocked.Exchange(ref _completed, 1) != 0)
{
return;
}
try { _cacheStream.Flush(); } catch { /* ignore */ }
try { _cacheStream.Dispose(); } catch { /* ignore */ }
try { _source.Dispose(); } catch { /* ignore */ }
try { _response.Dispose(); } catch { /* ignore */ }
if (success)
{
_onSuccess(_bytesWritten);
}
else
{
_onFailure(_bytesWritten);
}
}
public override long Seek(long offset, SeekOrigin origin) => throw new NotSupportedException();
public override void SetLength(long value) => throw new NotSupportedException();
public override void Write(byte[] buffer, int offset, int count) => throw new NotSupportedException();
}
}
public readonly record struct CachedFileStreamResult(FileInfo? File, Stream? Stream, long? ContentLength);

View File

@@ -1,5 +1,6 @@
using LightlessSyncShared.Services;
using LightlessSyncShared.Utils.Configuration;
using LightlessSyncShared.Models;
using System.Collections.Concurrent;
using System.Collections.Frozen;
@@ -11,8 +12,16 @@ public class MainServerShardRegistrationService : IHostedService
private readonly IConfigurationService<StaticFilesServerConfiguration> _configurationService;
private readonly ConcurrentDictionary<string, ShardConfiguration> _shardConfigs = new(StringComparer.Ordinal);
private readonly ConcurrentDictionary<string, DateTime> _shardHeartbeats = new(StringComparer.Ordinal);
private readonly ConcurrentDictionary<string, ShardFileInventory> _shardFileInventory = new(StringComparer.Ordinal);
private readonly CancellationTokenSource _periodicCheckCts = new();
private sealed class ShardFileInventory
{
public long Sequence { get; set; }
public HashSet<string> Files { get; set; } = new(StringComparer.OrdinalIgnoreCase);
public object SyncRoot { get; } = new();
}
public MainServerShardRegistrationService(ILogger<MainServerShardRegistrationService> logger,
IConfigurationService<StaticFilesServerConfiguration> configurationService)
{
@@ -32,6 +41,7 @@ public class MainServerShardRegistrationService : IHostedService
_shardHeartbeats[shardName] = DateTime.UtcNow;
_shardConfigs[shardName] = shardConfiguration;
_shardFileInventory.TryAdd(shardName, new ShardFileInventory());
}
public void UnregisterShard(string shardName)
@@ -40,6 +50,7 @@ public class MainServerShardRegistrationService : IHostedService
_shardHeartbeats.TryRemove(shardName, out _);
_shardConfigs.TryRemove(shardName, out _);
_shardFileInventory.TryRemove(shardName, out _);
}
public List<ShardConfiguration> GetConfigurationsByContinent(string continent)
@@ -56,6 +67,94 @@ public class MainServerShardRegistrationService : IHostedService
} }];
}
public List<(string ShardName, ShardConfiguration Config)> GetShardEntriesByContinent(string continent)
{
var shardConfigs = _shardConfigs
.Where(v => v.Value.Continents.Contains(continent, StringComparer.OrdinalIgnoreCase))
.Select(kvp => (kvp.Key, kvp.Value))
.ToList();
if (shardConfigs.Any()) return shardConfigs;
shardConfigs = _shardConfigs
.Where(v => v.Value.Continents.Contains("*", StringComparer.OrdinalIgnoreCase))
.Select(kvp => (kvp.Key, kvp.Value))
.ToList();
if (shardConfigs.Any()) return shardConfigs;
var fallback = new ShardConfiguration()
{
Continents = ["*"],
FileMatch = ".*",
RegionUris = new(StringComparer.Ordinal)
{
{ "Central", _configurationService.GetValue<Uri>(nameof(StaticFilesServerConfiguration.CdnFullUrl)) }
}
};
return [(string.Empty, fallback)];
}
public long ApplyFileInventoryUpdate(string shardName, ShardFileInventoryUpdateDto update)
{
if (!_shardConfigs.ContainsKey(shardName))
throw new InvalidOperationException("Shard not registered");
var inventory = _shardFileInventory.GetOrAdd(shardName, _ => new ShardFileInventory());
lock (inventory.SyncRoot)
{
if (update.IsFullSnapshot && update.Sequence <= inventory.Sequence)
{
inventory.Files = new HashSet<string>(update.Added ?? [], StringComparer.OrdinalIgnoreCase);
inventory.Sequence = update.Sequence;
return inventory.Sequence;
}
if (update.Sequence <= inventory.Sequence)
{
return inventory.Sequence;
}
if (update.IsFullSnapshot)
{
inventory.Files = new HashSet<string>(update.Added ?? [], StringComparer.OrdinalIgnoreCase);
}
else
{
if (update.Added != null)
{
foreach (var hash in update.Added)
{
if (!string.IsNullOrWhiteSpace(hash))
inventory.Files.Add(hash);
}
}
if (update.Removed != null)
{
foreach (var hash in update.Removed)
{
if (!string.IsNullOrWhiteSpace(hash))
inventory.Files.Remove(hash);
}
}
}
inventory.Sequence = update.Sequence;
return inventory.Sequence;
}
}
public bool ShardHasFile(string shardName, string hash)
{
if (!_shardFileInventory.TryGetValue(shardName, out var inventory))
return false;
lock (inventory.SyncRoot)
{
return inventory.Files.Contains(hash);
}
}
public void ShardHeartbeat(string shardName)
{
if (!_shardConfigs.ContainsKey(shardName))
@@ -87,6 +186,7 @@ public class MainServerShardRegistrationService : IHostedService
{
_shardHeartbeats.TryRemove(kvp.Key, out _);
_shardConfigs.TryRemove(kvp.Key, out _);
_shardFileInventory.TryRemove(kvp.Key, out _);
}
}

View File

@@ -12,13 +12,16 @@ public class ShardFileCleanupService : IHostedService
private readonly IConfigurationService<StaticFilesServerConfiguration> _configuration;
private readonly ILogger<MainFileCleanupService> _logger;
private readonly LightlessMetrics _metrics;
private readonly IShardFileInventoryReporter _inventoryReporter;
private CancellationTokenSource _cleanupCts;
public ShardFileCleanupService(LightlessMetrics metrics, ILogger<MainFileCleanupService> logger, IConfigurationService<StaticFilesServerConfiguration> configuration)
public ShardFileCleanupService(LightlessMetrics metrics, ILogger<MainFileCleanupService> logger, IConfigurationService<StaticFilesServerConfiguration> configuration,
IShardFileInventoryReporter inventoryReporter)
{
_metrics = metrics;
_logger = logger;
_configuration = configuration;
_inventoryReporter = inventoryReporter;
_cacheDir = _configuration.GetValue<string>(nameof(StaticFilesServerConfiguration.CacheDirectory));
}
@@ -99,6 +102,7 @@ public class ShardFileCleanupService : IHostedService
_metrics.DecGauge(MetricsAPI.GaugeFilesTotalSize, oldestFile.Length);
_metrics.DecGauge(MetricsAPI.GaugeFilesTotal);
_logger.LogInformation("Deleting {oldestFile} with size {size}MiB", oldestFile.FullName, ByteSize.FromBytes(oldestFile.Length).MebiBytes);
ReportRemoval(oldestFile.Name);
oldestFile.Delete();
}
}
@@ -135,6 +139,7 @@ public class ShardFileCleanupService : IHostedService
_metrics.DecGauge(MetricsAPI.GaugeFilesTotalSize, file.Length);
_metrics.DecGauge(MetricsAPI.GaugeFilesTotal);
_logger.LogInformation("File outdated: {fileName}, {fileSize}MiB", file.Name, ByteSize.FromBytes(file.Length).MebiBytes);
ReportRemoval(file.Name);
file.Delete();
}
else if (forcedDeletionAfterHours > 0 && file.LastWriteTime < prevTimeForcedDeletion)
@@ -142,6 +147,7 @@ public class ShardFileCleanupService : IHostedService
_metrics.DecGauge(MetricsAPI.GaugeFilesTotalSize, file.Length);
_metrics.DecGauge(MetricsAPI.GaugeFilesTotal);
_logger.LogInformation("File forcefully deleted: {fileName}, {fileSize}MiB", file.Name, ByteSize.FromBytes(file.Length).MebiBytes);
ReportRemoval(file.Name);
file.Delete();
}
else if (file.Length == 0 && !string.Equals(file.Extension, ".dl", StringComparison.OrdinalIgnoreCase))
@@ -149,6 +155,7 @@ public class ShardFileCleanupService : IHostedService
_metrics.DecGauge(MetricsAPI.GaugeFilesTotalSize, file.Length);
_metrics.DecGauge(MetricsAPI.GaugeFilesTotal);
_logger.LogInformation("File with size 0 deleted: {filename}", file.Name);
ReportRemoval(file.Name);
file.Delete();
}
@@ -160,4 +167,12 @@ public class ShardFileCleanupService : IHostedService
_logger.LogWarning(ex, "Error during file cleanup of old files");
}
}
private void ReportRemoval(string fileName)
{
if (fileName.Length == 40 && fileName.All(char.IsAsciiLetterOrDigit))
{
_inventoryReporter.ReportRemoved(fileName);
}
}
}

View File

@@ -0,0 +1,282 @@
using LightlessSync.API.Routes;
using LightlessSyncShared.Models;
using LightlessSyncShared.Services;
using LightlessSyncShared.Utils;
using LightlessSyncShared.Utils.Configuration;
using System.Net.Http.Json;
using System.Linq;
namespace LightlessSyncStaticFilesServer.Services;
public interface IShardFileInventoryReporter
{
void ReportAdded(string hash);
void ReportRemoved(string hash);
}
public sealed class NullShardFileInventoryReporter : IShardFileInventoryReporter
{
public void ReportAdded(string hash) { }
public void ReportRemoved(string hash) { }
}
public sealed class ShardFileInventoryReporter : IHostedService, IShardFileInventoryReporter
{
private static readonly TimeSpan ResyncInterval = TimeSpan.FromMinutes(30);
private static readonly TimeSpan RetryDelay = TimeSpan.FromSeconds(10);
private static readonly TimeSpan BatchDelay = TimeSpan.FromSeconds(2);
private readonly IConfigurationService<StaticFilesServerConfiguration> _configurationService;
private readonly ILogger<ShardFileInventoryReporter> _logger;
private readonly HttpClient _httpClient = new();
private readonly string _cacheDir;
private readonly object _pendingLock = new();
private readonly SemaphoreSlim _signal = new(0);
private HashSet<string> _pendingAdds = new(StringComparer.OrdinalIgnoreCase);
private HashSet<string> _pendingRemoves = new(StringComparer.OrdinalIgnoreCase);
private CancellationTokenSource? _cts;
private Task? _processTask;
private Task? _resyncTask;
private long _sequence;
private bool _resyncRequested;
public ShardFileInventoryReporter(
IConfigurationService<StaticFilesServerConfiguration> configurationService,
ILogger<ShardFileInventoryReporter> logger,
ServerTokenGenerator serverTokenGenerator)
{
_configurationService = configurationService;
_logger = logger;
_cacheDir = configurationService.GetValue<string>(nameof(StaticFilesServerConfiguration.CacheDirectory));
_httpClient.DefaultRequestHeaders.Authorization =
new System.Net.Http.Headers.AuthenticationHeaderValue("Bearer", serverTokenGenerator.Token);
}
public Task StartAsync(CancellationToken cancellationToken)
{
_cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
_resyncRequested = true;
_signal.Release();
_processTask = Task.Run(() => ProcessUpdatesAsync(_cts.Token), _cts.Token);
_resyncTask = Task.Run(() => ResyncLoopAsync(_cts.Token), _cts.Token);
return Task.CompletedTask;
}
public async Task StopAsync(CancellationToken cancellationToken)
{
if (_cts == null)
return;
_cts.Cancel();
try
{
if (_processTask != null) await _processTask.ConfigureAwait(false);
}
catch
{
// ignore
}
try
{
if (_resyncTask != null) await _resyncTask.ConfigureAwait(false);
}
catch
{
// ignore
}
_httpClient.Dispose();
}
public void ReportAdded(string hash)
{
if (!IsValidHash(hash))
return;
lock (_pendingLock)
{
_pendingAdds.Add(hash);
_pendingRemoves.Remove(hash);
}
_signal.Release();
}
public void ReportRemoved(string hash)
{
if (!IsValidHash(hash))
return;
lock (_pendingLock)
{
_pendingRemoves.Add(hash);
_pendingAdds.Remove(hash);
}
_signal.Release();
}
private async Task ResyncLoopAsync(CancellationToken ct)
{
while (!ct.IsCancellationRequested)
{
try
{
await Task.Delay(ResyncInterval, ct).ConfigureAwait(false);
}
catch (OperationCanceledException)
{
break;
}
_resyncRequested = true;
_signal.Release();
}
}
private async Task ProcessUpdatesAsync(CancellationToken ct)
{
while (!ct.IsCancellationRequested)
{
try
{
await _signal.WaitAsync(BatchDelay, ct).ConfigureAwait(false);
}
catch (OperationCanceledException)
{
break;
}
while (!ct.IsCancellationRequested)
{
ShardFileInventoryUpdateDto? update = null;
if (_resyncRequested)
{
var snapshot = BuildSnapshot();
update = new ShardFileInventoryUpdateDto
{
Sequence = Interlocked.Increment(ref _sequence),
IsFullSnapshot = true,
Added = snapshot
};
}
else
{
HashSet<string> adds;
HashSet<string> removes;
lock (_pendingLock)
{
if (_pendingAdds.Count == 0 && _pendingRemoves.Count == 0)
break;
adds = _pendingAdds;
removes = _pendingRemoves;
_pendingAdds = new HashSet<string>(StringComparer.OrdinalIgnoreCase);
_pendingRemoves = new HashSet<string>(StringComparer.OrdinalIgnoreCase);
}
update = new ShardFileInventoryUpdateDto
{
Sequence = Interlocked.Increment(ref _sequence),
Added = adds.ToList(),
Removed = removes.ToList()
};
}
if (update == null)
break;
await SendUpdateWithRetryAsync(update, ct).ConfigureAwait(false);
if (update.IsFullSnapshot)
{
lock (_pendingLock)
{
_pendingAdds.Clear();
_pendingRemoves.Clear();
_resyncRequested = false;
}
}
}
}
}
private async Task SendUpdateWithRetryAsync(ShardFileInventoryUpdateDto update, CancellationToken ct)
{
while (!ct.IsCancellationRequested)
{
try
{
await SendUpdateAsync(update, ct).ConfigureAwait(false);
return;
}
catch (OperationCanceledException) when (ct.IsCancellationRequested)
{
throw;
}
catch (Exception ex)
{
_logger.LogWarning(ex, "Failed to send shard file inventory update (seq {seq})", update.Sequence);
try
{
await Task.Delay(RetryDelay, ct).ConfigureAwait(false);
}
catch (OperationCanceledException)
{
throw;
}
}
}
}
private async Task SendUpdateAsync(ShardFileInventoryUpdateDto update, CancellationToken ct)
{
var mainServer = _configurationService.GetValue<Uri>(nameof(StaticFilesServerConfiguration.MainFileServerAddress));
if (mainServer == null)
throw new InvalidOperationException("Main server address is not configured.");
using var response = await _httpClient.PostAsJsonAsync(
LightlessFiles.MainShardFilesFullPath(mainServer),
update,
ct).ConfigureAwait(false);
response.EnsureSuccessStatusCode();
var ack = await response.Content.ReadFromJsonAsync<ShardFileInventoryUpdateAckDto>(cancellationToken: ct)
.ConfigureAwait(false);
if (ack == null || ack.AppliedSequence < update.Sequence)
throw new InvalidOperationException($"Main server did not apply update {update.Sequence}.");
}
private List<string> BuildSnapshot()
{
var hashes = new List<string>();
if (string.IsNullOrWhiteSpace(_cacheDir) || !Directory.Exists(_cacheDir))
return hashes;
foreach (var file in Directory.EnumerateFiles(_cacheDir, "*", SearchOption.AllDirectories))
{
var name = Path.GetFileName(file);
if (name.EndsWith(".dl", StringComparison.OrdinalIgnoreCase))
continue;
if (IsValidHash(name))
hashes.Add(name);
}
return hashes;
}
private static bool IsValidHash(string hash)
{
return hash.Length == 40 && hash.All(char.IsAsciiLetterOrDigit);
}
}

View File

@@ -97,6 +97,7 @@ public class Startup
// specific services
if (_isMain)
{
services.AddSingleton<IShardFileInventoryReporter, NullShardFileInventoryReporter>();
services.AddSingleton<IClientReadyMessageService, MainClientReadyMessageService>();
services.AddHostedService<MainFileCleanupService>();
services.AddSingleton<IConfigurationService<StaticFilesServerConfiguration>, LightlessConfigurationServiceServer<StaticFilesServerConfiguration>>();
@@ -184,6 +185,9 @@ public class Startup
}
else
{
services.AddSingleton<ShardFileInventoryReporter>();
services.AddSingleton<IShardFileInventoryReporter>(sp => sp.GetRequiredService<ShardFileInventoryReporter>());
services.AddHostedService(sp => sp.GetRequiredService<ShardFileInventoryReporter>());
services.AddSingleton<ShardRegistrationService>();
services.AddHostedService(s => s.GetRequiredService<ShardRegistrationService>());
services.AddSingleton<IClientReadyMessageService, ShardClientReadyMessageService>();